Skip to content

Commit

Permalink
support alter twcs compression options
Browse files Browse the repository at this point in the history
  • Loading branch information
lyang24 committed Nov 13, 2024
1 parent 0e0c4fa commit 6854d51
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 25 deletions.
117 changes: 112 additions & 5 deletions src/mito2/src/worker/handle_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@

//! Handling alter related requests.

use std::str::FromStr;
use std::sync::Arc;

use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
use humantime_serde::re::humantime;
use snafu::ResultExt;
use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::metadata::{
InvalidRegionOptionChangeRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
RegionMetadataRef,
};
use store_api::mito_engine_options;
use store_api::region_request::{AlterKind, ChangeOption, RegionAlterRequest};
use store_api::storage::RegionId;

Expand All @@ -27,6 +34,8 @@ use crate::error::{
};
use crate::flush::FlushReason;
use crate::manifest::action::RegionChange;
use crate::region::options::CompactionOptions::Twcs;
use crate::region::options::TwcsOptions;
use crate::region::version::VersionRef;
use crate::region::MitoRegionRef;
use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
Expand All @@ -50,7 +59,10 @@ impl<S> RegionWorkerLoop<S> {

// fast path for memory state changes like options.
if let AlterKind::ChangeRegionOptions { options } = request.kind {
self.handle_alter_region_options(region, version, options, sender);
match self.handle_alter_region_options(region, version, options) {
Ok(_) => sender.send(Ok(0)),
Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
}
return;
}

Expand Down Expand Up @@ -151,8 +163,7 @@ impl<S> RegionWorkerLoop<S> {
region: MitoRegionRef,
version: VersionRef,
options: Vec<ChangeOption>,
sender: OptionOutputTx,
) {
) -> std::result::Result<(), MetadataError> {
let mut current_options = version.options.clone();
for option in options {
match option {
Expand All @@ -167,10 +178,20 @@ impl<S> RegionWorkerLoop<S> {
current_options.ttl = Some(new_ttl);
}
}
ChangeOption::Twsc(key, value) => {
let Twcs(options) = &mut current_options.compaction;
change_twcs_options(
options,
&TwcsOptions::default(),
&key,
&value,
region.region_id,
)?;
}
}
}
region.version_control.alter_options(current_options);
sender.send(Ok(0));
Ok(())
}
}

Expand All @@ -191,3 +212,89 @@ fn metadata_after_alteration(

Ok(Arc::new(new_meta))
}

fn change_twcs_options(
options: &mut TwcsOptions,
default_option: &TwcsOptions,
key: &str,
value: &str,
region_id: RegionId,
) -> std::result::Result<(), MetadataError> {
match key {
mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_RUNS => {
let runs = parse_usize_with_default(key, value, default_option.max_active_window_runs)?;
log_option_update(region_id, key, options.max_active_window_runs, runs);
options.max_active_window_runs = runs;
}
mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_FILES => {
let files =
parse_usize_with_default(key, value, default_option.max_active_window_files)?;
log_option_update(region_id, key, options.max_active_window_files, files);
options.max_active_window_files = files;
}
mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_RUNS => {
let runs =
parse_usize_with_default(key, value, default_option.max_inactive_window_runs)?;
log_option_update(region_id, key, options.max_inactive_window_runs, runs);
options.max_inactive_window_runs = runs;
}
mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_FILES => {
let files =
parse_usize_with_default(key, value, default_option.max_inactive_window_files)?;
log_option_update(region_id, key, options.max_inactive_window_files, files);
options.max_inactive_window_files = files;
}
mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
let size =
if value.is_empty() {
default_option.max_output_file_size
} else {
Some(ReadableSize::from_str(value).map_err(|_| {
InvalidRegionOptionChangeRequestSnafu { key, value }.build()
})?)
};
log_option_update(region_id, key, options.max_output_file_size, size);
options.max_output_file_size = size;
}
mito_engine_options::TWCS_TIME_WINDOW => {
let window =
if value.is_empty() {
default_option.time_window
} else {
Some(humantime::parse_duration(value).map_err(|_| {
InvalidRegionOptionChangeRequestSnafu { key, value }.build()
})?)
};
log_option_update(region_id, key, options.time_window, window);
options.time_window = window;
}
_ => return InvalidRegionOptionChangeRequestSnafu { key, value }.fail(),
}
Ok(())
}

fn parse_usize_with_default(
key: &str,
value: &str,
default: usize,
) -> std::result::Result<usize, MetadataError> {
if value.is_empty() {
Ok(default)
} else {
value
.parse::<usize>()
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())
}
}

