Skip to content

Commit

Permalink
chore: replace channel to flume
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Jul 17, 2024
1 parent 4d66fbe commit a211a1e
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 60 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ tokio = ["dep:tokio"]
arrow = "52"
async-lock = "3"
crossbeam-skiplist = "0.1"
futures-channel = "0.3"
flume = { version = "0.11.0", features = ["async"] }
futures-core = "0.3"
futures-executor = "0.3"
futures-io = "0.3"
futures-util = "0.3"
once_cell = "1"
Expand Down
14 changes: 6 additions & 8 deletions src/version/cleaner.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{collections::BTreeMap, fs, io, sync::Arc};

use futures_channel::mpsc::{channel, Receiver, Sender};
use futures_util::StreamExt;
use flume::{Receiver, Sender};

use crate::{fs::FileId, DbOption};

Expand All @@ -23,7 +22,7 @@ pub(crate) struct Cleaner {

impl Cleaner {
pub(crate) fn new(option: Arc<DbOption>) -> (Self, Sender<CleanTag>) {
let (tag_send, tag_recv) = channel(option.clean_channel_buffer);
let (tag_send, tag_recv) = flume::bounded(option.clean_channel_buffer);

(
Cleaner {
Expand All @@ -36,13 +35,12 @@ impl Cleaner {
}

pub(crate) async fn listen(&mut self) -> Result<(), io::Error> {
loop {
match self.tag_recv.next().await {
None => break,
Some(CleanTag::Add { version_num, gens }) => {
while let Ok(tag) = self.tag_recv.recv_async().await {
match tag {
CleanTag::Add { version_num, gens } => {
let _ = self.gens_map.insert(version_num, (gens, false));
}
Some(CleanTag::Clean { version_num }) => {
CleanTag::Clean { version_num } => {
if let Some((_, dropped)) = self.gens_map.get_mut(&version_num) {
*dropped = true;
}
Expand Down
61 changes: 29 additions & 32 deletions src/version/edit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,46 +107,43 @@ where

#[cfg(test)]
mod tests {
use futures_executor::block_on;
use futures_util::io::Cursor;

use crate::{fs::FileId, scope::Scope, serdes::Encode, version::edit::VersionEdit};

#[test]
fn encode_and_decode() {
block_on(async {
let edits = vec![
VersionEdit::Add {
level: 0,
scope: Scope {
min: "Min".to_string(),
max: "Max".to_string(),
gen: Default::default(),
wal_ids: Some(vec![FileId::new(), FileId::new()]),
},
},
VersionEdit::Remove {
level: 1,
#[tokio::test]
async fn encode_and_decode() {
let edits = vec![
VersionEdit::Add {
level: 0,
scope: Scope {
min: "Min".to_string(),
max: "Max".to_string(),
gen: Default::default(),
wal_ids: Some(vec![FileId::new(), FileId::new()]),
},
];

let bytes = {
let mut cursor = Cursor::new(vec![]);

for edit in edits.clone() {
edit.encode(&mut cursor).await.unwrap();
}
cursor.into_inner()
};
},
VersionEdit::Remove {
level: 1,
gen: Default::default(),
},
];

let bytes = {
let mut cursor = Cursor::new(vec![]);

for edit in edits.clone() {
edit.encode(&mut cursor).await.unwrap();
}
cursor.into_inner()
};

let decode_edits = {
let mut cursor = Cursor::new(bytes);
let decode_edits = {
let mut cursor = Cursor::new(bytes);

VersionEdit::<String>::recover(&mut cursor).await
};
VersionEdit::<String>::recover(&mut cursor).await
};

assert_eq!(edits, decode_edits);
})
assert_eq!(edits, decode_edits);
}
}
22 changes: 7 additions & 15 deletions src/version/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ mod set;

use std::{marker::PhantomData, ops::Bound, sync::Arc};

use futures_channel::mpsc::{SendError, Sender};
use futures_executor::block_on;
use futures_util::SinkExt;
use flume::{SendError, Sender};
use thiserror::Error;
use tracing::error;

Expand Down Expand Up @@ -168,17 +166,11 @@ where
E: Executor,
{
fn drop(&mut self) {
block_on(async {
if let Err(err) = self
.clean_sender
.send(CleanTag::Clean {
version_num: self.num,
})
.await
{
error!("[Version Drop Error]: {}", err)
}
});
if let Err(err) = self.clean_sender.send(CleanTag::Clean {
version_num: self.num,
}) {
error!("[Version Drop Error]: {}", err)
}
}
}

Expand All @@ -194,5 +186,5 @@ where
#[error("version parquet error: {0}")]
Parquet(#[source] parquet::errors::ParquetError),
#[error("version send error: {0}")]
Send(#[source] SendError),
Send(#[source] SendError<CleanTag>),
}
6 changes: 3 additions & 3 deletions src/version/set.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{io::SeekFrom, sync::Arc};

use async_lock::RwLock;
use futures_channel::mpsc::Sender;
use futures_util::{AsyncSeekExt, AsyncWriteExt, SinkExt};
use flume::Sender;
use futures_util::{AsyncSeekExt, AsyncWriteExt};

use crate::{
executor::Executor,
Expand Down Expand Up @@ -125,7 +125,7 @@ where
if let Some(delete_gens) = delete_gens {
new_version
.clean_sender
.send(CleanTag::Add {
.send_async(CleanTag::Add {
version_num: new_version.num,
gens: delete_gens,
})
Expand Down

0 comments on commit a211a1e

Please sign in to comment.