diff --git a/MIGRATION-GUIDE.md b/MIGRATION-GUIDE.md index 0a7d09a5..e7b87bf4 100644 --- a/MIGRATION-GUIDE.md +++ b/MIGRATION-GUIDE.md @@ -146,7 +146,7 @@ let cache = Cache::builder() `async_eviction_listener` takes a closure that returns a `Future`. If you need to `.await` something in the eviction listener, use this method. The actual return type -of the closure is `future::ListenerFuture`, which is a type alias of +of the closure is `notification::ListenerFuture`, which is a type alias of `Pin + Send>>`. You can use the `boxed` method of `future::FutureExt` trait to convert a regular `Future` into this type. @@ -242,7 +242,6 @@ free some memory), you do not need to call `run_pending_tasks` method. - To enable it, see [Enabling the thread pool](#enabling-the-thread-pool) for more details. - #### Enabling the thread pool To enable the thread pool, do the followings: diff --git a/src/common/concurrent.rs b/src/common/concurrent.rs index 96ca2193..c5e5d97f 100644 --- a/src/common/concurrent.rs +++ b/src/common/concurrent.rs @@ -1,7 +1,7 @@ use crate::common::{deque::DeqNode, time::Instant}; use parking_lot::Mutex; -use std::{ptr::NonNull, sync::Arc}; +use std::{fmt, ptr::NonNull, sync::Arc}; use tagptr::TagNonNull; use triomphe::Arc as TrioArc; @@ -112,6 +112,15 @@ impl KvEntry { } } +impl Clone for KvEntry { + fn clone(&self) -> Self { + Self { + key: Arc::clone(&self.key), + entry: TrioArc::clone(&self.entry), + } + } +} + impl AccessTime for DeqNode> { #[inline] fn last_accessed(&self) -> Option { @@ -316,3 +325,49 @@ pub(crate) enum WriteOp { }, Remove(KvEntry), } + +/// Cloning a WriteOp is safe and cheap because it uses Arc and TrioArc pointers to +/// the actual data. +impl Clone for WriteOp { + fn clone(&self) -> Self { + match self { + Self::Upsert { + key_hash, + value_entry, + old_weight, + new_weight, + } => Self::Upsert { + key_hash: key_hash.clone(), + value_entry: TrioArc::clone(value_entry), + old_weight: *old_weight, + new_weight: *new_weight, + }, + Self::Remove(kv_hash) => Self::Remove(kv_hash.clone()), + } + } +} + +impl fmt::Debug for WriteOp { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Upsert { .. } => f.debug_struct("Upsert").finish(), + Self::Remove(..) => f.debug_tuple("Remove").finish(), + } + } +} + +pub(crate) struct OldEntryInfo { + pub(crate) entry: TrioArc>, + pub(crate) last_accessed: Option, + pub(crate) last_modified: Option, +} + +impl OldEntryInfo { + pub(crate) fn new(entry: &TrioArc>) -> Self { + Self { + entry: TrioArc::clone(entry), + last_accessed: entry.last_accessed(), + last_modified: entry.last_modified(), + } + } +} diff --git a/src/future.rs b/src/future.rs index b12a08e2..da172656 100644 --- a/src/future.rs +++ b/src/future.rs @@ -4,10 +4,13 @@ //! To use this module, enable a crate feature called "future". use async_lock::Mutex; -use futures_util::future::BoxFuture; +use crossbeam_channel::Sender; +use futures_util::future::{BoxFuture, Shared}; use once_cell::sync::Lazy; use std::{future::Future, hash::Hash, sync::Arc}; +use crate::common::{concurrent::WriteOp, time::Instant}; + mod base_cache; mod builder; mod cache; @@ -71,6 +74,80 @@ where } } +/// Operation that has been interrupted (stopped polling) by async cancellation. +pub(crate) enum InterruptedOp { + CallEvictionListener { + ts: Instant, + // 'static means that the future can capture only owned value and/or static + // references. No non-static references are allowed. + future: Shared>, + op: WriteOp, + }, + SendWriteOp { + ts: Instant, + op: WriteOp, + }, +} + +/// Drop guard for an async task being performed. If this guard is dropped while it +/// is still having the shared `future` or the write `op`, it will convert them to an +/// `InterruptedOp` and send it to the interrupted operations channel. Later, the +/// interrupted op will be retried by `retry_interrupted_ops` method of +/// `BaseCache`. +struct CancelGuard<'a, K, V> { + interrupted_op_ch: &'a Sender>, + ts: Instant, + future: Option>>, + op: Option>, +} + +impl<'a, K, V> CancelGuard<'a, K, V> { + fn new(interrupted_op_ch: &'a Sender>, ts: Instant) -> Self { + Self { + interrupted_op_ch, + ts, + future: Default::default(), + op: Default::default(), + } + } + + fn set_future_and_op(&mut self, future: Shared>, op: WriteOp) { + self.future = Some(future); + self.op = Some(op); + } + + fn set_op(&mut self, op: WriteOp) { + self.op = Some(op); + } + + fn unset_future(&mut self) { + self.future = None; + } + + fn clear(&mut self) { + self.future = None; + self.op = None; + } +} + +impl<'a, K, V> Drop for CancelGuard<'a, K, V> { + fn drop(&mut self) { + let interrupted_op = match (self.future.take(), self.op.take()) { + (Some(future), Some(op)) => InterruptedOp::CallEvictionListener { + ts: self.ts, + future, + op, + }, + (None, Some(op)) => InterruptedOp::SendWriteOp { ts: self.ts, op }, + _ => return, + }; + + self.interrupted_op_ch + .send(interrupted_op) + .expect("Failed to send a pending op"); + } +} + /// May yield to other async tasks. pub(crate) async fn may_yield() { static LOCK: Lazy> = Lazy::new(Default::default); diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index 683bc4bb..59481965 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -3,7 +3,7 @@ use super::{ invalidator::{GetOrRemoveEntry, Invalidator, KeyDateLite, PredicateFun}, key_lock::{KeyLock, KeyLockMap}, notifier::RemovalNotifier, - PredicateId, + InterruptedOp, PredicateId, }; use crate::{ @@ -16,7 +16,8 @@ use crate::{ }, deques::Deques, entry_info::EntryInfo, - AccessTime, KeyHash, KeyHashDate, KvEntry, ReadOp, ValueEntry, Weigher, WriteOp, + AccessTime, KeyHash, KeyHashDate, KvEntry, OldEntryInfo, ReadOp, ValueEntry, Weigher, + WriteOp, }, deque::{DeqNode, Deque}, frequency_sketch::FrequencySketch, @@ -24,6 +25,7 @@ use crate::{ timer_wheel::{ReschedulingResult, TimerWheel}, CacheRegion, }, + future::CancelGuard, notification::{AsyncEvictionListener, RemovalCause}, policy::ExpirationPolicy, sync_base::iter::ScanningGet, @@ -37,6 +39,7 @@ use async_lock::{Mutex, MutexGuard, RwLock}; use async_trait::async_trait; use crossbeam_channel::{Receiver, Sender, TrySendError}; use crossbeam_utils::atomic::AtomicCell; +use futures_util::future::BoxFuture; use parking_lot::RwLock as SyncRwLock; use smallvec::SmallVec; use std::{ @@ -57,6 +60,8 @@ pub(crate) struct BaseCache { pub(crate) inner: Arc>, read_op_ch: Sender>, pub(crate) write_op_ch: Sender>, + pub(crate) interrupted_op_ch_snd: Sender>, + pub(crate) interrupted_op_ch_rcv: Receiver>, pub(crate) housekeeper: Option, } @@ -70,6 +75,8 @@ impl Clone for BaseCache { inner: Arc::clone(&self.inner), read_op_ch: self.read_op_ch.clone(), write_op_ch: self.write_op_ch.clone(), + interrupted_op_ch_snd: self.interrupted_op_ch_snd.clone(), + interrupted_op_ch_rcv: self.interrupted_op_ch_rcv.clone(), housekeeper: self.housekeeper.as_ref().map(Arc::clone), } } @@ -113,12 +120,16 @@ impl BaseCache { self.inner.current_time_from_expiration_clock() } - pub(crate) async fn notify_invalidate(&self, key: &Arc, entry: &TrioArc>) + pub(crate) fn notify_invalidate( + &self, + key: &Arc, + entry: &TrioArc>, + ) -> BoxFuture<'static, ()> where K: Send + Sync + 'static, V: Clone + Send + Sync + 'static, { - self.inner.notify_invalidate(key, entry).await; + self.inner.notify_invalidate(key, entry) } #[cfg(feature = "unstable-debug-counters")] @@ -163,6 +174,7 @@ where let (r_snd, r_rcv) = crossbeam_channel::bounded(r_size); let (w_snd, w_rcv) = crossbeam_channel::bounded(w_size); + let (i_snd, i_rcv) = crossbeam_channel::unbounded(); let inner = Arc::new(Inner::new( name, @@ -181,6 +193,8 @@ where inner, read_op_ch: r_snd, write_op_ch: w_snd, + interrupted_op_ch_snd: i_snd, + interrupted_op_ch_rcv: i_rcv, housekeeper: Some(Arc::new(Housekeeper::default())), } } @@ -230,6 +244,10 @@ where return None; } + if record_read { + self.retry_interrupted_ops().await; + } + let mut now = self.current_time_from_expiration_clock(); let maybe_kv_and_op = self @@ -454,6 +472,8 @@ where hash: u64, value: V, ) -> (WriteOp, Instant) { + self.retry_interrupted_ops().await; + let ts = self.current_time_from_expiration_clock(); let weight = self.inner.weigh(&key, &value); let op_cnt1 = Arc::new(AtomicU8::new(0)); @@ -483,123 +503,210 @@ where // on_insert || { let entry = self.new_value_entry(&key, hash, value.clone(), ts, weight); + let ins_op = WriteOp::Upsert { + key_hash: KeyHash::new(Arc::clone(&key), hash), + value_entry: TrioArc::clone(&entry), + old_weight: 0, + new_weight: weight, + }; let cnt = op_cnt1.fetch_add(1, Ordering::Relaxed); - op1 = Some(( - cnt, - WriteOp::Upsert { - key_hash: KeyHash::new(Arc::clone(&key), hash), - value_entry: TrioArc::clone(&entry), - old_weight: 0, - new_weight: weight, - }, - )); + op1 = Some((cnt, ins_op)); entry }, // on_modify |_k, old_entry| { - // NOTES on `new_value_entry_from` method: - // 1. The internal EntryInfo will be shared between the old and new - // ValueEntries. - // 2. This method will set the is_dirty to prevent this new - // ValueEntry from being evicted by an expiration policy. - // 3. This method will update the policy_weight with the new weight. let old_weight = old_entry.policy_weight(); - let old_timestamps = (old_entry.last_accessed(), old_entry.last_modified()); + + // Create this OldEntryInfo _before_ creating a new ValueEntry, so + // that the OldEntryInfo can preserve the old EntryInfo's + // last_accessed and last_modified timestamps. + let old_info = OldEntryInfo::new(old_entry); let entry = self.new_value_entry_from(value.clone(), ts, weight, old_entry); + let upd_op = WriteOp::Upsert { + key_hash: KeyHash::new(Arc::clone(&key), hash), + value_entry: TrioArc::clone(&entry), + old_weight, + new_weight: weight, + }; let cnt = op_cnt2.fetch_add(1, Ordering::Relaxed); - op2 = Some(( - cnt, - TrioArc::clone(old_entry), - old_timestamps, - WriteOp::Upsert { - key_hash: KeyHash::new(Arc::clone(&key), hash), - value_entry: TrioArc::clone(&entry), - old_weight, - new_weight: weight, - }, - )); + op2 = Some((cnt, old_info, upd_op)); entry }, ); match (op1, op2) { - (Some((_cnt, ins_op)), None) => { - if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = - (&self.inner.expiration_policy.expiry(), &ins_op) - { - Self::expire_after_create(expiry, &key, value_entry, ts, self.inner.clocks()); - } - (ins_op, ts) + (Some((_cnt, ins_op)), None) => self.do_post_insert_steps(ts, &key, ins_op), + (Some((cnt1, ins_op)), Some((cnt2, ..))) if cnt1 > cnt2 => { + self.do_post_insert_steps(ts, &key, ins_op) } - (None, Some((_cnt, old_entry, (old_last_accessed, old_last_modified), upd_op))) => { - if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = - (&self.inner.expiration_policy.expiry(), &upd_op) - { - Self::expire_after_read_or_update( - |k, v, t, d| expiry.expire_after_update(k, v, t, d), - &key, - value_entry, - self.inner.expiration_policy.time_to_live(), - self.inner.expiration_policy.time_to_idle(), - ts, - self.inner.clocks(), - ); + (_, Some((_cnt, old_entry, upd_op))) => { + self.do_post_update_steps(ts, key, old_entry, upd_op, &self.interrupted_op_ch_snd) + .await + } + (None, None) => unreachable!(), + } + } + + fn do_post_insert_steps( + &self, + ts: Instant, + key: &Arc, + ins_op: WriteOp, + ) -> (WriteOp, Instant) { + if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = + (&self.inner.expiration_policy.expiry(), &ins_op) + { + Self::expire_after_create(expiry, key, value_entry, ts, self.inner.clocks()); + } + (ins_op, ts) + } + + async fn do_post_update_steps<'a>( + &self, + ts: Instant, + key: Arc, + old_info: OldEntryInfo, + upd_op: WriteOp, + interrupted_op_ch: &'a Sender>, + ) -> (WriteOp, Instant) { + use futures_util::FutureExt; + + if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = + (&self.inner.expiration_policy.expiry(), &upd_op) + { + Self::expire_after_read_or_update( + |k, v, t, d| expiry.expire_after_update(k, v, t, d), + &key, + value_entry, + self.inner.expiration_policy.time_to_live(), + self.inner.expiration_policy.time_to_idle(), + ts, + self.inner.clocks(), + ); + } + + if self.is_removal_notifier_enabled() { + let future = self + .inner + .notify_upsert( + key, + &old_info.entry, + old_info.last_accessed, + old_info.last_modified, + ) + .shared(); + // Async Cancellation Safety: To ensure the above future should be + // executed even if our caller async task is cancelled, we create a + // cancel guard for the future (and the upd_op). If our caller is + // cancelled while we are awaiting for the future, the cancel guard will + // save the future and the upd_op to the interrupted_op_ch channel, so + // that we can resume/retry later. + let mut cancel_guard = CancelGuard::new(interrupted_op_ch, ts); + cancel_guard.set_future_and_op(future.clone(), upd_op.clone()); + + // Notify the eviction listener. + future.await; + cancel_guard.clear(); + } + + crossbeam_epoch::pin().flush(); + (upd_op, ts) + } + + #[inline] + pub(crate) async fn schedule_write_op( + inner: &Arc, + ch: &Sender>, + op: WriteOp, + ts: Instant, + housekeeper: Option<&HouseKeeperArc>, + // Used only for testing. + _should_block: bool, + ) -> Result<(), TrySendError>> { + // Testing stuff. + #[cfg(test)] + if _should_block { + // We are going to do a dead-lock here to simulate a full channel. + let mutex = Mutex::new(()); + let _guard = mutex.lock().await; + // This should dead-lock. + mutex.lock().await; + } + + let mut op = op; + let mut spin_count = 0u8; + loop { + BaseCache::::apply_reads_writes_if_needed( + Arc::clone(inner), + ch, + ts, + housekeeper, + ) + .await; + match ch.try_send(op) { + Ok(()) => return Ok(()), + Err(TrySendError::Full(op1)) => { + op = op1; } + Err(e @ TrySendError::Disconnected(_)) => return Err(e), + } - if self.is_removal_notifier_enabled() { - // TODO: Make this one resumable. (Pass `kl`, `_klg`, `upd_op` - // and `ts`) - self.inner - .notify_upsert(key, &old_entry, old_last_accessed, old_last_modified) - .await; + // We have got a `TrySendError::Full` above. Wait for a bit and try + // again. + if spin_count < 10 { + spin_count += 1; + // Wastes some CPU time with a hint to indicate to the CPU that we + // are spinning + for _ in 0..8 { + std::hint::spin_loop(); } - crossbeam_epoch::pin().flush(); - (upd_op, ts) + } else { + spin_count = 0; + // Try to yield to other tasks. We have to yield sometimes, otherwise + // other task, which is draining the `ch`, will not make any + // progress. If this happens, we will stuck in this loop forever. + super::may_yield().await; } - ( - Some((cnt1, ins_op)), - Some((cnt2, old_entry, (old_last_accessed, old_last_modified), upd_op)), - ) => { - if cnt1 > cnt2 { - if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = - (&self.inner.expiration_policy.expiry(), &ins_op) - { - Self::expire_after_create( - expiry, - &key, - value_entry, - ts, - self.inner.clocks(), - ); - } - (ins_op, ts) - } else { - if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = - (&self.inner.expiration_policy.expiry(), &upd_op) - { - Self::expire_after_read_or_update( - |k, v, t, d| expiry.expire_after_update(k, v, t, d), - &key, - value_entry, - self.inner.expiration_policy.time_to_live(), - self.inner.expiration_policy.time_to_idle(), - ts, - self.inner.clocks(), - ); - } + } + } - if self.is_removal_notifier_enabled() { - // TODO: Make this one resumable. (Pass `kl`, `_klg`, `upd_op` - // and `ts`) - self.inner - .notify_upsert(key, &old_entry, old_last_accessed, old_last_modified) - .await; - } - crossbeam_epoch::pin().flush(); - (upd_op, ts) + pub(crate) async fn retry_interrupted_ops(&self) { + while let Ok(op) = self.interrupted_op_ch_rcv.try_recv() { + // Async Cancellation Safety: Remember that we are in an async task here. + // If our caller is cancelled while we are awaiting for the future, we + // will be cancelled too at the await point. In that case, the cancel + // guard below will save the future and the op to the interrupted_op_ch + // channel, so that we can resume/retry later. + let mut cancel_guard; + + // Resume an interrupted future if there is one. + match op { + InterruptedOp::CallEvictionListener { ts, future, op } => { + cancel_guard = CancelGuard::new(&self.interrupted_op_ch_snd, ts); + cancel_guard.set_future_and_op(future.clone(), op); + // Resume the interrupted future (which will notify an eviction + // to the eviction listener). + future.await; + // If we are here, it means the above future has been completed. + cancel_guard.unset_future(); + } + InterruptedOp::SendWriteOp { ts, op } => { + cancel_guard = CancelGuard::new(&self.interrupted_op_ch_snd, ts); + cancel_guard.set_op(op); } } - (None, None) => unreachable!(), + + // Retry to schedule the write op. + let ts = cancel_guard.ts; + let op = cancel_guard.op.as_ref().cloned().unwrap(); + let hk = self.housekeeper.as_ref(); + Self::schedule_write_op(&self.inner, &self.write_op_ch, op, ts, hk, false) + .await + .expect("Failed to reschedule a write op"); + + // If we are here, it means the above write op has been scheduled. + // We are all good now. Clear the cancel guard. + cancel_guard.clear(); } } @@ -752,14 +859,14 @@ where struct EvictionState<'a, K, V> { counters: EvictionCounters, - notifier: Option<&'a RemovalNotifier>, + notifier: Option<&'a Arc>>, } impl<'a, K, V> EvictionState<'a, K, V> { fn new( entry_count: u64, weighted_size: u64, - notifier: Option<&'a RemovalNotifier>, + notifier: Option<&'a Arc>>, ) -> Self { Self { counters: EvictionCounters::new(entry_count, weighted_size), @@ -908,7 +1015,7 @@ pub(crate) struct Inner { expiration_policy: ExpirationPolicy, valid_after: AtomicInstant, weigher: Option>, - removal_notifier: Option>, + removal_notifier: Option>>, key_locks: Option>, invalidator: Option>, clocks: Clocks, @@ -1074,7 +1181,7 @@ where let timer_wheel = Mutex::new(TimerWheel::new(now)); let (removal_notifier, key_locks) = if let Some(listener) = eviction_listener { - let rn = RemovalNotifier::new(listener, name.clone()); + let rn = Arc::new(RemovalNotifier::new(listener, name.clone())); let kl = KeyLockMap::with_hasher(build_hasher.clone()); (Some(rn), Some(kl)) } else { @@ -2287,13 +2394,15 @@ where } #[inline] - async fn notify_upsert( + fn notify_upsert( &self, key: Arc, entry: &TrioArc>, last_accessed: Option, last_modified: Option, - ) { + ) -> BoxFuture<'static, ()> { + use futures_util::future::FutureExt; + let now = self.current_time_from_expiration_clock(); let exp = &self.expiration_policy; @@ -2313,11 +2422,26 @@ where } } - self.notify_single_removal(key, entry, cause).await; + if let Some(notifier) = &self.removal_notifier { + let notifier = Arc::clone(notifier); + let value = entry.value.clone(); + async move { + notifier.notify(key, value, cause).await; + } + .boxed() + } else { + std::future::ready(()).boxed() + } } #[inline] - async fn notify_invalidate(&self, key: &Arc, entry: &TrioArc>) { + fn notify_invalidate( + &self, + key: &Arc, + entry: &TrioArc>, + ) -> BoxFuture<'static, ()> { + use futures_util::future::FutureExt; + let now = self.current_time_from_expiration_clock(); let exp = &self.expiration_policy; @@ -2335,8 +2459,14 @@ where } } - self.notify_single_removal(Arc::clone(key), entry, cause) - .await; + if let Some(notifier) = &self.removal_notifier { + let notifier = Arc::clone(notifier); + let key = Arc::clone(key); + let value = entry.value.clone(); + async move { notifier.notify(key, value, cause).await }.boxed() + } else { + std::future::ready(()).boxed() + } } } diff --git a/src/future/cache.rs b/src/future/cache.rs index ff1d5e89..fb23c310 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -1,16 +1,11 @@ use super::{ - base_cache::{BaseCache, HouseKeeperArc}, - housekeeper::InnerSync, + base_cache::BaseCache, value_initializer::{GetOrInsert, InitResult, ValueInitializer}, - CacheBuilder, Iter, OwnedKeyEntrySelector, PredicateId, RefKeyEntrySelector, + CacheBuilder, CancelGuard, Iter, OwnedKeyEntrySelector, PredicateId, RefKeyEntrySelector, + WriteOp, }; use crate::{ - common::{ - concurrent::{Weigher, WriteOp}, - time::Instant, - }, - notification::AsyncEvictionListener, - policy::ExpirationPolicy, + common::concurrent::Weigher, notification::AsyncEvictionListener, policy::ExpirationPolicy, Entry, Policy, PredicateError, }; @@ -19,7 +14,6 @@ use crate::common::concurrent::debug_counters::CacheDebugStats; use async_lock::Mutex; use async_trait::async_trait; -use crossbeam_channel::{Sender, TrySendError}; use std::{ borrow::Borrow, collections::hash_map::RandomState, @@ -30,6 +24,9 @@ use std::{ sync::Arc, }; +#[cfg(test)] +use std::sync::atomic::{AtomicBool, Ordering}; + /// A thread-safe, futures-aware concurrent in-memory cache. /// /// `Cache` supports full concurrency of retrievals and a high expected concurrency @@ -650,6 +647,9 @@ use std::{ pub struct Cache { base: BaseCache, value_initializer: Arc>, + + #[cfg(test)] + schedule_write_op_should_block: AtomicBool, } // TODO: https://github.com/moka-rs/moka/issues/54 @@ -680,6 +680,11 @@ impl Clone for Cache { Self { base: self.base.clone(), value_initializer: Arc::clone(&self.value_initializer), + + #[cfg(test)] + schedule_write_op_should_block: AtomicBool::new( + self.schedule_write_op_should_block.load(Ordering::Acquire), + ), } } } @@ -843,6 +848,9 @@ where invalidator_enabled, ), value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)), + + #[cfg(test)] + schedule_write_op_should_block: Default::default(), // false } } @@ -1383,6 +1391,10 @@ where K: Borrow, Q: Hash + Eq + ?Sized, { + use futures_util::FutureExt; + + self.base.retry_interrupted_ops().await; + // Lock the key for removal if blocking removal notification is enabled. let mut kl = None; let mut klg = None; @@ -1412,11 +1424,36 @@ where Some(kv) => { let now = self.base.current_time_from_expiration_clock(); + let maybe_v = if need_value { + Some(kv.entry.value.clone()) + } else { + None + }; + + let op = WriteOp::Remove(kv.clone()); + + // Async Cancellation Safety: To ensure the below future should be + // executed even if our caller async task is cancelled, we create a + // cancel guard for the future (and the op). If our caller is + // cancelled while we are awaiting for the future, the cancel guard + // will save the future and the op to the interrupted_op_ch channel, + // so that we can resume/retry later. + let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, now); + if self.base.is_removal_notifier_enabled() { - // TODO: Make this one resumable. (Pass `kl`, `klg`, `kv` and - // `now`) - self.base.notify_invalidate(&kv.key, &kv.entry).await + let future = self + .base + .notify_invalidate(&kv.key, &kv.entry) + .boxed() + .shared(); + cancel_guard.set_future_and_op(future.clone(), op.clone()); + // Send notification to the eviction listener. + future.await; + cancel_guard.unset_future(); + } else { + cancel_guard.set_op(op.clone()); } + // Drop the locks before scheduling write op to avoid a potential // dead lock. (Scheduling write can do spin lock when the queue is // full, and queue will be drained by the housekeeping thread that @@ -1424,20 +1461,30 @@ where std::mem::drop(klg); std::mem::drop(kl); - let maybe_v = if need_value { - Some(kv.entry.value.clone()) - } else { - None - }; - - let op = WriteOp::Remove(kv); let hk = self.base.housekeeper.as_ref(); - // TODO: If enclosing future is being dropped, save `op` and `now` so - // that we can resume later. (maybe we can send to an unbound mpsc - // channel) - Self::schedule_write_op(&self.base.inner, &self.base.write_op_ch, op, now, hk) - .await - .expect("Failed to remove"); + + let should_block; + #[cfg(not(test))] + { + should_block = false; + } + #[cfg(test)] + { + should_block = self.schedule_write_op_should_block.load(Ordering::Acquire); + } + + BaseCache::::schedule_write_op( + &self.base.inner, + &self.base.write_op_ch, + op, + now, + hk, + should_block, + ) + .await + .expect("Failed to schedule write op for remove"); + cancel_guard.clear(); + crossbeam_epoch::pin().flush(); maybe_v } @@ -1554,6 +1601,7 @@ where pub async fn run_pending_tasks(&self) { if let Some(hk) = &self.base.housekeeper { + self.base.retry_interrupted_ops().await; hk.run_pending_tasks(Arc::clone(&self.base.inner)).await; } } @@ -1879,58 +1927,32 @@ where return; } - let (op, now) = self.base.do_insert_with_hash(key, hash, value).await; + let (op, ts) = self.base.do_insert_with_hash(key, hash, value).await; let hk = self.base.housekeeper.as_ref(); - // TODO: If enclosing future is being dropped, save `op` and `now` so that - // we can resume later. (maybe we can send to an unbound mpsc channel) - Self::schedule_write_op(&self.base.inner, &self.base.write_op_ch, op, now, hk) - .await - .expect("Failed to insert"); - } - - #[inline] - async fn schedule_write_op( - inner: &Arc, - ch: &Sender>, - op: WriteOp, - now: Instant, - housekeeper: Option<&HouseKeeperArc>, - ) -> Result<(), TrySendError>> { - let mut op = op; - let mut spin_count = 0u8; - loop { - BaseCache::::apply_reads_writes_if_needed( - Arc::clone(inner), - ch, - now, - housekeeper, - ) - .await; - match ch.try_send(op) { - Ok(()) => return Ok(()), - Err(TrySendError::Full(op1)) => { - op = op1; - } - Err(e @ TrySendError::Disconnected(_)) => return Err(e), - } + let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, ts); + cancel_guard.set_op(op.clone()); - // We have got a `TrySendError::Full` above. Wait for a bit and try - // again. - if spin_count < 10 { - spin_count += 1; - // Wastes some CPU time with a hint to indicate to the CPU that we - // are spinning - for _ in 0..8 { - std::hint::spin_loop(); - } - } else { - spin_count = 0; - // Try to yield to other tasks. We have to yield sometimes, otherwise - // other task, which is draining the `ch`, will not make any - // progress. If this happens, we will stuck in this loop forever. - super::may_yield().await; - } + let should_block; + #[cfg(not(test))] + { + should_block = false; } + #[cfg(test)] + { + should_block = self.schedule_write_op_should_block.load(Ordering::Acquire); + } + + BaseCache::::schedule_write_op( + &self.base.inner, + &self.base.write_op_ch, + op, + ts, + hk, + should_block, + ) + .await + .expect("Failed to schedule write op for insert"); + cancel_guard.clear(); } } @@ -1990,7 +2012,6 @@ where } fn run_pending_tasks_initiation_count(&self) -> usize { - use std::sync::atomic::Ordering; self.base .housekeeper .as_ref() @@ -1999,7 +2020,6 @@ where } fn run_pending_tasks_completion_count(&self) -> usize { - use std::sync::atomic::Ordering; self.base .housekeeper .as_ref() @@ -4499,6 +4519,196 @@ mod tests { assert_eq!(listener_completion_count.load(Ordering::Acquire), 1); } + #[tokio::test] + async fn cancel_future_while_calling_eviction_listener() { + use crate::future::FutureExt; + use futures_util::future::poll_immediate; + use tokio::task::yield_now; + + let listener_initiation_count: Arc = Default::default(); + let listener_completion_count: Arc = Default::default(); + + let listener = { + // Variables to capture. + let init_count = Arc::clone(&listener_initiation_count); + let comp_count = Arc::clone(&listener_completion_count); + + // Our eviction listener closure. + move |_k, _v, _r| { + init_count.fetch_add(1, Ordering::AcqRel); + let comp_count1 = Arc::clone(&comp_count); + + async move { + yield_now().await; + comp_count1.fetch_add(1, Ordering::AcqRel); + } + .boxed() + } + }; + + let mut cache: Cache = Cache::builder() + .time_to_live(Duration::from_millis(10)) + .async_eviction_listener(listener) + .build(); + + cache.reconfigure_for_testing().await; + + // Make the cache exterior immutable. + let cache = cache; + + // ------------------------------------------------------------ + // Interrupt the eviction listener while calling `insert` + // ------------------------------------------------------------ + + cache.insert(1, 1).await; + + let fut = cache.insert(1, 2); + // Poll the fut only once, and drop it. The fut should not be completed (so + // it is cancelled) because the eviction listener performed a yield_now(). + assert!(poll_immediate(fut).await.is_none()); + + // The listener is initiated but not completed. + assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1); + assert_eq!(listener_completion_count.load(Ordering::Acquire), 0); + + // This will call retry_interrupted_ops() and resume the interrupted + // listener. + cache.run_pending_tasks().await; + assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1); + assert_eq!(listener_completion_count.load(Ordering::Acquire), 1); + + // ------------------------------------------------------------ + // Interrupt the eviction listener while calling `invalidate` + // ------------------------------------------------------------ + + let fut = cache.invalidate(&1); + // Cancel the fut after one poll. + assert!(poll_immediate(fut).await.is_none()); + // The listener is initiated but not completed. + assert_eq!(listener_initiation_count.load(Ordering::Acquire), 2); + assert_eq!(listener_completion_count.load(Ordering::Acquire), 1); + + // This will call retry_interrupted_ops() and resume the interrupted + // listener. + cache.get(&99).await; + assert_eq!(listener_initiation_count.load(Ordering::Acquire), 2); + assert_eq!(listener_completion_count.load(Ordering::Acquire), 2); + + // ------------------------------------------------------------ + // Ensure retry_interrupted_ops() is called + // ------------------------------------------------------------ + + // Repeat the same test with `insert`, but this time, call different methods + // to ensure retry_interrupted_ops() is called. + let prepare = || async { + cache.invalidate(&1).await; + + // Reset the counters. + listener_initiation_count.store(0, Ordering::Release); + listener_completion_count.store(0, Ordering::Release); + + cache.insert(1, 1).await; + + let fut = cache.insert(1, 2); + // Poll the fut only once, and drop it. The fut should not be completed (so + // it is cancelled) because the eviction listener performed a yield_now(). + assert!(poll_immediate(fut).await.is_none()); + + // The listener is initiated but not completed. + assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1); + assert_eq!(listener_completion_count.load(Ordering::Acquire), 0); + }; + + // Methods to test: + // + // - run_pending_tasks (Already tested in a previous test) + // - get (Already tested in a previous test) + // - insert + // - invalidate + // - remove + + // insert + prepare().await; + cache.insert(99, 99).await; + assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1); + assert_eq!(listener_completion_count.load(Ordering::Acquire), 1); + + // invalidate + prepare().await; + cache.invalidate(&88).await; + assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1); + assert_eq!(listener_completion_count.load(Ordering::Acquire), 1); + + // remove + prepare().await; + cache.remove(&77).await; + assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1); + assert_eq!(listener_completion_count.load(Ordering::Acquire), 1); + } + + #[tokio::test] + async fn cancel_future_while_scheduling_write_op() { + use futures_util::future::poll_immediate; + + let mut cache: Cache = Cache::builder().build(); + cache.reconfigure_for_testing().await; + + // Make the cache exterior immutable. + let cache = cache; + + // -------------------------------------------------------------- + // Interrupt `insert` while blocking in `schedule_write_op` + // -------------------------------------------------------------- + + cache + .schedule_write_op_should_block + .store(true, Ordering::Release); + let fut = cache.insert(1, 1); + // Poll the fut only once, and drop it. The fut should not be completed (so + // it is cancelled) because schedule_write_op should be awaiting for a lock. + assert!(poll_immediate(fut).await.is_none()); + + assert_eq!(cache.base.interrupted_op_ch_snd.len(), 1); + assert_eq!(cache.base.write_op_ch.len(), 0); + + // This should retry the interrupted operation. + cache + .schedule_write_op_should_block + .store(false, Ordering::Release); + cache.get(&99).await; + assert_eq!(cache.base.interrupted_op_ch_snd.len(), 0); + assert_eq!(cache.base.write_op_ch.len(), 1); + + cache.run_pending_tasks().await; + assert_eq!(cache.base.write_op_ch.len(), 0); + + // -------------------------------------------------------------- + // Interrupt `invalidate` while blocking in `schedule_write_op` + // -------------------------------------------------------------- + + cache + .schedule_write_op_should_block + .store(true, Ordering::Release); + let fut = cache.invalidate(&1); + // Poll the fut only once, and drop it. The fut should not be completed (so + // it is cancelled) because schedule_write_op should be awaiting for a lock. + assert!(poll_immediate(fut).await.is_none()); + + assert_eq!(cache.base.interrupted_op_ch_snd.len(), 1); + assert_eq!(cache.base.write_op_ch.len(), 0); + + // This should retry the interrupted operation. + cache + .schedule_write_op_should_block + .store(false, Ordering::Release); + cache.get(&99).await; + assert_eq!(cache.base.interrupted_op_ch_snd.len(), 0); + assert_eq!(cache.base.write_op_ch.len(), 1); + + cache.run_pending_tasks().await; + assert_eq!(cache.base.write_op_ch.len(), 0); + } + // This test ensures that the `contains_key`, `get` and `invalidate` can use // borrowed form `&[u8]` for key with type `Vec`. // https://github.com/moka-rs/moka/issues/166 diff --git a/src/future/housekeeper.rs b/src/future/housekeeper.rs index 06e6c340..f70648f7 100644 --- a/src/future/housekeeper.rs +++ b/src/future/housekeeper.rs @@ -101,13 +101,15 @@ impl Housekeeper { let now = cache.now(); + // Async Cancellation Safety: Our maintenance task is cancellable as we save + // it in the lock. If it is canceled, we will resume it in the next run. + if let Some(task) = &*current_task { - // This task was being resolved, but did not complete. This means the - // previous run was canceled due to the enclosing Future was dropped. - // Resume the task now by awaiting. + // This task was cancelled in the previous run due to the enclosing + // Future was dropped. Resume the task now by awaiting. task.clone().await; } else { - // Create a new maintenance task and resolve it. + // Create a new maintenance task and await it. let task = async move { cache.run_pending_tasks(MAX_SYNC_REPEATS).await } .boxed() .shared(); @@ -119,7 +121,7 @@ impl Housekeeper { task.await; } - // If we are here, it means that the maintenance task has been resolved. + // If we are here, it means that the maintenance task has been completed. // We can remove it from the lock. *current_task = None; self.run_after.set_instant(Self::sync_after(now)); diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index 6b6df9c8..ffd86610 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -17,7 +17,8 @@ use crate::{ deques::Deques, entry_info::EntryInfo, housekeeper::{self, Housekeeper, InnerSync, SyncPace}, - AccessTime, KeyHash, KeyHashDate, KvEntry, ReadOp, ValueEntry, Weigher, WriteOp, + AccessTime, KeyHash, KeyHashDate, KvEntry, OldEntryInfo, ReadOp, ValueEntry, Weigher, + WriteOp, }, deque::{DeqNode, Deque}, frequency_sketch::FrequencySketch, @@ -520,123 +521,96 @@ where // on_insert || { let entry = self.new_value_entry(&key, hash, value.clone(), ts, weight); + let ins_op = WriteOp::Upsert { + key_hash: KeyHash::new(Arc::clone(&key), hash), + value_entry: TrioArc::clone(&entry), + old_weight: 0, + new_weight: weight, + }; let cnt = op_cnt1.fetch_add(1, Ordering::Relaxed); - op1 = Some(( - cnt, - WriteOp::Upsert { - key_hash: KeyHash::new(Arc::clone(&key), hash), - value_entry: TrioArc::clone(&entry), - old_weight: 0, - new_weight: weight, - }, - )); + op1 = Some((cnt, ins_op)); entry }, // on_modify |_k, old_entry| { - // NOTES on `new_value_entry_from` method: - // 1. The internal EntryInfo will be shared between the old and new ValueEntries. - // 2. This method will set the last_accessed and last_modified to the max value to - // prevent this new ValueEntry from being evicted by an expiration policy. - // 3. This method will update the policy_weight with the new weight. let old_weight = old_entry.policy_weight(); - let old_timestamps = (old_entry.last_accessed(), old_entry.last_modified()); + + // Create this OldEntryInfo _before_ creating a new ValueEntry, so + // that the OldEntryInfo can preserve the old EntryInfo's + // last_accessed and last_modified timestamps. + let old_info = OldEntryInfo::new(old_entry); let entry = self.new_value_entry_from(value.clone(), ts, weight, old_entry); + let upd_op = WriteOp::Upsert { + key_hash: KeyHash::new(Arc::clone(&key), hash), + value_entry: TrioArc::clone(&entry), + old_weight, + new_weight: weight, + }; let cnt = op_cnt2.fetch_add(1, Ordering::Relaxed); - op2 = Some(( - cnt, - TrioArc::clone(old_entry), - old_timestamps, - WriteOp::Upsert { - key_hash: KeyHash::new(Arc::clone(&key), hash), - value_entry: TrioArc::clone(&entry), - old_weight, - new_weight: weight, - }, - )); + op2 = Some((cnt, old_info, upd_op)); entry }, ); match (op1, op2) { - (Some((_cnt, ins_op)), None) => { - if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = - (&self.inner.expiration_policy.expiry(), &ins_op) - { - Self::expire_after_create(expiry, &key, value_entry, ts, self.inner.clocks()); - } - (ins_op, ts) + (Some((_cnt, ins_op)), None) => self.do_post_insert_steps(ts, &key, ins_op), + (Some((cnt1, ins_op)), Some((cnt2, ..))) if cnt1 > cnt2 => { + self.do_post_insert_steps(ts, &key, ins_op) } - (None, Some((_cnt, old_entry, (old_last_accessed, old_last_modified), upd_op))) => { - if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = - (&self.inner.expiration_policy.expiry(), &upd_op) - { - Self::expire_after_read_or_update( - |k, v, t, d| expiry.expire_after_update(k, v, t, d), - &key, - value_entry, - self.inner.expiration_policy.time_to_live(), - self.inner.expiration_policy.time_to_idle(), - ts, - self.inner.clocks(), - ); - } - - if self.is_removal_notifier_enabled() { - self.inner - .notify_upsert(key, &old_entry, old_last_accessed, old_last_modified); - } - crossbeam_epoch::pin().flush(); - (upd_op, ts) - } - ( - Some((cnt1, ins_op)), - Some((cnt2, old_entry, (old_last_accessed, old_last_modified), upd_op)), - ) => { - if cnt1 > cnt2 { - if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = - (&self.inner.expiration_policy.expiry(), &ins_op) - { - Self::expire_after_create( - expiry, - &key, - value_entry, - ts, - self.inner.clocks(), - ); - } - (ins_op, ts) - } else { - if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = - (&self.inner.expiration_policy.expiry(), &upd_op) - { - Self::expire_after_read_or_update( - |k, v, t, d| expiry.expire_after_update(k, v, t, d), - &key, - value_entry, - self.inner.expiration_policy.time_to_live(), - self.inner.expiration_policy.time_to_idle(), - ts, - self.inner.clocks(), - ); - } - - if self.is_removal_notifier_enabled() { - self.inner.notify_upsert( - key, - &old_entry, - old_last_accessed, - old_last_modified, - ); - } - crossbeam_epoch::pin().flush(); - (upd_op, ts) - } + (_, Some((_cnt, old_info, upd_op))) => { + self.do_post_update_steps(ts, key, old_info, upd_op) } (None, None) => unreachable!(), } } + fn do_post_insert_steps( + &self, + ts: Instant, + key: &Arc, + ins_op: WriteOp, + ) -> (WriteOp, Instant) { + if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = + (&self.inner.expiration_policy.expiry(), &ins_op) + { + Self::expire_after_create(expiry, key, value_entry, ts, self.inner.clocks()); + } + (ins_op, ts) + } + + fn do_post_update_steps( + &self, + ts: Instant, + key: Arc, + old_info: OldEntryInfo, + upd_op: WriteOp, + ) -> (WriteOp, Instant) { + if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = + (&self.inner.expiration_policy.expiry(), &upd_op) + { + Self::expire_after_read_or_update( + |k, v, t, d| expiry.expire_after_update(k, v, t, d), + &key, + value_entry, + self.inner.expiration_policy.time_to_live(), + self.inner.expiration_policy.time_to_idle(), + ts, + self.inner.clocks(), + ); + } + + if self.is_removal_notifier_enabled() { + self.inner.notify_upsert( + key, + &old_info.entry, + old_info.last_accessed, + old_info.last_modified, + ); + } + crossbeam_epoch::pin().flush(); + (upd_op, ts) + } + #[inline] fn apply_reads_if_needed(&self, inner: &Inner, now: Instant) { let len = self.read_op_ch.len();