Skip to content

Commit

Permalink
fix/use-cache-kv-manager: Moved InvalidateSchemaCacheHandler to a sep…
Browse files Browse the repository at this point in the history
…arate module

 • Extracted InvalidateSchemaCacheHandler and associated tests into a new file cache_invalidator.rs
 • Removed async_trait and CacheInvalidator related code from heartbeat.rs
 • Added cache_invalidator module declaration in handler.rs
  • Loading branch information
v0y4g3r committed Nov 26, 2024
1 parent 7ac1c28 commit 0876f62
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 142 deletions.
144 changes: 2 additions & 142 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,15 @@ use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
use async_trait::async_trait;
use catalog::kvbackend::CachedKvBackend;
use common_meta::cache_invalidator::{CacheInvalidator, Context};
use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::{
HandleControl, HandlerGroupExecutor, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
HeartbeatResponseHandlerExecutorRef,
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_meta::instruction::{CacheIdent, Instruction};
use common_telemetry::{debug, error, info, trace, warn};
use meta_client::client::{HeartbeatSender, MetaClient};
use meta_client::MetaClientRef;
Expand All @@ -44,6 +40,7 @@ use crate::alive_keeper::RegionAliveKeeper;
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::RegionServerEventReceiver;
use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler;
use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
use crate::region_server::RegionServer;

Expand Down Expand Up @@ -366,140 +363,3 @@ impl HeartbeatTask {
Ok(())
}
}

#[derive(Clone)]
pub struct InvalidateSchemaCacheHandler {
cached_kv_backend: Arc<CachedKvBackend>,
}

#[async_trait]
impl HeartbeatResponseHandler for InvalidateSchemaCacheHandler {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
matches!(
ctx.incoming_message.as_ref(),
Some((_, Instruction::InvalidateCaches(_)))
)
}

async fn handle(
&self,
ctx: &mut HeartbeatResponseHandlerContext,
) -> common_meta::error::Result<HandleControl> {
let Some((_, Instruction::InvalidateCaches(caches))) = ctx.incoming_message.take() else {
unreachable!("InvalidateSchemaCacheHandler: should be guarded by 'is_acceptable'")
};

debug!(
"InvalidateSchemaCacheHandler: invalidating caches: {:?}",
caches
);

let schema_caches = caches
.into_iter()
.filter(|i| matches!(i, CacheIdent::SchemaName(_)))
.collect::<Vec<_>>();

self.cached_kv_backend
.invalidate(&Context::default(), &schema_caches)
.await?;
Ok(HandleControl::Done)
}
}

impl InvalidateSchemaCacheHandler {
pub fn new(cached_kv_backend: Arc<CachedKvBackend>) -> Self {
Self { cached_kv_backend }
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::HeartbeatResponse;
use catalog::kvbackend::CachedKvBackendBuilder;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
use common_meta::key::{MetadataKey, SchemaMetadataManager};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackend;
use common_meta::rpc::store::PutRequest;

use crate::heartbeat::InvalidateSchemaCacheHandler;

#[tokio::test]
async fn test_invalidate_schema_cache_handler() {
let inner_kv = Arc::new(MemoryKvBackend::default());
let cached_kv = Arc::new(CachedKvBackendBuilder::new(inner_kv.clone()).build());
let schema_metadata_manager = SchemaMetadataManager::new(cached_kv.clone());

let schema_name = "test_schema";
let catalog_name = "test_catalog";
schema_metadata_manager
.register_region_table_info(
1,
"test_table",
schema_name,
catalog_name,
Some(SchemaNameValue {
ttl: Some(Duration::from_secs(1)),
}),
)
.await;

schema_metadata_manager
.get_schema_options_by_table_id(1)
.await
.unwrap();

let schema_key = SchemaNameKey::new(catalog_name, schema_name).to_bytes();
let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(3)),
}
.try_as_raw_value()
.unwrap();
inner_kv
.put(PutRequest {
key: schema_key.clone(),
value: new_schema_value,
prev_kv: false,
})
.await
.unwrap();

let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
InvalidateSchemaCacheHandler::new(cached_kv),
)]));

let (tx, _) = tokio::sync::mpsc::channel(8);
let mailbox = Arc::new(HeartbeatMailbox::new(tx));

// removes a valid key
let response = HeartbeatResponse::default();
let mut ctx: HeartbeatResponseHandlerContext =
HeartbeatResponseHandlerContext::new(mailbox, response);
ctx.incoming_message = Some((
MessageMeta::new_test(1, "hi", "foo", "bar"),
Instruction::InvalidateCaches(vec![CacheIdent::SchemaName(SchemaName {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
})]),
));
executor.handle(ctx).await.unwrap();

