From 42db98248ac846ddbdfb13a717551b336642ddde Mon Sep 17 00:00:00 2001 From: lyang24 Date: Tue, 5 Nov 2024 00:54:45 -0800 Subject: [PATCH] support alter twcs compression options --- src/mito2/src/worker/handle_alter.rs | 63 +++++++++ src/store-api/src/mito_engine_options.rs | 28 +++- src/store-api/src/region_request.rs | 127 ++++++++++++++++-- src/table/src/metadata.rs | 11 ++ .../common/alter/alter_table_options.result | 81 +++++++++++ .../common/alter/alter_table_options.sql | 20 +++ 6 files changed, 314 insertions(+), 16 deletions(-) diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index d73b24c46aa3..9faa70a5fcc0 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -27,6 +27,8 @@ use crate::error::{ }; use crate::flush::FlushReason; use crate::manifest::action::RegionChange; +use crate::region::options::CompactionOptions::Twcs; +use crate::region::options::TwcsOptions; use crate::region::version::VersionRef; use crate::region::MitoRegionRef; use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest}; @@ -167,6 +169,67 @@ impl RegionWorkerLoop { current_options.ttl = Some(new_ttl); } } + ChangeOption::TwscMaxActiveWindowRuns(runs) => { + let Twcs(options) = &mut current_options.compaction; + let runs = runs.unwrap_or(TwcsOptions::default().max_active_window_runs); + info!( + "Update region compaction.twcs.max_active_window_runs: {}, previous: {} new: {}", + region.region_id, options.max_active_window_runs, runs + ); + options.max_active_window_runs = runs; + } + ChangeOption::TwscMaxActiveWindowFiles(files) => { + let Twcs(options) = &mut current_options.compaction; + let files = files.unwrap_or(TwcsOptions::default().max_active_window_files); + info!( + "Update region compaction.twcs.max_active_window_files: {}, previous: {} new: {}", + region.region_id, options.max_active_window_files, files + ); + options.max_active_window_files = files; + } + ChangeOption::TwscMaxInactiveWindowRuns(runs) => { + let Twcs(options) = &mut current_options.compaction; + let runs = runs.unwrap_or(TwcsOptions::default().max_inactive_window_runs); + info!( + "Update region compaction.twcs.max_inactive_window_runs: {}, previous: {} new: {}", + region.region_id, options.max_inactive_window_runs, runs + ); + options.max_inactive_window_runs = runs; + } + ChangeOption::TwscMaxInactiveWindowFiles(files) => { + let Twcs(options) = &mut current_options.compaction; + let files = files.unwrap_or(TwcsOptions::default().max_inactive_window_files); + info!( + "Update region compaction.twcs.max_inactive_window_files: {}, previous: {} new: {}", + region.region_id, options.max_inactive_window_files, files + ); + options.max_inactive_window_files = files; + } + ChangeOption::TwscMaxOutputFileSize(size) => { + let Twcs(options) = &mut current_options.compaction; + info!( + "Update region compaction.twcs.max_output_file_size: {}, previous: {:?} new: {:?}", + region.region_id, options.max_output_file_size, size + ); + options.max_output_file_size = size; + } + ChangeOption::TwscTimeWindow(window) => { + let Twcs(options) = &mut current_options.compaction; + info!( + "Update region compaction.twcs.time_window: {}, previous: {:?} new: {:?}", + region.region_id, options.time_window, window + ); + options.time_window = window; + } + ChangeOption::TwscFallbackToLocal(allow) => { + let Twcs(options) = &mut current_options.compaction; + let allow = allow.unwrap_or(TwcsOptions::default().fallback_to_local); + info!( + "Update region compaction.twcs.fallback_to_local: {}, previous: {:?} new: {:?}", + region.region_id, options.remote_compaction, allow + ); + options.fallback_to_local = allow; + } } } region.version_control.alter_options(current_options); diff --git a/src/store-api/src/mito_engine_options.rs b/src/store-api/src/mito_engine_options.rs index 0e0f3fdac790..0c9c666df37c 100644 --- a/src/store-api/src/mito_engine_options.rs +++ b/src/store-api/src/mito_engine_options.rs @@ -23,20 +23,34 @@ pub const APPEND_MODE_KEY: &str = "append_mode"; pub const MERGE_MODE_KEY: &str = "merge_mode"; /// Option key for TTL(time-to-live) pub const TTL_KEY: &str = "ttl"; +/// Option key for twcs max active window runs. +pub const TWCS_MAX_ACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_active_window_runs"; +/// Option key for twcs max active window files. +pub const TWCS_MAX_ACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_active_window_files"; +/// Option key for twcs max inactive window runs. +pub const TWCS_MAX_INACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_inactive_window_runs"; +/// Option key for twcs max inactive window files. +pub const TWCS_MAX_INACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_inactive_window_files"; +/// Option key for twcs max output file size. +pub const TWCS_MAX_OUTPUT_FILE_SIZE: &str = "compaction.twcs.max_output_file_size"; +/// Option key for twcs time window. +pub const TWCS_TIME_WINDOW: &str = "compaction.twcs.time_window"; +/// Option key for twcs fallback to local. +pub const TWCS_FALLBACK_TO_LOCAL: &str = "compaction.twcs.fallback_to_local"; /// Returns true if the `key` is a valid option key for the mito engine. pub fn is_mito_engine_option_key(key: &str) -> bool { [ "ttl", "compaction.type", - "compaction.twcs.max_active_window_runs", - "compaction.twcs.max_active_window_files", - "compaction.twcs.max_inactive_window_runs", - "compaction.twcs.max_inactive_window_files", - "compaction.twcs.max_output_file_size", - "compaction.twcs.time_window", + TWCS_MAX_ACTIVE_WINDOW_RUNS, + TWCS_MAX_ACTIVE_WINDOW_FILES, + TWCS_MAX_INACTIVE_WINDOW_RUNS, + TWCS_MAX_INACTIVE_WINDOW_FILES, + TWCS_MAX_OUTPUT_FILE_SIZE, + TWCS_TIME_WINDOW, "compaction.twcs.remote_compaction", - "compaction.twcs.fallback_to_local", + TWCS_FALLBACK_TO_LOCAL, "storage", "index.inverted_index.ignore_column_ids", "index.inverted_index.segment_row_count", diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 867f23175445..accd8f02d67a 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::fmt; +use std::str::FromStr; use std::time::Duration; use api::helper::ColumnDataTypeWrapper; @@ -25,6 +26,7 @@ use api::v1::region::{ FlushRequest, InsertRequests, OpenRequest, TruncateRequest, }; use api::v1::{self, ChangeTableOption, Rows, SemanticType}; +use common_base::readable_size::ReadableSize; pub use common_base::AffectedRows; use datatypes::data_type::ConcreteDataType; use serde::{Deserialize, Serialize}; @@ -36,7 +38,11 @@ use crate::metadata::{ ColumnMetadata, InvalidRawRegionRequestSnafu, InvalidRegionOptionChangeRequestSnafu, InvalidRegionRequestSnafu, MetadataError, RegionMetadata, Result, }; -use crate::mito_engine_options::TTL_KEY; +use crate::mito_engine_options::{ + TTL_KEY, TWCS_FALLBACK_TO_LOCAL, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS, + TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE, + TWCS_TIME_WINDOW, +}; use crate::path_utils::region_dir; use crate::storage::{ColumnId, RegionId, ScanRequest}; @@ -661,6 +667,54 @@ impl From for ChangeColumnType { #[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] pub enum ChangeOption { TTL(Duration), + TwscMaxActiveWindowRuns(Option), + TwscMaxActiveWindowFiles(Option), + TwscMaxInactiveWindowRuns(Option), + TwscMaxInactiveWindowFiles(Option), + TwscMaxOutputFileSize(Option), + TwscTimeWindow(Option), + TwscFallbackToLocal(Option), +} + +impl ChangeOption { + pub fn to_change_table_option(&self) -> ChangeTableOption { + match self { + ChangeOption::TTL(duration) => ChangeTableOption { + key: "ttl".to_string(), + value: humantime::format_duration(*duration).to_string(), + }, + ChangeOption::TwscMaxActiveWindowRuns(runs) => ChangeTableOption { + key: TWCS_MAX_ACTIVE_WINDOW_RUNS.to_string(), + value: runs.map(|s| s.to_string()).unwrap_or_default(), + }, + ChangeOption::TwscMaxActiveWindowFiles(files) => ChangeTableOption { + key: TWCS_MAX_ACTIVE_WINDOW_FILES.to_string(), + value: files.map(|s| s.to_string()).unwrap_or_default(), + }, + ChangeOption::TwscMaxInactiveWindowRuns(runs) => ChangeTableOption { + key: TWCS_MAX_INACTIVE_WINDOW_RUNS.to_string(), + value: runs.map(|s| s.to_string()).unwrap_or_default(), + }, + ChangeOption::TwscMaxInactiveWindowFiles(files) => ChangeTableOption { + key: TWCS_MAX_INACTIVE_WINDOW_FILES.to_string(), + value: files.map(|s| s.to_string()).unwrap_or_default(), + }, + ChangeOption::TwscMaxOutputFileSize(size) => ChangeTableOption { + key: TWCS_MAX_OUTPUT_FILE_SIZE.to_string(), + value: size.map(|s| s.to_string()).unwrap_or_default(), + }, + ChangeOption::TwscTimeWindow(window) => ChangeTableOption { + key: TWCS_TIME_WINDOW.to_string(), + value: window + .map(|s| humantime::format_duration(s).to_string()) + .unwrap_or_default(), + }, + ChangeOption::TwscFallbackToLocal(flag) => ChangeTableOption { + key: TWCS_FALLBACK_TO_LOCAL.to_string(), + value: flag.map(|s| s.to_string()).unwrap_or_default(), + }, + } + } } impl TryFrom<&ChangeTableOption> for ChangeOption { @@ -668,16 +722,71 @@ impl TryFrom<&ChangeTableOption> for ChangeOption { fn try_from(value: &ChangeTableOption) -> std::result::Result { let ChangeTableOption { key, value } = value; - if key == TTL_KEY { - let ttl = if value.is_empty() { - Duration::from_secs(0) + + // Inline helper for parsing Option + let parse_optional_usize = |value: &str| { + if value.is_empty() { + Ok(None) + } else { + value + .parse::() + .map(Some) + .map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build()) + } + }; + + // Inline helper for parsing Option + let parse_optional_bool = |value: &str| { + if value.is_empty() { + Ok(None) + } else { + value + .parse::() + .map(Some) + .map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build()) + } + }; + + // Inline helper for parsing Option + let parse_optional_duration = |value: &str| { + if value.is_empty() { + Ok(None) } else { humantime::parse_duration(value) - .map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())? - }; - Ok(Self::TTL(ttl)) - } else { - InvalidRegionOptionChangeRequestSnafu { key, value }.fail() + .map(Some) + .map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build()) + } + }; + + match key.as_str() { + TTL_KEY => Ok(Self::TTL( + parse_optional_duration(value)?.unwrap_or(Duration::from_secs(0)), + )), + TWCS_MAX_ACTIVE_WINDOW_RUNS => { + Ok(Self::TwscMaxActiveWindowRuns(parse_optional_usize(value)?)) + } + TWCS_MAX_ACTIVE_WINDOW_FILES => { + Ok(Self::TwscMaxActiveWindowFiles(parse_optional_usize(value)?)) + } + TWCS_MAX_INACTIVE_WINDOW_RUNS => Ok(Self::TwscMaxInactiveWindowRuns( + parse_optional_usize(value)?, + )), + TWCS_MAX_INACTIVE_WINDOW_FILES => Ok(Self::TwscMaxInactiveWindowFiles( + parse_optional_usize(value)?, + )), + TWCS_MAX_OUTPUT_FILE_SIZE => { + let size = if value.is_empty() { + None + } else { + Some(ReadableSize::from_str(value).map_err(|_| { + InvalidRegionOptionChangeRequestSnafu { key, value }.build() + })?) + }; + Ok(Self::TwscMaxOutputFileSize(size)) + } + TWCS_TIME_WINDOW => Ok(Self::TwscTimeWindow(parse_optional_duration(value)?)), + TWCS_FALLBACK_TO_LOCAL => Ok(Self::TwscFallbackToLocal(parse_optional_bool(value)?)), + _ => InvalidRegionOptionChangeRequestSnafu { key, value }.fail(), } } } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 2b56e214cb77..013a488206a3 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -219,6 +219,17 @@ impl TableMeta { new_options.ttl = Some(*new_ttl); } } + _ => { + let change_option = request.to_change_table_option(); + if !change_option.value.is_empty() { + new_options + .extra_options + .insert(change_option.key, change_option.value); + } else { + // Invalidate the previous change option if an empty value has been set. + new_options.extra_options.remove(&change_option.key); + } + } } } let mut builder = self.new_meta_builder(); diff --git a/tests/cases/standalone/common/alter/alter_table_options.result b/tests/cases/standalone/common/alter/alter_table_options.result index 8e31766f0108..27c1f0247106 100644 --- a/tests/cases/standalone/common/alter/alter_table_options.result +++ b/tests/cases/standalone/common/alter/alter_table_options.result @@ -137,6 +137,87 @@ SELECT i FROM ato; | 2 | +---+ +ALTER TABLE ato SET 'compaction.twcs.time_window'='2h'; + +Affected Rows: 0 + +ALTER TABLE ato SET 'compaction.twcs.max_output_file_size'='500MB'; + +Affected Rows: 0 + +ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_files'='2'; + +Affected Rows: 0 + +ALTER TABLE ato SET 'compaction.twcs.max_active_window_files'='2'; + +Affected Rows: 0 + +ALTER TABLE ato SET 'compaction.twcs.max_active_window_runs'='6'; + +Affected Rows: 0 + +ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='6'; + +Affected Rows: 0 + +ALTER TABLE ato SET 'compaction.twcs.fallback_to_local'='false'; + +Affected Rows: 0 + +SHOW CREATE TABLE ato; + ++-------+------------------------------------------------------+ +| Table | Create Table | ++-------+------------------------------------------------------+ +| ato | CREATE TABLE IF NOT EXISTS "ato" ( | +| | "i" INT NULL, | +| | "j" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("j"), | +| | PRIMARY KEY ("i") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | compaction.twcs.fallback_to_local = 'false', | +| | compaction.twcs.max_active_window_files = '2', | +| | compaction.twcs.max_active_window_runs = '6', | +| | compaction.twcs.max_inactive_window_files = '2', | +| | compaction.twcs.max_inactive_window_runs = '6', | +| | compaction.twcs.max_output_file_size = '500.0MiB', | +| | compaction.twcs.time_window = '2h', | +| | ttl = '1s' | +| | ) | ++-------+------------------------------------------------------+ + +ALTER TABLE ato SET 'compaction.twcs.time_window'=''; + +Affected Rows: 0 + +SHOW CREATE TABLE ato; + ++-------+------------------------------------------------------+ +| Table | Create Table | ++-------+------------------------------------------------------+ +| ato | CREATE TABLE IF NOT EXISTS "ato" ( | +| | "i" INT NULL, | +| | "j" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("j"), | +| | PRIMARY KEY ("i") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | compaction.twcs.fallback_to_local = 'false', | +| | compaction.twcs.max_active_window_files = '2', | +| | compaction.twcs.max_active_window_runs = '6', | +| | compaction.twcs.max_inactive_window_files = '2', | +| | compaction.twcs.max_inactive_window_runs = '6', | +| | compaction.twcs.max_output_file_size = '500.0MiB', | +| | ttl = '1s' | +| | ) | ++-------+------------------------------------------------------+ + DROP TABLE ato; Affected Rows: 0 diff --git a/tests/cases/standalone/common/alter/alter_table_options.sql b/tests/cases/standalone/common/alter/alter_table_options.sql index aab558543de2..0483c388cd3c 100644 --- a/tests/cases/standalone/common/alter/alter_table_options.sql +++ b/tests/cases/standalone/common/alter/alter_table_options.sql @@ -28,4 +28,24 @@ SHOW CREATE TABLE ato; SELECT i FROM ato; +ALTER TABLE ato SET 'compaction.twcs.time_window'='2h'; + +ALTER TABLE ato SET 'compaction.twcs.max_output_file_size'='500MB'; + +ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_files'='2'; + +ALTER TABLE ato SET 'compaction.twcs.max_active_window_files'='2'; + +ALTER TABLE ato SET 'compaction.twcs.max_active_window_runs'='6'; + +ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='6'; + +ALTER TABLE ato SET 'compaction.twcs.fallback_to_local'='false'; + +SHOW CREATE TABLE ato; + +ALTER TABLE ato SET 'compaction.twcs.time_window'=''; + +SHOW CREATE TABLE ato; + DROP TABLE ato;