Skip to content

Commit

Permalink
refactor: is_physical_table
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Nov 13, 2024
1 parent 613452a commit 9dfca4d
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 34 deletions.
38 changes: 16 additions & 22 deletions src/catalog/src/system_schema/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use datatypes::vectors::{
};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use store_api::storage::{RegionId, ScanRequest, TableId};
use table::metadata::{TableInfo, TableType};

Expand Down Expand Up @@ -260,27 +259,22 @@ impl InformationSchemaTablesBuilder {
let table_info = table.table_info();

// TODO(dennis): make it working for metric engine
let table_region_stats = if table_info.meta.engine == MITO_ENGINE
|| table_info
.meta
.options
.extra_options
.contains_key(PHYSICAL_TABLE_METADATA_KEY)
{
let region_ids = table_info
.meta
.region_numbers
.iter()
.map(|n| RegionId::new(table_info.ident.table_id, *n))
.collect::<HashSet<_>>();

region_stats
.iter()
.filter(|stat| region_ids.contains(&stat.id))
.collect::<Vec<_>>()
} else {
vec![]
};
let table_region_stats =
if table_info.meta.engine == MITO_ENGINE || table_info.is_physical_table() {
let region_ids = table_info
.meta
.region_numbers
.iter()
.map(|n| RegionId::new(table_info.ident.table_id, *n))
.collect::<HashSet<_>>();

region_stats
.iter()
.filter(|stat| region_ids.contains(&stat.id))
.collect::<Vec<_>>()
} else {
vec![]
};

self.add_table(
&predicates,
Expand Down
213 changes: 213 additions & 0 deletions src/datanode/src/#store.rs#
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! object storage utilities

mod azblob;
mod fs;
mod gcs;
mod oss;
mod s3;

use std::sync::Arc;
use std::time::Duration;
use std::{env, path};

use common_base::readable_size::ReadableSize;
use common_telemetry::{info, warn};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;

use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};

pub(crate) async fn new_raw_object_store(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let data_home = normalize_dir(data_home);
let object_store = match store {
ObjectStoreConfig::File(file_config) => {
fs::new_fs_object_store(&data_home, file_config).await
}
ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await,
ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await,
ObjectStoreConfig::Azblob(azblob_config) => {
azblob::new_azblob_object_store(azblob_config).await
}
ObjectStoreConfig::Gcs(gcs_config) => gcs::new_gcs_object_store(gcs_config).await,
}?;
Ok(object_store)
}

fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
object_store.layer(
RetryLayer::new()
.with_jitter()
.with_notify(PrintDetailedError),
)
}

pub(crate) async fn new_object_store_without_cache(
store: &ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = with_retry_layers(object_store);

let object_store = with_instrument_layers(object_store, true);
Ok(object_store)
}

pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(&store, data_home).await?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = {
let object_store = if let Some(cache_layer) = build_cache_layer(&store).await? {
// Adds cache layer
object_store.layer(cache_layer)
} else {
object_store
};

// Adds retry layer
with_retry_layers(object_store)
};

let object_store = with_instrument_layers(object_store, true);
Ok(object_store)
}

