Skip to content

Commit

Permalink
refactor/separate-paraquet-reader: Refactor column page reader creati…
Browse files Browse the repository at this point in the history
…on and remove unused code

 • Centralize creation of SerializedPageReader in RowGroupBase::column_reader method.
 • Remove unused RowGroupCachedReader and related code from MemtableRowGroupPageFetcher.
 • Eliminate redundant error handling for invalid column index in multiple places.
  • Loading branch information
v0y4g3r committed Nov 15, 2024
1 parent 80b6f9d commit 0d0d634
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 44 deletions.
31 changes: 3 additions & 28 deletions src/mito2/src/memtable/bulk/row_group_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@ use datatypes::arrow::error::ArrowError;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowGroups, RowSelection};
use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
use parquet::column::page::{PageIterator, PageReader};
use parquet::errors::ParquetError;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::reader::SerializedPageReader;
use snafu::ResultExt;

use crate::error;
use crate::error::ReadDataPartSnafu;
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::page_reader::RowGroupCachedReader;
use crate::sst::parquet::reader::{RowGroupReaderBase, RowGroupReaderContext};
use crate::sst::parquet::row_group::{ColumnChunkIterator, RowGroupBase};
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
Expand Down Expand Up @@ -62,6 +59,7 @@ impl<'a> MemtableRowGroupPageFetcher<'a> {
page_locations,
row_count,
column_chunks: vec![None; metadata.columns().len()],
// the cached `column_uncompressed_pages` would never be used in Memtable readers.
column_uncompressed_pages: vec![None; metadata.columns().len()],
},
bytes,
Expand Down Expand Up @@ -102,31 +100,8 @@ impl<'a> MemtableRowGroupPageFetcher<'a> {

/// Creates a page reader to read column at `i`.
fn column_page_reader(&self, i: usize) -> parquet::errors::Result<Box<dyn PageReader>> {
if let Some(cached_pages) = &self.base.column_uncompressed_pages[i] {
debug_assert!(!cached_pages.row_group.is_empty());
// Hits the row group level page cache.
return Ok(Box::new(RowGroupCachedReader::new(&cached_pages.row_group)));
}

let page_reader = match &self.base.column_chunks[i] {
None => {
return Err(ParquetError::General(format!(
"Invalid column index {i}, column was not fetched"
)))
}
Some(data) => {
let page_locations = self.base.page_locations.map(|index| index[i].clone());
SerializedPageReader::new(
data.clone(),
self.base.metadata.column(i),
self.base.row_count,
page_locations,
)?
}
};

// This column don't cache uncompressed pages.
Ok(Box::new(page_reader))
let reader = self.base.column_reader(i)?;
Ok(Box::new(reader))
}
}

Expand Down
43 changes: 27 additions & 16 deletions src/mito2/src/sst/parquet/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,32 @@ impl<'a> RowGroupBase<'a> {
}
res
}

/// Create [PageReader] from [RowGroupBase::column_chunks]
pub(crate) fn column_reader(
&self,
col_idx: usize,
) -> Result<SerializedPageReader<ColumnChunkData>> {
let page_reader = match &self.column_chunks[col_idx] {
None => {
return Err(ParquetError::General(format!(
"Invalid column index {col_idx}, column was not fetched"
)))
}
Some(data) => {
let page_locations = self.page_locations.map(|index| index[col_idx].clone());
SerializedPageReader::new(
data.clone(),
self.metadata.column(col_idx),
self.row_count,
page_locations,
)?
}
};

// This column don't cache uncompressed pages.
Ok(page_reader)
}
}

/// An in-memory collection of column chunks
Expand Down Expand Up @@ -369,22 +395,7 @@ impl<'a> InMemoryRowGroup<'a> {
return Ok(Box::new(RowGroupCachedReader::new(&cached_pages.row_group)));
}

let page_reader = match &self.base.column_chunks[i] {
None => {
return Err(ParquetError::General(format!(
"Invalid column index {i}, column was not fetched"
)))
}
Some(data) => {
let page_locations = self.base.page_locations.map(|index| index[i].clone());
SerializedPageReader::new(
data.clone(),
self.base.metadata.column(i),
self.base.row_count,
page_locations,
)?
}
};
let page_reader = self.base.column_reader(i)?;

let Some(cache) = &self.cache_manager else {
return Ok(Box::new(page_reader));
Expand Down

0 comments on commit 0d0d634

Please sign in to comment.