Skip to content

Commit

Permalink
refactor: use parquet lru reader as trait object
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Nov 19, 2024
1 parent 2e3b9ae commit dda70d4
Show file tree
Hide file tree
Showing 19 changed files with 263 additions and 309 deletions.
4 changes: 1 addition & 3 deletions bindings/python/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::Arc;

use parquet_lru::NoopCache;
use pyo3::{
prelude::*,
pyclass, pymethods,
Expand All @@ -10,7 +9,6 @@ use pyo3::{
use pyo3_asyncio::tokio::{future_into_py, get_runtime};
use tonbo::{
executor::tokio::TokioExecutor,
fs::FileId,
record::{ColumnDesc, DynRecord},
DB,
};
Expand All @@ -30,7 +28,7 @@ type PyExecutor = TokioExecutor;
pub struct TonboDB {
desc: Arc<Vec<Column>>,
primary_key_index: usize,
db: Arc<DB<DynRecord, PyExecutor, NoopCache<FileId>>>,
db: Arc<DB<DynRecord, PyExecutor>>,
}

#[pymethods]
Expand Down
19 changes: 9 additions & 10 deletions bindings/python/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::{mem::transmute, sync::Arc};

use parquet_lru::NoopCache;
use pyo3::{
pyclass, pymethods,
types::{PyAnyMethods, PyMapping, PyMappingMethods, PySequenceMethods, PyTuple},
Bound, IntoPy, Py, PyAny, PyResult, Python,
};
use pyo3_asyncio::tokio::future_into_py;
use tonbo::{fs::FileId, record::DynRecord, transaction, Projection};
use tonbo::{record::DynRecord, transaction, Projection};

use crate::{
column::Column,
Expand All @@ -19,14 +18,14 @@ use crate::{

#[pyclass]
pub struct Transaction {
txn: Option<transaction::Transaction<'static, DynRecord, NoopCache<FileId>>>,
txn: Option<transaction::Transaction<'static, DynRecord>>,
desc: Arc<Vec<Column>>,
primary_key_index: usize,
}

impl Transaction {
pub(crate) fn new<'txn>(
txn: transaction::Transaction<'txn, DynRecord, NoopCache<FileId>>,
txn: transaction::Transaction<'txn, DynRecord>,
desc: Arc<Vec<Column>>,
) -> Self {
let primary_key_index = desc
Expand All @@ -38,8 +37,8 @@ impl Transaction {
Transaction {
txn: Some(unsafe {
transmute::<
transaction::Transaction<'txn, DynRecord, NoopCache<FileId>>,
transaction::Transaction<'static, DynRecord, NoopCache<FileId>>,
transaction::Transaction<'txn, DynRecord>,
transaction::Transaction<'static, DynRecord>,
>(txn)
}),
desc,
Expand Down Expand Up @@ -85,8 +84,8 @@ impl Transaction {
let txn = self.txn.as_ref().unwrap();
let txn = unsafe {
transmute::<
&transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
&'static transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
&transaction::Transaction<'_, DynRecord>,
&'static transaction::Transaction<'_, DynRecord>,
>(txn)
};

Expand Down Expand Up @@ -170,8 +169,8 @@ impl Transaction {
let txn = self.txn.as_ref().unwrap();
let txn = unsafe {
transmute::<
&transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
&'static transaction::Transaction<'_, DynRecord, NoopCache<FileId>>,
&transaction::Transaction<'_, DynRecord>,
&'static transaction::Transaction<'_, DynRecord>,
>(txn)
};
let col_desc = self.desc.get(self.primary_key_index).unwrap();
Expand Down
8 changes: 4 additions & 4 deletions examples/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::{
use fusio::path::Path;
use futures_core::Stream;
use futures_util::StreamExt;
use parquet_lru::NoopCache;
use parquet_lru::NoCache;
use tokio::fs;
use tonbo::{
executor::tokio::TokioExecutor, fs::FileId, inmem::immutable::ArrowArrays, record::Record,
Expand All @@ -43,12 +43,12 @@ pub struct Music {
}

struct MusicProvider {
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
db: Arc<DB<Music, TokioExecutor, NoCache<FileId>>>,
}

struct MusicExec {
cache: PlanProperties,
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
db: Arc<DB<Music, TokioExecutor, NoCache<FileId>>>,
projection: Option<Vec<usize>>,
limit: Option<usize>,
range: (Bound<<Music as Record>::Key>, Bound<<Music as Record>::Key>),
Expand Down Expand Up @@ -98,7 +98,7 @@ impl TableProvider for MusicProvider {

impl MusicExec {
fn new(
db: Arc<DB<Music, TokioExecutor, NoopCache<FileId>>>,
db: Arc<DB<Music, TokioExecutor, NoCache<FileId>>>,
projection: Option<&Vec<usize>>,
) -> Self {
let schema = Music::arrow_schema();
Expand Down
3 changes: 1 addition & 2 deletions parquet-lru/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ name = "parquet-lru"
version = "0.1.0"

[features]
default = []
foyer = ["dep:foyer", "dep:serde"]
full = ["foyer"]

[dependencies]
bytes = { version = "1.8.0", features = ["serde"] }
Expand All @@ -17,4 +17,3 @@ futures-core = "0.3.31"
futures-util = "0.3.31"
parquet = { version = "53.2.0", features = ["async"] }
serde = { version = "1.0.214", optional = true }
thiserror = "2.0.3"
49 changes: 49 additions & 0 deletions parquet-lru/src/dyn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::{ops::Range, sync::Arc};

use bytes::Bytes;
use futures_core::future::BoxFuture;
use parquet::{
arrow::async_reader::AsyncFileReader, errors::Result, file::metadata::ParquetMetaData,
};

use crate::LruCache;

pub struct BoxedFileReader {
inner: Box<dyn AsyncFileReader>,
}

impl BoxedFileReader {
pub fn new<T: AsyncFileReader + 'static>(inner: T) -> Self {
Self {
inner: Box::new(inner),
}
}
}

impl AsyncFileReader for BoxedFileReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
self.inner.get_bytes(range)
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
self.inner.get_metadata()
}

fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>> {
self.inner.get_byte_ranges(ranges)
}
}

pub trait DynLruCache<K> {
fn get_reader(&self, key: K, reader: BoxedFileReader) -> BoxFuture<'_, BoxedFileReader>;
}

impl<K, C> DynLruCache<K> for C
where
K: 'static + Send,
C: LruCache<K> + Sized + Send + Sync,
{
fn get_reader(&self, key: K, reader: BoxedFileReader) -> BoxFuture<'_, BoxedFileReader> {
Box::pin(async move { BoxedFileReader::new(self.get_reader(key, reader).await) })
}
}
25 changes: 8 additions & 17 deletions parquet-lru/src/foyer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use parquet::{
};
use serde::{Deserialize, Serialize};

use crate::{Error, LruCache, Options};
use crate::LruCache;

#[derive(Clone)]
pub struct FoyerCache<K>
Expand All @@ -32,23 +32,14 @@ impl<K> LruCache<K> for FoyerCache<K>
where
for<'a> K: Send + Sync + Hash + Eq + Serialize + Deserialize<'a> + Clone + 'static,
{
type LruReader<R: AsyncFileReader + 'static> = FoyerReader<K, R>;
type LruReader<R> = FoyerReader<K, R>
where
R: AsyncFileReader + 'static;

async fn new(options: Options) -> Result<Self, Error> {
Ok(Self {
inner: Arc::new(FoyerCacheInner {
meta: foyer::CacheBuilder::new(options.meta_capacity).build(),
data: foyer::HybridCacheBuilder::new()
.memory(options.data_capacity)
.storage(foyer::Engine::Large)
.build()
.await
.map_err(|e| Error::External(e.into()))?,
}),
})
}

async fn get_reader<R: AsyncFileReader>(&self, key: K, reader: R) -> FoyerReader<K, R> {
async fn get_reader<R>(&self, key: K, reader: R) -> FoyerReader<K, R>
where
R: AsyncFileReader,
{
FoyerReader::new(self.clone(), key, reader)
}
}
Expand Down
77 changes: 35 additions & 42 deletions parquet-lru/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,56 @@
mod r#dyn;
#[cfg(feature = "foyer")]
pub mod foyer;

use std::{future::Future, marker::PhantomData};

use parquet::{arrow::async_reader::AsyncFileReader, errors::Result};
use thiserror::Error;
use parquet::arrow::async_reader::AsyncFileReader;

#[derive(Default)]
pub struct Options {
meta_capacity: usize,
data_capacity: usize,
}

impl Options {
pub fn meta_capacity(mut self, meta_capacity: usize) -> Self {
self.meta_capacity = meta_capacity;
self
}

pub fn data_capacity(mut self, data_capacity: usize) -> Self {
self.data_capacity = data_capacity;
self
}
}
pub use crate::r#dyn::*;

pub trait LruCache<K>: Clone + Send + Sync + 'static {
type LruReader<R: AsyncFileReader + 'static>: AsyncFileReader + 'static;

fn new(options: Options) -> impl Future<Output = Result<Self, Error>> + Send;
pub trait LruCache<K>
where
K: 'static,
{
type LruReader<R>: AsyncFileReader + 'static
where
R: AsyncFileReader + 'static;

fn get_reader<R>(&self, key: K, reader: R) -> impl Future<Output = Self::LruReader<R>> + Send
where
R: AsyncFileReader + 'static;
}

#[derive(Clone, Default)]
pub struct NoopCache<K> {
#[derive(Default)]
pub struct NoCache<K> {
_phantom: PhantomData<K>,
}

impl<K> LruCache<K> for NoopCache<K>
where
K: Send + Sync + Clone + 'static,
{
type LruReader<R: AsyncFileReader + 'static> = R;

async fn new(_options: Options) -> Result<Self, Error> {
Ok(Self {
impl<K> Clone for NoCache<K> {
fn clone(&self) -> Self {
Self {
_phantom: PhantomData,
})
}

async fn get_reader<R: AsyncFileReader>(&self, _key: K, reader: R) -> R {
reader
}
}
}

#[derive(Debug, Error)]
pub enum Error {
#[error("External lru implementation error: {0}")]
External(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
unsafe impl<K> Send for NoCache<K> {}

unsafe impl<K> Sync for NoCache<K> {}

impl<K> LruCache<K> for NoCache<K>
where
K: 'static,
{
type LruReader<R> = R
where
R: AsyncFileReader + 'static;

#[allow(clippy::manual_async_fn)]
fn get_reader<R>(&self, _key: K, reader: R) -> impl Future<Output = R> + Send
where
R: AsyncFileReader,
{
async move { reader }
}
}
Loading

0 comments on commit dda70d4

Please sign in to comment.