Skip to content

Commit

Permalink
support alter twcs compression options
Browse files Browse the repository at this point in the history
  • Loading branch information
lyang24 committed Nov 12, 2024
1 parent 0e0c4fa commit 96113b2
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 24 deletions.
121 changes: 116 additions & 5 deletions src/mito2/src/worker/handle_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@

//! Handling alter related requests.

use std::str::FromStr;
use std::sync::Arc;

use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
use humantime_serde::re::humantime;
use snafu::ResultExt;
use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::metadata::{
InvalidRegionOptionChangeRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
RegionMetadataRef,
};
use store_api::mito_engine_options;
use store_api::region_request::{AlterKind, ChangeOption, RegionAlterRequest};
use store_api::storage::RegionId;

Expand All @@ -27,6 +34,8 @@ use crate::error::{
};
use crate::flush::FlushReason;
use crate::manifest::action::RegionChange;
use crate::region::options::CompactionOptions::Twcs;
use crate::region::options::TwcsOptions;
use crate::region::version::VersionRef;
use crate::region::MitoRegionRef;
use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
Expand All @@ -50,7 +59,10 @@ impl<S> RegionWorkerLoop<S> {

// fast path for memory state changes like options.
if let AlterKind::ChangeRegionOptions { options } = request.kind {
self.handle_alter_region_options(region, version, options, sender);
match self.handle_alter_region_options(region, version, options) {
Ok(_) => sender.send(Ok(0)),
Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
}
return;
}

Expand Down Expand Up @@ -151,8 +163,7 @@ impl<S> RegionWorkerLoop<S> {
region: MitoRegionRef,
version: VersionRef,
options: Vec<ChangeOption>,
sender: OptionOutputTx,
) {
) -> std::result::Result<(), MetadataError> {
let mut current_options = version.options.clone();
for option in options {
match option {
Expand All @@ -167,10 +178,20 @@ impl<S> RegionWorkerLoop<S> {
current_options.ttl = Some(new_ttl);
}
}
ChangeOption::Twsc(key, value) => {
let Twcs(options) = &mut current_options.compaction;
change_twcs_options(
options,
&TwcsOptions::default(),
&key,
&value,
region.region_id,
)?;
}
}
}
region.version_control.alter_options(current_options);
sender.send(Ok(0));
Ok(())
}
}

Expand All @@ -191,3 +212,93 @@ fn metadata_after_alteration(

Ok(Arc::new(new_meta))
}

fn change_twcs_options(
options: &mut TwcsOptions,
default_option: &TwcsOptions,
key: &str,
value: &str,
region_id: RegionId,
) -> std::result::Result<(), MetadataError> {
match key {
mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_RUNS => {
let runs =
parse_usize_with_default(&key, &value, default_option.max_active_window_runs)?;

Check failure on line 226 in src/mito2/src/worker/handle_alter.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 226 in src/mito2/src/worker/handle_alter.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
log_option_update(region_id, key, options.max_active_window_runs, runs);
options.max_active_window_runs = runs;
}
mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_FILES => {
let files =
parse_usize_with_default(&key, &value, default_option.max_active_window_files)?;

Check failure on line 232 in src/mito2/src/worker/handle_alter.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 232 in src/mito2/src/worker/handle_alter.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
log_option_update(region_id, key, options.max_active_window_files, files);
options.max_active_window_files = files;
}
mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_RUNS => {
let runs =
parse_usize_with_default(&key, &value, default_option.max_inactive_window_runs)?;

Check failure on line 238 in src/mito2/src/worker/handle_alter.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 238 in src/mito2/src/worker/handle_alter.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
log_option_update(region_id, key, options.max_inactive_window_runs, runs);
options.max_inactive_window_runs = runs;
}
mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_FILES => {
let files =
parse_usize_with_default(&key, &value, default_option.max_inactive_window_files)?;

Check failure on line 244 in src/mito2/src/worker/handle_alter.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 244 in src/mito2/src/worker/handle_alter.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
log_option_update(region_id, key, options.max_inactive_window_files, files);
options.max_inactive_window_files = files;
}
mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => {
let size =
if value.is_empty() {
default_option.max_output_file_size
} else {
Some(ReadableSize::from_str(&value).map_err(|_| {

Check failure on line 253 in src/mito2/src/worker/handle_alter.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
InvalidRegionOptionChangeRequestSnafu { key, value }.build()
})?)
};
log_option_update(region_id, key, options.max_output_file_size, size);
options.max_output_file_size = size;
}
mito_engine_options::TWCS_TIME_WINDOW => {
let window =
if value.is_empty() {
default_option.time_window
} else {
Some(humantime::parse_duration(&value).map_err(|_| {

Check failure on line 265 in src/mito2/src/worker/handle_alter.rs

View workflow job for this annotation

GitHub Actions / Clippy

this expression creates a reference which is immediately dereferenced by the compiler
InvalidRegionOptionChangeRequestSnafu { key, value }.build()
})?)
};
log_option_update(region_id, key, options.time_window, window);
options.time_window = window;
}
_ => return InvalidRegionOptionChangeRequestSnafu { key, value }.fail(),
}
Ok(())
}

