Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support alter twcs compression options #4939

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions src/mito2/src/worker/handle_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use crate::error::{
};
use crate::flush::FlushReason;
use crate::manifest::action::RegionChange;
use crate::region::options::CompactionOptions::Twcs;
use crate::region::options::TwcsOptions;
use crate::region::version::VersionRef;
use crate::region::MitoRegionRef;
use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
Expand Down Expand Up @@ -167,6 +169,67 @@ impl<S> RegionWorkerLoop<S> {
current_options.ttl = Some(new_ttl);
}
}
ChangeOption::TwscMaxActiveWindowRuns(runs) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change all these variants into Twcs(String), then we can refactor these match clauses into:

ChangeOption::Twcs(options) =>
{
    match options.as_str() => {
       "compaction.twcs.max_active_window_runs" => { .. }
      ......
    }
}

let Twcs(options) = &mut current_options.compaction;
let runs = runs.unwrap_or(TwcsOptions::default().max_active_window_runs);
info!(
"Update region compaction.twcs.max_active_window_runs: {}, previous: {} new: {}",
region.region_id, options.max_active_window_runs, runs
);
options.max_active_window_runs = runs;
}
ChangeOption::TwscMaxActiveWindowFiles(files) => {
let Twcs(options) = &mut current_options.compaction;
let files = files.unwrap_or(TwcsOptions::default().max_active_window_files);
info!(
"Update region compaction.twcs.max_active_window_files: {}, previous: {} new: {}",
region.region_id, options.max_active_window_files, files
);
options.max_active_window_files = files;
}
ChangeOption::TwscMaxInactiveWindowRuns(runs) => {
let Twcs(options) = &mut current_options.compaction;
let runs = runs.unwrap_or(TwcsOptions::default().max_inactive_window_runs);
info!(
"Update region compaction.twcs.max_inactive_window_runs: {}, previous: {} new: {}",
region.region_id, options.max_inactive_window_runs, runs
);
options.max_inactive_window_runs = runs;
}
ChangeOption::TwscMaxInactiveWindowFiles(files) => {
let Twcs(options) = &mut current_options.compaction;
let files = files.unwrap_or(TwcsOptions::default().max_inactive_window_files);
info!(
"Update region compaction.twcs.max_inactive_window_files: {}, previous: {} new: {}",
region.region_id, options.max_inactive_window_files, files
);
options.max_inactive_window_files = files;
}
ChangeOption::TwscMaxOutputFileSize(size) => {
let Twcs(options) = &mut current_options.compaction;
info!(
"Update region compaction.twcs.max_output_file_size: {}, previous: {:?} new: {:?}",
region.region_id, options.max_output_file_size, size
);
options.max_output_file_size = size;
}
ChangeOption::TwscTimeWindow(window) => {
let Twcs(options) = &mut current_options.compaction;
info!(
"Update region compaction.twcs.time_window: {}, previous: {:?} new: {:?}",
region.region_id, options.time_window, window
);
options.time_window = window;
}
ChangeOption::TwscFallbackToLocal(allow) => {
let Twcs(options) = &mut current_options.compaction;
let allow = allow.unwrap_or(TwcsOptions::default().fallback_to_local);
info!(
"Update region compaction.twcs.fallback_to_local: {}, previous: {:?} new: {:?}",
region.region_id, options.remote_compaction, allow
);
options.fallback_to_local = allow;
}
}
}
region.version_control.alter_options(current_options);
Expand Down
28 changes: 21 additions & 7 deletions src/store-api/src/mito_engine_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,34 @@ pub const APPEND_MODE_KEY: &str = "append_mode";
pub const MERGE_MODE_KEY: &str = "merge_mode";
/// Option key for TTL(time-to-live)
pub const TTL_KEY: &str = "ttl";
/// Option key for twcs max active window runs.
pub const TWCS_MAX_ACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_active_window_runs";
/// Option key for twcs max active window files.
pub const TWCS_MAX_ACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_active_window_files";
/// Option key for twcs max inactive window runs.
pub const TWCS_MAX_INACTIVE_WINDOW_RUNS: &str = "compaction.twcs.max_inactive_window_runs";
/// Option key for twcs max inactive window files.
pub const TWCS_MAX_INACTIVE_WINDOW_FILES: &str = "compaction.twcs.max_inactive_window_files";
/// Option key for twcs max output file size.
pub const TWCS_MAX_OUTPUT_FILE_SIZE: &str = "compaction.twcs.max_output_file_size";
/// Option key for twcs time window.
pub const TWCS_TIME_WINDOW: &str = "compaction.twcs.time_window";
/// Option key for twcs fallback to local.
pub const TWCS_FALLBACK_TO_LOCAL: &str = "compaction.twcs.fallback_to_local";

