From 9dfca4d6625dabeb306c730f87b1ec222d65ec1d Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Wed, 13 Nov 2024 11:05:21 +0800 Subject: [PATCH] refactor: is_physical_table --- .../information_schema/tables.rs | 38 ++-- src/datanode/src/#store.rs# | 213 ++++++++++++++++++ src/metric-engine/src/engine/create.rs | 10 +- src/metric-engine/src/engine/open.rs | 8 +- src/store-api/src/region_request.rs | 13 ++ src/table/src/metadata.rs | 9 + 6 files changed, 257 insertions(+), 34 deletions(-) create mode 100644 src/datanode/src/#store.rs# diff --git a/src/catalog/src/system_schema/information_schema/tables.rs b/src/catalog/src/system_schema/information_schema/tables.rs index a786cbca2b69..4e56face5d23 100644 --- a/src/catalog/src/system_schema/information_schema/tables.rs +++ b/src/catalog/src/system_schema/information_schema/tables.rs @@ -34,7 +34,6 @@ use datatypes::vectors::{ }; use futures::TryStreamExt; use snafu::{OptionExt, ResultExt}; -use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; use store_api::storage::{RegionId, ScanRequest, TableId}; use table::metadata::{TableInfo, TableType}; @@ -260,27 +259,22 @@ impl InformationSchemaTablesBuilder { let table_info = table.table_info(); // TODO(dennis): make it working for metric engine - let table_region_stats = if table_info.meta.engine == MITO_ENGINE - || table_info - .meta - .options - .extra_options - .contains_key(PHYSICAL_TABLE_METADATA_KEY) - { - let region_ids = table_info - .meta - .region_numbers - .iter() - .map(|n| RegionId::new(table_info.ident.table_id, *n)) - .collect::>(); - - region_stats - .iter() - .filter(|stat| region_ids.contains(&stat.id)) - .collect::>() - } else { - vec![] - }; + let table_region_stats = + if table_info.meta.engine == MITO_ENGINE || table_info.is_physical_table() { + let region_ids = table_info + .meta + .region_numbers + .iter() + .map(|n| RegionId::new(table_info.ident.table_id, *n)) + .collect::>(); + + region_stats + .iter() + .filter(|stat| region_ids.contains(&stat.id)) + .collect::>() + } else { + vec![] + }; self.add_table( &predicates, diff --git a/src/datanode/src/#store.rs# b/src/datanode/src/#store.rs# new file mode 100644 index 000000000000..a5c51dccb006 --- /dev/null +++ b/src/datanode/src/#store.rs# @@ -0,0 +1,213 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! object storage utilities + +mod azblob; +mod fs; +mod gcs; +mod oss; +mod s3; + +use std::sync::Arc; +use std::time::Duration; +use std::{env, path}; + +use common_base::readable_size::ReadableSize; +use common_telemetry::{info, warn}; +use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer}; +use object_store::services::Fs; +use object_store::util::{join_dir, normalize_dir, with_instrument_layers}; +use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder}; +use snafu::prelude::*; + +use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; +use crate::error::{self, Result}; + +pub(crate) async fn new_raw_object_store( + store: &ObjectStoreConfig, + data_home: &str, +) -> Result { + let data_home = normalize_dir(data_home); + let object_store = match store { + ObjectStoreConfig::File(file_config) => { + fs::new_fs_object_store(&data_home, file_config).await + } + ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await, + ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await, + ObjectStoreConfig::Azblob(azblob_config) => { + azblob::new_azblob_object_store(azblob_config).await + } + ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await, + }?; + Ok(object_store) +} + +fn with_retry_layers(object_store: ObjectStore) -> ObjectStore { + object_store.layer( + RetryLayer::new() + .with_jitter() + .with_notify(PrintDetailedError), + ) +} + +pub(crate) async fn new_object_store_without_cache( + store: &ObjectStoreConfig, + data_home: &str, +) -> Result { + let object_store = new_raw_object_store(store, data_home).await?; + // Enable retry layer and cache layer for non-fs object storages + let object_store = with_retry_layers(object_store); + + let object_store = with_instrument_layers(object_store, true); + Ok(object_store) +} + +pub(crate) async fn new_object_store( + store: ObjectStoreConfig, + data_home: &str, +) -> Result { + let object_store = new_raw_object_store(&store, data_home).await?; + // Enable retry layer and cache layer for non-fs object storages + let object_store = { + let object_store = if let Some(cache_layer) = build_cache_layer(&store).await? { + // Adds cache layer + object_store.layer(cache_layer) + } else { + object_store + }; + + // Adds retry layer + with_retry_layers(object_store) + }; + + let object_store = with_instrument_layers(object_store, true); + Ok(object_store) +} + +async fn build_cache_layer( + store_config: &ObjectStoreConfig, +) -> Result>> { + let path = "/tmp/local_cache".to_string(); + let (cache_path, cache_capacity) = match store_config { + ObjectStoreConfig::S3(s3_config) => { + let path = s3_config.cache.cache_path.as_ref(); + let capacity = s3_config + .cache + .cache_capacity + .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); + (path, capacity) + } + ObjectStoreConfig::Oss(oss_config) => { + let path = oss_config.cache.cache_path.as_ref(); + let capacity = oss_config + .cache + .cache_capacity + .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); + (path, capacity) + } + ObjectStoreConfig::Azblob(azblob_config) => { + let path = azblob_config.cache.cache_path.as_ref(); + let capacity = azblob_config + .cache + .cache_capacity + .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); + (path, capacity) + } + ObjectStoreConfig::Gcs(gcs_config) => { + let path = gcs_config.cache.cache_path.as_ref(); + let capacity = gcs_config + .cache + .cache_capacity + .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); + (path, capacity) + } + _ => (Some(path.), ReadableSize(1024*1024*256)), + }; + + if let Some(path) = cache_path { + let atomic_temp_dir = join_dir(path, ".tmp/"); + clean_temp_dir(&atomic_temp_dir)?; + + let cache_store = Fs::default() + .root(path) + .atomic_write_dir(&atomic_temp_dir) + .build() + .context(error::InitBackendSnafu)?; + + let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize) + .await + .context(error::InitBackendSnafu)?; + + info!( + "Enabled local object storage cache, path: {}, capacity: {}.", + path, cache_capacity + ); + + Ok(Some(cache_layer)) + } else { + Ok(None) + } +} + +pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> { + if path::Path::new(&dir).exists() { + info!("Begin to clean temp storage directory: {}", dir); + std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?; + info!("Cleaned temp storage directory: {}", dir); + } + + Ok(()) +} + +pub(crate) fn build_http_client() -> Result { + let http_builder = { + let mut builder = reqwest::ClientBuilder::new(); + + // Pool max idle per host controls connection pool size. + // Default to no limit, set to `0` for disable it. + let pool_max_idle_per_host = env::var("_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(usize::MAX); + builder = builder.pool_max_idle_per_host(pool_max_idle_per_host); + + // Connect timeout default to 30s. + let connect_timeout = env::var("_GREPTIMEDB_HTTP_CONNECT_TIMEOUT") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(30); + builder = builder.connect_timeout(Duration::from_secs(connect_timeout)); + + // Pool connection idle timeout default to 90s. + let idle_timeout = env::var("_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(90); + + builder = builder.pool_idle_timeout(Duration::from_secs(idle_timeout)); + + builder + }; + + HttpClient::build(http_builder).context(error::InitBackendSnafu) +} +struct PrintDetailedError; + +// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying. +impl RetryInterceptor for PrintDetailedError { + fn intercept(&self, err: &Error, dur: Duration) { + warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err); + } +} diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index b90d49005810..0523dd1e5389 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -31,7 +31,6 @@ use store_api::metric_engine_consts::{ METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME, METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME, - PHYSICAL_TABLE_METADATA_KEY, }; use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY}; use store_api::region_engine::RegionEngine; @@ -61,7 +60,7 @@ impl MetricEngineInner { ) -> Result { Self::verify_region_create_request(&request)?; - let result = if request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) { + let result = if request.is_physical_table() { self.create_physical_region(region_id, request).await } else if request.options.contains_key(LOGICAL_TABLE_METADATA_KEY) { let physical_region_id = self.create_logical_region(region_id, request).await?; @@ -355,12 +354,11 @@ impl MetricEngineInner { // check if required table option is present ensure!( - request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) - || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY), + request.is_physical_table() || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY), MissingRegionOptionSnafu {} ); ensure!( - !(request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) + !(request.is_physical_table() && request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)), ConflictRegionOptionSnafu {} ); @@ -543,7 +541,7 @@ impl MetricEngineInner { #[cfg(test)] mod test { - use store_api::metric_engine_consts::METRIC_ENGINE_NAME; + use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY}; use super::*; use crate::engine::MetricEngine; diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 97b049e01dca..bf41099b39b3 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -18,9 +18,7 @@ use common_telemetry::info; use mito2::engine::MITO_ENGINE_NAME; use object_store::util::join_dir; use snafu::ResultExt; -use store_api::metric_engine_consts::{ - DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR, PHYSICAL_TABLE_METADATA_KEY, -}; +use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR}; use store_api::region_engine::RegionEngine; use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; use store_api::storage::RegionId; @@ -46,9 +44,7 @@ impl MetricEngineInner { region_id: RegionId, request: RegionOpenRequest, ) -> Result { - let is_opening_physical_region = request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY); - - if is_opening_physical_region { + if request.is_physical_table() { // open physical region and recover states self.open_physical_region(region_id, request).await?; self.recover_states(region_id).await?; diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index cd587a97d816..ce4ab9a5c93e 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -38,6 +38,7 @@ use crate::metadata::{ InvalidRegionOptionChangeRequestSnafu, InvalidRegionRequestSnafu, MetadataError, RegionMetadata, Result, }; +use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; use crate::mito_engine_options::{ TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS, TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE, @@ -306,6 +307,11 @@ impl RegionCreateRequest { Ok(()) } + + /// Returns true when the region belongs to the metric engine's physical table. + pub fn is_physical_table(&self) -> bool { + self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) + } } #[derive(Debug, Clone, Default)] @@ -324,6 +330,13 @@ pub struct RegionOpenRequest { pub skip_wal_replay: bool, } +impl RegionOpenRequest { + /// Returns true when the region belongs to the metric engine's physical table. + pub fn is_physical_table(&self) -> bool { + self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) + } +} + /// Close region request. #[derive(Debug)] pub struct RegionCloseRequest {} diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 53f0cf8f2d16..88c9ab11f098 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -26,6 +26,7 @@ use datatypes::schema::{ use derive_builder::Builder; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS}; use store_api::region_request::ChangeOption; use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId}; @@ -797,6 +798,14 @@ impl TableInfo { pub fn full_table_name(&self) -> String { common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name) } + + /// Returns true when the table is the metric engine's physical table. + pub fn is_physical_table(&self) -> bool { + self.meta + .options + .extra_options + .contains_key(PHYSICAL_TABLE_METADATA_KEY) + } } impl TableInfoBuilder {