Skip to content

Commit

Permalink
Merge pull request #1 from bittcrafter/dev/0.6.0
Browse files Browse the repository at this point in the history
Implement Redis Cluster Mode
  • Loading branch information
bittcrafter authored Oct 28, 2024
2 parents cf7cb5d + 08349d4 commit 36c0bd5
Show file tree
Hide file tree
Showing 6 changed files with 2,227 additions and 70 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rmqtt-storage"
version = "0.5.1"
version = "0.6.0"
authors = ["rmqtt <rmqttd@126.com>"]
edition = "2021"
license = "MIT OR Apache-2.0"
Expand All @@ -23,7 +23,7 @@ len = []
[dependencies]
sled = "0.34"
tokio = { version = "1", features = ["sync", "rt"] }
redis = { version = "0.24", features = [ "tokio-comp", "connection-manager" ] }
redis = { version = "0.27", features = [ "tokio-comp", "connection-manager", "cluster", "cluster-async" ] }

futures = "0.3"
serde = { version = "1.0", features = ["derive"] }
Expand All @@ -33,7 +33,7 @@ async-trait = "0.1"
bincode = "1.3"
log = "0.4"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
dashmap = "5.5"
dashmap = "6.1"
ahash = "0.8"
convert = { package = "box-convert", version = "0.1", features = ["bytesize"] }

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Add this to your `Cargo.toml`:

```toml
[dependencies]
rmqtt-storage = "0.5"
rmqtt-storage = "0.6"
```

## Features
Expand All @@ -25,3 +25,4 @@ rmqtt-storage = "0.5"
- Supports key expiration.
- Provides an implementation for 'sled'.
- Provides an implementation for 'redis'.
- Provides an implementation for 'redis cluster'. Note: the 'len' feature is not supported yet.
158 changes: 152 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ extern crate serde;
use anyhow::Error;
use serde::de;

use storage_redis::RedisStorageDB;

mod storage;
mod storage_redis;
mod storage_redis_cluster;
mod storage_sled;

pub use storage::{DefaultStorageDB, List, Map, StorageDB, StorageList, StorageMap};
pub use storage_redis::RedisConfig;
pub use storage_redis::{RedisConfig, RedisStorageDB};
pub use storage_redis_cluster::{
RedisConfig as RedisClusterConfig, RedisStorageDB as RedisClusterStorageDB,
};
pub use storage_sled::{SledConfig, SledStorageDB};

pub type Result<T> = anyhow::Result<T>;
Expand All @@ -26,6 +28,10 @@ pub async fn init_db(cfg: &Config) -> Result<DefaultStorageDB> {
let db = RedisStorageDB::new(cfg.redis.clone()).await?;
Ok(DefaultStorageDB::Redis(db))
}
StorageType::RedisCluster => {
let db = RedisClusterStorageDB::new(cfg.redis_cluster.clone()).await?;
Ok(DefaultStorageDB::RedisCluster(db))
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
Expand All @@ -37,6 +43,8 @@ pub struct Config {
pub sled: SledConfig,
#[serde(default)]
pub redis: RedisConfig,
#[serde(default, rename = "redis-cluster")]
pub redis_cluster: RedisClusterConfig,
}

impl Default for Config {
Expand All @@ -45,6 +53,7 @@ impl Default for Config {
typ: Config::storage_type_default(),
sled: SledConfig::default(),
redis: RedisConfig::default(),
redis_cluster: RedisClusterConfig::default(),
}
}
}
Expand All @@ -61,6 +70,8 @@ pub enum StorageType {
Sled,
//redis:
Redis,
//redis cluster:
RedisCluster,
}