fn parse_usize_with_default(
key: &str,
value: &str,
default: usize,
) -> std::result::Result<usize, MetadataError> {
if value.is_empty() {
Ok(default)
} else {
value
.parse::<usize>()
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())
}
}

fn log_option_update<T: std::fmt::Debug>(
region_id: RegionId,
option_name: &str,
prev_value: T,
cur_value: T,
) {
info!(
"Update region {}: {}, previous: {:?}, new: {:?}",
option_name,
region_id,
prev_value,
Some(cur_value)
);
}
32 changes: 24 additions & 8 deletions src/store-api/src/mito_engine_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,36 @@ pub const APPEND_MODE_KEY: &str = "append_mode";
pub const MERGE_MODE_KEY: &str = "merge_mode";
/// Option key for TTL(time-to-live)
pub const TTL_KEY: &str = "ttl";
/// Option key for twcs max active window runs.
pub const TWCS_MAX_ACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_active_window_runs";
/// Option key for twcs max active window files.
pub const TWCS_MAX_ACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_active_window_files";
/// Option key for twcs max inactive window runs.
pub const TWCS_MAX_INACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_inactive_window_runs";
/// Option key for twcs max inactive window files.
pub const TWCS_MAX_INACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_inactive_window_files";
/// Option key for twcs max output file size.
pub const TWCS_MAX_OUTPUT_FILE_SIZE: &str = "compaction.twcs.max_output_file_size";
/// Option key for twcs time window.
pub const TWCS_TIME_WINDOW: &str = "compaction.twcs.time_window";
/// Option key for twcs remote compaction.
pub const REMOTE_COMPACTION: &str = "compaction.twcs.remote_compaction";
/// Option key for twcs fallback to local.
pub const TWCS_FALLBACK_TO_LOCAL: &str = "compaction.twcs.fallback_to_local";

