Skip to content

Commit

Permalink
feat/memory-decoder:
Browse files Browse the repository at this point in the history
 Refactor Parquet reader and row group handling in mito2

 - Make `Location` enum public and add async byte fetching method
 - Introduce `RowGroupLocation` to manage row group specific operations
 - Move byte fetching logic from `Sst` to `Location` and `RowGroupLocation`
 - Change `Location` fields and methods to `pub(crate)` for internal use
 - Update `RowGroupReaderBuilder` to use `file_location` instead of `location`
 - Add `fetch_bytes` and cache management methods to `RowGroupLocation`
 - Simplify `InMemoryRowGroup` creation with new `create` method
 - Adjust tests to reflect changes in Parquet reader and row group creation
  • Loading branch information
v0y4g3r committed Nov 8, 2024
1 parent 74c0f2d commit d7fb332
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 193 deletions.
135 changes: 89 additions & 46 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ use parquet::arrow::arrow_reader::{
ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowSelection,
};
use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
use parquet::errors::ParquetError;
use parquet::file::metadata::ParquetMetaData;
use parquet::format::KeyValue;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::{ColumnId, RegionId};
use table::predicate::Predicate;

use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::CacheManagerRef;
use crate::error;
use crate::error::{
Expand All @@ -61,6 +63,7 @@ use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::helper::fetch_byte_ranges;
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::InMemoryRowGroup;
use crate::sst::parquet::row_selection::{
Expand Down Expand Up @@ -248,7 +251,7 @@ impl ParquetReaderBuilder {
parquet_meta,
projection: projection_mask,
field_levels,
location: self.location.clone(),
file_location: self.location.clone(),
};

let mut filters = if let Some(predicate) = &self.predicate {
Expand Down Expand Up @@ -809,7 +812,7 @@ impl ReaderMetrics {

/// Location of Parquet file.
#[derive(Clone)]
enum Location {
pub enum Location {
/// SST file
Sst(Sst),
/// In memory Parquet file.
Expand All @@ -818,7 +821,7 @@ enum Location {

impl Location {
/// Returns true if inverted index is enabled.
fn inverted_index_available(&self) -> bool {
pub(crate) fn inverted_index_available(&self) -> bool {
match self {
Location::Sst(sst) => sst.file_handle.meta_ref().inverted_index_available(),
Location::Memory(_) => {
Expand All @@ -829,7 +832,7 @@ impl Location {
}

/// Returns true if fulltext index is enabled.
fn fulltext_index_available(&self) -> bool {
pub(crate) fn fulltext_index_available(&self) -> bool {
match self {
Location::Sst(sst) => sst.file_handle.meta_ref().fulltext_index_available(),
Location::Memory(_) => {
Expand All @@ -840,39 +843,49 @@ impl Location {
}

/// Returns the region id of file.
fn region_id(&self) -> RegionId {
pub(crate) fn region_id(&self) -> RegionId {
match self {
Location::Sst(sst) => sst.file_handle.region_id(),
Location::Memory(memory) => memory.region_id,
}
}

/// Path of the file to read.
fn file_path(&self) -> &str {
pub(crate) fn file_path(&self) -> &str {
match self {
Location::Sst(sst) => &sst.file_path,
Location::Memory(_) => "MEMORY",
}
}

/// Handle of the file to read.
fn file_id(&self) -> FileId {
pub(crate) fn file_id(&self) -> FileId {
match self {
Location::Sst(sst) => sst.file_handle.file_id(),
Location::Memory(_) => FileId::default(),
}
}

/// Returns the [CacheManagerRef] if enabled.
fn cache_manager(&self) -> &Option<CacheManagerRef> {
pub(crate) fn cache_manager(&self) -> &Option<CacheManagerRef> {
match self {
Location::Sst(sst) => &sst.cache_manager,
Location::Memory(_) => &None,
}
}

pub(crate) async fn fetch_bytes(
&self,
ranges: &[Range<u64>],
) -> parquet::errors::Result<Vec<Bytes>> {
match self {
Location::Sst(sst) => sst.fetch_bytes(ranges).await,
Location::Memory(mem) => mem.fetch_bytes(ranges),
}
}

/// Reads Parquet metadata from current location.
async fn read_parquet_metadata(&self) -> Result<Arc<ParquetMetaData>> {
pub(crate) async fn read_parquet_metadata(&self) -> Result<Arc<ParquetMetaData>> {
match self {
Location::Sst(sst) => sst.read_parquet_metadata().await,
Location::Memory(memory) => {
Expand All @@ -885,30 +898,40 @@ impl Location {
}

#[derive(Clone)]
struct Memory {
pub struct Memory {
/// Region for the memory Parquet file
region_id: RegionId,
/// Data for the memory Parquety file.
data: Bytes,
pub(crate) region_id: RegionId,
/// Data for the memory Parquet file.
pub(crate) data: Bytes,
}

impl Memory {
pub(crate) fn fetch_bytes(&self, ranges: &[Range<u64>]) -> parquet::errors::Result<Vec<Bytes>> {
let data = ranges
.iter()
.map(|range| self.data.slice(range.start as usize..range.end as usize))
.collect();
Ok(data)
}
}

#[derive(Clone)]
struct Sst {
pub struct Sst {
/// SST file to read.
///
/// Holds the file handle to avoid the file purge it.
file_handle: FileHandle,
pub(crate) file_handle: FileHandle,
/// Path of the file.
file_path: String,
pub(crate) file_path: String,
/// Object store as an Operator.
object_store: ObjectStore,
pub(crate) object_store: ObjectStore,
/// Cache.
cache_manager: Option<CacheManagerRef>,
pub(crate) cache_manager: Option<CacheManagerRef>,
}

impl Sst {
/// Reads parquet metadata of specific file.
async fn read_parquet_metadata(&self) -> error::Result<Arc<ParquetMetaData>> {
async fn read_parquet_metadata(&self) -> Result<Arc<ParquetMetaData>> {
let _t = READ_STAGE_ELAPSED
.with_label_values(&["read_parquet_metadata"])
.start_timer();
Expand Down Expand Up @@ -937,6 +960,44 @@ impl Sst {
}
Ok(metadata)
}

/// Try to fetch data from WriteCache,
/// if not in WriteCache, fetch data from object store directly.
pub(crate) async fn fetch_bytes(
&self,
ranges: &[Range<u64>],
) -> parquet::errors::Result<Vec<Bytes>> {
let key = IndexKey::new(
self.file_handle.region_id(),
self.file_handle.file_id(),
FileType::Parquet,
);
match self.fetch_ranges_from_write_cache(key, ranges).await {
Some(data) => Ok(data),
None => {
// Fetch data from object store.
let _timer = READ_STAGE_ELAPSED
.with_label_values(&["cache_miss_read"])
.start_timer();
fetch_byte_ranges(&self.file_path, self.object_store.clone(), ranges)
.await
.map_err(|e| ParquetError::External(Box::new(e)))
}
}
}

/// Fetches data from write cache.
/// Returns `None` if the data is not in the cache.
async fn fetch_ranges_from_write_cache(
&self,
key: IndexKey,
ranges: &[Range<u64>],
) -> Option<Vec<Bytes>> {
if let Some(cache) = self.cache_manager.as_ref()?.write_cache() {
return cache.file_cache().read_ranges(key, ranges).await;
}
None
}
}

/// Builder to build a [ParquetRecordBatchReader] for a row group from file.
Expand All @@ -947,22 +1008,23 @@ pub(crate) struct RowGroupReaderBuilder {
projection: ProjectionMask,
/// Field levels to read.
field_levels: FieldLevels,
location: Location,
/// Location of Parquet file.
file_location: Location,
}

impl RowGroupReaderBuilder {
/// Path of the file to read.
pub(crate) fn file_path(&self) -> &str {
&self.location.file_path()
self.file_location.file_path()
}

/// Id of the file to read.
pub(crate) fn file_id(&self) -> FileId {
self.location.file_id()
self.file_location.file_id()
}

pub(crate) fn cache_manager(&self) -> &Option<CacheManagerRef> {
self.location.cache_manager()
self.file_location.cache_manager()
}

pub(crate) fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
Expand All @@ -975,34 +1037,15 @@ impl RowGroupReaderBuilder {
row_group_idx: usize,
row_selection: Option<RowSelection>,
) -> Result<ParquetRecordBatchReader> {
let mut row_group = match &self.location {
Location::Sst(Sst {
file_handle,
file_path,
object_store,
cache_manager,
}) => InMemoryRowGroup::create_sst(
file_handle.region_id(),
file_handle.file_id(),
&self.parquet_meta,
row_group_idx,
cache_manager.clone(),
&file_path,
object_store.clone(),
),
Location::Memory(memory) => InMemoryRowGroup::create_memory(
row_group_idx,
&self.parquet_meta,
memory.data.clone(),
),
};
let mut row_group =
InMemoryRowGroup::create(row_group_idx, &self.parquet_meta, &self.file_location);

// Fetches data into memory.
row_group
.fetch(&self.projection, row_selection.as_ref())
.await
.context(ReadParquetSnafu {
path: self.location.file_path(),
path: self.file_location.file_path(),
})?;

// Builds the parquet reader.
Expand All @@ -1014,7 +1057,7 @@ impl RowGroupReaderBuilder {
row_selection,
)
.context(ReadParquetSnafu {
path: self.location.file_path(),
path: self.file_location.file_path(),
})
}
}
Expand Down
Loading

0 comments on commit d7fb332

Please sign in to comment.