Skip to content

Commit

Permalink
feat: get row group time range from cached metadata (#4869)
Browse files Browse the repository at this point in the history
* feat: get part range min-max from cache for unordered scan

* feat: seq scan push row groups if num_row_groups > 0

* test: test split

* feat: update comment

* test: fix split test

* refactor: rename get meta data method
  • Loading branch information
evenyag authored Nov 1, 2024
1 parent 758ad0a commit 39ab1a6
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 22 deletions.
23 changes: 17 additions & 6 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,15 @@ impl CacheManager {
CacheManagerBuilder::default()
}

/// Gets cached [ParquetMetaData].
/// Gets cached [ParquetMetaData] from in-memory cache first.
/// If not found, tries to get it from write cache and fill the in-memory cache.
pub async fn get_parquet_meta_data(
&self,
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
// Try to get metadata from sst meta cache
let metadata = self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
let value = sst_meta_cache.get(&SstMetaKey(region_id, file_id));
update_hit_miss(value, SST_META_TYPE)
});

let metadata = self.get_parquet_meta_data_from_mem_cache(region_id, file_id);
if metadata.is_some() {
return metadata;
}
Expand All @@ -110,6 +107,20 @@ impl CacheManager {
None
}

/// Gets cached [ParquetMetaData] from in-memory cache.
/// This method does not perform I/O.
pub fn get_parquet_meta_data_from_mem_cache(
&self,
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
// Try to get metadata from sst meta cache
self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
let value = sst_meta_cache.get(&SstMetaKey(region_id, file_id));
update_hit_miss(value, SST_META_TYPE)
})
}

/// Puts [ParquetMetaData] into the cache.
pub fn put_parquet_meta_data(
&self,
Expand Down
281 changes: 268 additions & 13 deletions src/mito2/src/read/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ use common_time::Timestamp;
use smallvec::{smallvec, SmallVec};
use store_api::region_engine::PartitionRange;

use crate::cache::CacheManager;
use crate::memtable::MemtableRef;
use crate::read::scan_region::ScanInput;
use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
use crate::sst::parquet::format::parquet_row_group_time_range;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;

const ALL_ROW_GROUPS: i64 = -1;

/// Index to access a row group.
#[derive(Clone, Copy, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) struct RowGroupIndex {
/// Index to the memtable/file.
pub(crate) index: usize,
Expand All @@ -38,6 +40,7 @@ pub(crate) struct RowGroupIndex {
/// Meta data of a partition range.
/// If the scanner is [UnorderedScan], each meta only has one row group or memtable.
/// If the scanner is [SeqScan], each meta may have multiple row groups and memtables.
#[derive(Debug, PartialEq)]
pub(crate) struct RangeMeta {
/// The time range of the range.
pub(crate) time_range: FileTimeRange,
Expand Down Expand Up @@ -84,7 +87,12 @@ impl RangeMeta {
pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
Self::push_unordered_file_ranges(input.memtables.len(), &input.files, &mut ranges);
Self::push_unordered_file_ranges(
input.memtables.len(),
&input.files,
input.cache_manager.as_deref(),
&mut ranges,
);

ranges
}
Expand Down Expand Up @@ -164,12 +172,36 @@ impl RangeMeta {
fn push_unordered_file_ranges(
num_memtables: usize,
files: &[FileHandle],
cache: Option<&CacheManager>,
ranges: &mut Vec<RangeMeta>,
) {
// For append mode, we can parallelize reading row groups.
for (i, file) in files.iter().enumerate() {
let file_index = num_memtables + i;
if file.meta_ref().num_row_groups > 0 {
// Get parquet meta from the cache.
let parquet_meta = cache.and_then(|c| {
c.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id())
});
if let Some(parquet_meta) = parquet_meta {
// Scans each row group.
for row_group_index in 0..file.meta_ref().num_row_groups {
let time_range = parquet_row_group_time_range(
file.meta_ref(),
&parquet_meta,
row_group_index as usize,
);
let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
ranges.push(RangeMeta {
time_range: time_range.unwrap_or_else(|| file.time_range()),
indices: smallvec![file_index],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: row_group_index as i64,
}],
num_rows: num_rows as usize,
});
}
} else if file.meta_ref().num_row_groups > 0 {
// Scans each row group.
for row_group_index in 0..file.meta_ref().num_row_groups {
ranges.push(RangeMeta {
Expand Down Expand Up @@ -217,7 +249,6 @@ impl RangeMeta {
}
}

// TODO(yingwen): Support multiple row groups in a range so we can split them later.
fn push_seq_file_ranges(
num_memtables: usize,
files: &[FileHandle],
Expand All @@ -226,15 +257,31 @@ impl RangeMeta {
// For non append-only mode, each range only contains one file.
for (i, file) in files.iter().enumerate() {
let file_index = num_memtables + i;
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: ALL_ROW_GROUPS,
}],
num_rows: file.meta_ref().num_rows as usize,
});
if file.meta_ref().num_row_groups > 0 {
// All row groups share the same time range.
let row_group_indices = (0..file.meta_ref().num_row_groups)
.map(|row_group_index| RowGroupIndex {
index: file_index,
row_group_index: row_group_index as i64,
})
.collect();
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices,
num_rows: file.meta_ref().num_rows as usize,
});
} else {
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: ALL_ROW_GROUPS,
}],
num_rows: file.meta_ref().num_rows as usize,
});
}
}
}
}
Expand Down Expand Up @@ -366,4 +413,212 @@ mod tests {
&[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
);
}

