Skip to content

Commit

Permalink
fix(producer): send messages to leader (#32)
Browse files Browse the repository at this point in the history
* fix(producer): send messages to leader

* chore: rename field

* chore: make fmt happy
  • Loading branch information
iamazy authored Mar 22, 2024
1 parent 56110d8 commit e650773
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 62 deletions.
2 changes: 1 addition & 1 deletion examples/simple_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn main() -> Result<(), Box<Error>> {
let record = TestData::new(&format!("hello - kafka {i}"));
let ret = producer.send(&topic, record).await?;
let _ = tx.send(ret).await;
// tokio::time::sleep(Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
info!("elapsed: {:?}", now.elapsed());
// wait till all cached records send to kafka
Expand Down
14 changes: 7 additions & 7 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct Kafka<Exe: Executor> {
pub manager: Arc<ConnectionManager<Exe>>,
pub operation_retry_options: OperationRetryOptions,
pub executor: Arc<Exe>,
pub cluster_meta: Arc<Cluster>,
pub cluster: Arc<Cluster>,
supported_versions: Arc<DashMap<i16, VersionRange>>,
}

Expand Down Expand Up @@ -122,20 +122,20 @@ impl<Exe: Executor> Kafka<Exe> {
manager,
operation_retry_options,
executor,
cluster_meta: Arc::new(Cluster::default()),
cluster: Arc::new(Cluster::default()),
supported_versions: Arc::new(supported_versions),
})
}