fn log_option_update<T: std::fmt::Debug>(
region_id: RegionId,
option_name: &str,
prev_value: T,
cur_value: T,
) {
info!(
"Update region {}: {}, previous: {:?}, new: {:?}",
option_name, region_id, prev_value, cur_value
);
}
38 changes: 29 additions & 9 deletions src/store-api/src/mito_engine_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,40 @@ pub const APPEND_MODE_KEY: &str = "append_mode";
pub const MERGE_MODE_KEY: &str = "merge_mode";
/// Option key for TTL(time-to-live)
pub const TTL_KEY: &str = "ttl";
/// Option key for compaction type.
pub const COMPACTION_TYPE: &str = "compaction.type";
/// TWCS compaction strategy.
pub const COMPACTION_TYPE_TWCS: &str = "twcs";
/// Option key for twcs max active window runs.
pub const TWCS_MAX_ACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_active_window_runs";
/// Option key for twcs max active window files.
pub const TWCS_MAX_ACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_active_window_files";
/// Option key for twcs max inactive window runs.
pub const TWCS_MAX_INACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_inactive_window_runs";
/// Option key for twcs max inactive window files.
pub const TWCS_MAX_INACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_inactive_window_files";
/// Option key for twcs max output file size.
pub const TWCS_MAX_OUTPUT_FILE_SIZE: &str = "compaction.twcs.max_output_file_size";
/// Option key for twcs time window.
pub const TWCS_TIME_WINDOW: &str = "compaction.twcs.time_window";
/// Option key for twcs remote compaction.
pub const REMOTE_COMPACTION: &str = "compaction.twcs.remote_compaction";
/// Option key for twcs fallback to local.
pub const TWCS_FALLBACK_TO_LOCAL: &str = "compaction.twcs.fallback_to_local";

/// Returns true if the `key` is a valid option key for the mito engine.
pub fn is_mito_engine_option_key(key: &str) -> bool {
[
"ttl",
"compaction.type",
"compaction.twcs.max_active_window_runs",
"compaction.twcs.max_active_window_files",
"compaction.twcs.max_inactive_window_runs",
"compaction.twcs.max_inactive_window_files",
"compaction.twcs.max_output_file_size",
"compaction.twcs.time_window",
"compaction.twcs.remote_compaction",
"compaction.twcs.fallback_to_local",
COMPACTION_TYPE,
TWCS_MAX_ACTIVE_WINDOW_RUNS,
TWCS_MAX_ACTIVE_WINDOW_FILES,
TWCS_MAX_INACTIVE_WINDOW_RUNS,
TWCS_MAX_INACTIVE_WINDOW_FILES,
TWCS_MAX_OUTPUT_FILE_SIZE,
TWCS_TIME_WINDOW,
REMOTE_COMPACTION,
TWCS_FALLBACK_TO_LOCAL,
"storage",
"index.inverted_index.ignore_column_ids",
"index.inverted_index.segment_row_count",
Expand Down
36 changes: 25 additions & 11 deletions src/store-api/src/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ use crate::metadata::{
ColumnMetadata, InvalidRawRegionRequestSnafu, InvalidRegionOptionChangeRequestSnafu,
InvalidRegionRequestSnafu, MetadataError, RegionMetadata, Result,
};
use crate::mito_engine_options::TTL_KEY;
use crate::mito_engine_options::{
TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS,
TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE,
TWCS_TIME_WINDOW,
};
use crate::path_utils::region_dir;
use crate::storage::{ColumnId, RegionId, ScanRequest};

Expand Down Expand Up @@ -661,23 +665,33 @@ impl From<v1::ChangeColumnType> for ChangeColumnType {
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum ChangeOption {
TTL(Duration),
// Modifying TwscOptions with values as (option name, new value).
Twsc(String, String),
}

impl TryFrom<&ChangeTableOption> for ChangeOption {
type Error = MetadataError;

fn try_from(value: &ChangeTableOption) -> std::result::Result<Self, Self::Error> {
let ChangeTableOption { key, value } = value;
if key == TTL_KEY {
let ttl = if value.is_empty() {
Duration::from_secs(0)
} else {
humantime::parse_duration(value)
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())?
};
Ok(Self::TTL(ttl))
} else {
InvalidRegionOptionChangeRequestSnafu { key, value }.fail()

match key.as_str() {
TTL_KEY => {
let ttl = if value.is_empty() {
Duration::from_secs(0)
} else {
humantime::parse_duration(value)
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())?
};
Ok(Self::TTL(ttl))
}
TWCS_MAX_ACTIVE_WINDOW_RUNS
| TWCS_MAX_ACTIVE_WINDOW_FILES
| TWCS_MAX_INACTIVE_WINDOW_FILES
| TWCS_MAX_INACTIVE_WINDOW_RUNS
| TWCS_MAX_OUTPUT_FILE_SIZE
| TWCS_TIME_WINDOW => Ok(Self::Twsc(key.to_string(), value.to_string())),
_ => InvalidRegionOptionChangeRequestSnafu { key, value }.fail(),
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions src/table/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRe
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
use store_api::region_request::ChangeOption;
use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};

Expand Down Expand Up @@ -219,6 +220,21 @@ impl TableMeta {
new_options.ttl = Some(*new_ttl);
}
}
ChangeOption::Twsc(key, value) => {
if !value.is_empty() {
new_options
.extra_options
.insert(key.to_string(), value.to_string());
// Ensure node restart correctly.
new_options.extra_options.insert(
COMPACTION_TYPE.to_string(),
COMPACTION_TYPE_TWCS.to_string(),
);
} else {
// Invalidate the previous change option if an empty value has been set.
new_options.extra_options.remove(key.as_str());
}
}
}
}
let mut builder = self.new_meta_builder();
Expand Down
Loading

0 comments on commit 6854d51

Please sign in to comment.