From 6d266970309a922d889b861b94bff2b6a606d92a Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Fri, 8 Nov 2024 14:39:38 -0800 Subject: [PATCH] feat/memory-decoder: Updated decoding and logging in Parquet reader and row group modules MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Modified decode_to_batches in bulk/part.rs to use placeholders for unused parameters. • Enhanced logging in ParquetReader's Drop implementation to include region ID and row group metrics. • Added #[allow(dead_code)] to new_memory function in ParquetReaderBuilder. • Implemented region_id method in RowGroupReaderBuilder. • Added comments to RowGroupLocation methods for clarity. • Removed unnecessary comments and unused code. --- src/mito2/src/memtable/bulk/part.rs | 2 +- src/mito2/src/sst/parquet/reader.rs | 43 ++++++++++++-------------- src/mito2/src/sst/parquet/row_group.rs | 10 ++++-- 3 files changed, 29 insertions(+), 26 deletions(-) diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 617f5b82843c..74a390dc253f 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -115,7 +115,7 @@ impl BulkPartEncoder { } /// Decodes [BulkPart] to [Batch]es. - fn decode_to_batches(&self, part: &BulkPart, dest: &mut VecDeque) -> Result<()> { + fn decode_to_batches(&self, _part: &BulkPart, _dest: &mut VecDeque) -> Result<()> { todo!() } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 2ca52edfbaa0..ef8d57817a75 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -23,7 +23,7 @@ use api::v1::SemanticType; use async_trait::async_trait; use bytes::Bytes; use common_recordbatch::filter::SimpleFilterEvaluator; -use common_telemetry::warn; +use common_telemetry::{debug, warn}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; @@ -96,6 +96,7 @@ pub struct ParquetReaderBuilder { impl ParquetReaderBuilder { /// Returns a new [ParquetReaderBuilder] to read specific Parquet file in memory. + #[allow(dead_code)] pub fn new_memory(region_id: RegionId, data: Bytes) -> ParquetReaderBuilder { ParquetReaderBuilder { location: Location::Memory(Memory { region_id, data }), @@ -1013,6 +1014,11 @@ pub(crate) struct RowGroupReaderBuilder { } impl RowGroupReaderBuilder { + /// Region Id of given file. + pub(crate) fn region_id(&self) -> RegionId { + self.file_location.region_id() + } + /// Path of the file to read. pub(crate) fn file_path(&self) -> &str { self.file_location.file_path() @@ -1211,28 +1217,19 @@ impl BatchReader for ParquetReader { impl Drop for ParquetReader { fn drop(&mut self) { let metrics = self.reader_state.metrics(); - // todo(hl): we should carry region info and time range info in location variants. - // debug!( - // "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}", - // self.context - // .reader_builder() - // .location - // .file_handle - // .region_id(), - // self.context.reader_builder().location.file_id(), - // self.context - // .reader_builder() - // .location - // .file_handle - // .time_range(), - // metrics.filter_metrics.num_row_groups_before_filtering - // - metrics - // .filter_metrics - // .num_row_groups_inverted_index_filtered - // - metrics.filter_metrics.num_row_groups_min_max_filtered, - // metrics.filter_metrics.num_row_groups_before_filtering, - // metrics - // ); + //todo(hl): Also carry time range when we implement the memory parquet. + debug!( + "Read parquet {} {}, {}/{} row groups, metrics: {:?}", + self.context.reader_builder().region_id(), + self.context.reader_builder().file_id(), + metrics.filter_metrics.num_row_groups_before_filtering + - metrics + .filter_metrics + .num_row_groups_inverted_index_filtered + - metrics.filter_metrics.num_row_groups_min_max_filtered, + metrics.filter_metrics.num_row_groups_before_filtering, + metrics + ); // Report metrics. READ_STAGE_ELAPSED diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 42584abcc8b4..63f4b3b24d09 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -34,20 +34,24 @@ use crate::metrics::READ_STAGE_FETCH_PAGES; use crate::sst::parquet::page_reader::RowGroupCachedReader; use crate::sst::parquet::reader::Location; +/// Location of row group. struct RowGroupLocation<'a> { file_location: &'a Location, row_group_idx: usize, } impl<'a> RowGroupLocation<'a> { - async fn fetch_bytes(&self, ranges: &[Range]) -> parquet::errors::Result> { + /// Fetches bytes from given ranges. + async fn fetch_bytes(&self, ranges: &[Range]) -> Result> { self.file_location.fetch_bytes(ranges).await } + /// Returns true if cache manager is enabled for given [RowGroupLocation]. fn has_cache_manager(&self) -> bool { self.file_location.cache_manager().is_some() } + /// Gets page value of given column index in row group. fn get_cache(&self, col_idx: usize, compressed: bool) -> Option> { let Location::Sst(sst) = &self.file_location else { return None; @@ -63,6 +67,7 @@ impl<'a> RowGroupLocation<'a> { cache.get_pages(&page_key) } + /// Puts compressed pages to page cache. fn put_compressed_to_cache(&self, col_idx: usize, data: Bytes) { let Location::Sst(sst) = &self.file_location else { return; @@ -83,6 +88,7 @@ impl<'a> RowGroupLocation<'a> { ); } + /// Puts uncompressed pages to page cache. fn put_uncompressed_to_cache(&self, col_idx: usize, data: Arc) { let Location::Sst(sst) = &self.file_location else { return; @@ -118,6 +124,7 @@ pub struct InMemoryRowGroup<'a> { } impl<'a> InMemoryRowGroup<'a> { + /// Creates a new [InMemoryRowGroup] with given file location. pub fn create( row_group_idx: usize, parquet_meta: &'a ParquetMetaData, @@ -187,7 +194,6 @@ impl<'a> InMemoryRowGroup<'a> { .collect::>(); let mut chunk_data = self.location.fetch_bytes(&fetch_ranges).await?.into_iter(); - let mut page_start_offsets = page_start_offsets.into_iter(); for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {