diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index d73b24c46aa3..8ab7e55bda23 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -14,11 +14,18 @@ //! Handling alter related requests. +use std::str::FromStr; use std::sync::Arc; +use common_base::readable_size::ReadableSize; use common_telemetry::{debug, info}; +use humantime_serde::re::humantime; use snafu::ResultExt; -use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef}; +use store_api::metadata::{ + InvalidRegionOptionChangeRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder, + RegionMetadataRef, +}; +use store_api::mito_engine_options; use store_api::region_request::{AlterKind, ChangeOption, RegionAlterRequest}; use store_api::storage::RegionId; @@ -27,6 +34,8 @@ use crate::error::{ }; use crate::flush::FlushReason; use crate::manifest::action::RegionChange; +use crate::region::options::CompactionOptions::Twcs; +use crate::region::options::TwcsOptions; use crate::region::version::VersionRef; use crate::region::MitoRegionRef; use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest}; @@ -50,7 +59,10 @@ impl RegionWorkerLoop { // fast path for memory state changes like options. if let AlterKind::ChangeRegionOptions { options } = request.kind { - self.handle_alter_region_options(region, version, options, sender); + match self.handle_alter_region_options(region, version, options) { + Ok(_) => sender.send(Ok(0)), + Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)), + } return; } @@ -151,8 +163,7 @@ impl RegionWorkerLoop { region: MitoRegionRef, version: VersionRef, options: Vec, - sender: OptionOutputTx, - ) { + ) -> std::result::Result<(), MetadataError> { let mut current_options = version.options.clone(); for option in options { match option { @@ -167,10 +178,20 @@ impl RegionWorkerLoop { current_options.ttl = Some(new_ttl); } } + ChangeOption::Twsc(key, value) => { + let Twcs(options) = &mut current_options.compaction; + change_twcs_options( + options, + &TwcsOptions::default(), + &key, + &value, + region.region_id, + )?; + } } } region.version_control.alter_options(current_options); - sender.send(Ok(0)); + Ok(()) } } @@ -191,3 +212,89 @@ fn metadata_after_alteration( Ok(Arc::new(new_meta)) } + +fn change_twcs_options( + options: &mut TwcsOptions, + default_option: &TwcsOptions, + key: &str, + value: &str, + region_id: RegionId, +) -> std::result::Result<(), MetadataError> { + match key { + mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_RUNS => { + let runs = parse_usize_with_default(key, value, default_option.max_active_window_runs)?; + log_option_update(region_id, key, options.max_active_window_runs, runs); + options.max_active_window_runs = runs; + } + mito_engine_options::TWCS_MAX_ACTIVE_WINDOW_FILES => { + let files = + parse_usize_with_default(key, value, default_option.max_active_window_files)?; + log_option_update(region_id, key, options.max_active_window_files, files); + options.max_active_window_files = files; + } + mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_RUNS => { + let runs = + parse_usize_with_default(key, value, default_option.max_inactive_window_runs)?; + log_option_update(region_id, key, options.max_inactive_window_runs, runs); + options.max_inactive_window_runs = runs; + } + mito_engine_options::TWCS_MAX_INACTIVE_WINDOW_FILES => { + let files = + parse_usize_with_default(key, value, default_option.max_inactive_window_files)?; + log_option_update(region_id, key, options.max_inactive_window_files, files); + options.max_inactive_window_files = files; + } + mito_engine_options::TWCS_MAX_OUTPUT_FILE_SIZE => { + let size = + if value.is_empty() { + default_option.max_output_file_size + } else { + Some(ReadableSize::from_str(value).map_err(|_| { + InvalidRegionOptionChangeRequestSnafu { key, value }.build() + })?) + }; + log_option_update(region_id, key, options.max_output_file_size, size); + options.max_output_file_size = size; + } + mito_engine_options::TWCS_TIME_WINDOW => { + let window = + if value.is_empty() { + default_option.time_window + } else { + Some(humantime::parse_duration(value).map_err(|_| { + InvalidRegionOptionChangeRequestSnafu { key, value }.build() + })?) + }; + log_option_update(region_id, key, options.time_window, window); + options.time_window = window; + } + _ => return InvalidRegionOptionChangeRequestSnafu { key, value }.fail(), + } + Ok(()) +} + +fn parse_usize_with_default( + key: &str, + value: &str, + default: usize, +) -> std::result::Result { + if value.is_empty() { + Ok(default) + } else { + value + .parse::() + .map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build()) + } +} + +fn log_option_update( + region_id: RegionId, + option_name: &str, + prev_value: T, + cur_value: T, +) { + info!( + "Update region {}: {}, previous: {:?}, new: {:?}", + option_name, region_id, prev_value, cur_value + ); +} diff --git a/src/store-api/src/mito_engine_options.rs b/src/store-api/src/mito_engine_options.rs index 0e0f3fdac790..470f6f44bbce 100644 --- a/src/store-api/src/mito_engine_options.rs +++ b/src/store-api/src/mito_engine_options.rs @@ -23,20 +23,40 @@ pub const APPEND_MODE_KEY: &str = "append_mode"; pub const MERGE_MODE_KEY: &str = "merge_mode"; /// Option key for TTL(time-to-live) pub const TTL_KEY: &str = "ttl"; +/// Option key for compaction type. +pub const COMPACTION_TYPE: &str = "compaction.type"; +/// TWCS compaction strategy. +pub const COMPACTION_TYPE_TWCS: &str = "twcs"; +/// Option key for twcs max active window runs. +pub const TWCS_MAX_ACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_active_window_runs"; +/// Option key for twcs max active window files. +pub const TWCS_MAX_ACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_active_window_files"; +/// Option key for twcs max inactive window runs. +pub const TWCS_MAX_INACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_inactive_window_runs"; +/// Option key for twcs max inactive window files. +pub const TWCS_MAX_INACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_inactive_window_files"; +/// Option key for twcs max output file size. +pub const TWCS_MAX_OUTPUT_FILE_SIZE: &str = "compaction.twcs.max_output_file_size"; +/// Option key for twcs time window. +pub const TWCS_TIME_WINDOW: &str = "compaction.twcs.time_window"; +/// Option key for twcs remote compaction. +pub const REMOTE_COMPACTION: &str = "compaction.twcs.remote_compaction"; +/// Option key for twcs fallback to local. +pub const TWCS_FALLBACK_TO_LOCAL: &str = "compaction.twcs.fallback_to_local"; /// Returns true if the `key` is a valid option key for the mito engine. pub fn is_mito_engine_option_key(key: &str) -> bool { [ "ttl", - "compaction.type", - "compaction.twcs.max_active_window_runs", - "compaction.twcs.max_active_window_files", - "compaction.twcs.max_inactive_window_runs", - "compaction.twcs.max_inactive_window_files", - "compaction.twcs.max_output_file_size", - "compaction.twcs.time_window", - "compaction.twcs.remote_compaction", - "compaction.twcs.fallback_to_local", + COMPACTION_TYPE, + TWCS_MAX_ACTIVE_WINDOW_RUNS, + TWCS_MAX_ACTIVE_WINDOW_FILES, + TWCS_MAX_INACTIVE_WINDOW_RUNS, + TWCS_MAX_INACTIVE_WINDOW_FILES, + TWCS_MAX_OUTPUT_FILE_SIZE, + TWCS_TIME_WINDOW, + REMOTE_COMPACTION, + TWCS_FALLBACK_TO_LOCAL, "storage", "index.inverted_index.ignore_column_ids", "index.inverted_index.segment_row_count", diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 867f23175445..48fbc93da727 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -36,7 +36,11 @@ use crate::metadata::{ ColumnMetadata, InvalidRawRegionRequestSnafu, InvalidRegionOptionChangeRequestSnafu, InvalidRegionRequestSnafu, MetadataError, RegionMetadata, Result, }; -use crate::mito_engine_options::TTL_KEY; +use crate::mito_engine_options::{ + TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS, + TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE, + TWCS_TIME_WINDOW, +}; use crate::path_utils::region_dir; use crate::storage::{ColumnId, RegionId, ScanRequest}; @@ -661,6 +665,8 @@ impl From for ChangeColumnType { #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] pub enum ChangeOption { TTL(Duration), + // Modifying TwscOptions with values as (option name, new value). + Twsc(String, String), } impl TryFrom<&ChangeTableOption> for ChangeOption { @@ -668,16 +674,24 @@ impl TryFrom<&ChangeTableOption> for ChangeOption { fn try_from(value: &ChangeTableOption) -> std::result::Result { let ChangeTableOption { key, value } = value; - if key == TTL_KEY { - let ttl = if value.is_empty() { - Duration::from_secs(0) - } else { - humantime::parse_duration(value) - .map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())? - }; - Ok(Self::TTL(ttl)) - } else { - InvalidRegionOptionChangeRequestSnafu { key, value }.fail() + + match key.as_str() { + TTL_KEY => { + let ttl = if value.is_empty() { + Duration::from_secs(0) + } else { + humantime::parse_duration(value) + .map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())? + }; + Ok(Self::TTL(ttl)) + } + TWCS_MAX_ACTIVE_WINDOW_RUNS + | TWCS_MAX_ACTIVE_WINDOW_FILES + | TWCS_MAX_INACTIVE_WINDOW_FILES + | TWCS_MAX_INACTIVE_WINDOW_RUNS + | TWCS_MAX_OUTPUT_FILE_SIZE + | TWCS_TIME_WINDOW => Ok(Self::Twsc(key.to_string(), value.to_string())), + _ => InvalidRegionOptionChangeRequestSnafu { key, value }.fail(), } } } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 2b56e214cb77..4369376d55e2 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -24,6 +24,7 @@ use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRe use derive_builder::Builder; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS}; use store_api::region_request::ChangeOption; use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId}; @@ -219,6 +220,21 @@ impl TableMeta { new_options.ttl = Some(*new_ttl); } } + ChangeOption::Twsc(key, value) => { + if !value.is_empty() { + new_options + .extra_options + .insert(key.to_string(), value.to_string()); + // Ensure node restart correctly. + new_options.extra_options.insert( + COMPACTION_TYPE.to_string(), + COMPACTION_TYPE_TWCS.to_string(), + ); + } else { + // Invalidate the previous change option if an empty value has been set. + new_options.extra_options.remove(key.as_str()); + } + } } } let mut builder = self.new_meta_builder(); diff --git a/tests/cases/standalone/common/alter/alter_table_options.result b/tests/cases/standalone/common/alter/alter_table_options.result index 8e31766f0108..53e765b0d23a 100644 --- a/tests/cases/standalone/common/alter/alter_table_options.result +++ b/tests/cases/standalone/common/alter/alter_table_options.result @@ -137,6 +137,108 @@ SELECT i FROM ato; | 2 | +---+ +ALTER TABLE ato SET 'compaction.twcs.time_window'='2h'; + +Affected Rows: 0 + +ALTER TABLE ato SET 'compaction.twcs.max_output_file_size'='500MB'; + +Affected Rows: 0 + +ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_files'='2'; + +Affected Rows: 0 + +ALTER TABLE ato SET 'compaction.twcs.max_active_window_files'='2'; + +Affected Rows: 0 + +ALTER TABLE ato SET 'compaction.twcs.max_active_window_runs'='6'; + +Affected Rows: 0 + +ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='6'; + +Affected Rows: 0 + +SHOW CREATE TABLE ato; + ++-------+----------------------------------------------------+ +| Table | Create Table | ++-------+----------------------------------------------------+ +| ato | CREATE TABLE IF NOT EXISTS "ato" ( | +| | "i" INT NULL, | +| | "j" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("j"), | +| | PRIMARY KEY ("i") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | compaction.twcs.max_active_window_files = '2', | +| | compaction.twcs.max_active_window_runs = '6', | +| | compaction.twcs.max_inactive_window_files = '2', | +| | compaction.twcs.max_inactive_window_runs = '6', | +| | compaction.twcs.max_output_file_size = '500MB', | +| | compaction.twcs.time_window = '2h', | +| | compaction.type = 'twcs', | +| | ttl = '1s' | +| | ) | ++-------+----------------------------------------------------+ + +ALTER TABLE ato SET 'compaction.twcs.time_window'=''; + +Affected Rows: 0 + +SHOW CREATE TABLE ato; + ++-------+----------------------------------------------------+ +| Table | Create Table | ++-------+----------------------------------------------------+ +| ato | CREATE TABLE IF NOT EXISTS "ato" ( | +| | "i" INT NULL, | +| | "j" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("j"), | +| | PRIMARY KEY ("i") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | compaction.twcs.max_active_window_files = '2', | +| | compaction.twcs.max_active_window_runs = '6', | +| | compaction.twcs.max_inactive_window_files = '2', | +| | compaction.twcs.max_inactive_window_runs = '6', | +| | compaction.twcs.max_output_file_size = '500MB', | +| | compaction.type = 'twcs', | +| | ttl = '1s' | +| | ) | ++-------+----------------------------------------------------+ + +-- SQLNESS ARG restart=true +SHOW CREATE TABLE ato; + ++-------+----------------------------------------------------+ +| Table | Create Table | ++-------+----------------------------------------------------+ +| ato | CREATE TABLE IF NOT EXISTS "ato" ( | +| | "i" INT NULL, | +| | "j" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("j"), | +| | PRIMARY KEY ("i") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | compaction.twcs.max_active_window_files = '2', | +| | compaction.twcs.max_active_window_runs = '6', | +| | compaction.twcs.max_inactive_window_files = '2', | +| | compaction.twcs.max_inactive_window_runs = '6', | +| | compaction.twcs.max_output_file_size = '500MB', | +| | compaction.type = 'twcs', | +| | ttl = '1s' | +| | ) | ++-------+----------------------------------------------------+ + DROP TABLE ato; Affected Rows: 0 diff --git a/tests/cases/standalone/common/alter/alter_table_options.sql b/tests/cases/standalone/common/alter/alter_table_options.sql index aab558543de2..d2ad4546c97d 100644 --- a/tests/cases/standalone/common/alter/alter_table_options.sql +++ b/tests/cases/standalone/common/alter/alter_table_options.sql @@ -28,4 +28,25 @@ SHOW CREATE TABLE ato; SELECT i FROM ato; +ALTER TABLE ato SET 'compaction.twcs.time_window'='2h'; + +ALTER TABLE ato SET 'compaction.twcs.max_output_file_size'='500MB'; + +ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_files'='2'; + +ALTER TABLE ato SET 'compaction.twcs.max_active_window_files'='2'; + +ALTER TABLE ato SET 'compaction.twcs.max_active_window_runs'='6'; + +ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='6'; + +SHOW CREATE TABLE ato; + +ALTER TABLE ato SET 'compaction.twcs.time_window'=''; + +SHOW CREATE TABLE ato; + +-- SQLNESS ARG restart=true +SHOW CREATE TABLE ato; + DROP TABLE ato;