#[test]
fn test_merge_range() {
let mut left = RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
row_group_index: 1
},
RowGroupIndex {
index: 1,
row_group_index: 2
}
],
num_rows: 5,
};
let right = RangeMeta {
time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
indices: smallvec![2],
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
row_group_index: 1
},
RowGroupIndex {
index: 2,
row_group_index: 2
}
],
num_rows: 4,
};
left.merge(right);

assert_eq!(
left,
RangeMeta {
time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
indices: smallvec![1, 2],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
row_group_index: 1
},
RowGroupIndex {
index: 1,
row_group_index: 2
},
RowGroupIndex {
index: 2,
row_group_index: 1
},
RowGroupIndex {
index: 2,
row_group_index: 2
},
],
num_rows: 9,
}
);
}

#[test]
fn test_split_range() {
let range = RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
row_group_index: 1
},
RowGroupIndex {
index: 1,
row_group_index: 2
}
],
num_rows: 5,
};

assert!(range.can_split_preserve_order());
let mut output = Vec::new();
range.maybe_split(&mut output);

assert_eq!(
output,
&[
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 1
},],
num_rows: 2,
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 2
}],
num_rows: 2,
}
]
);
}

#[test]
fn test_not_split_range() {
let range = RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1, 2],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
row_group_index: 1
},
RowGroupIndex {
index: 2,
row_group_index: 1
}
],
num_rows: 5,
};

assert!(!range.can_split_preserve_order());
let mut output = Vec::new();
range.maybe_split(&mut output);
assert_eq!(1, output.len());
}

#[test]
fn test_maybe_split_ranges() {
let ranges = vec![
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
row_group_index: 0
},
RowGroupIndex {
index: 1,
row_group_index: 1
}
],
num_rows: 4,
},
RangeMeta {
time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
indices: smallvec![2, 3],
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
row_group_index: 0
},
RowGroupIndex {
index: 3,
row_group_index: 0
}
],
num_rows: 5,
},
];
let output = maybe_split_ranges_for_seq_scan(ranges);
assert_eq!(
output,
vec![
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 0
},],
num_rows: 2,
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 1
}],
num_rows: 2,
},
RangeMeta {
time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
indices: smallvec![2, 3],
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
row_group_index: 0
},
RowGroupIndex {
index: 3,
row_group_index: 0
}
],
num_rows: 5,
},
]
)
}
}
3 changes: 2 additions & 1 deletion src/mito2/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ pub struct FileMeta {
pub region_id: RegionId,
/// Compared to normal file names, FileId ignore the extension
pub file_id: FileId,
/// Timestamp range of file.
/// Timestamp range of file. The timestamps have the same time unit as the
/// data in the SST.
pub time_range: FileTimeRange,
/// SST level of the file.
pub level: Level,
Expand Down
Loading

0 comments on commit 39ab1a6

Please sign in to comment.