Skip to content

Commit

Permalink
chore: optimization with review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 11, 2024
1 parent 3b5e11a commit e00775b
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 55 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ resolver = "2"
version = "0.2.0"

[package.metadata]
msrv = "1.81.0"
msrv = "1.82.0"

[features]
bench = ["redb", "rocksdb", "sled"]
Expand Down Expand Up @@ -58,19 +58,19 @@ crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "42", optional = true }
flume = { version = "0.11", features = ["async"] }
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio", version = "0.3.1", features = [
fusio = { package = "fusio", version = "0.3.3", features = [
"aws",
"dyn",
"fs",
"object_store",
"tokio",
"tokio-http",
] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-dispatch", version = "0.2.1", features = [
fusio-dispatch = { package = "fusio-dispatch", version = "0.2.2", features = [
"aws",
"tokio",
] }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-parquet", version = "0.2.1" }
fusio-parquet = { package = "fusio-parquet", version = "0.2.2" }
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tonbo::{
record::{ColumnDesc, DynRecord},
DB,
};
use tonbo_ext_reader::foyer_reader::FoyerReader;
use tonbo_ext_reader::lru_reader::LruReader;
use crate::{
column::Column,
error::{CommitError, DbError},
Expand All @@ -27,7 +27,7 @@ type PyExecutor = TokioExecutor;
pub struct TonboDB {
desc: Arc<Vec<Column>>,
primary_key_index: usize,
db: Arc<DB<DynRecord, PyExecutor, FoyerReader>>,
db: Arc<DB<DynRecord, PyExecutor, LruReader>>,
}

#[pymethods]
Expand Down
18 changes: 9 additions & 9 deletions bindings/python/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use pyo3::{
};
use pyo3_asyncio::tokio::future_into_py;
use tonbo::{record::DynRecord, transaction, Projection};
use tonbo_ext_reader::foyer_reader::FoyerReader;
use tonbo_ext_reader::lru_reader::LruReader;
use crate::{
column::Column,
error::{repeated_commit_err, CommitError, DbError},
Expand All @@ -18,14 +18,14 @@ use crate::{

#[pyclass]
pub struct Transaction {
txn: Option<transaction::Transaction<'static, DynRecord, FoyerReader>>,
txn: Option<transaction::Transaction<'static, DynRecord, LruReader>>,
desc: Arc<Vec<Column>>,
primary_key_index: usize,
}

impl Transaction {
pub(crate) fn new<'txn>(
txn: transaction::Transaction<'txn, DynRecord, FoyerReader>,
txn: transaction::Transaction<'txn, DynRecord, LruReader>,
desc: Arc<Vec<Column>>,
) -> Self {
let primary_key_index = desc
Expand All @@ -37,8 +37,8 @@ impl Transaction {
Transaction {
txn: Some(unsafe {
transmute::<
transaction::Transaction<'txn, DynRecord, FoyerReader>,
transaction::Transaction<'static, DynRecord, FoyerReader>,
transaction::Transaction<'txn, DynRecord, LruReader>,
transaction::Transaction<'static, DynRecord, LruReader>,
>(txn)
}),
desc,
Expand Down Expand Up @@ -84,8 +84,8 @@ impl Transaction {
let txn = self.txn.as_ref().unwrap();
let txn = unsafe {
transmute::<
&transaction::Transaction<'_, DynRecord, FoyerReader>,
&'static transaction::Transaction<'_, DynRecord, FoyerReader>,
&transaction::Transaction<'_, DynRecord, LruReader>,
&'static transaction::Transaction<'_, DynRecord, LruReader>,
>(txn)
};

Expand Down Expand Up @@ -169,8 +169,8 @@ impl Transaction {
let txn = self.txn.as_ref().unwrap();
let txn = unsafe {
transmute::<
&transaction::Transaction<'_, DynRecord, FoyerReader>,
&'static transaction::Transaction<'_, DynRecord, FoyerReader>,
&transaction::Transaction<'_, DynRecord, LruReader>,
&'static transaction::Transaction<'_, DynRecord, LruReader>,
>(txn)
};
let col_desc = self.desc.get(self.primary_key_index).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.81.0"
channel = "1.82.0"
components = ["clippy", "rust-analyzer", "rustfmt"]
6 changes: 3 additions & 3 deletions src/inmem/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crossbeam_skiplist::{
map::{Entry, Range},
SkipMap,
};
use fusio::{buffered::BufWriter, dynamic::DynFile, DynFs};
use fusio::{buffered::BufWriter, DynFs, DynWrite};
use ulid::Ulid;

use crate::{
Expand Down Expand Up @@ -37,7 +37,7 @@ where
R: Record,
{
pub(crate) data: SkipMap<Timestamped<R::Key>, Option<R>>,
wal: Option<Mutex<WalFile<Box<dyn DynFile>, R>>>,
wal: Option<Mutex<WalFile<Box<dyn DynWrite>, R>>>,
pub(crate) trigger: Arc<Box<dyn Trigger<R> + Send + Sync>>,
}

Expand All @@ -61,7 +61,7 @@ where
)
.await?,
option.wal_buffer_size,
)) as Box<dyn DynFile>;
)) as Box<dyn DynWrite>;

wal = Some(Mutex::new(WalFile::new(file, file_id)));
};
Expand Down
8 changes: 4 additions & 4 deletions tonbo_ext_reader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,24 @@ bytes = { version = "1.7", features = ["serde"] }
foyer = { version = "0.12" }
futures-core = "0.3"
futures-util = "0.3"
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-parquet", version = "0.2.1" }
fusio-parquet = { package = "fusio-parquet", version = "0.2.2" }
lru = "0.12"
parking_lot = "0.12"
parquet = { version = "53", features = ["async"] }
thiserror = "1"
ulid = { version = "1", features = ["serde"] }

[dev-dependencies]
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio", version = "0.3.1", features = [
fusio = { package = "fusio", version = "0.3.3", features = [
"aws",
"dyn",
"fs",
"object_store",
"tokio",
"tokio-http",
] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "db305599ae88c7a934ab1ab235d8b3efda95a242", package = "fusio-dispatch", version = "0.2.1", features = [
fusio-dispatch = { package = "fusio-dispatch", version = "0.2.2", features = [
"tokio",
] }
tempfile = "3"
tokio = { version = "1", features = ["full"] }
tokio = { version = "1", features = ["full"] }
17 changes: 11 additions & 6 deletions tonbo_ext_reader/src/foyer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures_util::FutureExt;
use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData};
use ulid::Ulid;

use crate::{CacheError, CacheReader, MetaCache, RangeCache};
use crate::{CacheError, CacheReader, TonboCache};

#[derive(Debug, Clone)]
pub struct FoyerMetaCache(Cache<Ulid, Arc<ParquetMetaData>>);
Expand All @@ -24,17 +24,17 @@ pub struct FoyerReader {
meta_cache: FoyerMetaCache,
}

impl MetaCache for FoyerMetaCache {
fn get(&self, gen: &Ulid) -> Option<Arc<ParquetMetaData>> {
self.0.get(gen).map(|entry| entry.value().clone())
impl TonboCache<Ulid, Arc<ParquetMetaData>> for FoyerMetaCache {
async fn get(&self, gen: &Ulid) -> Result<Option<Arc<ParquetMetaData>>, CacheError> {
Ok(self.0.get(gen).map(|entry| entry.value().clone()))
}

fn insert(&self, gen: Ulid, data: Arc<ParquetMetaData>) -> Arc<ParquetMetaData> {
self.0.insert(gen, data).value().clone()
}
}

impl RangeCache for FoyerRangeCache {
impl TonboCache<(Ulid, Range<usize>), Bytes> for FoyerRangeCache {
async fn get(&self, key: &(Ulid, Range<usize>)) -> Result<Option<Bytes>, CacheError> {
Ok(self.0.get(key).await?.map(|entry| entry.value().clone()))
}
Expand Down Expand Up @@ -112,7 +112,12 @@ impl AsyncFileReader for FoyerReader {

fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
async move {
if let Some(meta) = self.meta_cache.get(&self.gen) {
if let Some(meta) = self
.meta_cache
.get(&self.gen)
.await
.map_err(|e| parquet::errors::ParquetError::External(From::from(e)))?
{
return Ok(meta);
}

Expand Down
34 changes: 14 additions & 20 deletions tonbo_ext_reader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,18 @@ use ulid::Ulid;
pub mod foyer_reader;
pub mod lru_reader;

pub trait MetaCache: Sync + Send + Clone + Debug {
fn get(&self, gen: &Ulid) -> Option<Arc<ParquetMetaData>>;

fn insert(&self, gen: Ulid, data: Arc<ParquetMetaData>) -> Arc<ParquetMetaData>;
}

pub trait RangeCache: Sync + Send + Clone + Debug {
pub trait TonboCache<K, V>: Sync + Send + Clone + Debug {
fn get(
&self,
key: &(Ulid, Range<usize>),
) -> impl std::future::Future<Output = Result<Option<Bytes>, CacheError>> + Send;
key: &K,
) -> impl std::future::Future<Output = Result<Option<V>, CacheError>> + Send;

fn insert(&self, key: (Ulid, Range<usize>), bytes: Bytes) -> Bytes;
fn insert(&self, key: K, value: V) -> V;
}

pub trait CacheReader: AsyncFileReader + Unpin {
type MetaCache: MetaCache;
type RangeCache: RangeCache;
type MetaCache: TonboCache<Ulid, Arc<ParquetMetaData>>;
type RangeCache: TonboCache<(Ulid, Range<usize>), Bytes>;

fn new(
meta_cache: Self::MetaCache,
Expand All @@ -37,14 +31,14 @@ pub trait CacheReader: AsyncFileReader + Unpin {

#[allow(clippy::too_many_arguments)]
fn build_caches(
cache_path: impl AsRef<std::path::Path> + Send,
cache_meta_capacity: usize,
cache_meta_shards: usize,
cache_meta_ratio: f64,
cache_range_memory: usize,
cache_range_disk: usize,
cache_range_capacity: usize,
cache_range_shards: usize,
path: impl AsRef<std::path::Path> + Send,
meta_capacity: usize,
meta_shards: usize,
meta_ratio: f64,
range_memory: usize,
range_disk: usize,
range_capacity: usize,
range_shards: usize,
) -> impl std::future::Future<Output = Result<(Self::MetaCache, Self::RangeCache), CacheError>> + Send;
}

Expand Down
17 changes: 11 additions & 6 deletions tonbo_ext_reader/src/lru_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use parking_lot::Mutex;
use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData};
use ulid::Ulid;

use crate::{CacheError, CacheReader, MetaCache, RangeCache};
use crate::{CacheError, CacheReader, TonboCache};

pub(crate) trait SharedKey<S: BuildHasher>: Hash + PartialEq + Eq {
fn shared(&self, hash_builder: &S, shared: usize) -> usize;
Expand Down Expand Up @@ -96,9 +96,9 @@ pub struct LruReader {
meta_cache: LruMetaCache,
}

impl MetaCache for LruMetaCache {
fn get(&self, gen: &Ulid) -> Option<Arc<ParquetMetaData>> {
self.0.get(gen, |v| v.map(Arc::clone))
impl TonboCache<Ulid, Arc<ParquetMetaData>> for LruMetaCache {
async fn get(&self, gen: &Ulid) -> Result<Option<Arc<ParquetMetaData>>, CacheError> {
Ok(self.0.get(gen, |v| v.map(Arc::clone)))
}

fn insert(&self, gen: Ulid, data: Arc<ParquetMetaData>) -> Arc<ParquetMetaData> {
Expand All @@ -107,7 +107,7 @@ impl MetaCache for LruMetaCache {
}
}

impl RangeCache for LruRangeCache {
impl TonboCache<(Ulid, Range<usize>), Bytes> for LruRangeCache {
async fn get(&self, key: &(Ulid, Range<usize>)) -> Result<Option<Bytes>, CacheError> {
Ok(self.0.get(key, |v| v.cloned()))
}
Expand Down Expand Up @@ -139,7 +139,12 @@ impl AsyncFileReader for LruReader {

fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
async move {
if let Some(meta) = self.meta_cache.get(&self.gen) {
if let Some(meta) = self
.meta_cache
.get(&self.gen)
.await
.map_err(|e| parquet::errors::ParquetError::External(From::from(e)))?
{
return Ok(meta);
}

Expand Down

0 comments on commit e00775b

Please sign in to comment.