Skip to content

Commit

Permalink
feat/memory-decoder: Updated decoding and logging in Parquet reader a…
Browse files Browse the repository at this point in the history
…nd row group modules

 • Modified decode_to_batches in bulk/part.rs to use placeholders for unused parameters.
 • Enhanced logging in ParquetReader's Drop implementation to include region ID and row group metrics.
 • Added #[allow(dead_code)] to new_memory function in ParquetReaderBuilder.
 • Implemented region_id method in RowGroupReaderBuilder.
 • Added comments to RowGroupLocation methods for clarity.
 • Removed unnecessary comments and unused code.
  • Loading branch information
v0y4g3r committed Nov 8, 2024
1 parent d7fb332 commit 6d26697
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/mito2/src/memtable/bulk/part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl BulkPartEncoder {
}

/// Decodes [BulkPart] to [Batch]es.
fn decode_to_batches(&self, part: &BulkPart, dest: &mut VecDeque<Batch>) -> Result<()> {
fn decode_to_batches(&self, _part: &BulkPart, _dest: &mut VecDeque<Batch>) -> Result<()> {
todo!()
}
}
Expand Down
43 changes: 20 additions & 23 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use api::v1::SemanticType;
use async_trait::async_trait;
use bytes::Bytes;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
Expand Down Expand Up @@ -96,6 +96,7 @@ pub struct ParquetReaderBuilder {

impl ParquetReaderBuilder {
/// Returns a new [ParquetReaderBuilder] to read specific Parquet file in memory.
#[allow(dead_code)]
pub fn new_memory(region_id: RegionId, data: Bytes) -> ParquetReaderBuilder {
ParquetReaderBuilder {
location: Location::Memory(Memory { region_id, data }),
Expand Down Expand Up @@ -1013,6 +1014,11 @@ pub(crate) struct RowGroupReaderBuilder {
}

impl RowGroupReaderBuilder {
/// Region Id of given file.
pub(crate) fn region_id(&self) -> RegionId {
self.file_location.region_id()
}

/// Path of the file to read.
pub(crate) fn file_path(&self) -> &str {
self.file_location.file_path()
Expand Down Expand Up @@ -1211,28 +1217,19 @@ impl BatchReader for ParquetReader {
impl Drop for ParquetReader {
fn drop(&mut self) {
let metrics = self.reader_state.metrics();
// todo(hl): we should carry region info and time range info in location variants.
// debug!(
// "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
// self.context
// .reader_builder()
// .location
// .file_handle
// .region_id(),
// self.context.reader_builder().location.file_id(),
// self.context
// .reader_builder()
// .location
// .file_handle
// .time_range(),
// metrics.filter_metrics.num_row_groups_before_filtering
// - metrics
// .filter_metrics
// .num_row_groups_inverted_index_filtered
// - metrics.filter_metrics.num_row_groups_min_max_filtered,
// metrics.filter_metrics.num_row_groups_before_filtering,
// metrics
// );
//todo(hl): Also carry time range when we implement the memory parquet.
debug!(
"Read parquet {} {}, {}/{} row groups, metrics: {:?}",
self.context.reader_builder().region_id(),
self.context.reader_builder().file_id(),
metrics.filter_metrics.num_row_groups_before_filtering
- metrics
.filter_metrics
.num_row_groups_inverted_index_filtered
- metrics.filter_metrics.num_row_groups_min_max_filtered,
metrics.filter_metrics.num_row_groups_before_filtering,
metrics
);

// Report metrics.
READ_STAGE_ELAPSED
Expand Down
10 changes: 8 additions & 2 deletions src/mito2/src/sst/parquet/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,24 @@ use crate::metrics::READ_STAGE_FETCH_PAGES;
use crate::sst::parquet::page_reader::RowGroupCachedReader;
use crate::sst::parquet::reader::Location;

/// Location of row group.
struct RowGroupLocation<'a> {
file_location: &'a Location,
row_group_idx: usize,
}

impl<'a> RowGroupLocation<'a> {
async fn fetch_bytes(&self, ranges: &[Range<u64>]) -> parquet::errors::Result<Vec<Bytes>> {
/// Fetches bytes from given ranges.
async fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
self.file_location.fetch_bytes(ranges).await
}

/// Returns true if cache manager is enabled for given [RowGroupLocation].
fn has_cache_manager(&self) -> bool {
self.file_location.cache_manager().is_some()
}

/// Gets page value of given column index in row group.
fn get_cache(&self, col_idx: usize, compressed: bool) -> Option<Arc<PageValue>> {
let Location::Sst(sst) = &self.file_location else {
return None;
Expand All @@ -63,6 +67,7 @@ impl<'a> RowGroupLocation<'a> {
cache.get_pages(&page_key)
}

/// Puts compressed pages to page cache.
fn put_compressed_to_cache(&self, col_idx: usize, data: Bytes) {
let Location::Sst(sst) = &self.file_location else {
return;
Expand All @@ -83,6 +88,7 @@ impl<'a> RowGroupLocation<'a> {
);
}

/// Puts uncompressed pages to page cache.
fn put_uncompressed_to_cache(&self, col_idx: usize, data: Arc<PageValue>) {
let Location::Sst(sst) = &self.file_location else {
return;
Expand Down Expand Up @@ -118,6 +124,7 @@ pub struct InMemoryRowGroup<'a> {
}

impl<'a> InMemoryRowGroup<'a> {
/// Creates a new [InMemoryRowGroup] with given file location.
pub fn create(
row_group_idx: usize,
parquet_meta: &'a ParquetMetaData,
Expand Down Expand Up @@ -187,7 +194,6 @@ impl<'a> InMemoryRowGroup<'a> {
.collect::<Vec<_>>();

let mut chunk_data = self.location.fetch_bytes(&fetch_ranges).await?.into_iter();

let mut page_start_offsets = page_start_offsets.into_iter();

for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
Expand Down

0 comments on commit 6d26697

Please sign in to comment.