diff --git a/Cargo.toml b/Cargo.toml index 7fe966a..449eaf7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,8 @@ arrow = "52" async-lock = "3" crossbeam-skiplist = "0.1" futures-core = "0.3" +futures-channel = "0.3" +futures-executor = "0.3" futures-io = "0.3" futures-util = "0.3" once_cell = "1" @@ -20,6 +22,8 @@ pin-project-lite = "0.2" thiserror = "1" tokio = { version = "1", optional = true } tokio-util = { version = "0.7", features = ["compat"] } +tracing = "0.1" +ulid = "1" [dev-dependencies] tempfile = "3" diff --git a/src/executor.rs b/src/executor.rs index 1f26139..886130b 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,6 +1,8 @@ use std::future::Future; -pub trait Executor { +use crate::fs::Fs; + +pub trait Executor: Fs { fn spawn(&self, future: F) where F: Future + Send + 'static; @@ -10,15 +12,19 @@ pub trait Executor { pub mod tokio { use std::future::Future; + use tokio::runtime::Handle; + use super::Executor; pub struct TokioExecutor { - tokio: tokio::runtime::Runtime, + handle: Handle, } impl TokioExecutor { - pub fn new(tokio: tokio::runtime::Runtime) -> Self { - Self { tokio } + pub fn new() -> Self { + Self { + handle: Handle::current(), + } } } @@ -27,7 +33,7 @@ pub mod tokio { where F: Future + Send + 'static, { - self.tokio.spawn(future); + self.handle.spawn(future); } } } diff --git a/src/fs/mod.rs b/src/fs/mod.rs index a3a2375..51adde4 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -1,16 +1,42 @@ #[cfg(any(feature = "tokio", test))] pub mod tokio; -use std::{future::Future, io, path::Path}; +use std::{ + fmt::{Display, Formatter}, + future::Future, + io, + path::Path, +}; use futures_io::{AsyncRead, AsyncSeek, AsyncWrite}; +use ulid::Ulid; -pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static {} +pub(crate) type FileId = Ulid; -impl AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static {} +pub enum FileType { + WAL, + PARQUET, + LOG, +} + +pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {} + +impl AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {} pub trait Fs { type File: AsyncFile; - fn open(&self, path: impl AsRef) -> impl Future>; + fn open(path: impl AsRef) -> impl Future>; + + fn remove(path: impl AsRef) -> impl Future>; +} + +impl Display for FileType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + FileType::WAL => write!(f, "wal"), + FileType::PARQUET => write!(f, "parquet"), + FileType::LOG => write!(f, "log"), + } + } } diff --git a/src/fs/tokio.rs b/src/fs/tokio.rs index 9c41afe..e7fc7bd 100644 --- a/src/fs/tokio.rs +++ b/src/fs/tokio.rs @@ -1,17 +1,21 @@ use std::{io, path::Path}; +use tokio::fs::{remove_file, File}; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; use super::Fs; +use crate::executor::tokio::TokioExecutor; -pub struct TokioFs; +impl Fs for TokioExecutor { + type File = Compat; -impl Fs for TokioFs { - type File = Compat; - - async fn open(&self, path: impl AsRef) -> io::Result { - tokio::fs::File::create_new(path) + async fn open(path: impl AsRef) -> io::Result { + File::create_new(path) .await .map(TokioAsyncReadCompatExt::compat) } + + async fn remove(path: impl AsRef) -> io::Result<()> { + remove_file(path).await + } } diff --git a/src/inmem/immutable.rs b/src/inmem/immutable.rs index 509a69a..56725df 100644 --- a/src/inmem/immutable.rs +++ b/src/inmem/immutable.rs @@ -19,9 +19,9 @@ use crate::{ pub trait ArrowArrays: Sized { type Record: Record; - type Buider: Builder; + type Builder: Builder; - fn builder(capacity: usize) -> Self::Buider; + fn builder(capacity: usize) -> Self::Builder; fn get(&self, offset: u32) -> Option::Ref<'_>>>; @@ -167,9 +167,9 @@ pub(crate) mod tests { impl ArrowArrays for TestImmutableArrays { type Record = Test; - type Buider = TestBuilder; + type Builder = TestBuilder; - fn builder(capacity: usize) -> Self::Buider { + fn builder(capacity: usize) -> Self::Builder { TestBuilder { vstring: StringBuilder::with_capacity(capacity, 0), vu32: PrimitiveBuilder::::with_capacity(capacity), diff --git a/src/lib.rs b/src/lib.rs index d1f22fe..c9fc95b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,15 +1,20 @@ #![allow(dead_code)] pub(crate) mod arrows; -mod executor; +pub mod executor; pub mod fs; mod inmem; mod ondisk; mod oracle; mod record; +mod scope; +pub mod serdes; mod stream; mod transaction; +mod version; -use std::{collections::VecDeque, io, mem, ops::Bound, sync::Arc}; +use std::{ + collections::VecDeque, io, marker::PhantomData, mem, ops::Bound, path::PathBuf, sync::Arc, +}; use async_lock::{RwLock, RwLockReadGuard}; use futures_core::Stream; @@ -18,31 +23,99 @@ use inmem::{immutable::Immutable, mutable::Mutable}; use oracle::Timestamp; use parquet::errors::ParquetError; use record::Record; -use stream::{merge::MergeStream, Entry, ScanStream}; -pub struct DB +use crate::{ + executor::Executor, + fs::{FileId, FileType}, + stream::{merge::MergeStream, Entry, ScanStream}, + version::Version, +}; + +#[derive(Debug)] +pub struct DbOption { + pub path: PathBuf, + pub max_mem_table_size: usize, + pub immutable_chunk_num: usize, + pub major_threshold_with_sst_size: usize, + pub level_sst_magnification: usize, + pub max_sst_file_size: usize, + pub clean_channel_buffer: usize, +} + +pub struct DB where R: Record, + E: Executor, { schema: Arc>>, + _p: PhantomData, } -impl Default for DB +impl DbOption { + pub fn new(path: impl Into + Send) -> Self { + DbOption { + path: path.into(), + max_mem_table_size: 8 * 1024 * 1024, + immutable_chunk_num: 3, + major_threshold_with_sst_size: 10, + level_sst_magnification: 10, + max_sst_file_size: 24 * 1024 * 1024, + clean_channel_buffer: 10, + } + } + + pub(crate) fn table_path(&self, gen: &FileId) -> PathBuf { + self.path.join(format!("{}.{}", gen, FileType::PARQUET)) + } + + pub(crate) fn wal_path(&self, gen: &FileId) -> PathBuf { + self.path.join(format!("{}.{}", gen, FileType::WAL)) + } + + pub(crate) fn version_path(&self) -> PathBuf { + self.path.join(format!("version.{}", FileType::LOG)) + } + + pub(crate) fn is_threshold_exceeded_major( + &self, + version: &Version, + level: usize, + ) -> bool + where + R: Record, + E: Executor, + { + Version::::tables_len(version, level) + >= (self.major_threshold_with_sst_size * self.level_sst_magnification.pow(level as u32)) + } +} + +impl Default for DB where R: Record, + E: Executor, { fn default() -> Self { Self { schema: Arc::new(RwLock::new(Schema::default())), + _p: Default::default(), } } } -impl DB +impl DB where R: Record + Send + Sync, - R::Key: Send, + R::Key: Send + Sync, + E: Executor, { + pub fn empty() -> Self { + Self { + schema: Arc::new(RwLock::new(Schema::default())), + _p: Default::default(), + } + } + pub(crate) async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> { let columns = self.schema.read().await; columns.write(record, ts).await @@ -95,25 +168,31 @@ where Ok(()) } - async fn get<'get>( + async fn get<'get, E>( &'get self, key: &'get R::Key, ts: Timestamp, - ) -> Result>, ParquetError> { - self.scan(Bound::Included(key), Bound::Unbounded, ts) + ) -> Result>, ParquetError> + where + E: Executor, + { + self.scan::(Bound::Included(key), Bound::Unbounded, ts) .await? .next() .await .transpose() } - async fn scan<'scan>( + async fn scan<'scan, E>( &'scan self, lower: Bound<&'scan R::Key>, uppwer: Bound<&'scan R::Key>, ts: Timestamp, - ) -> Result, ParquetError>>, ParquetError> { - let mut streams = Vec::>::with_capacity(self.immutables.len() + 1); + ) -> Result, ParquetError>>, ParquetError> + where + E: Executor, + { + let mut streams = Vec::>::with_capacity(self.immutables.len() + 1); streams.push(self.mutable.scan((lower, uppwer), ts).into()); for immutable in &self.immutables { streams.push(immutable.scan((lower, uppwer), ts).into()); @@ -141,6 +220,7 @@ pub(crate) mod tests { use once_cell::sync::Lazy; use crate::{ + executor::Executor, inmem::immutable::tests::TestImmutableArrays, record::{internal::InternalRecordRef, RecordRef}, Record, DB, @@ -227,8 +307,8 @@ pub(crate) mod tests { } } - pub(crate) async fn get_test_record_batch() -> RecordBatch { - let db = DB::default(); + pub(crate) async fn get_test_record_batch() -> RecordBatch { + let db: DB = DB::empty(); db.write( Test { diff --git a/src/ondisk/scan.rs b/src/ondisk/scan.rs index 5e5abc4..4508be6 100644 --- a/src/ondisk/scan.rs +++ b/src/ondisk/scan.rs @@ -9,32 +9,36 @@ use pin_project_lite::pin_project; use tokio_util::compat::Compat; use crate::{ - fs::AsyncFile, + executor::Executor, record::Record, stream::record_batch::{RecordBatchEntry, RecordBatchIterator}, }; pin_project! { #[derive(Debug)] - pub struct SsTableScan { + pub struct SsTableScan + where + E: Executor + { #[pin] - stream: ParquetRecordBatchStream>>, + stream: ParquetRecordBatchStream>, iter: Option>, } } -impl SsTableScan +impl SsTableScan where - R: Record, + E: Executor, { - pub fn new(stream: ParquetRecordBatchStream>>) -> Self { + pub fn new(stream: ParquetRecordBatchStream>) -> Self { SsTableScan { stream, iter: None } } } -impl Stream for SsTableScan +impl Stream for SsTableScan where R: Record, + E: Executor, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/ondisk/sstable.rs b/src/ondisk/sstable.rs index 70e9c42..334281e 100644 --- a/src/ondisk/sstable.rs +++ b/src/ondisk/sstable.rs @@ -17,32 +17,34 @@ use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt}; use super::scan::SsTableScan; use crate::{ arrows::get_range_filter, - fs::AsyncFile, + executor::Executor, oracle::{timestamp::TimestampedRef, Timestamp}, record::Record, stream::record_batch::RecordBatchEntry, }; -struct SsTable +pub(crate) struct SsTable where R: Record, + E: Executor, { - file: Box, - _marker: PhantomData, + file: E::File, + _marker: PhantomData<(R, E)>, } -impl SsTable +impl SsTable where R: Record, + E: Executor, { - fn open(file: Box) -> Self { + pub(crate) fn open(file: E::File) -> Self { SsTable { file, _marker: PhantomData, } } - fn create_writer(&mut self) -> AsyncArrowWriter>> { + fn create_writer(&mut self) -> AsyncArrowWriter> { // TODO: expose writer options let options = ArrowWriterOptions::new().with_properties( WriterProperties::builder() @@ -73,7 +75,7 @@ where async fn into_parquet_builder( self, limit: usize, - ) -> parquet::errors::Result>>>> { + ) -> parquet::errors::Result>>> { Ok(ParquetRecordBatchStreamBuilder::new_with_options( self.file.compat(), ArrowReaderOptions::default().with_page_index(true), @@ -82,7 +84,7 @@ where .with_limit(limit)) } - async fn get( + pub(crate) async fn get( self, key: &TimestampedRef, ) -> parquet::errors::Result>> { @@ -97,7 +99,7 @@ where self, range: (Bound<&'scan R::Key>, Bound<&'scan R::Key>), ts: Timestamp, - ) -> Result, parquet::errors::ParquetError> { + ) -> Result, parquet::errors::ParquetError> { let builder = self.into_parquet_builder(1).await?; let schema_descriptor = builder.metadata().file_metadata().schema_descr(); @@ -113,7 +115,8 @@ mod tests { use super::SsTable; use crate::{ - fs::{tokio::TokioFs, AsyncFile, Fs}, + executor::tokio::TokioExecutor, + fs::Fs, oracle::timestamp::Timestamped, tests::{get_test_record_batch, Test}, }; @@ -121,12 +124,11 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn write_sstable() { let temp_dir = tempfile::tempdir().unwrap(); - let file = temp_dir.path().join("test.parquet"); - - let file = TokioFs {}.open(&file).await.unwrap(); - let mut sstable = SsTable::::open(Box::new(file) as Box); - - let record_batch = get_test_record_batch().await; + let record_batch = get_test_record_batch::().await; + let file = TokioExecutor::open(&temp_dir.path().join("test.parquet")) + .await + .unwrap(); + let mut sstable = SsTable::::open(file); sstable.write(record_batch).await.unwrap(); diff --git a/src/record/mod.rs b/src/record/mod.rs index 0ceff14..79fc7ee 100644 --- a/src/record/mod.rs +++ b/src/record/mod.rs @@ -9,9 +9,12 @@ use arrow::{ }; use internal::InternalRecordRef; -use crate::inmem::immutable::ArrowArrays; +use crate::{ + inmem::immutable::ArrowArrays, + serdes::{Decode, Encode}, +}; -pub trait Key: 'static + Ord + Send + Sync { +pub trait Key: 'static + Encode + Decode + Ord + Clone + Send + Sync { type Ref<'r>: KeyRef<'r, Key = Self> + Copy where Self: 'r; diff --git a/src/record/str.rs b/src/record/str.rs index a50411d..deea23c 100644 --- a/src/record/str.rs +++ b/src/record/str.rs @@ -97,9 +97,9 @@ pub struct StringColumns { impl ArrowArrays for StringColumns { type Record = String; - type Buider = StringColumnsBuilder; + type Builder = StringColumnsBuilder; - fn builder(capacity: usize) -> Self::Buider { + fn builder(capacity: usize) -> Self::Builder { StringColumnsBuilder { _null: BooleanBufferBuilder::new(capacity), _ts: UInt32Builder::with_capacity(capacity), diff --git a/src/scope.rs b/src/scope.rs new file mode 100644 index 0000000..e6bf93a --- /dev/null +++ b/src/scope.rs @@ -0,0 +1,124 @@ +use futures_io::{AsyncRead, AsyncWrite}; +use futures_util::{AsyncReadExt, AsyncWriteExt}; + +use crate::{ + fs::FileId, + serdes::{Decode, Encode}, +}; + +#[derive(Debug, Eq, PartialEq)] +pub struct Scope +where + K: Encode + Decode + Ord + Clone, +{ + pub(crate) min: K, + pub(crate) max: K, + pub(crate) gen: FileId, + pub(crate) wal_ids: Option>, +} + +impl Clone for Scope +where + K: Encode + Decode + Ord + Clone, +{ + fn clone(&self) -> Self { + Scope { + min: self.min.clone(), + max: self.max.clone(), + gen: self.gen, + wal_ids: self.wal_ids.clone(), + } + } +} + +impl Scope +where + K: Encode + Decode + Ord + Clone, +{ + pub(crate) fn is_between(&self, key: &K) -> bool { + self.min.le(key) && self.max.ge(key) + } + + pub(crate) fn is_meet(&self, target: &Scope) -> bool { + (self.min.le(&target.min) && self.max.ge(&target.min)) + || (self.min.le(&target.max) && self.max.ge(&target.max)) + || (self.min.le(&target.min)) && self.max.ge(&target.max) + || (self.min.ge(&target.min)) && self.max.le(&target.max) + } +} + +impl Encode for Scope +where + K: Encode + Decode + Ord + Clone, +{ + type Error = ::Error; + + async fn encode( + &self, + writer: &mut W, + ) -> Result<(), Self::Error> { + self.min.encode(writer).await?; + self.max.encode(writer).await?; + + writer.write_all(&self.gen.to_bytes()).await?; + + match &self.wal_ids { + None => { + 0u8.encode(writer).await?; + } + Some(ids) => { + 1u8.encode(writer).await?; + (ids.len() as u32).encode(writer).await?; + for id in ids { + writer.write_all(&id.to_bytes()).await?; + } + } + } + Ok(()) + } + + fn size(&self) -> usize { + // ProcessUniqueId: usize + u64 + self.min.size() + self.max.size() + 16 + } +} + +impl Decode for Scope +where + K: Encode + Decode + Ord + Clone, +{ + type Error = ::Error; + + async fn decode(reader: &mut R) -> Result { + let min = K::decode(reader).await?; + let max = K::decode(reader).await?; + + let gen = { + let mut slice = [0; 16]; + reader.read_exact(&mut slice).await?; + FileId::from_bytes(slice) + }; + let wal_ids = match u8::decode(reader).await? { + 0 => None, + 1 => { + let len = u32::decode(reader).await? as usize; + let mut ids = Vec::with_capacity(len); + + for _ in 0..len { + let mut slice = [0; 16]; + reader.read_exact(&mut slice).await?; + ids.push(FileId::from_bytes(slice)); + } + Some(ids) + } + _ => unreachable!(), + }; + + Ok(Scope { + min, + max, + gen, + wal_ids, + }) + } +} diff --git a/src/serdes/arc.rs b/src/serdes/arc.rs new file mode 100644 index 0000000..8d45bc8 --- /dev/null +++ b/src/serdes/arc.rs @@ -0,0 +1,37 @@ +use std::sync::Arc; + +use futures_io::{AsyncRead, AsyncWrite}; + +use super::{Decode, Encode}; + +impl Decode for Arc +where + T: Decode, +{ + type Error = T::Error; + + async fn decode(reader: &mut R) -> Result + where + R: AsyncRead + Unpin, + { + Ok(Arc::from(T::decode(reader).await?)) + } +} + +impl Encode for Arc +where + T: Encode + Send + Sync, +{ + type Error = T::Error; + + async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> + where + W: AsyncWrite + Unpin + Send + Sync, + { + self.as_ref().encode(writer).await + } + + fn size(&self) -> usize { + Encode::size(self.as_ref()) + } +} diff --git a/src/serdes/boolean.rs b/src/serdes/boolean.rs new file mode 100644 index 0000000..f8cc9e4 --- /dev/null +++ b/src/serdes/boolean.rs @@ -0,0 +1,34 @@ +use std::{io, mem::size_of}; + +use futures_io::{AsyncRead, AsyncWrite}; +use futures_util::{AsyncReadExt, AsyncWriteExt}; + +use crate::serdes::{Decode, Encode}; + +impl Encode for bool { + type Error = io::Error; + + async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> { + writer + .write_all(&if *self { 1u8 } else { 0u8 }.to_le_bytes()) + .await + } + + fn size(&self) -> usize { + size_of::() + } +} + +impl Decode for bool { + type Error = io::Error; + + async fn decode(reader: &mut R) -> Result { + let buf = { + let mut buf = [0; size_of::()]; + reader.read_exact(&mut buf).await?; + buf + }; + + Ok(u8::from_le_bytes(buf) == 1u8) + } +} diff --git a/src/serdes/mod.rs b/src/serdes/mod.rs new file mode 100644 index 0000000..beb9e6b --- /dev/null +++ b/src/serdes/mod.rs @@ -0,0 +1,42 @@ +mod arc; +mod boolean; +mod num; +mod option; +mod string; + +use std::{future::Future, io}; + +use futures_io::{AsyncRead, AsyncWrite}; + +pub trait Encode: Send + Sync { + type Error: From + std::error::Error + Send + Sync + 'static; + + fn encode(&self, writer: &mut W) -> impl Future> + Send + where + W: AsyncWrite + Unpin + Send + Sync; + + fn size(&self) -> usize; +} + +impl Encode for &T { + type Error = T::Error; + + async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> + where + W: AsyncWrite + Unpin + Send + Sync, + { + Encode::encode(*self, writer).await + } + + fn size(&self) -> usize { + Encode::size(*self) + } +} + +pub trait Decode: Sized { + type Error: From + std::error::Error + Send + Sync + 'static; + + fn decode(reader: &mut R) -> impl Future> + where + R: AsyncRead + Unpin; +} diff --git a/src/serdes/num.rs b/src/serdes/num.rs new file mode 100644 index 0000000..de20692 --- /dev/null +++ b/src/serdes/num.rs @@ -0,0 +1,49 @@ +use std::{io, mem::size_of}; + +use futures_io::{AsyncRead, AsyncWrite}; +use futures_util::{AsyncReadExt, AsyncWriteExt}; + +use super::{Decode, Encode}; + +#[macro_export] +macro_rules! implement_encode_decode { + ($struct_name:ident) => { + impl Encode for $struct_name { + type Error = io::Error; + + async fn encode( + &self, + writer: &mut W, + ) -> Result<(), Self::Error> { + writer.write_all(&self.to_le_bytes()).await + } + + fn size(&self) -> usize { + size_of::() + } + } + + impl Decode for $struct_name { + type Error = io::Error; + + async fn decode(reader: &mut R) -> Result { + let buf = { + let mut buf = [0; size_of::()]; + reader.read_exact(&mut buf).await?; + buf + }; + + Ok(Self::from_le_bytes(buf)) + } + } + }; +} + +implement_encode_decode!(i8); +implement_encode_decode!(i16); +implement_encode_decode!(i32); +implement_encode_decode!(i64); +implement_encode_decode!(u8); +implement_encode_decode!(u16); +implement_encode_decode!(u32); +implement_encode_decode!(u64); diff --git a/src/serdes/option.rs b/src/serdes/option.rs new file mode 100644 index 0000000..397e439 --- /dev/null +++ b/src/serdes/option.rs @@ -0,0 +1,73 @@ +use std::io; + +use futures_io::{AsyncRead, AsyncWrite}; +use futures_util::{AsyncReadExt, AsyncWriteExt}; +use thiserror::Error; + +use super::{Decode, Encode}; + +#[derive(Debug, Error)] +#[error("option encode error")] +pub enum EncodeError +where + E: std::error::Error, +{ + #[error("io error: {0}")] + Io(#[from] io::Error), + #[error("inner error: {0}")] + Inner(#[source] E), +} + +#[derive(Debug, Error)] +#[error("option decode error")] +pub enum DecodeError +where + E: std::error::Error, +{ + #[error("io error: {0}")] + Io(#[from] io::Error), + #[error("inner error: {0}")] + Inner(#[source] E), +} + +impl Encode for Option +where + V: Encode + Sync, +{ + type Error = EncodeError; + + async fn encode( + &self, + writer: &mut W, + ) -> Result<(), Self::Error> { + match self { + None => writer.write_all(&[0]).await?, + Some(v) => { + writer.write_all(&[1]).await?; + v.encode(writer).await.map_err(EncodeError::Inner)?; + } + } + Ok(()) + } + + fn size(&self) -> usize { + 1 + } +} + +impl Decode for Option +where + V: Decode, +{ + type Error = DecodeError; + + async fn decode(reader: &mut R) -> Result { + let mut o = [0]; + reader.read_exact(&mut o).await?; + match o[0] { + 0 => Ok(None), + 1 => Ok(Some(V::decode(reader).await.map_err(DecodeError::Inner)?)), + _ => panic!("invalid option tag"), + } + } +} diff --git a/src/serdes/string.rs b/src/serdes/string.rs new file mode 100644 index 0000000..3b9ea2d --- /dev/null +++ b/src/serdes/string.rs @@ -0,0 +1,42 @@ +use std::{io, mem::size_of}; + +use futures_io::{AsyncRead, AsyncWrite}; +use futures_util::{AsyncReadExt, AsyncWriteExt}; + +use super::{Decode, Encode}; + +impl Encode for String { + type Error = io::Error; + + async fn encode( + &self, + writer: &mut W, + ) -> Result<(), Self::Error> { + writer.write_all(&(self.len() as u16).to_le_bytes()).await?; + writer.write_all(self.as_bytes()).await + } + + fn size(&self) -> usize { + size_of::() + self.len() + } +} + +impl Decode for String { + type Error = io::Error; + + async fn decode(reader: &mut R) -> Result { + let len = { + let mut len = [0; size_of::()]; + reader.read_exact(&mut len).await?; + u16::from_le_bytes(len) as usize + }; + + let vec = { + let mut vec = vec![0; len]; + reader.read_exact(&mut vec).await?; + vec + }; + + Ok(unsafe { String::from_utf8_unchecked(vec) }) + } +} diff --git a/src/stream/merge.rs b/src/stream/merge.rs index bb1d8de..2734572 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -10,25 +10,27 @@ use futures_util::stream::StreamExt; use pin_project_lite::pin_project; use super::{Entry, ScanStream}; -use crate::record::Record; +use crate::{executor::Executor, record::Record}; pin_project! { - pub(crate) struct MergeStream<'merge, R> + pub(crate) struct MergeStream<'merge, R, E> where R: Record, + E: Executor, { - streams: Vec>, + streams: Vec>, peeked: BinaryHeap>, buf: Option>, } } -impl<'merge, R> MergeStream<'merge, R> +impl<'merge, R, E> MergeStream<'merge, R, E> where R: Record, + E: Executor, { pub(crate) async fn from_vec( - mut streams: Vec>, + mut streams: Vec>, ) -> Result { let mut peeked = BinaryHeap::with_capacity(streams.len()); @@ -49,9 +51,10 @@ where } } -impl<'merge, R> Stream for MergeStream<'merge, R> +impl<'merge, R, E> Stream for MergeStream<'merge, R, E> where R: Record, + E: Executor, { type Item = Result, parquet::errors::ParquetError>; @@ -135,7 +138,7 @@ mod tests { use futures_util::StreamExt; use super::MergeStream; - use crate::inmem::mutable::Mutable; + use crate::{executor::tokio::TokioExecutor, inmem::mutable::Mutable}; #[tokio::test] async fn merge_mutable() { @@ -155,7 +158,7 @@ mod tests { let lower = "a".to_string(); let upper = "e".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::from_vec(vec![ + let mut merge = MergeStream::::from_vec(vec![ m1.scan(bound, 6.into()).into(), m2.scan(bound, 6.into()).into(), m3.scan(bound, 6.into()).into(), @@ -183,9 +186,10 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::from_vec(vec![m1.scan(bound, 0.into()).into()]) - .await - .unwrap(); + let mut merge = + MergeStream::::from_vec(vec![m1.scan(bound, 0.into()).into()]) + .await + .unwrap(); dbg!(merge.next().await); dbg!(merge.next().await); @@ -194,9 +198,10 @@ mod tests { let lower = "1".to_string(); let upper = "4".to_string(); let bound = (Bound::Included(&lower), Bound::Included(&upper)); - let mut merge = MergeStream::from_vec(vec![m1.scan(bound, 1.into()).into()]) - .await - .unwrap(); + let mut merge = + MergeStream::::from_vec(vec![m1.scan(bound, 1.into()).into()]) + .await + .unwrap(); dbg!(merge.next().await); dbg!(merge.next().await); diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 819601a..75c1bcf 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -14,6 +14,7 @@ use pin_project_lite::pin_project; use record_batch::RecordBatchEntry; use crate::{ + executor::Executor, inmem::{immutable::ImmutableScan, mutable::MutableScan}, ondisk::scan::SsTableScan, oracle::timestamp::Timestamped, @@ -65,9 +66,10 @@ where pin_project! { #[project = ScanStreamProject] - pub enum ScanStream<'scan, R> + pub enum ScanStream<'scan, R, E> where R: Record, + E: Executor, { Mutable { #[pin] @@ -79,14 +81,15 @@ pin_project! { }, SsTable { #[pin] - inner: SsTableScan, + inner: SsTableScan, }, } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, E> From> for ScanStream<'scan, R, E> where R: Record, + E: Executor, { fn from(inner: MutableScan<'scan, R>) -> Self { ScanStream::Mutable { @@ -95,9 +98,10 @@ where } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, E> From> for ScanStream<'scan, R, E> where R: Record, + E: Executor, { fn from(inner: ImmutableScan<'scan, R>) -> Self { ScanStream::Immutable { @@ -106,18 +110,20 @@ where } } -impl<'scan, R> From> for ScanStream<'scan, R> +impl<'scan, R, E> From> for ScanStream<'scan, R, E> where R: Record, + E: Executor, { - fn from(inner: SsTableScan) -> Self { + fn from(inner: SsTableScan) -> Self { ScanStream::SsTable { inner } } } -impl fmt::Debug for ScanStream<'_, R> +impl fmt::Debug for ScanStream<'_, R, E> where R: Record, + E: Executor, { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { @@ -128,9 +134,10 @@ where } } -impl<'scan, R> Stream for ScanStream<'scan, R> +impl<'scan, R, E> Stream for ScanStream<'scan, R, E> where R: Record, + E: Executor, { type Item = Result, parquet::errors::ParquetError>; diff --git a/src/transaction.rs b/src/transaction.rs index f7054b8..20fdd36 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -1,21 +1,23 @@ use std::{collections::BTreeMap, io, sync::Arc}; -use crate::{oracle::Timestamp, Record, DB}; +use crate::{executor::Executor, oracle::Timestamp, Record, DB}; -pub struct Transaction +pub struct Transaction where R: Record, + E: Executor, { - db: Arc>, + db: Arc>, read_at: Timestamp, local: BTreeMap>, } -impl Transaction +impl Transaction where R: Record, + E: Executor, { - pub(crate) fn new(db: Arc>, read_at: Timestamp) -> Self { + pub(crate) fn new(db: Arc>, read_at: Timestamp) -> Self { Self { db, read_at, diff --git a/src/version/cleaner.rs b/src/version/cleaner.rs new file mode 100644 index 0000000..daeb8bb --- /dev/null +++ b/src/version/cleaner.rs @@ -0,0 +1,66 @@ +use std::{collections::BTreeMap, fs, io, sync::Arc}; + +use futures_channel::mpsc::{channel, Receiver, Sender}; +use futures_util::StreamExt; + +use crate::{fs::FileId, DbOption}; + +pub(crate) enum CleanTag { + Add { + version_num: usize, + gens: Vec, + }, + Clean { + version_num: usize, + }, +} + +pub(crate) struct Cleaner { + tag_recv: Receiver, + gens_map: BTreeMap, bool)>, + option: Arc, +} + +impl Cleaner { + pub(crate) fn new(option: Arc) -> (Self, Sender) { + let (tag_send, tag_recv) = channel(option.clean_channel_buffer); + + ( + Cleaner { + tag_recv, + gens_map: Default::default(), + option, + }, + tag_send, + ) + } + + pub(crate) async fn listen(&mut self) -> Result<(), io::Error> { + loop { + match self.tag_recv.next().await { + None => break, + Some(CleanTag::Add { version_num, gens }) => { + let _ = self.gens_map.insert(version_num, (gens, false)); + } + Some(CleanTag::Clean { version_num }) => { + if let Some((_, dropped)) = self.gens_map.get_mut(&version_num) { + *dropped = true; + } + while let Some((first_version, (gens, dropped))) = self.gens_map.pop_first() { + if !dropped { + let _ = self.gens_map.insert(first_version, (gens, false)); + continue; + } + for gen in gens { + fs::remove_file(self.option.table_path(&gen))?; + } + } + } + } + } + + Ok(()) + } +} + +// TODO: TestCase diff --git a/src/version/edit.rs b/src/version/edit.rs new file mode 100644 index 0000000..2133864 --- /dev/null +++ b/src/version/edit.rs @@ -0,0 +1,152 @@ +use std::mem::size_of; + +use futures_io::{AsyncRead, AsyncWrite}; +use futures_util::{AsyncReadExt, AsyncWriteExt}; + +use crate::{ + fs::FileId, + scope::Scope, + serdes::{Decode, Encode}, +}; + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) enum VersionEdit +where + K: Encode + Decode + Ord + Clone, +{ + Add { level: u8, scope: Scope }, + Remove { level: u8, gen: FileId }, +} + +impl VersionEdit +where + K: Encode + Decode + Ord + Clone, +{ + pub(crate) async fn recover(reader: &mut R) -> Vec> { + let mut edits = Vec::new(); + + while let Ok(edit) = VersionEdit::decode(reader).await { + edits.push(edit) + } + edits + } +} + +impl Encode for VersionEdit +where + K: Encode + Decode + Ord + Clone, +{ + type Error = ::Error; + + async fn encode( + &self, + writer: &mut W, + ) -> Result<(), Self::Error> { + match self { + VersionEdit::Add { scope, level } => { + writer.write_all(&0u8.to_le_bytes()).await?; + writer.write_all(&level.to_le_bytes()).await?; + scope.encode(writer).await?; + } + VersionEdit::Remove { gen, level } => { + writer.write_all(&1u8.to_le_bytes()).await?; + writer.write_all(&level.to_le_bytes()).await?; + writer.write_all(&gen.to_bytes()).await?; + } + } + + Ok(()) + } + + fn size(&self) -> usize { + size_of::() + + size_of::() + + match self { + VersionEdit::Add { scope, .. } => scope.size(), + VersionEdit::Remove { .. } => 16, + } + } +} + +impl Decode for VersionEdit +where + K: Encode + Decode + Ord + Clone, +{ + type Error = ::Error; + + async fn decode(reader: &mut R) -> Result { + let edit_type = { + let mut len = [0; size_of::()]; + reader.read_exact(&mut len).await?; + u8::from_le_bytes(len) as usize + }; + let level = { + let mut level = [0; size_of::()]; + reader.read_exact(&mut level).await?; + u8::from_le_bytes(level) + }; + + Ok(match edit_type { + 0 => { + let scope = Scope::::decode(reader).await?; + + VersionEdit::Add { level, scope } + } + 1 => { + let gen = { + let mut slice = [0; 16]; + reader.read_exact(&mut slice).await?; + FileId::from_bytes(slice) + }; + VersionEdit::Remove { level, gen } + } + _ => todo!(), + }) + } +} + +#[cfg(test)] +mod tests { + use futures_executor::block_on; + use futures_util::io::Cursor; + + use crate::{fs::FileId, scope::Scope, serdes::Encode, version::edit::VersionEdit}; + + #[test] + fn encode_and_decode() { + block_on(async { + let edits = vec![ + VersionEdit::Add { + level: 0, + scope: Scope { + min: "Min".to_string(), + max: "Max".to_string(), + gen: Default::default(), + wal_ids: Some(vec![FileId::new(), FileId::new()]), + }, + }, + VersionEdit::Remove { + level: 1, + gen: Default::default(), + }, + ]; + + let bytes = { + let mut cursor = Cursor::new(vec![]); + + for edit in edits.clone() { + edit.encode(&mut cursor).await.unwrap(); + } + cursor.into_inner() + }; + + let decode_edits = { + let mut cursor = Cursor::new(bytes); + + VersionEdit::::recover(&mut cursor).await + }; + + assert_eq!(edits, decode_edits); + }) + } +} diff --git a/src/version/mod.rs b/src/version/mod.rs new file mode 100644 index 0000000..6dd364b --- /dev/null +++ b/src/version/mod.rs @@ -0,0 +1,196 @@ +mod cleaner; +mod edit; +mod set; + +use std::{marker::PhantomData, ops::Bound, sync::Arc}; + +use futures_channel::mpsc::{SendError, Sender}; +use futures_executor::block_on; +use futures_util::SinkExt; +use thiserror::Error; +use tracing::error; + +use crate::{ + executor::Executor, + fs::FileId, + ondisk::sstable::SsTable, + oracle::{timestamp::TimestampedRef, Timestamp}, + record::Record, + scope::Scope, + serdes::Encode, + stream::{record_batch::RecordBatchEntry, ScanStream}, + version::cleaner::CleanTag, + DbOption, +}; + +pub const MAX_LEVEL: usize = 7; + +pub(crate) type VersionRef = Arc>; + +pub(crate) struct Version +where + R: Record, + E: Executor, +{ + pub(crate) num: usize, + pub(crate) level_slice: [Vec>; MAX_LEVEL], + pub(crate) clean_sender: Sender, + pub(crate) option: Arc, + _p: PhantomData, +} + +impl Clone for Version +where + R: Record, + E: Executor, +{ + fn clone(&self) -> Self { + let mut level_slice = Version::::level_slice_new(); + + for (level, scopes) in self.level_slice.iter().enumerate() { + for scope in scopes { + level_slice[level].push(scope.clone()); + } + } + + Self { + num: self.num, + level_slice, + clean_sender: self.clean_sender.clone(), + option: self.option.clone(), + _p: Default::default(), + } + } +} + +impl Version +where + R: Record, + E: Executor, +{ + pub(crate) async fn query( + &self, + key: &TimestampedRef, + ) -> Result>, VersionError> { + for scope in self.level_slice[0].iter().rev() { + if !scope.is_between(key.value()) { + continue; + } + if let Some(entry) = Self::table_query(self, key, &scope.gen).await? { + return Ok(Some(entry)); + } + } + for level in self.level_slice[1..6].iter() { + if level.is_empty() { + continue; + } + let index = Self::scope_search(key.value(), level); + if !level[index].is_between(key.value()) { + continue; + } + if let Some(entry) = Self::table_query(self, key, &level[index].gen).await? { + return Ok(Some(entry)); + } + } + + Ok(None) + } + + async fn table_query( + &self, + key: &TimestampedRef<::Key>, + gen: &FileId, + ) -> Result>, VersionError> { + let file = E::open(self.option.table_path(gen)) + .await + .map_err(VersionError::Io)?; + let table = SsTable::::open(file); + table.get(key).await.map_err(VersionError::Parquet) + } + + pub(crate) fn scope_search(key: &R::Key, level: &[Scope]) -> usize { + level + .binary_search_by(|scope| scope.min.cmp(key)) + .unwrap_or_else(|index| index.saturating_sub(1)) + } + + pub(crate) fn tables_len(&self, level: usize) -> usize { + self.level_slice[level].len() + } + + pub(crate) fn level_slice_new() -> [Vec>; 7] { + [ + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + ] + } + + pub(crate) async fn iters<'a>( + &self, + iters: &mut Vec>, + range: (Bound<&'a R::Key>, Bound<&'a R::Key>), + ts: Timestamp, + ) -> Result<(), VersionError> { + for scope in self.level_slice[0].iter() { + let file = E::open(self.option.table_path(&scope.gen)) + .await + .map_err(VersionError::Io)?; + let table = SsTable::::open(file); + + iters.push(ScanStream::SsTable { + inner: table.scan(range, ts).await.map_err(VersionError::Parquet)?, + }) + } + for scopes in self.level_slice[1..].iter() { + if scopes.is_empty() { + continue; + } + let _gens = scopes.iter().map(|scope| scope.gen).collect::>(); + todo!("level stream") + // iters.push(EStreamImpl::Level( + // LevelStream::new(option, gens, lower, upper).await?, + // )); + } + Ok(()) + } +} + +impl Drop for Version +where + R: Record, + E: Executor, +{ + fn drop(&mut self) { + block_on(async { + if let Err(err) = self + .clean_sender + .send(CleanTag::Clean { + version_num: self.num, + }) + .await + { + error!("[Version Drop Error]: {}", err) + } + }); + } +} + +#[derive(Debug, Error)] +pub enum VersionError +where + R: Record, +{ + #[error("version encode error: {0}")] + Encode(#[source] ::Error), + #[error("version io error: {0}")] + Io(#[source] std::io::Error), + #[error("version parquet error: {0}")] + Parquet(#[source] parquet::errors::ParquetError), + #[error("version send error: {0}")] + Send(#[source] SendError), +} diff --git a/src/version/set.rs b/src/version/set.rs new file mode 100644 index 0000000..96fdcc6 --- /dev/null +++ b/src/version/set.rs @@ -0,0 +1,139 @@ +use std::{io::SeekFrom, sync::Arc}; + +use async_lock::RwLock; +use futures_channel::mpsc::Sender; +use futures_util::{AsyncSeekExt, AsyncWriteExt, SinkExt}; + +use crate::{ + executor::Executor, + fs::FileId, + record::Record, + serdes::Encode, + version::{cleaner::CleanTag, edit::VersionEdit, Version, VersionError, VersionRef}, + DbOption, +}; + +pub(crate) struct VersionSetInner +where + R: Record, + E: Executor, +{ + current: VersionRef, + log: E::File, +} + +pub(crate) struct VersionSet +where + R: Record, + E: Executor, +{ + inner: Arc>>, + clean_sender: Sender, + option: Arc, +} + +impl Clone for VersionSet +where + R: Record, + E: Executor, +{ + fn clone(&self) -> Self { + VersionSet { + inner: self.inner.clone(), + clean_sender: self.clean_sender.clone(), + option: self.option.clone(), + } + } +} + +impl VersionSet +where + R: Record, + E: Executor, +{ + pub(crate) async fn new( + clean_sender: Sender, + option: Arc, + ) -> Result> { + let mut log = E::open(option.version_path()) + .await + .map_err(VersionError::Io)?; + let edits = VersionEdit::recover(&mut log).await; + log.seek(SeekFrom::End(0)).await.map_err(VersionError::Io)?; + + let set = VersionSet:: { + inner: Arc::new(RwLock::new(VersionSetInner { + current: Arc::new(Version:: { + num: 0, + level_slice: Version::::level_slice_new(), + clean_sender: clean_sender.clone(), + option: option.clone(), + _p: Default::default(), + }), + log, + })), + clean_sender, + option, + }; + set.apply_edits(edits, None, true).await?; + + Ok(set) + } + + pub(crate) async fn current(&self) -> VersionRef { + self.inner.read().await.current.clone() + } + + pub(crate) async fn apply_edits( + &self, + version_edits: Vec>, + delete_gens: Option>, + is_recover: bool, + ) -> Result<(), VersionError> { + let mut guard = self.inner.write().await; + + let mut new_version = Version::::clone(&guard.current); + + for version_edit in version_edits { + if !is_recover { + version_edit + .encode(&mut guard.log) + .await + .map_err(VersionError::Encode)?; + } + match version_edit { + VersionEdit::Add { mut scope, level } => { + if let Some(wal_ids) = scope.wal_ids.take() { + for wal_id in wal_ids { + E::remove(self.option.wal_path(&wal_id)) + .await + .map_err(VersionError::Io)?; + } + } + new_version.level_slice[level as usize].push(scope); + } + VersionEdit::Remove { gen, level } => { + if let Some(i) = new_version.level_slice[level as usize] + .iter() + .position(|scope| scope.gen == gen) + { + new_version.level_slice[level as usize].remove(i); + } + } + } + } + if let Some(delete_gens) = delete_gens { + new_version + .clean_sender + .send(CleanTag::Add { + version_num: new_version.num, + gens: delete_gens, + }) + .await + .map_err(VersionError::Send)?; + } + guard.log.flush().await.map_err(VersionError::Io)?; + guard.current = Arc::new(new_version); + Ok(()) + } +}