Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/version #3

Closed
wants to merge 9 commits into from
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ arrow = "52"
async-lock = "3"
crossbeam-skiplist = "0.1"
futures-core = "0.3"
futures-channel = "0.3"
futures-executor = "0.3"
futures-io = "0.3"
futures-util = "0.3"
once_cell = "1"
Expand All @@ -20,6 +22,8 @@ pin-project-lite = "0.2"
thiserror = "1"
tokio = { version = "1", optional = true }
tokio-util = { version = "0.7", features = ["compat"] }
tracing = "0.1"
ulid = "1"

[dev-dependencies]
tempfile = "3"
Expand Down
16 changes: 11 additions & 5 deletions src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::future::Future;

pub trait Executor {
use crate::fs::Fs;

pub trait Executor: Fs {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static;
Expand All @@ -10,15 +12,19 @@ pub trait Executor {
pub mod tokio {
use std::future::Future;

use tokio::runtime::Handle;

use super::Executor;

pub struct TokioExecutor {
tokio: tokio::runtime::Runtime,
handle: Handle,
}

impl TokioExecutor {
pub fn new(tokio: tokio::runtime::Runtime) -> Self {
Self { tokio }
pub fn new() -> Self {
Self {
handle: Handle::current(),
}
}
}

Expand All @@ -27,7 +33,7 @@ pub mod tokio {
where
F: Future<Output = ()> + Send + 'static,
{
self.tokio.spawn(future);
self.handle.spawn(future);
}
}
}
34 changes: 30 additions & 4 deletions src/fs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,42 @@
#[cfg(any(feature = "tokio", test))]
pub mod tokio;

use std::{future::Future, io, path::Path};
use std::{
fmt::{Display, Formatter},
future::Future,
io,
path::Path,
};

use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
use ulid::Ulid;

pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static {}
pub(crate) type FileId = Ulid;

impl<T> AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin + 'static {}
pub enum FileType {
WAL,
PARQUET,
LOG,
}

pub trait AsyncFile: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {}

impl<T> AsyncFile for T where T: AsyncRead + AsyncWrite + AsyncSeek + Send + Sync + Unpin + 'static {}

pub trait Fs {
type File: AsyncFile;

fn open(&self, path: impl AsRef<Path>) -> impl Future<Output = io::Result<Self::File>>;
fn open(path: impl AsRef<Path>) -> impl Future<Output = io::Result<Self::File>>;

fn remove(path: impl AsRef<Path>) -> impl Future<Output = io::Result<()>>;
}

impl Display for FileType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
FileType::WAL => write!(f, "wal"),
FileType::PARQUET => write!(f, "parquet"),
FileType::LOG => write!(f, "log"),
}
}
}
16 changes: 10 additions & 6 deletions src/fs/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
use std::{io, path::Path};

use tokio::fs::{remove_file, File};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};

use super::Fs;
use crate::executor::tokio::TokioExecutor;

