Skip to content

Commit

Permalink
support alter twcs compression options
Browse files Browse the repository at this point in the history
  • Loading branch information
lyang24 committed Nov 5, 2024
1 parent be72d3b commit 0ff73fa
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 15 deletions.
88 changes: 88 additions & 0 deletions src/mito2/src/worker/handle_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use crate::error::{
};
use crate::flush::FlushReason;
use crate::manifest::action::RegionChange;
use crate::region::options::CompactionOptions::Twcs;
use crate::region::options::TwcsOptions;
use crate::region::version::VersionRef;
use crate::region::MitoRegionRef;
use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
Expand Down Expand Up @@ -167,6 +169,92 @@ impl<S> RegionWorkerLoop<S> {
current_options.ttl = Some(new_ttl);
}
}
ChangeOption::TwscMaxActiveWindowRuns(runs) => {
let Twcs(options) = current_options.compaction;
let runs = runs.unwrap_or(TwcsOptions::default().max_active_window_runs);
info!(
"Update region compaction.twcs.max_active_window_runs: {}, previous: {} new: {}",
region.region_id, options.max_active_window_runs, runs
);
let mut new_option = options.clone();
new_option.max_active_window_runs = runs;
current_options.compaction = Twcs(new_option)
}
ChangeOption::TwscMaxActiveWindowFiles(files) => {
let Twcs(options) = current_options.compaction;
let files = files.unwrap_or(TwcsOptions::default().max_active_window_files);
info!(
"Update region compaction.twcs.max_active_window_files: {}, previous: {} new: {}",
region.region_id, options.max_active_window_files, files
);
let mut new_option = options.clone();
new_option.max_active_window_files = files;
current_options.compaction = Twcs(new_option)
}
ChangeOption::TwscMaxInactiveWindowRuns(runs) => {
let Twcs(options) = current_options.compaction;
let runs = runs.unwrap_or(TwcsOptions::default().max_inactive_window_runs);
info!(
"Update region compaction.twcs.max_inactive_window_runs: {}, previous: {} new: {}",
region.region_id, options.max_inactive_window_runs, runs
);
let mut new_option = options.clone();
new_option.max_inactive_window_runs = runs;
current_options.compaction = Twcs(new_option)
}
ChangeOption::TwscMaxInactiveWindowFiles(files) => {
let Twcs(options) = current_options.compaction;
let files = files.unwrap_or(TwcsOptions::default().max_inactive_window_files);
info!(
"Update region compaction.twcs.max_inactive_window_files: {}, previous: {} new: {}",
region.region_id, options.max_active_window_runs, files
);
let mut new_option = options.clone();
new_option.max_inactive_window_files = files;
current_options.compaction = Twcs(new_option)
}
ChangeOption::TwscMaxOutputFileSize(size) => {
let Twcs(options) = current_options.compaction;
info!(
"Update region compaction.twcs.max_output_file_size: {}, previous: {:?} new: {:?}",
region.region_id, options.max_output_file_size, size
);
let mut new_option = options.clone();
new_option.max_output_file_size = size;
current_options.compaction = Twcs(new_option)
}
ChangeOption::TwscTimeWindow(window) => {
let Twcs(options) = current_options.compaction;
info!(
"Update region compaction.twcs.time_window: {}, previous: {:?} new: {:?}",
region.region_id, options.time_window, window
);
let mut new_option = options.clone();
new_option.time_window = window;
current_options.compaction = Twcs(new_option)
}
ChangeOption::TwscRemoteCompaction(allow) => {
let Twcs(options) = current_options.compaction;
let allow = allow.unwrap_or(TwcsOptions::default().remote_compaction);
info!(
"Update region compaction.twcs.remote_compaction: {}, previous: {:?} new: {:?}",
region.region_id, options.remote_compaction, allow
);
let mut new_option = options.clone();
new_option.remote_compaction = allow;
current_options.compaction = Twcs(new_option)
}
ChangeOption::TwscFallbackToLocal(allow) => {
let Twcs(options) = current_options.compaction;
let allow = allow.unwrap_or(TwcsOptions::default().fallback_to_local);
info!(
"Update region compaction.twcs.fallback_to_local: {}, previous: {:?} new: {:?}",
region.region_id, options.remote_compaction, allow
);
let mut new_option = options.clone();
new_option.fallback_to_local = allow;
current_options.compaction = Twcs(new_option)
}
}
}
region.version_control.alter_options(current_options);
Expand Down
85 changes: 77 additions & 8 deletions src/store-api/src/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;
use std::fmt;
use std::str::FromStr;
use std::time::Duration;

