Skip to content

Commit

Permalink
chore: detach SnapShot from Transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 8, 2024
1 parent 583c335 commit a61203a
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 44 deletions.
11 changes: 8 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ pub mod option;
pub mod record;
mod scope;
pub mod serdes;
pub mod snapshot;
pub mod stream;
pub mod timestamp;
pub mod transaction;
Expand Down Expand Up @@ -160,6 +161,7 @@ use crate::{
executor::Executor,
fs::{manager::StoreManager, parse_file_id, FileId, FileType},
serdes::Decode,
snapshot::Snapshot,
stream::{
mem_projection::MemProjectionStream, merge::MergeStream, package::PackageStream, Entry,
ScanStream,
Expand Down Expand Up @@ -287,10 +289,13 @@ where

/// open an optimistic ACID transaction
pub async fn transaction(&self) -> Transaction<'_, R> {
Transaction::new(
self.version_set.current().await,
Transaction::new(self.snapshot().await, self.lock_map.clone())
}

pub async fn snapshot(&self) -> Snapshot<'_, R> {
Snapshot::new(
self.schema.read().await,
self.lock_map.clone(),
self.version_set.current().await,
self.manager.clone(),
)
}
Expand Down
204 changes: 204 additions & 0 deletions src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
use std::{collections::Bound, sync::Arc};

use async_lock::RwLockReadGuard;
use parquet::arrow::ProjectionMask;

use crate::{
fs::manager::StoreManager,
record::Record,
stream,
stream::ScanStream,
timestamp::Timestamp,
version::{TransactionTs, VersionRef},
DbError, Projection, Scan, Schema,
};

pub struct Snapshot<'s, R: Record> {
ts: Timestamp,
share: RwLockReadGuard<'s, Schema<R>>,
version: VersionRef<R>,
manager: Arc<StoreManager>,
}

impl<'s, R: Record> Snapshot<'s, R> {
pub async fn get<'get>(
&'get self,
key: &'get R::Key,
projection: Projection,
) -> Result<Option<stream::Entry<'get, R>>, DbError<R>> {
Ok(self
.share
.get(&self.version, &self.manager, key, self.ts, projection)
.await?
.and_then(|entry| {
if entry.value().is_none() {
None
} else {
Some(entry)
}
}))
}

pub fn scan<'scan>(
&'scan self,
range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>),
) -> Scan<'scan, R> {
Scan::new(
&self.share,
&self.manager,
range,
self.ts,
&self.version,
Box::new(move |_: Option<ProjectionMask>| None),
)
}

pub(crate) fn new(
share: RwLockReadGuard<'s, Schema<R>>,
version: VersionRef<R>,
manager: Arc<StoreManager>,
) -> Self {
Self {
ts: version.load_ts(),
share,
version,
manager,
}
}

pub(crate) fn ts(&self) -> Timestamp {
self.ts
}

pub(crate) fn increase_ts(&self) -> Timestamp {
self.version.increase_ts()
}

pub(crate) fn schema(&self) -> &Schema<R> {
&self.share
}

pub(crate) fn _scan<'scan>(
&'scan self,
range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>),
fn_pre_stream: Box<
dyn FnOnce(Option<ProjectionMask>) -> Option<ScanStream<'scan, R>> + Send + 'scan,
>,
) -> Scan<'scan, R> {
Scan::new(
&self.share,
&self.manager,
range,
self.ts,
&self.version,
fn_pre_stream,
)
}
}