pub struct TokioFs;
impl Fs for TokioExecutor {
type File = Compat<File>;

impl Fs for TokioFs {
type File = Compat<tokio::fs::File>;

async fn open(&self, path: impl AsRef<Path>) -> io::Result<Self::File> {
tokio::fs::File::create_new(path)
async fn open(path: impl AsRef<Path>) -> io::Result<Self::File> {
File::create_new(path)
.await
.map(TokioAsyncReadCompatExt::compat)
}

async fn remove(path: impl AsRef<Path>) -> io::Result<()> {
remove_file(path).await
}
}
8 changes: 4 additions & 4 deletions src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use crate::{
pub trait ArrowArrays: Sized {
type Record: Record;

type Buider: Builder<Self>;
type Builder: Builder<Self>;

fn builder(capacity: usize) -> Self::Buider;
fn builder(capacity: usize) -> Self::Builder;

fn get(&self, offset: u32) -> Option<Option<<Self::Record as Record>::Ref<'_>>>;

Expand Down Expand Up @@ -167,9 +167,9 @@ pub(crate) mod tests {
impl ArrowArrays for TestImmutableArrays {
type Record = Test;

type Buider = TestBuilder;
type Builder = TestBuilder;

fn builder(capacity: usize) -> Self::Buider {
fn builder(capacity: usize) -> Self::Builder {
TestBuilder {
vstring: StringBuilder::with_capacity(capacity, 0),
vu32: PrimitiveBuilder::<UInt32Type>::with_capacity(capacity),
Expand Down
110 changes: 95 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
#![allow(dead_code)]
pub(crate) mod arrows;
mod executor;
pub mod executor;
pub mod fs;
mod inmem;
mod ondisk;
mod oracle;
mod record;
mod scope;
pub mod serdes;
mod stream;
mod transaction;
mod version;

use std::{collections::VecDeque, io, mem, ops::Bound, sync::Arc};
use std::{
collections::VecDeque, io, marker::PhantomData, mem, ops::Bound, path::PathBuf, sync::Arc,
};

use async_lock::{RwLock, RwLockReadGuard};
use futures_core::Stream;
Expand All @@ -18,31 +23,99 @@ use inmem::{immutable::Immutable, mutable::Mutable};
use oracle::Timestamp;
use parquet::errors::ParquetError;
use record::Record;
use stream::{merge::MergeStream, Entry, ScanStream};

pub struct DB<R>
use crate::{
executor::Executor,
fs::{FileId, FileType},
stream::{merge::MergeStream, Entry, ScanStream},
version::Version,
};

#[derive(Debug)]
pub struct DbOption {
pub path: PathBuf,
pub max_mem_table_size: usize,
pub immutable_chunk_num: usize,
pub major_threshold_with_sst_size: usize,
pub level_sst_magnification: usize,
pub max_sst_file_size: usize,
pub clean_channel_buffer: usize,
}

pub struct DB<R, E>
where
R: Record,
E: Executor,
{
schema: Arc<RwLock<Schema<R>>>,
_p: PhantomData<E>,
}

impl<R> Default for DB<R>
impl DbOption {
pub fn new(path: impl Into<PathBuf> + Send) -> Self {
DbOption {
path: path.into(),
max_mem_table_size: 8 * 1024 * 1024,
immutable_chunk_num: 3,
major_threshold_with_sst_size: 10,
level_sst_magnification: 10,
max_sst_file_size: 24 * 1024 * 1024,
clean_channel_buffer: 10,
}
}

pub(crate) fn table_path(&self, gen: &FileId) -> PathBuf {
self.path.join(format!("{}.{}", gen, FileType::PARQUET))
}

pub(crate) fn wal_path(&self, gen: &FileId) -> PathBuf {
self.path.join(format!("{}.{}", gen, FileType::WAL))
}

pub(crate) fn version_path(&self) -> PathBuf {
self.path.join(format!("version.{}", FileType::LOG))
}

pub(crate) fn is_threshold_exceeded_major<R, E>(
&self,
version: &Version<R, E>,
level: usize,
) -> bool
where
R: Record,
E: Executor,
{
Version::<R, E>::tables_len(version, level)
>= (self.major_threshold_with_sst_size * self.level_sst_magnification.pow(level as u32))
}
}

impl<R, E> Default for DB<R, E>
where
R: Record,
E: Executor,
{
fn default() -> Self {
Self {
schema: Arc::new(RwLock::new(Schema::default())),
_p: Default::default(),
}
}
}

impl<R> DB<R>
impl<R, E> DB<R, E>
where
R: Record + Send + Sync,
R::Key: Send,
R::Key: Send + Sync,
E: Executor,
{
pub fn empty() -> Self {
Self {
schema: Arc::new(RwLock::new(Schema::default())),
_p: Default::default(),
}
}

pub(crate) async fn write(&self, record: R, ts: Timestamp) -> io::Result<()> {
let columns = self.schema.read().await;
columns.write(record, ts).await
Expand Down Expand Up @@ -95,25 +168,31 @@ where
Ok(())
}

async fn get<'get>(
async fn get<'get, E>(
&'get self,
key: &'get R::Key,
ts: Timestamp,
) -> Result<Option<Entry<'get, R>>, ParquetError> {
self.scan(Bound::Included(key), Bound::Unbounded, ts)
) -> Result<Option<Entry<'get, R>>, ParquetError>
where
E: Executor,
{
self.scan::<E>(Bound::Included(key), Bound::Unbounded, ts)
.await?
.next()
.await
.transpose()
}

async fn scan<'scan>(
async fn scan<'scan, E>(
&'scan self,
lower: Bound<&'scan R::Key>,
uppwer: Bound<&'scan R::Key>,
ts: Timestamp,
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, ParquetError> {
let mut streams = Vec::<ScanStream<R>>::with_capacity(self.immutables.len() + 1);
) -> Result<impl Stream<Item = Result<Entry<'scan, R>, ParquetError>>, ParquetError>
where
E: Executor,
{
let mut streams = Vec::<ScanStream<R, E>>::with_capacity(self.immutables.len() + 1);
streams.push(self.mutable.scan((lower, uppwer), ts).into());
for immutable in &self.immutables {
streams.push(immutable.scan((lower, uppwer), ts).into());
Expand Down Expand Up @@ -141,6 +220,7 @@ pub(crate) mod tests {
use once_cell::sync::Lazy;

use crate::{
executor::Executor,
inmem::immutable::tests::TestImmutableArrays,
record::{internal::InternalRecordRef, RecordRef},
Record, DB,
Expand Down Expand Up @@ -227,8 +307,8 @@ pub(crate) mod tests {
}
}

pub(crate) async fn get_test_record_batch() -> RecordBatch {
let db = DB::default();
pub(crate) async fn get_test_record_batch<E: Executor>() -> RecordBatch {
let db: DB<Test, E> = DB::empty();

db.write(
Test {
Expand Down
18 changes: 11 additions & 7 deletions src/ondisk/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,36 @@ use pin_project_lite::pin_project;
use tokio_util::compat::Compat;

use crate::{
fs::AsyncFile,
executor::Executor,
record::Record,
stream::record_batch::{RecordBatchEntry, RecordBatchIterator},
};

pin_project! {
#[derive(Debug)]
pub struct SsTableScan<R> {
pub struct SsTableScan<R, E>
where
E: Executor
{
#[pin]
stream: ParquetRecordBatchStream<Compat<Box<dyn AsyncFile>>>,
stream: ParquetRecordBatchStream<Compat<E::File>>,
iter: Option<RecordBatchIterator<R>>,
}
}

impl<R> SsTableScan<R>
impl<R, E> SsTableScan<R, E>
where
R: Record,
E: Executor,
{
pub fn new(stream: ParquetRecordBatchStream<Compat<Box<dyn AsyncFile>>>) -> Self {
pub fn new(stream: ParquetRecordBatchStream<Compat<E::File>>) -> Self {
SsTableScan { stream, iter: None }
}
}

impl<R> Stream for SsTableScan<R>
impl<R, E> Stream for SsTableScan<R, E>
where
R: Record,
E: Executor,
{
type Item = Result<RecordBatchEntry<R>, parquet::errors::ParquetError>;

Expand Down
Loading
Loading