assert_eq!(
Some(Duration::from_secs(3)),
SchemaNameValue::try_from_raw_value(
&inner_kv.get(&schema_key).await.unwrap().unwrap().value
)
.unwrap()
.unwrap()
.ttl
);
}
}
1 change: 1 addition & 0 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use futures::future::BoxFuture;
use snafu::OptionExt;
use store_api::storage::RegionId;

pub(crate) mod cache_invalidator;
mod close_region;
mod downgrade_region;
mod open_region;
Expand Down
163 changes: 163 additions & 0 deletions src/datanode/src/heartbeat/handler/cache_invalidator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Schema cache invalidator handler

use std::sync::Arc;

use async_trait::async_trait;
use catalog::kvbackend::CachedKvBackend;
use common_meta::cache_invalidator::{CacheInvalidator, Context};
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{CacheIdent, Instruction};
use common_telemetry::debug;

#[derive(Clone)]
pub(crate) struct InvalidateSchemaCacheHandler {
cached_kv_backend: Arc<CachedKvBackend>,
}

#[async_trait]
impl HeartbeatResponseHandler for InvalidateSchemaCacheHandler {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
matches!(
ctx.incoming_message.as_ref(),
Some((_, Instruction::InvalidateCaches(_)))
)
}

async fn handle(
&self,
ctx: &mut HeartbeatResponseHandlerContext,
) -> common_meta::error::Result<HandleControl> {
let Some((_, Instruction::InvalidateCaches(caches))) = ctx.incoming_message.take() else {
unreachable!("InvalidateSchemaCacheHandler: should be guarded by 'is_acceptable'")
};

debug!(
"InvalidateSchemaCacheHandler: invalidating caches: {:?}",
caches
);

let schema_caches = caches
.into_iter()
.filter(|i| matches!(i, CacheIdent::SchemaName(_)))
.collect::<Vec<_>>();

self.cached_kv_backend
.invalidate(&Context::default(), &schema_caches)
.await?;
Ok(HandleControl::Done)
}
}

impl InvalidateSchemaCacheHandler {
pub fn new(cached_kv_backend: Arc<CachedKvBackend>) -> Self {
Self { cached_kv_backend }
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::HeartbeatResponse;
use catalog::kvbackend::CachedKvBackendBuilder;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
use common_meta::key::{MetadataKey, SchemaMetadataManager};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackend;
use common_meta::rpc::store::PutRequest;

use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler;

#[tokio::test]
async fn test_invalidate_schema_cache_handler() {
let inner_kv = Arc::new(MemoryKvBackend::default());
let cached_kv = Arc::new(CachedKvBackendBuilder::new(inner_kv.clone()).build());
let schema_metadata_manager = SchemaMetadataManager::new(cached_kv.clone());

let schema_name = "test_schema";
let catalog_name = "test_catalog";
schema_metadata_manager
.register_region_table_info(
1,
"test_table",
schema_name,
catalog_name,
Some(SchemaNameValue {
ttl: Some(Duration::from_secs(1)),
}),
)
.await;

schema_metadata_manager
.get_schema_options_by_table_id(1)
.await
.unwrap();

let schema_key = SchemaNameKey::new(catalog_name, schema_name).to_bytes();
let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(3)),
}
.try_as_raw_value()
.unwrap();
inner_kv
.put(PutRequest {
key: schema_key.clone(),
value: new_schema_value,
prev_kv: false,
})
.await
.unwrap();

let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
InvalidateSchemaCacheHandler::new(cached_kv),
)]));

let (tx, _) = tokio::sync::mpsc::channel(8);
let mailbox = Arc::new(HeartbeatMailbox::new(tx));

// removes a valid key
let response = HeartbeatResponse::default();
let mut ctx: HeartbeatResponseHandlerContext =
HeartbeatResponseHandlerContext::new(mailbox, response);
ctx.incoming_message = Some((
MessageMeta::new_test(1, "hi", "foo", "bar"),
Instruction::InvalidateCaches(vec![CacheIdent::SchemaName(SchemaName {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
})]),
));
executor.handle(ctx).await.unwrap();

assert_eq!(
Some(Duration::from_secs(3)),
SchemaNameValue::try_from_raw_value(
&inner_kv.get(&schema_key).await.unwrap().unwrap().value
)
.unwrap()
.unwrap()
.ttl
);
}
}

0 comments on commit 0876f62

Please sign in to comment.