pub fn topic_id(&self, topic_name: &TopicName) -> Uuid {
match self.cluster_meta.topic_id(topic_name) {
match self.cluster.topic_id(topic_name) {
Some(topic_id) => topic_id,
None => Uuid::nil(),
}
}

pub fn partitions(&self, topic: &TopicName) -> Result<PartitionRef> {
self.cluster_meta.partitions(topic)
self.cluster.partitions(topic)
}

pub fn version_range(&self, key: ApiKey) -> Option<VersionRange> {
Expand Down Expand Up @@ -284,7 +284,7 @@ impl<Exe: Executor> Kafka<Exe> {
let request = RequestKind::MetadataRequest(request);
let response = self.manager.invoke(&self.manager.url, request).await?;
if let ResponseKind::MetadataResponse(metadata) = response {
self.cluster_meta.update_metadata(metadata)
self.cluster.update_metadata(metadata)
} else {
Err(Error::Connection(ConnectionError::UnexpectedResponse(
format!("{response:?}"),
Expand All @@ -293,8 +293,8 @@ impl<Exe: Executor> Kafka<Exe> {
}

pub async fn update_full_metadata(&self) -> Result<()> {
let mut topics = Vec::with_capacity(self.cluster_meta.topics.len());
for topic in self.cluster_meta.topics.iter() {
let mut topics = Vec::with_capacity(self.cluster.topics.len());
for topic in self.cluster.topics.iter() {
topics.push(topic.key().clone());
}
self.update_metadata(topics).await
Expand Down
12 changes: 6 additions & 6 deletions src/consumer/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<Exe: Executor> Fetcher<Exe> {
completed_fetches_tx: mpsc::UnboundedSender<CompletedFetch>,
) -> Self {
let sessions = DashMap::new();
for node in client.cluster_meta.nodes.iter() {
for node in client.cluster.nodes.iter() {
sessions.insert(node.id, FetchSession::new(node.id));
}

Expand Down Expand Up @@ -100,7 +100,7 @@ impl<Exe: Executor> Fetcher<Exe> {
version = 12;
}
let metadata = fetch_request_data.metadata;
if let Some(node) = self.client.cluster_meta.nodes.get(&node) {
if let Some(node) = self.client.cluster.nodes.get(&node) {
let fetch_request =
self.fetch_builder(&mut fetch_request_data, version).await?;
trace!("Send fetch request: {:?}", fetch_request);
Expand Down Expand Up @@ -239,7 +239,7 @@ impl<Exe: Executor> Fetcher<Exe> {
match rx.await {
Ok(partitions) => {
for tp in partitions {
let current_leader = self.client.cluster_meta.current_leader(&tp);
let current_leader = self.client.cluster.current_leader(&tp);
self.event_tx.unbounded_send(
CoordinatorEvent::MaybeValidatePositionForCurrentLeader {
partition: tp,
Expand Down Expand Up @@ -428,7 +428,7 @@ impl<Exe: Executor> Fetcher<Exe> {
let position = FetchPosition {
offset: offset_data.offset - 1,
offset_epoch: None,
current_leader: self.client.cluster_meta.current_leader(&partition),
current_leader: self.client.cluster.current_leader(&partition),
};
// TODO: metadata update last seen epoch if newer
self.event_tx
Expand Down Expand Up @@ -603,8 +603,8 @@ impl<Exe: Executor> Fetcher<Exe> {
offset_reset_timestamps: &mut HashMap<TopicPartition, i64>,
) -> Result<HashMap<Node, ListOffsetsRequest>> {
let mut node_request = HashMap::new();
for node_entry in self.client.cluster_meta.nodes.iter() {
if let Ok(node_topology) = self.client.cluster_meta.drain_node(node_entry.value().id) {
for node_entry in self.client.cluster.nodes.iter() {
if let Ok(node_topology) = self.client.cluster.drain_node(node_entry.value().id) {
let partitions = node_topology.value();

let mut topics = HashMap::new();
Expand Down
63 changes: 40 additions & 23 deletions src/coordinator/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::{

const CONSUMER_PROTOCOL_TYPE: &str = "consumer";

macro_rules! offset_fetch_block {
macro_rules! fetch_offsets_block {
($self:ident, $source:ident) => {
for topic in $source.topics {
for partition in topic.partitions {
Expand All @@ -63,13 +63,22 @@ macro_rules! offset_fetch_block {
};
if partition.error_code.is_ok() {
if let Some(partition_state) = $self.subscriptions.assignments.get_mut(&tp) {
partition_state.position.offset = partition.committed_offset;
partition_state.position.offset_epoch =
Some(partition.committed_leader_epoch);
info!(
"Fetch {tp} offset success, offset: {}",
partition.committed_offset
);
// record the position with the offset (-1 indicates no committed offset to
// fetch)
if partition.committed_offset >= 0 {
partition_state.position.offset = partition.committed_offset;
partition_state.position.offset_epoch =
Some(partition.committed_leader_epoch);
info!(
"Fetch {tp} offset success, offset: {}",
partition.committed_offset
);
} else {
debug!(
"Found no committed offset for partition {}",
partition.partition_index
);
}
}
} else {
error!(
Expand All @@ -86,7 +95,7 @@ macro_rules! offset_fetch_block {
if let Some(tp_state) = $self.subscriptions.assignments.get_mut(&tp) {
tp_state.position.offset = offset;
tp_state.position.offset_epoch = None;
tp_state.position.current_leader = $self.client.cluster_meta.current_leader(&tp);
tp_state.position.current_leader = $self.client.cluster.current_leader(&tp);
info!("Seek {tp} with offset: {offset}",);
}
}
Expand Down Expand Up @@ -370,7 +379,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {

self.join_group().await?;
self.sync_group().await?;
self.offset_fetch().await?;
self.fetch_offsets().await?;

// resume fetch thread.
self.fetcher.resume();
Expand Down Expand Up @@ -557,22 +566,30 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
};
let mut tp_state = TopicPartitionState::new(*partition);
tp_state.position.current_leader =
self.client.cluster_meta.current_leader(&tp);
self.client.cluster.current_leader(&tp);
self.subscriptions.assignments.insert(tp, tp_state);
}
}
self.state = MemberState::Stable;

let group_id = self.group_meta.group_id.as_str();
info!(
"Sync group [{}] success, leader = {}, member_id = {}, generation_id \
= {}, protocol_type = {}, protocol_name = {}, assignments = <{}>",
self.group_meta.group_id.as_str(),
"Sync group [{group_id}] success, leader = {}, member_id = {}, \
generation_id = {}, assignments = <{}>",
self.group_meta.leader.as_str(),
self.group_meta.member_id.as_str(),
self.group_meta.generation_id,
self.group_meta.protocol_type.as_ref().unwrap().as_str(),
self.group_meta.protocol_name.as_ref().unwrap().as_str(),
crate::array_display(self.subscriptions.assignments.keys()),
);

if let Some(protocol_type) = self.group_meta.protocol_type.as_ref() {
info!(
"Sync group [{group_id}] success, protocol_type = {}, \
protocol_name = {}",
protocol_type.as_str(),
self.group_meta.protocol_name.as_ref().unwrap().as_str()
);
}
Ok(())
}
Some(error) => Err(error.into()),
Expand All @@ -582,7 +599,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
}
}

pub async fn offset_fetch(&mut self) -> Result<()> {
pub async fn fetch_offsets(&mut self) -> Result<()> {
match self.client.version_range(ApiKey::OffsetFetchKey) {
Some(version_range) => {
let mut offset_fetch_response = self
Expand All @@ -595,9 +612,9 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
match offset_fetch_response.error_code.err() {
None => {
if let Some(group) = offset_fetch_response.groups.pop() {
offset_fetch_block!(self, group);
fetch_offsets_block!(self, group);
} else {
offset_fetch_block!(self, offset_fetch_response);
fetch_offsets_block!(self, offset_fetch_response);
}
Ok(())
}
Expand Down Expand Up @@ -829,7 +846,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
match self.group_meta.protocol_name {
Some(ref protocol) => {
let assignor = self.look_up_assignor(&protocol.to_string())?;
let cluster = self.client.cluster_meta.clone();
let cluster = self.client.cluster.clone();
request.assignments =
serialize_assignments(assignor.assign(cluster, &self.group_subscription)?)?;
}
Expand Down Expand Up @@ -858,7 +875,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
if version <= 7 {
let mut topics = Vec::with_capacity(self.subscriptions.topics.len());
for assign in self.subscriptions.topics.iter() {
let partitions = self.client.cluster_meta.partitions(assign)?;
let partitions = self.client.cluster.partitions(assign)?;

let mut topic = OffsetFetchRequestTopic::default();
topic.name = assign.clone();
Expand All @@ -870,7 +887,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
} else {
let mut topics = Vec::with_capacity(self.subscriptions.topics.len());
for assign in self.subscriptions.topics.iter() {
let partitions = self.client.cluster_meta.partitions(assign)?;
let partitions = self.client.cluster.partitions(assign)?;

let mut topic = OffsetFetchRequestTopics::default();
topic.name = assign.clone();
Expand Down Expand Up @@ -984,7 +1001,7 @@ async fn coordinator_loop<Exe: Executor>(
CoordinatorEvent::JoinGroup => coordinator.join_group().await,
CoordinatorEvent::SyncGroup => coordinator.sync_group().await,
CoordinatorEvent::LeaveGroup(reason) => coordinator.maybe_leave_group(reason).await,
CoordinatorEvent::OffsetFetch => coordinator.offset_fetch().await,
CoordinatorEvent::OffsetFetch => coordinator.fetch_offsets().await,
CoordinatorEvent::PauseFetch => {
coordinator.fetcher.pause();
Ok(())
Expand Down
20 changes: 11 additions & 9 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,19 @@ impl Cluster {
if !self.nodes.contains_key(&node) {
return Err(Error::NodeNotAvailable { node });
}
return if let Some(node_entry) = self.partitions_by_nodes.get(&node) {
Ok(node_entry)
return if let Some(node_ref) = self.partitions_by_nodes.get(&node) {
Ok(node_ref)
} else {
let mut topic_partitions = Vec::new();
for topic_entry in self.topics.iter() {
for partition in topic_entry.partitions.iter() {
let tp = TopicPartition {
topic: topic_entry.key().clone(),
partition: partition.partition,
};
topic_partitions.push(tp);
for topic in self.topics.iter() {
for partition in topic.partitions.iter() {
if partition.leader == node {
let tp = TopicPartition {
topic: topic.key().clone(),
partition: partition.partition,
};
topic_partitions.push(tp);
}
}
}
self.partitions_by_nodes.insert(node, topic_partitions);
Expand Down
14 changes: 9 additions & 5 deletions src/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,13 @@ impl<Exe: Executor> Producer<Exe> {
encode_options: &RecordEncodeOptions,
) -> Result<Vec<(Node, FlushResult)>> {
let mut result = Vec::new();
for node_entry in self.client.cluster_meta.nodes.iter() {
if let Ok(node_topology) = self.client.cluster_meta.drain_node(node_entry.value().id) {
let partitions = node_topology.value();
for node_entry in self.client.cluster.nodes.iter() {
if let Ok(node) = self.client.cluster.drain_node(node_entry.value().id) {
let partitions = node.value();
if partitions.is_empty() {
continue;
}

let mut topic_data = IndexMap::new();
let mut topics_thunks = BTreeMap::new();

Expand Down Expand Up @@ -444,7 +448,7 @@ impl<Exe: Executor> TopicProducer<Exe> {
pub async fn new(client: Arc<Kafka<Exe>>, topic: TopicName) -> Result<Arc<TopicProducer<Exe>>> {
client.update_metadata(vec![topic.clone()]).await?;

let partitions = client.cluster_meta.partitions(&topic)?;
let partitions = client.cluster.partitions(&topic)?;
let partitions = partitions.value();
let num_partitions = partitions.len();
let batches = DashMap::with_capacity_and_hasher(num_partitions, FxBuildHasher::default());
Expand Down Expand Up @@ -473,7 +477,7 @@ impl<Exe: Executor> TopicProducer<Exe> {
&self.topic,
record.key(),
record.value(),
&self.client.cluster_meta,
&self.client.cluster,
)?;
}
return match self.batches.get_mut(&partition) {
Expand Down
13 changes: 2 additions & 11 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl KafkaCodec {
}
Ok(())
}

fn encode_request<Req: Request>(
&mut self,
mut header: RequestHeader,
Expand Down Expand Up @@ -192,18 +193,8 @@ impl KafkaCodec {
Ok(())
}

fn response_header_version(&self, api_key: i16, api_version: i16) -> i16 {
if let Some(version_range) = self.support_versions.get(&api_key) {
if api_version >= version_range.max {
return 1;
}
}
0
}

fn decode_response(&mut self, src: &mut BytesMut) -> Result<Option<Command>, ConnectionError> {
let mut correlation_id_bytes = src.try_peek_bytes(0..4)?;
let correlation_id = correlation_id_bytes.get_i32();
let correlation_id = src.peek_bytes(0..4).get_i32();

let request_header = self
.active_requests
Expand Down

0 comments on commit e650773

Please sign in to comment.