#[cfg(test)]
mod tests {
use std::{collections::Bound, sync::Arc};

use fusio::path::Path;
use fusio_dispatch::FsOptions;
use futures_util::StreamExt;
use tempfile::TempDir;

use crate::{
compaction::tests::build_version,
executor::tokio::TokioExecutor,
fs::manager::StoreManager,
tests::{build_db, build_schema},
version::TransactionTs,
DbOption,
};

#[tokio::test]
async fn snapshot_scan() {
let temp_dir = TempDir::new().unwrap();
let manager = Arc::new(StoreManager::new(FsOptions::Local, vec![]).unwrap());
let option = Arc::new(DbOption::from(
Path::from_filesystem_path(temp_dir.path()).unwrap(),
));

manager
.base_fs()
.create_dir_all(&option.version_log_dir_path())
.await
.unwrap();
manager
.base_fs()
.create_dir_all(&option.wal_dir_path())
.await
.unwrap();

let (_, version) = build_version(&option, &manager).await;
let (schema, compaction_rx) = build_schema(option.clone(), manager.base_fs())
.await
.unwrap();
let db = build_db(
option,
compaction_rx,
TokioExecutor::new(),
schema,
version,
manager,
)
.await
.unwrap();

{
// to increase timestamps to 1 because the data ts built in advance is 1
db.version_set.increase_ts();
}
let mut snapshot = db.snapshot().await;

let mut stream = snapshot
.scan((Bound::Unbounded, Bound::Unbounded))
.projection(vec![1])
.take()
.await
.unwrap();

let entry_0 = stream.next().await.unwrap().unwrap();
assert_eq!(entry_0.key().value, "1");
assert!(entry_0.value().unwrap().vbool.is_none());
let entry_1 = stream.next().await.unwrap().unwrap();
assert_eq!(entry_1.key().value, "2");
assert!(entry_1.value().unwrap().vbool.is_none());
let entry_2 = stream.next().await.unwrap().unwrap();
assert_eq!(entry_2.key().value, "3");
assert!(entry_2.value().unwrap().vbool.is_none());
let entry_3 = stream.next().await.unwrap().unwrap();
assert_eq!(entry_3.key().value, "4");
assert!(entry_3.value().unwrap().vbool.is_none());
let entry_4 = stream.next().await.unwrap().unwrap();
assert_eq!(entry_4.key().value, "5");
assert!(entry_4.value().unwrap().vbool.is_none());
let entry_5 = stream.next().await.unwrap().unwrap();
assert_eq!(entry_5.key().value, "6");
assert!(entry_5.value().unwrap().vbool.is_none());
let entry_6 = stream.next().await.unwrap().unwrap();
assert_eq!(entry_6.key().value, "7");
assert!(entry_6.value().unwrap().vbool.is_none());
let entry_7 = stream.next().await.unwrap().unwrap();
assert_eq!(entry_7.key().value, "8");
assert!(entry_7.value().unwrap().vbool.is_none());
let entry_8 = stream.next().await.unwrap().unwrap();
assert_eq!(entry_8.key().value, "9");
assert!(entry_8.value().unwrap().vbool.is_none());
let entry_9 = stream.next().await.unwrap().unwrap();
assert_eq!(entry_9.key().value, "alice");
let entry_10 = stream.next().await.unwrap().unwrap();
assert_eq!(entry_10.key().value, "ben");
let entry_11 = stream.next().await.unwrap().unwrap();
assert_eq!(entry_11.key().value, "carl");
let entry_12 = stream.next().await.unwrap().unwrap();
assert_eq!(entry_12.key().value, "dice");
let entry_13 = stream.next().await.unwrap().unwrap();
assert_eq!(entry_13.key().value, "erika");
let entry_14 = stream.next().await.unwrap().unwrap();
assert_eq!(entry_14.key().value, "funk");
}
}
73 changes: 32 additions & 41 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,20 @@ use std::{
},
io,
mem::transmute,
sync::Arc,
};

use async_lock::RwLockReadGuard;
use flume::SendError;
use lockable::AsyncLimit;
use parquet::{arrow::ProjectionMask, errors::ParquetError};
use thiserror::Error;