use api::helper::ColumnDataTypeWrapper;
Expand All @@ -25,6 +26,7 @@ use api::v1::region::{
FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
};
use api::v1::{self, ChangeTableOption, Rows, SemanticType};
use common_base::readable_size::ReadableSize;
pub use common_base::AffectedRows;
use datatypes::data_type::ConcreteDataType;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -661,23 +663,90 @@ impl From<v1::ChangeColumnType> for ChangeColumnType {
#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub enum ChangeOption {
TTL(Duration),
TwscMaxActiveWindowRuns(Option<usize>),
TwscMaxActiveWindowFiles(Option<usize>),
TwscMaxInactiveWindowRuns(Option<usize>),
TwscMaxInactiveWindowFiles(Option<usize>),
TwscMaxOutputFileSize(Option<ReadableSize>),
TwscTimeWindow(Option<Duration>),
TwscRemoteCompaction(Option<bool>),
TwscFallbackToLocal(Option<bool>),
}

impl TryFrom<&ChangeTableOption> for ChangeOption {
type Error = MetadataError;

fn try_from(value: &ChangeTableOption) -> std::result::Result<Self, Self::Error> {
let ChangeTableOption { key, value } = value;
if key == TTL_KEY {
let ttl = if value.is_empty() {
Duration::from_secs(0)

// Inline helper for parsing Option<usize>
let parse_optional_usize = |value: &str| {
if value.is_empty() {
Ok(None)
} else {
value
.parse::<usize>()
.map(Some)
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())
}
};

// Inline helper for parsing Option<bool>
let parse_optional_bool = |value: &str| {
if value.is_empty() {
Ok(None)
} else {
value
.parse::<bool>()
.map(Some)
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())
}
};

// Inline helper for parsing Option<Duration>
let parse_optional_duration = |value: &str| {
if value.is_empty() {
Ok(None)
} else {
humantime::parse_duration(value)
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())?
};
Ok(Self::TTL(ttl))
} else {
InvalidRegionOptionChangeRequestSnafu { key, value }.fail()
.map(Some)
.map_err(|_| InvalidRegionOptionChangeRequestSnafu { key, value }.build())
}
};

match key.as_str() {
TTL_KEY => Ok(Self::TTL(
parse_optional_duration(value)?.unwrap_or(Duration::from_secs(0)),
)),
"compaction.twcs.max_active_window_runs" => {
Ok(Self::TwscMaxActiveWindowRuns(parse_optional_usize(value)?))
}
"compaction.twcs.max_active_window_files" => {
Ok(Self::TwscMaxActiveWindowRuns(parse_optional_usize(value)?))
}
"compaction.twcs.max_inactive_window_files" => Ok(Self::TwscMaxInactiveWindowFiles(
parse_optional_usize(value)?,
)),
"compaction.twcs.max_output_file_size" => {
let size = if value.is_empty() {
None
} else {
Some(ReadableSize::from_str(value).map_err(|_| {
InvalidRegionOptionChangeRequestSnafu { key, value }.build()
})?)
};
Ok(Self::TwscMaxOutputFileSize(size))
}
"compaction.twcs.time_window" => {
Ok(Self::TwscTimeWindow(parse_optional_duration(value)?))
}
"compaction.twcs.remote_compaction" => {
Ok(Self::TwscRemoteCompaction(parse_optional_bool(value)?))
}
"compaction.twcs.fallback_to_local" => {
Ok(Self::TwscFallbackToLocal(parse_optional_bool(value)?))
}
_ => InvalidRegionOptionChangeRequestSnafu { key, value }.fail(),
}
}
}
Expand Down
12 changes: 5 additions & 7 deletions src/table/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,11 @@ impl TableMeta {
let mut new_options = self.options.clone();

Check warning on line 220 in src/table/src/metadata.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/greptimedb/greptimedb/src/table/src/metadata.rs
for request in requests {
match request {
ChangeOption::TTL(new_ttl) => {
if new_ttl.is_zero() {
new_options.ttl = None;
} else {
new_options.ttl = Some(*new_ttl);
}
if let ChangeOption::TTL(new_ttl) = request {
if new_ttl.is_zero() {
new_options.ttl = None;
} else {
new_options.ttl = Some(*new_ttl);
}
}
}
Expand Down

0 comments on commit 0ff73fa

Please sign in to comment.