Skip to content

Commit

Permalink
refactor: manifest wal config (#437)
Browse files Browse the repository at this point in the history
* refactor manifest wal config.

* address CR.
  • Loading branch information
Rachelint authored Dec 1, 2022
1 parent 15a9908 commit 9250cef
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 21 deletions.
106 changes: 94 additions & 12 deletions analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ mod wal_synchronizer;
#[cfg(any(test, feature = "test"))]
pub mod tests;

use common_util::config::ReadableSize;
use common_util::config::{ReadableDuration, ReadableSize};
use message_queue::kafka::config::Config as KafkaConfig;
use meta::details::Options as ManifestOptions;
use serde::Serialize;
use serde_derive::Deserialize;
use storage_options::{LocalOptions, ObjectStoreOptions, StorageOptions};
use table_kv::config::ObkvConfig;
Expand Down Expand Up @@ -119,27 +120,108 @@ impl Default for Config {
}

/// Config of wal based on obkv
#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Default, Clone, Deserialize)]
#[serde(default)]
pub struct ObkvWalConfig {
/// Obkv client config
pub obkv: ObkvConfig,
/// Wal (stores data) namespace config
pub wal: NamespaceConfig,
pub wal: WalNamespaceConfig,
/// Manifest (stores meta data) namespace config
pub manifest: NamespaceConfig,
pub manifest: ManifestNamespaceConfig,
}

/// Config of obkv wal based manifest
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct ManifestNamespaceConfig {
/// Decide how many wal data shards will be created
///
/// NOTICE: it can just be set once, the later setting makes no effect.
pub shard_num: usize,

/// Decide how many wal meta shards will be created
///
/// NOTICE: it can just be set once, the later setting makes no effect.
pub meta_shard_num: usize,

pub init_scan_timeout: ReadableDuration,
pub init_scan_batch_size: i32,
pub clean_scan_timeout: ReadableDuration,
pub clean_scan_batch_size: usize,
}

impl Default for ObkvWalConfig {
impl Default for ManifestNamespaceConfig {
fn default() -> Self {
let namespace_config = NamespaceConfig::default();

Self {
obkv: ObkvConfig::default(),
wal: NamespaceConfig::default(),
manifest: NamespaceConfig {
// Manifest has no ttl.
ttl: None,
..Default::default()
},
shard_num: namespace_config.wal_shard_num,
meta_shard_num: namespace_config.table_unit_meta_shard_num,
init_scan_timeout: namespace_config.init_scan_timeout,
init_scan_batch_size: namespace_config.init_scan_batch_size,
clean_scan_timeout: namespace_config.clean_scan_timeout,
clean_scan_batch_size: namespace_config.clean_scan_batch_size,
}
}
}

impl From<ManifestNamespaceConfig> for NamespaceConfig {
fn from(manifest_config: ManifestNamespaceConfig) -> Self {
NamespaceConfig {
wal_shard_num: manifest_config.shard_num,
table_unit_meta_shard_num: manifest_config.meta_shard_num,
ttl: None,
init_scan_timeout: manifest_config.init_scan_timeout,
init_scan_batch_size: manifest_config.init_scan_batch_size,
clean_scan_timeout: manifest_config.clean_scan_timeout,
clean_scan_batch_size: manifest_config.clean_scan_batch_size,
}
}
}

/// Config of obkv wal based wal module
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct WalNamespaceConfig {
/// Decide how many wal data shards will be created
///
/// NOTICE: it can just be set once, the later setting makes no effect.
pub shard_num: usize,

/// Decide how many wal meta shards will be created
///
/// NOTICE: it can just be set once, the later setting makes no effect.
pub meta_shard_num: usize,

pub ttl: ReadableDuration,
pub init_scan_timeout: ReadableDuration,
pub init_scan_batch_size: i32,
}

impl Default for WalNamespaceConfig {
fn default() -> Self {
let namespace_config = NamespaceConfig::default();

Self {
shard_num: namespace_config.wal_shard_num,
meta_shard_num: namespace_config.table_unit_meta_shard_num,
ttl: namespace_config.ttl.unwrap(),
init_scan_timeout: namespace_config.init_scan_timeout,
init_scan_batch_size: namespace_config.init_scan_batch_size,
}
}
}

impl From<WalNamespaceConfig> for NamespaceConfig {
fn from(wal_config: WalNamespaceConfig) -> Self {
Self {
wal_shard_num: wal_config.shard_num,
table_unit_meta_shard_num: wal_config.meta_shard_num,
ttl: Some(wal_config.ttl),
init_scan_timeout: wal_config.init_scan_timeout,
init_scan_batch_size: wal_config.init_scan_batch_size,
..Default::default()
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ async fn open_wal_and_manifest_with_table_kv<T: TableKv>(
table_kv.clone(),
runtimes.clone(),
WAL_DIR_NAME,
config.wal.clone(),
config.wal.clone().into(),
)
.await
.context(OpenWal)?;
Expand All @@ -328,7 +328,7 @@ async fn open_wal_and_manifest_with_table_kv<T: TableKv>(
table_kv,
runtimes,
MANIFEST_DIR_NAME,
config.manifest.clone(),
config.manifest.clone().into(),
)
.await
.context(OpenManifestWal)?;
Expand Down
5 changes: 5 additions & 0 deletions wal/src/table_kv_impl/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ impl Default for NamespaceEntry {
pub struct NamespaceConfig {
pub wal_shard_num: usize,
pub table_unit_meta_shard_num: usize,

/// Outdated log cleaning strategy
///
/// NOTICE: you can just set once, if change after setting, it will lead to
/// error. Recommend to use default setting.
pub ttl: Option<ReadableDuration>,

pub init_scan_timeout: ReadableDuration,
Expand Down
25 changes: 18 additions & 7 deletions wal/src/table_kv_impl/namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -936,13 +936,24 @@ impl<T: TableKv> Namespace<T> {

let namespace =
match Self::load_namespace_from_meta(table_kv, consts::META_TABLE_NAME, name)? {
Some(namespace_entry) => Namespace::new(
runtimes,
table_kv.clone(),
consts::META_TABLE_NAME,
namespace_entry,
config,
)?,
Some(namespace_entry) => {
assert!(
namespace_entry.wal.enable_ttl == config.ttl.is_some(),
"It's impossible to be different because the it can't be set by user actually,
but now the original ttl set is:{}, current in config is:{}",
namespace_entry.wal.enable_ttl,
config.ttl.is_some()
);

Namespace::new(
runtimes,
table_kv.clone(),
consts::META_TABLE_NAME,
namespace_entry,
config,
)?
}

None => Namespace::try_persist_namespace(
runtimes,
table_kv,
Expand Down

0 comments on commit 9250cef

Please sign in to comment.