From 0d0d6340f1aaed4e8d578b3a4358e56f9666396e Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Thu, 14 Nov 2024 16:19:04 -0800 Subject: [PATCH] refactor/separate-paraquet-reader: Refactor column page reader creation and remove unused code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Centralize creation of SerializedPageReader in RowGroupBase::column_reader method. • Remove unused RowGroupCachedReader and related code from MemtableRowGroupPageFetcher. • Eliminate redundant error handling for invalid column index in multiple places. --- .../src/memtable/bulk/row_group_reader.rs | 31 ++----------- src/mito2/src/sst/parquet/row_group.rs | 43 ++++++++++++------- 2 files changed, 30 insertions(+), 44 deletions(-) diff --git a/src/mito2/src/memtable/bulk/row_group_reader.rs b/src/mito2/src/memtable/bulk/row_group_reader.rs index b2e368a73c71..14c3fbe68b83 100644 --- a/src/mito2/src/memtable/bulk/row_group_reader.rs +++ b/src/mito2/src/memtable/bulk/row_group_reader.rs @@ -21,16 +21,13 @@ use datatypes::arrow::error::ArrowError; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowGroups, RowSelection}; use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask}; use parquet::column::page::{PageIterator, PageReader}; -use parquet::errors::ParquetError; use parquet::file::metadata::ParquetMetaData; -use parquet::file::reader::SerializedPageReader; use snafu::ResultExt; use crate::error; use crate::error::ReadDataPartSnafu; use crate::memtable::bulk::context::BulkIterContextRef; use crate::sst::parquet::format::ReadFormat; -use crate::sst::parquet::page_reader::RowGroupCachedReader; use crate::sst::parquet::reader::{RowGroupReaderBase, RowGroupReaderContext}; use crate::sst::parquet::row_group::{ColumnChunkIterator, RowGroupBase}; use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; @@ -62,6 +59,7 @@ impl<'a> MemtableRowGroupPageFetcher<'a> { page_locations, row_count, column_chunks: vec![None; metadata.columns().len()], + // the cached `column_uncompressed_pages` would never be used in Memtable readers. column_uncompressed_pages: vec![None; metadata.columns().len()], }, bytes, @@ -102,31 +100,8 @@ impl<'a> MemtableRowGroupPageFetcher<'a> { /// Creates a page reader to read column at `i`. fn column_page_reader(&self, i: usize) -> parquet::errors::Result> { - if let Some(cached_pages) = &self.base.column_uncompressed_pages[i] { - debug_assert!(!cached_pages.row_group.is_empty()); - // Hits the row group level page cache. - return Ok(Box::new(RowGroupCachedReader::new(&cached_pages.row_group))); - } - - let page_reader = match &self.base.column_chunks[i] { - None => { - return Err(ParquetError::General(format!( - "Invalid column index {i}, column was not fetched" - ))) - } - Some(data) => { - let page_locations = self.base.page_locations.map(|index| index[i].clone()); - SerializedPageReader::new( - data.clone(), - self.base.metadata.column(i), - self.base.row_count, - page_locations, - )? - } - }; - - // This column don't cache uncompressed pages. - Ok(Box::new(page_reader)) + let reader = self.base.column_reader(i)?; + Ok(Box::new(reader)) } } diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index b8d28e6b84cd..54266ae70858 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -188,6 +188,32 @@ impl<'a> RowGroupBase<'a> { } res } + + /// Create [PageReader] from [RowGroupBase::column_chunks] + pub(crate) fn column_reader( + &self, + col_idx: usize, + ) -> Result> { + let page_reader = match &self.column_chunks[col_idx] { + None => { + return Err(ParquetError::General(format!( + "Invalid column index {col_idx}, column was not fetched" + ))) + } + Some(data) => { + let page_locations = self.page_locations.map(|index| index[col_idx].clone()); + SerializedPageReader::new( + data.clone(), + self.metadata.column(col_idx), + self.row_count, + page_locations, + )? + } + }; + + // This column don't cache uncompressed pages. + Ok(page_reader) + } } /// An in-memory collection of column chunks @@ -369,22 +395,7 @@ impl<'a> InMemoryRowGroup<'a> { return Ok(Box::new(RowGroupCachedReader::new(&cached_pages.row_group))); } - let page_reader = match &self.base.column_chunks[i] { - None => { - return Err(ParquetError::General(format!( - "Invalid column index {i}, column was not fetched" - ))) - } - Some(data) => { - let page_locations = self.base.page_locations.map(|index| index[i].clone()); - SerializedPageReader::new( - data.clone(), - self.base.metadata.column(i), - self.base.row_count, - page_locations, - )? - } - }; + let page_reader = self.base.column_reader(i)?; let Some(cache) = &self.cache_manager else { return Ok(Box::new(page_reader));