impl<'de> de::Deserialize<'de> for StorageType {
Expand All @@ -75,6 +86,7 @@ impl<'de> de::Deserialize<'de> for StorageType {
{
"sled" => StorageType::Sled,
"redis" => StorageType::Redis,
"redis-cluster" => StorageType::RedisCluster,
_ => StorageType::Sled,
};
Ok(t)
Expand Down Expand Up @@ -107,7 +119,7 @@ mod tests {

fn get_cfg(name: &str) -> Config {
let cfg = Config {
typ: StorageType::Sled,
typ: StorageType::RedisCluster,
sled: SledConfig {
path: format!("./.catch/{}", name),
cleanup_f: |_db| {},
Expand All @@ -117,6 +129,15 @@ mod tests {
url: "redis://127.0.0.1:6379/".into(),
prefix: name.to_owned(),
},
redis_cluster: RedisClusterConfig {
urls: [
"redis://127.0.0.1:6380/".into(),
"redis://127.0.0.1:6381/".into(),
"redis://127.0.0.1:6382/".into(),
]
.into(),
prefix: name.to_owned(),
},
};
cfg
}
Expand Down Expand Up @@ -183,6 +204,15 @@ mod tests {
url: "redis://127.0.0.1:6379/".into(),
prefix: "sled_cleanup".to_owned(),
},
redis_cluster: RedisClusterConfig {
urls: [
"redis://127.0.0.1:6380/".into(),
"redis://127.0.0.1:6381/".into(),
"redis://127.0.0.1:6382/".into(),
]
.into(),
prefix: "sled_cleanup".to_owned(),
},
};

let db = SledStorageDB::new(cfg.sled.clone()).await.unwrap();
Expand Down Expand Up @@ -485,6 +515,31 @@ mod tests {
assert_eq!(list_002.is_empty().await.unwrap(), true);
}

#[tokio::main]
#[test]
async fn test_db_contains_key2() {
let cfg = get_cfg("db_contains_key2");
let db = init_db(&cfg).await.unwrap();
let max = 10;
for i in 0..max {
db.insert(format!("key_{}", i), &1).await.unwrap();
}

for i in 0..max {
let c_res = db.contains_key(format!("key_{}", i)).await.unwrap();
assert!(c_res);
}

for i in 0..max {
db.remove(format!("key_{}", i)).await.unwrap();
}

for i in 0..max {
let c_res = db.contains_key(format!("key_{}", i)).await.unwrap();
assert!(!c_res);
}
}

#[cfg(feature = "ttl")]
#[tokio::main]
#[test]
Expand Down Expand Up @@ -896,6 +951,45 @@ mod tests {
assert_eq!(kv001.is_empty().await.unwrap(), true);
}

#[tokio::main]
#[test]
async fn test_map_iter2() {
let cfg = get_cfg("map_iter2");
let mut db = init_db(&cfg).await.unwrap();

let mut map_iter = db.map_iter().await.unwrap();
while let Some(map) = map_iter.next().await {
let map = map.unwrap();
map.clear().await.unwrap();
}

drop(map_iter);

let max = 10;

for i in 0..max {
let map1 = db.map(format!("map-{}", i), None).await.unwrap();
map1.insert(format!("map-{}-data", i), &i).await.unwrap();
}

let mut map_iter = db.map_iter().await.unwrap();

// let aa = collect(map_iter).await;
let mut count = 0;
while let Some(map) = map_iter.next().await {
let mut map = map.unwrap();
let mut iter = map.iter::<i32>().await.unwrap();
while let Some(item) = iter.next().await {
let (key, val) = item.unwrap();
println!("key: {:?}, val: {:?}", String::from_utf8_lossy(&key), val);
}
count += 1;
}

println!("max: {:?}, count: {:?}", max, count);
assert_eq!(max, count);
}

// #[tokio::main]
// #[test]
// async fn test_map_retain() {
Expand Down Expand Up @@ -1228,6 +1322,44 @@ mod tests {
}
}

#[tokio::main]
#[test]
async fn test_list_iter2() {
let cfg = get_cfg("list_iter2");
let mut db = init_db(&cfg).await.unwrap();

let mut list_iter = db.list_iter().await.unwrap();
while let Some(list) = list_iter.next().await {
let list = list.unwrap();
list.clear().await.unwrap();
}

drop(list_iter);

let max = 10;

for i in 0..max {
let list1 = db.list(format!("list-{}", i), None).await.unwrap();
list1.push(&i).await.unwrap();
}

let mut list_iter = db.list_iter().await.unwrap();

let mut count = 0;
while let Some(list) = list_iter.next().await {
let mut list = list.unwrap();
let mut iter = list.iter::<i32>().await.unwrap();
while let Some(item) = iter.next().await {
let val = item.unwrap();
println!("val: {:?}", val);
}
count += 1;
}

println!("max: {:?}, count: {:?}", max, count);
assert_eq!(max, count);
}

#[tokio::main]
#[test]
async fn test_map_iter() {
Expand Down Expand Up @@ -1579,6 +1711,15 @@ mod tests {
url: "redis://127.0.0.1:6379/".into(),
prefix: "map_expire_list".to_owned(),
},
redis_cluster: RedisClusterConfig {
urls: [
"redis://127.0.0.1:6380/".into(),
"redis://127.0.0.1:6381/".into(),
"redis://127.0.0.1:6382/".into(),
]
.into(),
prefix: "map_expire_list".to_owned(),
},
};

let mut db = SledStorageDB::new(cfg.sled.clone()).await.unwrap();
Expand Down Expand Up @@ -1620,7 +1761,7 @@ mod tests {
for item in collect(iter).await {
db.remove(item).await.unwrap();
}

println!("test_db_size db_size: {:?}", db.db_size().await);
db.insert("k1", &1).await.unwrap();
db.insert("k2", &2).await.unwrap();
db.insert("k3", &3).await.unwrap();
Expand Down Expand Up @@ -1655,6 +1796,7 @@ mod tests {
let mut db = init_db(&cfg).await.unwrap();
let iter = db.scan("*").await.unwrap();
for item in collect(iter).await {
println!("removed item: {:?}", String::from_utf8_lossy(&item));
db.remove(item).await.unwrap();
}
println!("test_scan db_size: {:?}", db.db_size().await);
Expand Down Expand Up @@ -1692,7 +1834,11 @@ mod tests {
let topic = format_topic("foo/abcd/#");
println!("topic: {}", topic);
let iter = db.scan(topic.as_bytes()).await.unwrap();
assert_eq!(collect(iter).await.len(), 6);
let items = collect(iter).await;
for item in items.iter() {
println!("item: {:?}", String::from_utf8_lossy(&item));
}
assert_eq!(items.len(), 6);

//"foo/abcd/\\**"
let topic = format_topic("foo/abcd/*/#");
Expand Down
Loading

0 comments on commit 36c0bd5

Please sign in to comment.