Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable partition-store db flush on shutdown #2365

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub use metadata::{
};
pub use task_center::{
cancellation_token, cancellation_watcher, is_cancellation_requested, my_node_id, AsyncRuntime,
MetadataFutureExt, RuntimeError, RuntimeRootTaskHandle, TaskCenter, TaskCenterBuildError,
MetadataFutureExt, RuntimeError, RuntimeTaskHandle, TaskCenter, TaskCenterBuildError,
TaskCenterBuilder, TaskCenterFutureExt, TaskContext, TaskHandle, TaskId, TaskKind,
};

Expand Down
45 changes: 34 additions & 11 deletions crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl TaskCenter {
runtime_name: &'static str,
partition_id: Option<PartitionId>,
root_future: impl FnOnce() -> F + Send + 'static,
) -> Result<RuntimeRootTaskHandle<anyhow::Result<()>>, RuntimeError>
) -> Result<RuntimeTaskHandle<anyhow::Result<()>>, RuntimeError>
where
F: Future<Output = anyhow::Result<()>> + 'static,
{
Expand Down Expand Up @@ -276,7 +276,7 @@ struct TaskCenterInner {
pause_time: bool,
default_runtime_handle: tokio::runtime::Handle,
ingress_runtime_handle: tokio::runtime::Handle,
managed_runtimes: Mutex<HashMap<&'static str, Arc<tokio::runtime::Runtime>>>,
managed_runtimes: Mutex<HashMap<&'static str, OwnedRuntimeHandle>>,
start_time: Instant,
/// We hold on to the owned Runtime to ensure it's dropped when task center is dropped. If this
/// is None, it means that it's the responsibility of the Handle owner to correctly drop
Expand Down Expand Up @@ -555,7 +555,7 @@ impl TaskCenterInner {
runtime_name: &'static str,
partition_id: Option<PartitionId>,
root_future: impl FnOnce() -> F + Send + 'static,
) -> Result<RuntimeRootTaskHandle<anyhow::Result<()>>, RuntimeError>
) -> Result<RuntimeTaskHandle<anyhow::Result<()>>, RuntimeError>
where
F: Future<Output = anyhow::Result<()>> + 'static,
{
Expand Down Expand Up @@ -590,7 +590,10 @@ impl TaskCenterInner {

let rt_handle = Arc::new(rt);

runtimes_guard.insert(runtime_name, rt_handle.clone());
runtimes_guard.insert(
runtime_name,
OwnedRuntimeHandle::new(runtime_name, cancel.clone(), rt_handle.clone()),
);

// release the lock.
drop(runtimes_guard);
Expand Down Expand Up @@ -625,16 +628,22 @@ impl TaskCenterInner {
})
.unwrap();

Ok(RuntimeRootTaskHandle {
inner_handle: result_rx,
cancellation_token: cancel,
})
Ok(RuntimeTaskHandle::new(runtime_name, cancel, result_rx))
}

/// Runs **only** after the inner main thread has completed work and no other owner exists for
/// the runtime handle.
fn drop_runtime(self: &Arc<Self>, name: &'static str) {
let mut runtimes_guard = self.managed_runtimes.lock();
if runtimes_guard.remove(name).is_some() {
trace!("Runtime {} was dropped", name);
if let Some(runtime) = runtimes_guard.remove(name) {
// We must be the only owner of runtime at this point.
let name = runtime.name().to_owned();
debug!("Runtime {} completed", runtime.name());
let owner = Arc::into_inner(runtime.into_inner());
if let Some(runtime) = owner {
runtime.shutdown_timeout(Duration::from_secs(2));
trace!("Runtime {} shutdown completed", name);
}
}
}

Expand Down Expand Up @@ -803,6 +812,7 @@ impl TaskCenterInner {
} else {
info!(%reason, "** Shutdown requested");
}
self.initiate_managed_runtimes_shutdown();
self.cancel_tasks(None, None).await;
self.shutdown_managed_runtimes();
// notify outer components that we have completed the shutdown.
Expand Down Expand Up @@ -906,10 +916,23 @@ impl TaskCenterInner {
}
}

fn initiate_managed_runtimes_shutdown(self: &Arc<Self>) {
let runtimes = self.managed_runtimes.lock();
for (name, runtime) in runtimes.iter() {
// asking the root task in the runtime to shutdown gracefully.
runtime.cancel();
trace!("Asked runtime {} to shutdown gracefully", name);
}
}

fn shutdown_managed_runtimes(self: &Arc<Self>) {
let mut runtimes = self.managed_runtimes.lock();
for (_, runtime) in runtimes.drain() {
if let Some(runtime) = Arc::into_inner(runtime) {
if let Some(runtime) = Arc::into_inner(runtime.into_inner()) {
// This really isn't doing much, but it's left here for completion.
// The reason is: If the runtime is still running, then it'll hold the Arc until it
// finishes gracefully, yielding None here. If the runtime completed, it'll
// self-shutdown prior to reaching this point.
runtime.shutdown_background();
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/task_center/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tracing::instrument;
use crate::{Metadata, ShutdownError};

use super::{
RuntimeError, RuntimeRootTaskHandle, TaskCenterInner, TaskContext, TaskHandle, TaskId, TaskKind,
RuntimeError, RuntimeTaskHandle, TaskCenterInner, TaskContext, TaskHandle, TaskId, TaskKind,
};

#[derive(Clone, derive_more::Debug)]
Expand Down Expand Up @@ -83,7 +83,7 @@ impl Handle {
runtime_name: &'static str,
partition_id: Option<PartitionId>,
root_future: impl FnOnce() -> F + Send + 'static,
) -> Result<RuntimeRootTaskHandle<anyhow::Result<()>>, RuntimeError>
) -> Result<RuntimeTaskHandle<anyhow::Result<()>>, RuntimeError>
where
F: Future<Output = anyhow::Result<()>> + 'static,
{
Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/task_center/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ impl TaskCenterMonitoring for Handle {

fn managed_runtime_metrics(&self) -> Vec<(&'static str, RuntimeMetrics)> {
let guard = self.inner.managed_runtimes.lock();
guard.iter().map(|(k, v)| (*k, v.metrics())).collect()
guard
.iter()
.map(|(k, v)| (*k, v.runtime_handle().metrics()))
.collect()
}

/// How long has the task-center been running?
Expand Down
72 changes: 65 additions & 7 deletions crates/core/src/task_center/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,50 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::borrow::Cow;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Context, Poll};

use futures::FutureExt;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;

/// A handle for a dedicated runtime managed by task-center
pub struct RuntimeRootTaskHandle<T> {
pub(crate) cancellation_token: CancellationToken,
pub(crate) inner_handle: oneshot::Receiver<T>,
pub struct RuntimeTaskHandle<T> {
name: Cow<'static, str>,
cancellation_token: CancellationToken,
inner_handle: oneshot::Receiver<T>,
}

impl<T> RuntimeRootTaskHandle<T> {
impl<T> RuntimeTaskHandle<T> {
pub fn new(
name: impl Into<Cow<'static, str>>,
cancellation_token: CancellationToken,
result_receiver: oneshot::Receiver<T>,
) -> Self {
Self {
name: name.into(),
cancellation_token,
inner_handle: result_receiver,
}
}
// The runtime name
pub fn name(&self) -> &str {
&self.name
}
/// Trigger graceful shutdown of the runtime root task. Shutdown is not guaranteed, it depends
/// on whether the root task awaits the cancellation token or not.
pub fn cancel(&self) {
self.cancellation_token.cancel()
}

pub fn cancellation_token(&self) -> CancellationToken {
self.cancellation_token.clone()
pub fn cancellation_token(&self) -> &CancellationToken {
&self.cancellation_token
}
}

impl<T> std::future::Future for RuntimeRootTaskHandle<T> {
impl<T> std::future::Future for RuntimeTaskHandle<T> {
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand All @@ -42,3 +60,43 @@ impl<T> std::future::Future for RuntimeRootTaskHandle<T> {
)
}
}

pub(super) struct OwnedRuntimeHandle {
name: Cow<'static, str>,
cancellation_token: CancellationToken,
inner: Arc<tokio::runtime::Runtime>,
}

impl OwnedRuntimeHandle {
pub fn new(
name: impl Into<Cow<'static, str>>,
cancellation_token: CancellationToken,
runtime: Arc<tokio::runtime::Runtime>,
) -> Self {
Self {
name: name.into(),
cancellation_token,
inner: runtime,
}
}

// The runtime name
pub fn name(&self) -> &str {
&self.name
}

// The runtime name
pub fn runtime_handle(&self) -> &tokio::runtime::Handle {
self.inner.handle()
}

/// Trigger graceful shutdown of the runtime root task. Shutdown is not guaranteed, it depends
/// on whether the root task awaits the cancellation token or not.
pub fn cancel(&self) {
self.cancellation_token.cancel()
}

pub fn into_inner(self) -> Arc<tokio::runtime::Runtime> {
self.inner
}
}
3 changes: 3 additions & 0 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ impl PartitionStoreManager {
cf_options(per_partition_memory_budget),
)
.ensure_column_families(partition_ids_to_cfs(initial_partition_set))
// This is added as an experiment. We might make this configurable to let users decide
// on the trade-off between shutdown time and startup catchup time.
.add_to_flush_on_shutdown(CfPrefixPattern::ANY)
.build()
.expect("valid spec");

Expand Down
8 changes: 8 additions & 0 deletions crates/rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ impl RocksDb {
);
return;
}
let start = Instant::now();

debug!(
db = %self.name,
Expand All @@ -448,6 +449,13 @@ impl RocksDb {
"Failed to flush memtables: {}",
e
);
} else {
info!(
db = %self.name,
"{} column families flushed in {:?}",
cfs_to_flush.len(),
start.elapsed(),
);
Comment on lines +453 to +458
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

}
self.db.cancel_all_background_work(true);
};
Expand Down
6 changes: 3 additions & 3 deletions crates/worker/src/partition_processor_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use restate_core::{
cancellation_watcher, my_node_id, Metadata, ShutdownError, TaskCenterFutureExt, TaskHandle,
TaskKind,
};
use restate_core::{RuntimeRootTaskHandle, TaskCenter};
use restate_core::{RuntimeTaskHandle, TaskCenter};
use restate_invoker_api::StatusHandle;
use restate_invoker_impl::{BuildError, ChannelStatusReader};
use restate_metadata_store::{MetadataStoreClient, ReadModifyWriteError};
Expand Down Expand Up @@ -456,7 +456,7 @@ impl PartitionProcessorManager {
fn await_runtime_task_result(
&mut self,
partition_id: PartitionId,
runtime_task_handle: RuntimeRootTaskHandle<anyhow::Result<()>>,
runtime_task_handle: RuntimeTaskHandle<anyhow::Result<()>>,
) {
self.asynchronous_operations.spawn(
async move {
Expand Down Expand Up @@ -881,7 +881,7 @@ struct AsynchronousEvent {

#[derive(strum::IntoStaticStr)]
enum EventKind {
Started(anyhow::Result<(StartedProcessor, RuntimeRootTaskHandle<anyhow::Result<()>>)>),
Started(anyhow::Result<(StartedProcessor, RuntimeTaskHandle<anyhow::Result<()>>)>),
Stopped(anyhow::Result<()>),
NewLeaderEpoch {
leader_epoch_token: LeaderEpochToken,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::sync::{mpsc, watch};
use tracing::instrument;

use restate_bifrost::Bifrost;
use restate_core::{Metadata, RuntimeRootTaskHandle, TaskCenter, TaskKind};
use restate_core::{Metadata, RuntimeTaskHandle, TaskCenter, TaskKind};
use restate_invoker_impl::Service as InvokerService;
use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager};
use restate_service_protocol::codec::ProtobufRawEntryCodec;
Expand Down Expand Up @@ -66,7 +66,7 @@ impl SpawnPartitionProcessorTask {
)]
pub async fn run(
self,
) -> anyhow::Result<(StartedProcessor, RuntimeRootTaskHandle<anyhow::Result<()>>)> {
) -> anyhow::Result<(StartedProcessor, RuntimeTaskHandle<anyhow::Result<()>>)> {
let Self {
task_name,
partition_id,
Expand Down Expand Up @@ -145,7 +145,7 @@ impl SpawnPartitionProcessorTask {
)?;

let state = StartedProcessor::new(
root_task_handle.cancellation_token(),
root_task_handle.cancellation_token().clone(),
key_range,
control_tx,
status_reader,
Expand Down
Loading