use crate::{
compaction::CompactTask,
fs::manager::StoreManager,
record::{Key, KeyRef},
snapshot::Snapshot,
stream,
stream::mem_projection::MemProjectionStream,
timestamp::{Timestamp, Timestamped},
version::{TransactionTs, VersionRef},
wal::log::LogType,
DbError, LockMap, Projection, Record, Scan, Schema,
};
Expand Down Expand Up @@ -49,31 +46,20 @@ pub struct Transaction<'txn, R>
where
R: Record,
{
ts: Timestamp,
local: BTreeMap<R::Key, Option<R>>,
share: RwLockReadGuard<'txn, Schema<R>>,
version: VersionRef<R>,
snapshot: Snapshot<'txn, R>,
lock_map: LockMap<R::Key>,
manager: Arc<StoreManager>,
}

impl<'txn, R> Transaction<'txn, R>
where
R: Record + Send,
{
pub(crate) fn new(
version: VersionRef<R>,
share: RwLockReadGuard<'txn, Schema<R>>,
lock_map: LockMap<R::Key>,
manager: Arc<StoreManager>,
) -> Self {
pub(crate) fn new(snapshot: Snapshot<'txn, R>, lock_map: LockMap<R::Key>) -> Self {
Self {
ts: version.load_ts(),
local: BTreeMap::new(),
share,
version,
snapshot,
lock_map,
manager,
}
}

Expand All @@ -87,16 +73,10 @@ where
Ok(match self.local.get(key).and_then(|v| v.as_ref()) {
Some(v) => Some(TransactionEntry::Local(v.as_record_ref())),
None => self
.share
.get(&self.version, &self.manager, key, self.ts, projection)
.snapshot
.get(key, projection)
.await?
.and_then(|entry| {
if entry.value().is_none() {
None
} else {
Some(TransactionEntry::Stream(entry))
}
}),
.map(TransactionEntry::Stream),
})
}

Expand All @@ -105,16 +85,12 @@ where
&'scan self,
range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>),
) -> Scan<'scan, R> {
Scan::new(
&self.share,
&self.manager,
self.snapshot._scan(
range,
self.ts,
&self.version,
Box::new(move |projection_mask: Option<ProjectionMask>| {
let mut transaction_scan = TransactionScan {
inner: self.local.range(range),
ts: self.ts,
ts: self.snapshot.ts(),
}
.into();
if let Some(mask) = projection_mask {
Expand Down Expand Up @@ -159,7 +135,11 @@ where
);
}
for (key, _) in self.local.iter() {
if self.share.check_conflict(key, self.ts) {
if self
.snapshot
.schema()
.check_conflict(key, self.snapshot.ts())
{
return Err(CommitError::WriteConflict(key.clone()));
}
}
Expand All @@ -168,27 +148,38 @@ where
let is_excess = match len {
0 => false,
1 => {
let new_ts = self.version.increase_ts();
let new_ts = self.snapshot.increase_ts();
let (key, record) = self.local.pop_first().unwrap();
Self::append(&self.share, LogType::Full, key, record, new_ts).await?
Self::append(self.snapshot.schema(), LogType::Full, key, record, new_ts).await?
}
_ => {
let new_ts = self.version.increase_ts();
let new_ts = self.snapshot.increase_ts();
let mut iter = self.local.into_iter();

let (key, record) = iter.next().unwrap();
Self::append(&self.share, LogType::First, key, record, new_ts).await?;
Self::append(self.snapshot.schema(), LogType::First, key, record, new_ts).await?;

for (key, record) in (&mut iter).take(len - 2) {
Self::append(&self.share, LogType::Middle, key, record, new_ts).await?;
Self::append(
self.snapshot.schema(),
LogType::Middle,
key,
record,
new_ts,
)
.await?;
}

let (key, record) = iter.next().unwrap();
Self::append(&self.share, LogType::Last, key, record, new_ts).await?
Self::append(self.snapshot.schema(), LogType::Last, key, record, new_ts).await?
}
};
if is_excess {
let _ = self.share.compaction_tx.try_send(CompactTask::Freeze);
let _ = self
.snapshot
.schema()
.compaction_tx
.try_send(CompactTask::Freeze);
}
Ok(())
}
Expand Down

0 comments on commit a61203a

Please sign in to comment.