/// Returns true if the `key` is a valid option key for the mito engine.
pub fn is_mito_engine_option_key(key: &str) -> bool {
[
"ttl",
"compaction.type",
"compaction.twcs.max_active_window_runs",
"compaction.twcs.max_active_window_files",
"compaction.twcs.max_inactive_window_runs",
"compaction.twcs.max_inactive_window_files",
"compaction.twcs.max_output_file_size",
"compaction.twcs.time_window",
TWCS_MAX_ACTIVE_WINDOW_RUNS,
TWCS_MAX_ACTIVE_WINDOW_FILES,
TWCS_MAX_INACTIVE_WINDOW_RUNS,
TWCS_MAX_INACTIVE_WINDOW_FILES,
TWCS_MAX_OUTPUT_FILE_SIZE,
TWCS_TIME_WINDOW,
"compaction.twcs.remote_compaction",
"compaction.twcs.fallback_to_local",
TWCS_FALLBACK_TO_LOCAL,
"storage",
"index.inverted_index.ignore_column_ids",
"index.inverted_index.segment_row_count",
Expand Down
127 changes: 118 additions & 9 deletions src/store-api/src/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;
use std::fmt;
use std::str::FromStr;
use std::time::Duration;

use api::helper::ColumnDataTypeWrapper;
Expand All @@ -25,6 +26,7 @@ use api::v1::region::{
FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
};
use api::v1::{self, ChangeTableOption, Rows, SemanticType};
use common_base::readable_size::ReadableSize;
pub use common_base::AffectedRows;
use datatypes::data_type::ConcreteDataType;
use serde::{Deserialize, Serialize};
Expand All @@ -36,7 +38,11 @@ use crate::metadata::{
ColumnMetadata, InvalidRawRegionRequestSnafu, InvalidRegionOptionChangeRequestSnafu,
InvalidRegionRequestSnafu, MetadataError, RegionMetadata, Result,
};
use crate::mito_engine_options::TTL_KEY;
use crate::mito_engine_options::{
TTL_KEY, TWCS_FALLBACK_TO_LOCAL, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS,
TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE,
TWCS_TIME_WINDOW,
};
use crate::path_utils::region_dir;
use crate::storage::{ColumnId, RegionId, ScanRequest};

Expand Down Expand Up @@ -661,23 +667,126 @@ impl From<v1::ChangeColumnType> for ChangeColumnType {
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum ChangeOption {
TTL(Duration),
TwscMaxActiveWindowRuns(Option<usize>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about changing these variants into one

Twcs(String)

The String value is the option name such as compaction.twcs.max_active_window_runs etc.

I think it looks much better and we can add new options in the future without adding new variant of ChangeOption.

Copy link
Collaborator Author

@lyang24 lyang24 Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the draw back would be that we cannot validate the type of value at storage api layer - i.e user can pass max_active_window_runs='abcde' and this will error out at mito engine code path (where we handle the alter options).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the draw back would be that we cannot validate the type of value at storage api layer - i.e user can pass max_active_window_runs='abcde' and this will error out at mito engine code path (where we handle the alter options).

We can match the option to verify its validity, as the next comment explains.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see great call

Copy link
Collaborator Author

@lyang24 lyang24 Nov 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a try at implement this approach, I had a few concerns:

  1. we also need the value of the change, the enum would look like Twcs(String, String)
  2. the second comments suggest that we match and validate values at Mito side when handling the alter request which will cause some drift in logics. (i.e. ttl change values are parsed and validated on building the request and it will return a metadata error - InvalidRegionOptionChangeRequest).

This is a draft of the working attempt https://github.com/GreptimeTeam/greptimedb/pull/4965/files if you like it i can push to this branch.

TwscMaxActiveWindowFiles(Option<usize>),
TwscMaxInactiveWindowRuns(Option<usize>),
TwscMaxInactiveWindowFiles(Option<usize>),
TwscMaxOutputFileSize(Option<ReadableSize>),
TwscTimeWindow(Option<Duration>),
TwscFallbackToLocal(Option<bool>),
}

impl ChangeOption {
pub fn to_change_table_option(&self) -> ChangeTableOption {
match self {
ChangeOption::TTL(duration) => ChangeTableOption {
key: "ttl".to_string(),
value: humantime::format_duration(*duration).to_string(),
},
ChangeOption::TwscMaxActiveWindowRuns(runs) => ChangeTableOption {
key: TWCS_MAX_ACTIVE_WINDOW_RUNS.to_string(),
value: runs.map(|s| s.to_string()).unwrap_or_default(),
},
ChangeOption::TwscMaxActiveWindowFiles(files) => ChangeTableOption {
key: TWCS_MAX_ACTIVE_WINDOW_FILES.to_string(),
value: files.map(|s| s.to_string()).unwrap_or_default(),
},
ChangeOption::TwscMaxInactiveWindowRuns(runs) => ChangeTableOption {
key: TWCS_MAX_INACTIVE_WINDOW_RUNS.to_string(),
value: runs.map(|s| s.to_string()).unwrap_or_default(),
},
ChangeOption::TwscMaxInactiveWindowFiles(files) => ChangeTableOption {
key: TWCS_MAX_INACTIVE_WINDOW_FILES.to_string(),
value: files.map(|s| s.to_string()).unwrap_or_default(),
},
ChangeOption::TwscMaxOutputFileSize(size) => ChangeTableOption {
key: TWCS_MAX_OUTPUT_FILE_SIZE.to_string(),
value: size.map(|s| s.to_string()).unwrap_or_default(),
},
ChangeOption::TwscTimeWindow(window) => ChangeTableOption {
key: TWCS_TIME_WINDOW.to_string(),
value: window
.map(|s| humantime::format_duration(s).to_string())
.unwrap_or_default(),
},
ChangeOption::TwscFallbackToLocal(flag) => ChangeTableOption {
key: TWCS_FALLBACK_TO_LOCAL.to_string(),
value: flag.map(|s| s.to_string()).unwrap_or_default(),
},
}
}
}

impl TryFrom<&ChangeTableOption> for ChangeOption {
type Error = MetadataError;

fn try_from(value: &ChangeTableOption) -> std::result::Result<Self, Self::Error> {
let ChangeTableOption { key, value } = value;
if key == TTL_KEY {
let ttl = if value.is_empty() {
Duration::from_secs(0)

// Inline helper for parsing Option<usize>
let parse_optional_usize = |value: &str| {
if value.is_empty() {
Ok(None)
} else {
value
.parse::<usize>()
.map(Some)
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())
}
};

// Inline helper for parsing Option<bool>
let parse_optional_bool = |value: &str| {
if value.is_empty() {
Ok(None)
} else {
value
.parse::<bool>()
.map(Some)
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())
}
};

// Inline helper for parsing Option<Duration>
let parse_optional_duration = |value: &str| {
if value.is_empty() {
Ok(None)
} else {
humantime::parse_duration(value)
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())?
};
Ok(Self::TTL(ttl))
} else {
InvalidRegionOptionChangeRequestSnafu { key, value }.fail()
.map(Some)
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())
}
};

match key.as_str() {
TTL_KEY => Ok(Self::TTL(
parse_optional_duration(value)?.unwrap_or(Duration::from_secs(0)),
)),
TWCS_MAX_ACTIVE_WINDOW_RUNS => {
Ok(Self::TwscMaxActiveWindowRuns(parse_optional_usize(value)?))
}
TWCS_MAX_ACTIVE_WINDOW_FILES => {
Ok(Self::TwscMaxActiveWindowFiles(parse_optional_usize(value)?))
}
TWCS_MAX_INACTIVE_WINDOW_RUNS => Ok(Self::TwscMaxInactiveWindowRuns(
parse_optional_usize(value)?,
)),
TWCS_MAX_INACTIVE_WINDOW_FILES => Ok(Self::TwscMaxInactiveWindowFiles(
parse_optional_usize(value)?,
)),
TWCS_MAX_OUTPUT_FILE_SIZE => {
let size = if value.is_empty() {
None
} else {
Some(ReadableSize::from_str(value).map_err(|_| {
InvalidRegionOptionChangeRequestSnafu { key, value }.build()
})?)
};
Ok(Self::TwscMaxOutputFileSize(size))
}
TWCS_TIME_WINDOW => Ok(Self::TwscTimeWindow(parse_optional_duration(value)?)),
TWCS_FALLBACK_TO_LOCAL => Ok(Self::TwscFallbackToLocal(parse_optional_bool(value)?)),
_ => InvalidRegionOptionChangeRequestSnafu { key, value }.fail(),
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/table/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ impl TableMeta {
new_options.ttl = Some(*new_ttl);
}
}
_ => {
let change_option = request.to_change_table_option();
if !change_option.value.is_empty() {
new_options
.extra_options
.insert(change_option.key, change_option.value);
} else {
// Invalidate the previous change option if an empty value has been set.
new_options.extra_options.remove(&change_option.key);
}
}
}
}
let mut builder = self.new_meta_builder();
Expand Down
81 changes: 81 additions & 0 deletions tests/cases/standalone/common/alter/alter_table_options.result
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,87 @@ SELECT i FROM ato;
| 2 |
+---+

ALTER TABLE ato SET 'compaction.twcs.time_window'='2h';

Affected Rows: 0

ALTER TABLE ato SET 'compaction.twcs.max_output_file_size'='500MB';

Affected Rows: 0

ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_files'='2';

Affected Rows: 0

ALTER TABLE ato SET 'compaction.twcs.max_active_window_files'='2';

Affected Rows: 0

ALTER TABLE ato SET 'compaction.twcs.max_active_window_runs'='6';

Affected Rows: 0

ALTER TABLE ato SET 'compaction.twcs.max_inactive_window_runs'='6';

Affected Rows: 0

ALTER TABLE ato SET 'compaction.twcs.fallback_to_local'='false';

Affected Rows: 0

SHOW CREATE TABLE ato;

+-------+------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | compaction.twcs.fallback_to_local = 'false', |
| | compaction.twcs.max_active_window_files = '2', |
| | compaction.twcs.max_active_window_runs = '6', |
| | compaction.twcs.max_inactive_window_files = '2', |
| | compaction.twcs.max_inactive_window_runs = '6', |
| | compaction.twcs.max_output_file_size = '500.0MiB', |
| | compaction.twcs.time_window = '2h', |
| | ttl = '1s' |
| | ) |
+-------+------------------------------------------------------+

ALTER TABLE ato SET 'compaction.twcs.time_window'='';

Affected Rows: 0

SHOW CREATE TABLE ato;

+-------+------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------+
| ato | CREATE TABLE IF NOT EXISTS "ato" ( |
| | "i" INT NULL, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("i") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | compaction.twcs.fallback_to_local = 'false', |
| | compaction.twcs.max_active_window_files = '2', |
| | compaction.twcs.max_active_window_runs = '6', |
| | compaction.twcs.max_inactive_window_files = '2', |
| | compaction.twcs.max_inactive_window_runs = '6', |
| | compaction.twcs.max_output_file_size = '500.0MiB', |
| | ttl = '1s' |
| | ) |
+-------+------------------------------------------------------+

DROP TABLE ato;

Affected Rows: 0
Expand Down
Loading