/// Returns true if the `key` is a valid option key for the mito engine.
pub fn is_mito_engine_option_key(key: &str) -> bool {
[
"ttl",
"compaction.type",
"compaction.twcs.max_active_window_runs",
"compaction.twcs.max_active_window_files",
"compaction.twcs.max_inactive_window_runs",
"compaction.twcs.max_inactive_window_files",
"compaction.twcs.max_output_file_size",
"compaction.twcs.time_window",
"compaction.twcs.remote_compaction",
"compaction.twcs.fallback_to_local",
TWCS_MAX_ACTIVE_WINDOW_RUNS,
TWCS_MAX_ACTIVE_WINDOW_FILES,
TWCS_MAX_INACTIVE_WINDOW_RUNS,
TWCS_MAX_INACTIVE_WINDOW_FILES,
TWCS_MAX_OUTPUT_FILE_SIZE,
TWCS_TIME_WINDOW,
REMOTE_COMPACTION,
TWCS_FALLBACK_TO_LOCAL,
"storage",
"index.inverted_index.ignore_column_ids",
"index.inverted_index.segment_row_count",
Expand Down
35 changes: 24 additions & 11 deletions src/store-api/src/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ use crate::metadata::{
ColumnMetadata, InvalidRawRegionRequestSnafu, InvalidRegionOptionChangeRequestSnafu,
InvalidRegionRequestSnafu, MetadataError, RegionMetadata, Result,
};
use crate::mito_engine_options::TTL_KEY;
use crate::mito_engine_options::{
TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS,
TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE,
TWCS_TIME_WINDOW,
};
use crate::path_utils::region_dir;
use crate::storage::{ColumnId, RegionId, ScanRequest};

Expand Down Expand Up @@ -661,23 +665,32 @@ impl From<v1::ChangeColumnType> for ChangeColumnType {
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum ChangeOption {
TTL(Duration),
Twsc(String, String),
}

impl TryFrom<&ChangeTableOption> for ChangeOption {
type Error = MetadataError;

fn try_from(value: &ChangeTableOption) -> std::result::Result<Self, Self::Error> {
let ChangeTableOption { key, value } = value;
if key == TTL_KEY {
let ttl = if value.is_empty() {
Duration::from_secs(0)
} else {
humantime::parse_duration(value)
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())?
};
Ok(Self::TTL(ttl))
} else {
InvalidRegionOptionChangeRequestSnafu { key, value }.fail()

match key.as_str() {
TTL_KEY => {
let ttl = if value.is_empty() {
Duration::from_secs(0)
} else {
humantime::parse_duration(value)
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())?
};
Ok(Self::TTL(ttl))
}
TWCS_MAX_ACTIVE_WINDOW_RUNS
| TWCS_MAX_ACTIVE_WINDOW_FILES
| TWCS_MAX_INACTIVE_WINDOW_FILES
| TWCS_MAX_INACTIVE_WINDOW_RUNS
| TWCS_MAX_OUTPUT_FILE_SIZE
| TWCS_TIME_WINDOW => Ok(Self::Twsc(key.to_string(), value.to_string())),
_ => InvalidRegionOptionChangeRequestSnafu { key, value }.fail(),
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/table/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,16 @@ impl TableMeta {
new_options.ttl = Some(*new_ttl);
}
}
ChangeOption::Twsc(key, value) => {
if !value.is_empty() {
new_options
.extra_options
.insert(key.to_string(), value.to_string());
} else {
// Invalidate the previous change option if an empty value has been set.
new_options.extra_options.remove(key.as_str());
}
}
}
}
let mut builder = self.new_meta_builder();
Expand Down
75 changes: 75 additions & 0 deletions tests/cases/standalone/common/alter/alter_table_options.result
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,81 @@ SELECT i FROM ato;
| 2 |
+---+

ALTER TABLE ato SET 'compaction.twcs.time_window'='2h';

Affected Rows: 0

ALTER TABLE ato SET 'compaction.twcs.max_output_file_size'='500MB';

Affected Rows: 0

ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_files'='2';

Affected Rows: 0

ALTER TABLE ato SET 'compaction.twcs.max_active_window_files'='2';

Affected Rows: 0

ALTER TABLE ato SET 'compaction.twcs.max_active_window_runs'='6';

Affected Rows: 0

ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='6';

Affected Rows: 0

SHOW CREATE TABLE ato;

+-------+----------------------------------------------------+
| Table | Create Table |
+-------+----------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | compaction.twcs.max_active_window_files = '2', |
| | compaction.twcs.max_active_window_runs = '6', |
| | compaction.twcs.max_inactive_window_files = '2', |
| | compaction.twcs.max_inactive_window_runs = '6', |
| | compaction.twcs.max_output_file_size = '500MB', |
| | compaction.twcs.time_window = '2h', |
| | ttl = '1s' |
| | ) |
+-------+----------------------------------------------------+

ALTER TABLE ato SET 'compaction.twcs.time_window'='';

Affected Rows: 0

SHOW CREATE TABLE ato;

+-------+----------------------------------------------------+
| Table | Create Table |
+-------+----------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | compaction.twcs.max_active_window_files = '2', |
| | compaction.twcs.max_active_window_runs = '6', |
| | compaction.twcs.max_inactive_window_files = '2', |
| | compaction.twcs.max_inactive_window_runs = '6', |
| | compaction.twcs.max_output_file_size = '500MB', |
| | ttl = '1s' |
| | ) |
+-------+----------------------------------------------------+

DROP TABLE ato;

Affected Rows: 0
Expand Down
18 changes: 18 additions & 0 deletions tests/cases/standalone/common/alter/alter_table_options.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,22 @@ SHOW CREATE TABLE ato;

SELECT i FROM ato;

ALTER TABLE ato SET 'compaction.twcs.time_window'='2h';

ALTER TABLE ato SET 'compaction.twcs.max_output_file_size'='500MB';

ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_files'='2';

ALTER TABLE ato SET 'compaction.twcs.max_active_window_files'='2';

ALTER TABLE ato SET 'compaction.twcs.max_active_window_runs'='6';

ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='6';

SHOW CREATE TABLE ato;

ALTER TABLE ato SET 'compaction.twcs.time_window'='';

SHOW CREATE TABLE ato;

DROP TABLE ato;

0 comments on commit 96113b2

Please sign in to comment.