Skip to content

Commit

Permalink
feat: merge close ranges
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jul 29, 2024
1 parent 924e101 commit e38f1e3
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 63 deletions.
47 changes: 30 additions & 17 deletions src/log-store/src/kafka/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use pin_project::pin_project;
use rskafka::client::partition::PartitionClient;
use rskafka::record::RecordAndOffset;

use super::index::RegionWalIndexIterator;
use super::index::{NextBatch, RegionWalIndexIterator};

#[async_trait::async_trait]
pub trait FetchClient: std::fmt::Debug + Send + Sync {
Expand Down Expand Up @@ -54,8 +54,8 @@ impl FetchClient for PartitionClient {

struct FetchResult {
records_and_offsets: Vec<RecordAndOffset>,
batch_num: usize,
max_batch_size: i32,
batch_size: usize,
fetch_size: i32,
watermark: i64,
used_offset: i64,
}
Expand All @@ -72,6 +72,8 @@ pub struct Consumer {

avg_record_size: usize,

max_gap_size: usize,

terminated: bool,

buffer: RecordsBuffer,
Expand Down Expand Up @@ -144,11 +146,19 @@ impl Stream for Consumer {
let client = Arc::clone(this.client);
let max_wait_ms = *this.max_wait_ms as i32;
let offset = next_offset as i64;
let next_batch_size = this.buffer.index.next_batch_size().max(1);
let max_batch_size = (*this.avg_record_size * next_batch_size as usize)
.min(*this.max_batch_size)
as i32;
let bytes = 1i32..max_batch_size;
let NextBatch {
fetch_size,
batch_size,
} = this
.buffer
.index
.next_batch(*this.avg_record_size, *this.max_gap_size)
.unwrap_or(NextBatch {
fetch_size: *this.avg_record_size,
batch_size: 1,
});

let bytes = 1i32..fetch_size as i32;

*this.fetch_fut = FutureExt::fuse(Box::pin(async move {
let (records_and_offsets, watermark) =
Expand All @@ -158,8 +168,8 @@ impl Stream for Consumer {
records_and_offsets,
watermark,
used_offset: offset,
max_batch_size,
batch_num: next_batch_size as usize,
fetch_size: fetch_size as i32,
batch_size,
})
}));
}
Expand All @@ -176,19 +186,19 @@ impl Stream for Consumer {
mut records_and_offsets,
watermark,
used_offset,
max_batch_size,
batch_num,
fetch_size,
batch_size,
}) => {
// Sort records by offset in case they aren't in order
records_and_offsets.sort_by_key(|x| x.offset);
*this.last_high_watermark = watermark;
if !records_and_offsets.is_empty() {
*this.avg_record_size = max_batch_size as usize / records_and_offsets.len();
*this.avg_record_size = fetch_size as usize / records_and_offsets.len();
debug!("set avg_record_size: {}", *this.avg_record_size);
}

debug!(
"Fetch result: {:?}, used_offset: {used_offset}, max_batch_size: {max_batch_size}, expected batch_num: {batch_num}, actual batch_num: {}",
"Fetch result: {:?}, used_offset: {used_offset}, max_batch_size: {fetch_size}, expected batch_num: {batch_size}, actual batch_num: {}",
records_and_offsets
.iter()
.map(|record| record.offset)
Expand Down Expand Up @@ -264,17 +274,18 @@ mod tests {
async fn test_consumer_with_index() {
common_telemetry::init_default_ut_logging();
let record = test_record();
let record_size = record.approximate_size() * 3;
let record_size = record.approximate_size();
let mock_client = MockFetchClient {
record: record.clone(),
};
let index = RegionWalVecIndex::new([1, 3, 5, 7, 8, 10, 12]);
let index = RegionWalVecIndex::new([1, 3, 4, 8, 10, 12]);
let consumer = Consumer {
last_high_watermark: -1,
client: Arc::new(mock_client),
max_batch_size: usize::MAX,
max_wait_ms: 500,
avg_record_size: record_size,
max_gap_size: record_size * 2 + 1,
terminated: false,
buffer: RecordsBuffer {
buffer: VecDeque::new(),
Expand All @@ -289,7 +300,7 @@ mod tests {
.into_iter()
.map(|(x, _)| x.offset)
.collect::<Vec<_>>(),
vec![1, 3, 5, 7, 8, 10, 12]
vec![1, 3, 4, 8, 10, 12]
)
}

Expand All @@ -307,6 +318,7 @@ mod tests {
max_batch_size: usize::MAX,
max_wait_ms: 500,
avg_record_size: record.approximate_size(),
max_gap_size: record.approximate_size() * 2,
terminated: false,
buffer: RecordsBuffer {
buffer: VecDeque::new(),
Expand Down Expand Up @@ -344,6 +356,7 @@ mod tests {
max_batch_size: usize::MAX,
max_wait_ms: 500,
avg_record_size: record.approximate_size(),
max_gap_size: record.approximate_size() * 2,
terminated: false,
buffer: RecordsBuffer {
buffer: VecDeque::new(),
Expand Down
Loading

0 comments on commit e38f1e3

Please sign in to comment.