Skip to content

Commit

Permalink
refactor/separate-paraquet-reader: Refactor RowGroupReader traits and…
Browse files Browse the repository at this point in the history
… implementations in memtable and parquet reader modules

 • Rename RowGroupReaderVirtual to RowGroupReaderContext for clarity.
 • Replace BulkPartVirt with direct usage of BulkIterContextRef in MemtableRowGroupReader.
 • Simplify MemtableRowGroupReaderBuilder by directly passing context instead of creating a BulkPartVirt instance.
 • Update RowGroupReaderBase to use context field instead of virt, reflecting the trait renaming and usage.
 • Modify FileRangeVirt to FileRangeContextRef and adjust implementations accordingly.
  • Loading branch information
v0y4g3r committed Nov 14, 2024
1 parent ae32bc5 commit 80b6f9d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 35 deletions.
19 changes: 5 additions & 14 deletions src/mito2/src/memtable/bulk/row_group_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::error::ReadDataPartSnafu;
use crate::memtable::bulk::context::BulkIterContextRef;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::page_reader::RowGroupCachedReader;
use crate::sst::parquet::reader::{RowGroupReaderBase, RowGroupReaderVirtual};
use crate::sst::parquet::reader::{RowGroupReaderBase, RowGroupReaderContext};
use crate::sst::parquet::row_group::{ColumnChunkIterator, RowGroupBase};
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;

Expand Down Expand Up @@ -142,11 +142,7 @@ impl RowGroups for MemtableRowGroupPageFetcher<'_> {
}
}

pub(crate) struct BulkPartVirt {
context: BulkIterContextRef,
}

impl RowGroupReaderVirtual for BulkPartVirt {
impl RowGroupReaderContext for BulkIterContextRef {
fn map_result(
&self,
result: Result<Option<RecordBatch>, ArrowError>,
Expand All @@ -155,11 +151,11 @@ impl RowGroupReaderVirtual for BulkPartVirt {
}

fn read_format(&self) -> &ReadFormat {
self.context.read_format()
self.as_ref().read_format()
}
}

pub(crate) type MemtableRowGroupReader = RowGroupReaderBase<BulkPartVirt>;
pub(crate) type MemtableRowGroupReader = RowGroupReaderBase<BulkIterContextRef>;

pub(crate) struct MemtableRowGroupReaderBuilder {
context: BulkIterContextRef,
Expand Down Expand Up @@ -213,11 +209,6 @@ impl MemtableRowGroupReaderBuilder {
row_selection,
)
.context(ReadDataPartSnafu)?;
Ok(MemtableRowGroupReader::create(
BulkPartVirt {
context: self.context.clone(),
},
reader,
))
Ok(MemtableRowGroupReader::create(self.context.clone(), reader))
}
}
37 changes: 16 additions & 21 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1119,9 +1119,9 @@ impl ParquetReader {
}
}

/// RowGroupReaderVirtual represents the fields that cannot be shared
/// RowGroupReaderContext represents the fields that cannot be shared
/// between different `RowGroupReader`s.
pub(crate) trait RowGroupReaderVirtual: Send {
pub(crate) trait RowGroupReaderContext: Send {
fn map_result(
&self,
result: std::result::Result<Option<RecordBatch>, ArrowError>,
Expand All @@ -1130,34 +1130,29 @@ pub(crate) trait RowGroupReaderVirtual: Send {
fn read_format(&self) -> &ReadFormat;
}

pub(crate) struct FileRangeVirt {
/// Context for file ranges.
pub(crate) context: FileRangeContextRef,
}

impl RowGroupReaderVirtual for FileRangeVirt {
impl RowGroupReaderContext for FileRangeContextRef {
fn map_result(
&self,
result: std::result::Result<Option<RecordBatch>, ArrowError>,
) -> Result<Option<RecordBatch>> {
result.context(ArrowReaderSnafu {
path: self.context.file_path(),
path: self.file_path(),
})
}

fn read_format(&self) -> &ReadFormat {
self.context.read_format()
self.as_ref().read_format()
}
}

/// [RowGroupReader] that reads from [FileRange].
pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeVirt>;
pub(crate) type RowGroupReader = RowGroupReaderBase<FileRangeContextRef>;

impl RowGroupReader {
/// Creates a new reader from file range.
pub(crate) fn new(context: FileRangeContextRef, reader: ParquetRecordBatchReader) -> Self {
Self {
virt: FileRangeVirt { context },
context,
reader,
batches: VecDeque::new(),
metrics: ReaderMetrics::default(),
Expand All @@ -1167,8 +1162,8 @@ impl RowGroupReader {

/// Reader to read a row group of a parquet file.
pub(crate) struct RowGroupReaderBase<T> {
/// Virtual parts of [RowGroupReader] so adapts to different underlying implementation.
virt: T,
/// Context of [RowGroupReader] so adapts to different underlying implementation.
context: T,
/// Inner parquet reader.
reader: ParquetRecordBatchReader,
/// Buffered batches to return.
Expand All @@ -1179,12 +1174,12 @@ pub(crate) struct RowGroupReaderBase<T> {

impl<T> RowGroupReaderBase<T>
where
T: RowGroupReaderVirtual,
T: RowGroupReaderContext,
{
/// Creates a new reader.
pub(crate) fn create(virt: T, reader: ParquetRecordBatchReader) -> Self {
pub(crate) fn create(context: T, reader: ParquetRecordBatchReader) -> Self {
Self {
virt,
context,
reader,
batches: VecDeque::new(),
metrics: ReaderMetrics::default(),
Expand All @@ -1198,12 +1193,12 @@ where

/// Gets [ReadFormat] of underlying reader.
pub(crate) fn read_format(&self) -> &ReadFormat {
self.virt.read_format()
self.context.read_format()
}

/// Tries to fetch next [RecordBatch] from the reader.
fn fetch_next_record_batch(&mut self) -> Result<Option<RecordBatch>> {
self.virt.map_result(self.reader.next().transpose())
self.context.map_result(self.reader.next().transpose())
}

/// Returns the next [Batch].
Expand All @@ -1223,7 +1218,7 @@ where
};
self.metrics.num_record_batches += 1;

self.virt
self.context
.read_format()
.convert_record_batch(&record_batch, &mut self.batches)?;
self.metrics.num_batches += self.batches.len();
Expand All @@ -1238,7 +1233,7 @@ where
#[async_trait::async_trait]
impl<T> BatchReader for RowGroupReaderBase<T>
where
T: RowGroupReaderVirtual,
T: RowGroupReaderContext,
{
async fn next_batch(&mut self) -> Result<Option<Batch>> {
self.next_inner()
Expand Down

0 comments on commit 80b6f9d

Please sign in to comment.