async fn build_cache_layer(
store_config: &ObjectStoreConfig,
) -> Result<Option<LruCacheLayer<impl Access>>> {
let path = "/tmp/local_cache".to_string();
let (cache_path, cache_capacity) = match store_config {
ObjectStoreConfig::S3(s3_config) => {
let path = s3_config.cache.cache_path.as_ref();
let capacity = s3_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
ObjectStoreConfig::Oss(oss_config) => {
let path = oss_config.cache.cache_path.as_ref();
let capacity = oss_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
ObjectStoreConfig::Azblob(azblob_config) => {
let path = azblob_config.cache.cache_path.as_ref();
let capacity = azblob_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
ObjectStoreConfig::Gcs(gcs_config) => {
let path = gcs_config.cache.cache_path.as_ref();
let capacity = gcs_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
_ => (Some(path.), ReadableSize(1024*1024*256)),
};

if let Some(path) = cache_path {
let atomic_temp_dir = join_dir(path, ".tmp/");
clean_temp_dir(&atomic_temp_dir)?;

let cache_store = Fs::default()
.root(path)
.atomic_write_dir(&atomic_temp_dir)
.build()
.context(error::InitBackendSnafu)?;

let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.await
.context(error::InitBackendSnafu)?;

info!(
"Enabled local object storage cache, path: {}, capacity: {}.",
path, cache_capacity
);

Ok(Some(cache_layer))
} else {
Ok(None)
}
}

pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> {
if path::Path::new(&dir).exists() {
info!("Begin to clean temp storage directory: {}", dir);
std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
info!("Cleaned temp storage directory: {}", dir);
}

Ok(())
}

pub(crate) fn build_http_client() -> Result<HttpClient> {
let http_builder = {
let mut builder = reqwest::ClientBuilder::new();

// Pool max idle per host controls connection pool size.
// Default to no limit, set to `0` for disable it.
let pool_max_idle_per_host = env::var("_GREPTIMEDB_HTTP_POOL_MAX_IDLE_PER_HOST")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(usize::MAX);
builder = builder.pool_max_idle_per_host(pool_max_idle_per_host);

// Connect timeout default to 30s.
let connect_timeout = env::var("_GREPTIMEDB_HTTP_CONNECT_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(30);
builder = builder.connect_timeout(Duration::from_secs(connect_timeout));

// Pool connection idle timeout default to 90s.
let idle_timeout = env::var("_GREPTIMEDB_HTTP_POOL_IDLE_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(90);

builder = builder.pool_idle_timeout(Duration::from_secs(idle_timeout));

builder
};

HttpClient::build(http_builder).context(error::InitBackendSnafu)
}
struct PrintDetailedError;

// PrintDetailedError is a retry interceptor that prints error in Debug format in retrying.
impl RetryInterceptor for PrintDetailedError {
fn intercept(&self, err: &Error, dur: Duration) {
warn!("Retry after {}s, error: {:#?}", dur.as_secs_f64(), err);
}
}
10 changes: 4 additions & 6 deletions src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use store_api::metric_engine_consts::{
METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
use store_api::region_engine::RegionEngine;
Expand Down Expand Up @@ -61,7 +60,7 @@ impl MetricEngineInner {
) -> Result<AffectedRows> {
Self::verify_region_create_request(&request)?;

let result = if request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) {
let result = if request.is_physical_table() {
self.create_physical_region(region_id, request).await
} else if request.options.contains_key(LOGICAL_TABLE_METADATA_KEY) {
let physical_region_id = self.create_logical_region(region_id, request).await?;
Expand Down Expand Up @@ -355,12 +354,11 @@ impl MetricEngineInner {

// check if required table option is present
ensure!(
request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
|| request.options.contains_key(LOGICAL_TABLE_METADATA_KEY),
request.is_physical_table() || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY),
MissingRegionOptionSnafu {}
);
ensure!(
!(request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
!(request.is_physical_table()
&& request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)),
ConflictRegionOptionSnafu {}
);
Expand Down Expand Up @@ -543,7 +541,7 @@ impl MetricEngineInner {

#[cfg(test)]
mod test {
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};

use super::*;
use crate::engine::MetricEngine;
Expand Down
8 changes: 2 additions & 6 deletions src/metric-engine/src/engine/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ use common_telemetry::info;
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
use snafu::ResultExt;
use store_api::metric_engine_consts::{
DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR, PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;
Expand All @@ -46,9 +44,7 @@ impl MetricEngineInner {
region_id: RegionId,
request: RegionOpenRequest,
) -> Result<AffectedRows> {
let is_opening_physical_region = request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY);

if is_opening_physical_region {
if request.is_physical_table() {
// open physical region and recover states
self.open_physical_region(region_id, request).await?;
self.recover_states(region_id).await?;
Expand Down
13 changes: 13 additions & 0 deletions src/store-api/src/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::metadata::{
InvalidRegionOptionChangeRequestSnafu, InvalidRegionRequestSnafu, MetadataError,
RegionMetadata, Result,
};
use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use crate::mito_engine_options::{
TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS,
TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE,
Expand Down Expand Up @@ -306,6 +307,11 @@ impl RegionCreateRequest {

Ok(())
}

/// Returns true when the region belongs to the metric engine's physical table.
pub fn is_physical_table(&self) -> bool {
self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
}
}

#[derive(Debug, Clone, Default)]
Expand All @@ -324,6 +330,13 @@ pub struct RegionOpenRequest {
pub skip_wal_replay: bool,
}

impl RegionOpenRequest {
/// Returns true when the region belongs to the metric engine's physical table.
pub fn is_physical_table(&self) -> bool {
self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
}
}

/// Close region request.
#[derive(Debug)]
pub struct RegionCloseRequest {}
Expand Down
9 changes: 9 additions & 0 deletions src/table/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use datatypes::schema::{
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
use store_api::region_request::ChangeOption;
use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
Expand Down Expand Up @@ -797,6 +798,14 @@ impl TableInfo {
pub fn full_table_name(&self) -> String {
common_catalog::format_full_table_name(&self.catalog_name, &self.schema_name, &self.name)
}

/// Returns true when the table is the metric engine's physical table.
pub fn is_physical_table(&self) -> bool {
self.meta
.options
.extra_options
.contains_key(PHYSICAL_TABLE_METADATA_KEY)
}
}

impl TableInfoBuilder {
Expand Down

0 comments on commit 9dfca4d

Please sign in to comment.