Skip to content

Commit

Permalink
Improve async cancellation safety of future::Cache
Browse files Browse the repository at this point in the history
- Add unit tests for async cancellations.
- Some refactoring.
  • Loading branch information
tatsuya6502 committed Aug 27, 2023
1 parent f629869 commit 0c21545
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 79 deletions.
35 changes: 18 additions & 17 deletions src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ where
}
}

/// Operation that has been interrupted by async cancellation.
pub(crate) enum PendingOp<K, V> {
/// Operation that has been interrupted (stopped polling) by async cancellation.
pub(crate) enum InterruptedOp<K, V> {
CallEvictionListener {
ts: Instant,
// 'static means that the future can capture only owned value and/or static
Expand All @@ -89,21 +89,22 @@ pub(crate) enum PendingOp<K, V> {
},
}

/// Drop guard for an operation being performed. If this guard is dropped while it is
/// still having the future or the write op, it will convert them to a PendingOp and
/// send to the pending operation channel, so that the operation can be retried
/// later.
struct PendingOpGuard<'a, K, V> {
pending_op_ch: &'a Sender<PendingOp<K, V>>,
/// Drop guard for an async task being performed. If this guard is dropped while it
/// is still having the shared `future` or the write `op`, it will convert them to an
/// `InterruptedOp` and send it to the interrupted operations channel. Later, the
/// interrupted op will be retried by `retry_interrupted_ops` method of
/// `BaseCache`.
struct CancelGuard<'a, K, V> {
interrupted_op_ch: &'a Sender<InterruptedOp<K, V>>,
ts: Instant,
future: Option<Shared<BoxFuture<'static, ()>>>,
op: Option<WriteOp<K, V>>,
}

impl<'a, K, V> PendingOpGuard<'a, K, V> {
fn new(pending_op_ch: &'a Sender<PendingOp<K, V>>, ts: Instant) -> Self {
impl<'a, K, V> CancelGuard<'a, K, V> {
fn new(interrupted_op_ch: &'a Sender<InterruptedOp<K, V>>, ts: Instant) -> Self {
Self {
pending_op_ch,
interrupted_op_ch,
ts,
future: Default::default(),
op: Default::default(),
Expand All @@ -129,20 +130,20 @@ impl<'a, K, V> PendingOpGuard<'a, K, V> {
}
}

impl<'a, K, V> Drop for PendingOpGuard<'a, K, V> {
impl<'a, K, V> Drop for CancelGuard<'a, K, V> {
fn drop(&mut self) {
let pending_op = match (self.future.take(), self.op.take()) {
(Some(future), Some(op)) => PendingOp::CallEvictionListener {
let interrupted_op = match (self.future.take(), self.op.take()) {
(Some(future), Some(op)) => InterruptedOp::CallEvictionListener {
ts: self.ts,
future,
op,
},
(None, Some(op)) => PendingOp::SendWriteOp { ts: self.ts, op },
(None, Some(op)) => InterruptedOp::SendWriteOp { ts: self.ts, op },
_ => return,
};

self.pending_op_ch
.send(pending_op)
self.interrupted_op_ch
.send(interrupted_op)
.expect("Failed to send a pending op");
}
}
Expand Down
107 changes: 68 additions & 39 deletions src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{
invalidator::{GetOrRemoveEntry, Invalidator, KeyDateLite, PredicateFun},
key_lock::{KeyLock, KeyLockMap},
notifier::RemovalNotifier,
PendingOp, PredicateId,
InterruptedOp, PredicateId,
};

use crate::{
Expand All @@ -25,7 +25,7 @@ use crate::{
timer_wheel::{ReschedulingResult, TimerWheel},
CacheRegion,
},
future::PendingOpGuard,
future::CancelGuard,
notification::{AsyncEvictionListener, RemovalCause},
policy::ExpirationPolicy,
sync_base::iter::ScanningGet,
Expand Down Expand Up @@ -60,8 +60,8 @@ pub(crate) struct BaseCache<K, V, S = RandomState> {
pub(crate) inner: Arc<Inner<K, V, S>>,
read_op_ch: Sender<ReadOp<K, V>>,
pub(crate) write_op_ch: Sender<WriteOp<K, V>>,
pub(crate) pending_op_ch_snd: Sender<PendingOp<K, V>>,
pub(crate) pending_op_ch_rcv: Receiver<PendingOp<K, V>>,
pub(crate) interrupted_op_ch_snd: Sender<InterruptedOp<K, V>>,
pub(crate) interrupted_op_ch_rcv: Receiver<InterruptedOp<K, V>>,
pub(crate) housekeeper: Option<HouseKeeperArc>,
}

Expand All @@ -75,8 +75,8 @@ impl<K, V, S> Clone for BaseCache<K, V, S> {
inner: Arc::clone(&self.inner),
read_op_ch: self.read_op_ch.clone(),
write_op_ch: self.write_op_ch.clone(),
pending_op_ch_snd: self.pending_op_ch_snd.clone(),
pending_op_ch_rcv: self.pending_op_ch_rcv.clone(),
interrupted_op_ch_snd: self.interrupted_op_ch_snd.clone(),
interrupted_op_ch_rcv: self.interrupted_op_ch_rcv.clone(),
housekeeper: self.housekeeper.as_ref().map(Arc::clone),
}
}
Expand Down Expand Up @@ -129,7 +129,7 @@ impl<K, V, S> BaseCache<K, V, S> {
K: Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
self.inner.notify_invalidate_future(key, entry)
self.inner.notify_invalidate(key, entry)
}

#[cfg(feature = "unstable-debug-counters")]
Expand Down Expand Up @@ -174,7 +174,7 @@ where

let (r_snd, r_rcv) = crossbeam_channel::bounded(r_size);
let (w_snd, w_rcv) = crossbeam_channel::bounded(w_size);
let (p_snd, p_rcv) = crossbeam_channel::unbounded();
let (i_snd, i_rcv) = crossbeam_channel::unbounded();

let inner = Arc::new(Inner::new(
name,
Expand All @@ -193,8 +193,8 @@ where
inner,
read_op_ch: r_snd,
write_op_ch: w_snd,
pending_op_ch_snd: p_snd,
pending_op_ch_rcv: p_rcv,
interrupted_op_ch_snd: i_snd,
interrupted_op_ch_rcv: i_rcv,
housekeeper: Some(Arc::new(Housekeeper::default())),
}
}
Expand Down Expand Up @@ -244,7 +244,9 @@ where
return None;
}

self.process_pending_ops().await;
if record_read {
self.retry_interrupted_ops().await;
}

let mut now = self.current_time_from_expiration_clock();

Expand Down Expand Up @@ -373,13 +375,13 @@ where
pub(crate) async fn apply_reads_writes_if_needed(
inner: Arc<impl InnerSync + Send + Sync + 'static>,
ch: &Sender<WriteOp<K, V>>,
ts: Instant,
now: Instant,
housekeeper: Option<&HouseKeeperArc>,
) {
let w_len = ch.len();

if let Some(hk) = housekeeper {
if Self::should_apply_writes(hk, w_len, ts) {
if Self::should_apply_writes(hk, w_len, now) {
hk.try_run_pending_tasks(inner).await;
}
}
Expand Down Expand Up @@ -470,7 +472,7 @@ where
hash: u64,
value: V,
) -> (WriteOp<K, V>, Instant) {
self.process_pending_ops().await;
self.retry_interrupted_ops().await;

let ts = self.current_time_from_expiration_clock();
let weight = self.inner.weigh(&key, &value);
Expand Down Expand Up @@ -533,19 +535,19 @@ where
);

match (op1, op2) {
(Some((_cnt, ins_op)), None) => self.post_insert(ts, &key, ins_op),
(Some((_cnt, ins_op)), None) => self.do_post_insert_steps(ts, &key, ins_op),
(Some((cnt1, ins_op)), Some((cnt2, ..))) if cnt1 > cnt2 => {
self.post_insert(ts, &key, ins_op)
self.do_post_insert_steps(ts, &key, ins_op)
}
(_, Some((_cnt, old_entry, upd_op))) => {
self.post_update(ts, key, old_entry, upd_op, &self.pending_op_ch_snd)
self.do_post_update_steps(ts, key, old_entry, upd_op, &self.interrupted_op_ch_snd)
.await
}
(None, None) => unreachable!(),
}
}

fn post_insert(
fn do_post_insert_steps(
&self,
ts: Instant,
key: &Arc<K>,
Expand All @@ -559,13 +561,13 @@ where
(ins_op, ts)
}

async fn post_update<'a>(
async fn do_post_update_steps<'a>(
&self,
ts: Instant,
key: Arc<K>,
old_info: OldEntryInfo<K, V>,
upd_op: WriteOp<K, V>,
pending_op_ch: &'a Sender<PendingOp<K, V>>,
interrupted_op_ch: &'a Sender<InterruptedOp<K, V>>,
) -> (WriteOp<K, V>, Instant) {
use futures_util::FutureExt;

Expand Down Expand Up @@ -593,12 +595,18 @@ where
old_info.last_modified,
)
.shared();
let mut drop_guard = PendingOpGuard::new(pending_op_ch, ts);
drop_guard.set_future_and_op(future.clone(), upd_op.clone());
// Async Cancellation Safety: To ensure the above future should be
// executed even if our caller async task is cancelled, we create a
// cancel guard for the future (and the upd_op). If our caller is
// cancelled while we are awaiting for the future, the cancel guard will
// save the future and the upd_op to the interrupted_op_ch channel, so
// that we can resume/retry later.
let mut cancel_guard = CancelGuard::new(interrupted_op_ch, ts);
cancel_guard.set_future_and_op(future.clone(), upd_op.clone());

// Notify the eviction listener.
future.await;
drop_guard.clear();
cancel_guard.clear();
}

crossbeam_epoch::pin().flush();
Expand All @@ -612,7 +620,19 @@ where
op: WriteOp<K, V>,
ts: Instant,
housekeeper: Option<&HouseKeeperArc>,
// Used only for testing.
_should_block: bool,
) -> Result<(), TrySendError<WriteOp<K, V>>> {
// Testing stuff.
#[cfg(test)]
if _should_block {
// We are going to do a dead-lock here to simulate a full channel.
let mutex = Mutex::new(());
let _guard = mutex.lock().await;
// This should dead-lock.
mutex.lock().await;
}

let mut op = op;
let mut spin_count = 0u8;
loop {
Expand Down Expand Up @@ -650,34 +670,43 @@ where
}
}

pub(crate) async fn process_pending_ops(&self) {
while let Ok(op) = self.pending_op_ch_rcv.try_recv() {
let mut drop_guard;
pub(crate) async fn retry_interrupted_ops(&self) {
while let Ok(op) = self.interrupted_op_ch_rcv.try_recv() {
// Async Cancellation Safety: Remember that we are in an async task here.
// If our caller is cancelled while we are awaiting for the future, we
// will be cancelled too at the await point. In that case, the cancel
// guard below will save the future and the op to the interrupted_op_ch
// channel, so that we can resume/retry later.
let mut cancel_guard;

// Resume an interrupted future if there is one.
match op {
PendingOp::CallEvictionListener { ts, future, op } => {
drop_guard = PendingOpGuard::new(&self.pending_op_ch_snd, ts);
drop_guard.set_future_and_op(future.clone(), op);
// Resume the interrupted future.
InterruptedOp::CallEvictionListener { ts, future, op } => {
cancel_guard = CancelGuard::new(&self.interrupted_op_ch_snd, ts);
cancel_guard.set_future_and_op(future.clone(), op);
// Resume the interrupted future (which will notify an eviction
// to the eviction listener).
future.await;
// If we are here, it means the above future has been completed.
drop_guard.unset_future();
cancel_guard.unset_future();
}
PendingOp::SendWriteOp { ts, op } => {
drop_guard = PendingOpGuard::new(&self.pending_op_ch_snd, ts);
drop_guard.set_op(op);
InterruptedOp::SendWriteOp { ts, op } => {
cancel_guard = CancelGuard::new(&self.interrupted_op_ch_snd, ts);
cancel_guard.set_op(op);
}
}

let ts = drop_guard.ts;
let op = drop_guard.op.as_ref().cloned().unwrap();
// Retry to schedule the write op.
let ts = cancel_guard.ts;
let op = cancel_guard.op.as_ref().cloned().unwrap();
let hk = self.housekeeper.as_ref();
Self::schedule_write_op(&self.inner, &self.write_op_ch, op, ts, hk)
Self::schedule_write_op(&self.inner, &self.write_op_ch, op, ts, hk, false)
.await
.expect("Failed to reschedule a write op");

// If we are here, it means the above write op has been scheduled.
drop_guard.clear();
// We are all good now. Clear the cancel guard.
cancel_guard.clear();
}
}

Expand Down Expand Up @@ -2406,7 +2435,7 @@ where
}

#[inline]
fn notify_invalidate_future(
fn notify_invalidate(
&self,
key: &Arc<K>,
entry: &TrioArc<ValueEntry<K, V>>,
Expand Down
Loading

0 comments on commit 0c21545

Please sign in to comment.