diff --git a/src/future.rs b/src/future.rs index 0636b974..da172656 100644 --- a/src/future.rs +++ b/src/future.rs @@ -74,8 +74,8 @@ where } } -/// Operation that has been interrupted by async cancellation. -pub(crate) enum PendingOp { +/// Operation that has been interrupted (stopped polling) by async cancellation. +pub(crate) enum InterruptedOp { CallEvictionListener { ts: Instant, // 'static means that the future can capture only owned value and/or static @@ -89,21 +89,22 @@ pub(crate) enum PendingOp { }, } -/// Drop guard for an operation being performed. If this guard is dropped while it is -/// still having the future or the write op, it will convert them to a PendingOp and -/// send to the pending operation channel, so that the operation can be retried -/// later. -struct PendingOpGuard<'a, K, V> { - pending_op_ch: &'a Sender>, +/// Drop guard for an async task being performed. If this guard is dropped while it +/// is still having the shared `future` or the write `op`, it will convert them to an +/// `InterruptedOp` and send it to the interrupted operations channel. Later, the +/// interrupted op will be retried by `retry_interrupted_ops` method of +/// `BaseCache`. +struct CancelGuard<'a, K, V> { + interrupted_op_ch: &'a Sender>, ts: Instant, future: Option>>, op: Option>, } -impl<'a, K, V> PendingOpGuard<'a, K, V> { - fn new(pending_op_ch: &'a Sender>, ts: Instant) -> Self { +impl<'a, K, V> CancelGuard<'a, K, V> { + fn new(interrupted_op_ch: &'a Sender>, ts: Instant) -> Self { Self { - pending_op_ch, + interrupted_op_ch, ts, future: Default::default(), op: Default::default(), @@ -129,20 +130,20 @@ impl<'a, K, V> PendingOpGuard<'a, K, V> { } } -impl<'a, K, V> Drop for PendingOpGuard<'a, K, V> { +impl<'a, K, V> Drop for CancelGuard<'a, K, V> { fn drop(&mut self) { - let pending_op = match (self.future.take(), self.op.take()) { - (Some(future), Some(op)) => PendingOp::CallEvictionListener { + let interrupted_op = match (self.future.take(), self.op.take()) { + (Some(future), Some(op)) => InterruptedOp::CallEvictionListener { ts: self.ts, future, op, }, - (None, Some(op)) => PendingOp::SendWriteOp { ts: self.ts, op }, + (None, Some(op)) => InterruptedOp::SendWriteOp { ts: self.ts, op }, _ => return, }; - self.pending_op_ch - .send(pending_op) + self.interrupted_op_ch + .send(interrupted_op) .expect("Failed to send a pending op"); } } diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index 93e6560e..59481965 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -3,7 +3,7 @@ use super::{ invalidator::{GetOrRemoveEntry, Invalidator, KeyDateLite, PredicateFun}, key_lock::{KeyLock, KeyLockMap}, notifier::RemovalNotifier, - PendingOp, PredicateId, + InterruptedOp, PredicateId, }; use crate::{ @@ -25,7 +25,7 @@ use crate::{ timer_wheel::{ReschedulingResult, TimerWheel}, CacheRegion, }, - future::PendingOpGuard, + future::CancelGuard, notification::{AsyncEvictionListener, RemovalCause}, policy::ExpirationPolicy, sync_base::iter::ScanningGet, @@ -60,8 +60,8 @@ pub(crate) struct BaseCache { pub(crate) inner: Arc>, read_op_ch: Sender>, pub(crate) write_op_ch: Sender>, - pub(crate) pending_op_ch_snd: Sender>, - pub(crate) pending_op_ch_rcv: Receiver>, + pub(crate) interrupted_op_ch_snd: Sender>, + pub(crate) interrupted_op_ch_rcv: Receiver>, pub(crate) housekeeper: Option, } @@ -75,8 +75,8 @@ impl Clone for BaseCache { inner: Arc::clone(&self.inner), read_op_ch: self.read_op_ch.clone(), write_op_ch: self.write_op_ch.clone(), - pending_op_ch_snd: self.pending_op_ch_snd.clone(), - pending_op_ch_rcv: self.pending_op_ch_rcv.clone(), + interrupted_op_ch_snd: self.interrupted_op_ch_snd.clone(), + interrupted_op_ch_rcv: self.interrupted_op_ch_rcv.clone(), housekeeper: self.housekeeper.as_ref().map(Arc::clone), } } @@ -129,7 +129,7 @@ impl BaseCache { K: Send + Sync + 'static, V: Clone + Send + Sync + 'static, { - self.inner.notify_invalidate_future(key, entry) + self.inner.notify_invalidate(key, entry) } #[cfg(feature = "unstable-debug-counters")] @@ -174,7 +174,7 @@ where let (r_snd, r_rcv) = crossbeam_channel::bounded(r_size); let (w_snd, w_rcv) = crossbeam_channel::bounded(w_size); - let (p_snd, p_rcv) = crossbeam_channel::unbounded(); + let (i_snd, i_rcv) = crossbeam_channel::unbounded(); let inner = Arc::new(Inner::new( name, @@ -193,8 +193,8 @@ where inner, read_op_ch: r_snd, write_op_ch: w_snd, - pending_op_ch_snd: p_snd, - pending_op_ch_rcv: p_rcv, + interrupted_op_ch_snd: i_snd, + interrupted_op_ch_rcv: i_rcv, housekeeper: Some(Arc::new(Housekeeper::default())), } } @@ -244,7 +244,9 @@ where return None; } - self.process_pending_ops().await; + if record_read { + self.retry_interrupted_ops().await; + } let mut now = self.current_time_from_expiration_clock(); @@ -373,13 +375,13 @@ where pub(crate) async fn apply_reads_writes_if_needed( inner: Arc, ch: &Sender>, - ts: Instant, + now: Instant, housekeeper: Option<&HouseKeeperArc>, ) { let w_len = ch.len(); if let Some(hk) = housekeeper { - if Self::should_apply_writes(hk, w_len, ts) { + if Self::should_apply_writes(hk, w_len, now) { hk.try_run_pending_tasks(inner).await; } } @@ -470,7 +472,7 @@ where hash: u64, value: V, ) -> (WriteOp, Instant) { - self.process_pending_ops().await; + self.retry_interrupted_ops().await; let ts = self.current_time_from_expiration_clock(); let weight = self.inner.weigh(&key, &value); @@ -533,19 +535,19 @@ where ); match (op1, op2) { - (Some((_cnt, ins_op)), None) => self.post_insert(ts, &key, ins_op), + (Some((_cnt, ins_op)), None) => self.do_post_insert_steps(ts, &key, ins_op), (Some((cnt1, ins_op)), Some((cnt2, ..))) if cnt1 > cnt2 => { - self.post_insert(ts, &key, ins_op) + self.do_post_insert_steps(ts, &key, ins_op) } (_, Some((_cnt, old_entry, upd_op))) => { - self.post_update(ts, key, old_entry, upd_op, &self.pending_op_ch_snd) + self.do_post_update_steps(ts, key, old_entry, upd_op, &self.interrupted_op_ch_snd) .await } (None, None) => unreachable!(), } } - fn post_insert( + fn do_post_insert_steps( &self, ts: Instant, key: &Arc, @@ -559,13 +561,13 @@ where (ins_op, ts) } - async fn post_update<'a>( + async fn do_post_update_steps<'a>( &self, ts: Instant, key: Arc, old_info: OldEntryInfo, upd_op: WriteOp, - pending_op_ch: &'a Sender>, + interrupted_op_ch: &'a Sender>, ) -> (WriteOp, Instant) { use futures_util::FutureExt; @@ -593,12 +595,18 @@ where old_info.last_modified, ) .shared(); - let mut drop_guard = PendingOpGuard::new(pending_op_ch, ts); - drop_guard.set_future_and_op(future.clone(), upd_op.clone()); + // Async Cancellation Safety: To ensure the above future should be + // executed even if our caller async task is cancelled, we create a + // cancel guard for the future (and the upd_op). If our caller is + // cancelled while we are awaiting for the future, the cancel guard will + // save the future and the upd_op to the interrupted_op_ch channel, so + // that we can resume/retry later. + let mut cancel_guard = CancelGuard::new(interrupted_op_ch, ts); + cancel_guard.set_future_and_op(future.clone(), upd_op.clone()); // Notify the eviction listener. future.await; - drop_guard.clear(); + cancel_guard.clear(); } crossbeam_epoch::pin().flush(); @@ -612,7 +620,19 @@ where op: WriteOp, ts: Instant, housekeeper: Option<&HouseKeeperArc>, + // Used only for testing. + _should_block: bool, ) -> Result<(), TrySendError>> { + // Testing stuff. + #[cfg(test)] + if _should_block { + // We are going to do a dead-lock here to simulate a full channel. + let mutex = Mutex::new(()); + let _guard = mutex.lock().await; + // This should dead-lock. + mutex.lock().await; + } + let mut op = op; let mut spin_count = 0u8; loop { @@ -650,34 +670,43 @@ where } } - pub(crate) async fn process_pending_ops(&self) { - while let Ok(op) = self.pending_op_ch_rcv.try_recv() { - let mut drop_guard; + pub(crate) async fn retry_interrupted_ops(&self) { + while let Ok(op) = self.interrupted_op_ch_rcv.try_recv() { + // Async Cancellation Safety: Remember that we are in an async task here. + // If our caller is cancelled while we are awaiting for the future, we + // will be cancelled too at the await point. In that case, the cancel + // guard below will save the future and the op to the interrupted_op_ch + // channel, so that we can resume/retry later. + let mut cancel_guard; + // Resume an interrupted future if there is one. match op { - PendingOp::CallEvictionListener { ts, future, op } => { - drop_guard = PendingOpGuard::new(&self.pending_op_ch_snd, ts); - drop_guard.set_future_and_op(future.clone(), op); - // Resume the interrupted future. + InterruptedOp::CallEvictionListener { ts, future, op } => { + cancel_guard = CancelGuard::new(&self.interrupted_op_ch_snd, ts); + cancel_guard.set_future_and_op(future.clone(), op); + // Resume the interrupted future (which will notify an eviction + // to the eviction listener). future.await; // If we are here, it means the above future has been completed. - drop_guard.unset_future(); + cancel_guard.unset_future(); } - PendingOp::SendWriteOp { ts, op } => { - drop_guard = PendingOpGuard::new(&self.pending_op_ch_snd, ts); - drop_guard.set_op(op); + InterruptedOp::SendWriteOp { ts, op } => { + cancel_guard = CancelGuard::new(&self.interrupted_op_ch_snd, ts); + cancel_guard.set_op(op); } } - let ts = drop_guard.ts; - let op = drop_guard.op.as_ref().cloned().unwrap(); + // Retry to schedule the write op. + let ts = cancel_guard.ts; + let op = cancel_guard.op.as_ref().cloned().unwrap(); let hk = self.housekeeper.as_ref(); - Self::schedule_write_op(&self.inner, &self.write_op_ch, op, ts, hk) + Self::schedule_write_op(&self.inner, &self.write_op_ch, op, ts, hk, false) .await .expect("Failed to reschedule a write op"); // If we are here, it means the above write op has been scheduled. - drop_guard.clear(); + // We are all good now. Clear the cancel guard. + cancel_guard.clear(); } } @@ -2406,7 +2435,7 @@ where } #[inline] - fn notify_invalidate_future( + fn notify_invalidate( &self, key: &Arc, entry: &TrioArc>, diff --git a/src/future/cache.rs b/src/future/cache.rs index 28e5951c..101ab3a2 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -1,7 +1,7 @@ use super::{ base_cache::BaseCache, value_initializer::{GetOrInsert, InitResult, ValueInitializer}, - CacheBuilder, Iter, OwnedKeyEntrySelector, PendingOpGuard, PredicateId, RefKeyEntrySelector, + CacheBuilder, CancelGuard, Iter, OwnedKeyEntrySelector, PredicateId, RefKeyEntrySelector, WriteOp, }; use crate::{ @@ -24,6 +24,9 @@ use std::{ sync::Arc, }; +#[cfg(test)] +use std::sync::atomic::{AtomicBool, Ordering}; + /// A thread-safe, futures-aware concurrent in-memory cache. /// /// `Cache` supports full concurrency of retrievals and a high expected concurrency @@ -644,6 +647,9 @@ use std::{ pub struct Cache { base: BaseCache, value_initializer: Arc>, + + #[cfg(test)] + schedule_write_op_should_block: AtomicBool, } // TODO: https://github.com/moka-rs/moka/issues/54 @@ -674,6 +680,11 @@ impl Clone for Cache { Self { base: self.base.clone(), value_initializer: Arc::clone(&self.value_initializer), + + #[cfg(test)] + schedule_write_op_should_block: AtomicBool::new( + self.schedule_write_op_should_block.load(Ordering::Acquire), + ), } } } @@ -837,6 +848,9 @@ where invalidator_enabled, ), value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)), + + #[cfg(test)] + schedule_write_op_should_block: Default::default(), // false } } @@ -1379,7 +1393,7 @@ where { use futures_util::FutureExt; - self.base.process_pending_ops().await; + self.base.retry_interrupted_ops().await; // Lock the key for removal if blocking removal notification is enabled. let mut kl = None; @@ -1417,7 +1431,14 @@ where }; let op = WriteOp::Remove(kv.clone()); - let mut drop_guard = PendingOpGuard::new(&self.base.pending_op_ch_snd, now); + + // Async Cancellation Safety: To ensure the below future should be + // executed even if our caller async task is cancelled, we create a + // cancel guard for the future (and the op). If our caller is + // cancelled while we are awaiting for the future, the cancel guard + // will save the future and the op to the interrupted_op_ch channel, + // so that we can resume/retry later. + let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, now); if self.base.is_removal_notifier_enabled() { let future = self @@ -1425,12 +1446,12 @@ where .notify_invalidate(&kv.key, &kv.entry) .boxed() .shared(); - drop_guard.set_future_and_op(future.clone(), op.clone()); + cancel_guard.set_future_and_op(future.clone(), op.clone()); // Send notification to the eviction listener. future.await; - drop_guard.unset_future(); + cancel_guard.unset_future(); } else { - drop_guard.set_op(op.clone()); + cancel_guard.set_op(op.clone()); } // Drop the locks before scheduling write op to avoid a potential @@ -1441,16 +1462,28 @@ where std::mem::drop(kl); let hk = self.base.housekeeper.as_ref(); + + let should_block; + #[cfg(not(test))] + { + should_block = false; + } + #[cfg(test)] + { + should_block = self.schedule_write_op_should_block.load(Ordering::Acquire); + } + BaseCache::::schedule_write_op( &self.base.inner, &self.base.write_op_ch, op, now, hk, + should_block, ) .await .expect("Failed to schedule write op for remove"); - drop_guard.clear(); + cancel_guard.clear(); crossbeam_epoch::pin().flush(); maybe_v @@ -1568,7 +1601,7 @@ where pub async fn run_pending_tasks(&self) { if let Some(hk) = &self.base.housekeeper { - self.base.process_pending_ops().await; + self.base.retry_interrupted_ops().await; hk.run_pending_tasks(Arc::clone(&self.base.inner)).await; } } @@ -1896,8 +1929,18 @@ where let (op, ts) = self.base.do_insert_with_hash(key, hash, value).await; let hk = self.base.housekeeper.as_ref(); - let mut drop_guard = PendingOpGuard::new(&self.base.pending_op_ch_snd, ts); - drop_guard.set_op(op.clone()); + let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, ts); + cancel_guard.set_op(op.clone()); + + let should_block; + #[cfg(not(test))] + { + should_block = false; + } + #[cfg(test)] + { + should_block = self.schedule_write_op_should_block.load(Ordering::Acquire); + } BaseCache::::schedule_write_op( &self.base.inner, @@ -1905,10 +1948,11 @@ where op, ts, hk, + should_block, ) .await .expect("Failed to schedule write op for insert"); - drop_guard.clear(); + cancel_guard.clear(); } } @@ -1968,7 +2012,6 @@ where } fn run_pending_tasks_initiation_count(&self) -> usize { - use std::sync::atomic::Ordering; self.base .housekeeper .as_ref() @@ -1977,7 +2020,6 @@ where } fn run_pending_tasks_completion_count(&self) -> usize { - use std::sync::atomic::Ordering; self.base .housekeeper .as_ref() @@ -4477,6 +4519,187 @@ mod tests { assert_eq!(listener_completion_count.load(Ordering::Acquire), 1); } + #[tokio::test] + async fn cancel_future_while_calling_eviction_listener() { + use crate::future::FutureExt; + use futures_util::future::poll_immediate; + use tokio::task::yield_now; + + let listener_initiation_count: Arc = Default::default(); + let listener_completion_count: Arc = Default::default(); + + let listener = { + // Variables to capture. + let init_count = Arc::clone(&listener_initiation_count); + let comp_count = Arc::clone(&listener_completion_count); + + // Our eviction listener closure. + move |_k, _v, _r| { + init_count.fetch_add(1, Ordering::AcqRel); + let comp_count1 = Arc::clone(&comp_count); + + async move { + yield_now().await; + comp_count1.fetch_add(1, Ordering::AcqRel); + } + .boxed() + } + }; + + let mut cache: Cache = Cache::builder() + .time_to_live(Duration::from_millis(10)) + .async_eviction_listener(listener) + .build(); + + cache.reconfigure_for_testing().await; + + // Make the cache exterior immutable. + let cache = cache; + + // ------------------------------------------------------------ + // Interrupt the eviction listener while calling `insert` + // ------------------------------------------------------------ + + cache.insert(1, 1).await; + + let fut = cache.insert(1, 2); + // Poll the fut only once, and drop it. The fut should not be completed (so + // it is cancelled) because the eviction listener performed a yield_now(). + assert!(poll_immediate(fut).await.is_none()); + + // The listener is initiated but not completed. + assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1); + assert_eq!(listener_completion_count.load(Ordering::Acquire), 0); + + // This will call retry_interrupted_ops() and resume the interrupted + // listener. + cache.run_pending_tasks().await; + assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1); + assert_eq!(listener_completion_count.load(Ordering::Acquire), 1); + + // ------------------------------------------------------------ + // Interrupt the eviction listener while calling `invalidate` + // ------------------------------------------------------------ + + let fut = cache.invalidate(&1); + // Cancel the fut after one poll. + assert!(poll_immediate(fut).await.is_none()); + // The listener is initiated but not completed. + assert_eq!(listener_initiation_count.load(Ordering::Acquire), 2); + assert_eq!(listener_completion_count.load(Ordering::Acquire), 1); + + // This will call retry_interrupted_ops() and resume the interrupted + // listener. + cache.get(&99).await; + assert_eq!(listener_initiation_count.load(Ordering::Acquire), 2); + assert_eq!(listener_completion_count.load(Ordering::Acquire), 2); + + // ------------------------------------------------------------ + // Ensure retry_interrupted_ops() is called + // ------------------------------------------------------------ + + // Repeat the same test with `insert`, but this time, call different methods + // to ensure retry_interrupted_ops() is called. + let prepare = || async { + cache.invalidate(&1).await; + + // Reset the counters. + listener_initiation_count.store(0, Ordering::Release); + listener_completion_count.store(0, Ordering::Release); + + cache.insert(1, 1).await; + + let fut = cache.insert(1, 2); + // Poll the fut only once, and drop it. The fut should not be completed (so + // it is cancelled) because the eviction listener performed a yield_now(). + assert!(poll_immediate(fut).await.is_none()); + + // The listener is initiated but not completed. + assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1); + assert_eq!(listener_completion_count.load(Ordering::Acquire), 0); + }; + + // Methods to test: + // + // - run_pending_tasks (Already tested in a previous test) + // - get (Already tested in a previous test) + // - insert + // - invalidate + + prepare().await; + cache.insert(99, 99).await; + assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1); + assert_eq!(listener_completion_count.load(Ordering::Acquire), 1); + + prepare().await; + cache.invalidate(&88).await; + assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1); + assert_eq!(listener_completion_count.load(Ordering::Acquire), 1); + } + + #[tokio::test] + async fn cancel_future_while_scheduling_write_op() { + use futures_util::future::poll_immediate; + + let mut cache: Cache = Cache::builder().build(); + cache.reconfigure_for_testing().await; + + // Make the cache exterior immutable. + let cache = cache; + + // -------------------------------------------------------------- + // Interrupt `insert` while blocking in `schedule_write_op` + // -------------------------------------------------------------- + + cache + .schedule_write_op_should_block + .store(true, Ordering::Release); + let fut = cache.insert(1, 1); + // Poll the fut only once, and drop it. The fut should not be completed (so + // it is cancelled) because schedule_write_op should be awaiting for a lock. + assert!(poll_immediate(fut).await.is_none()); + + assert_eq!(cache.base.interrupted_op_ch_snd.len(), 1); + assert_eq!(cache.base.write_op_ch.len(), 0); + + // This should retry the interrupted operation. + cache + .schedule_write_op_should_block + .store(false, Ordering::Release); + cache.get(&99).await; + assert_eq!(cache.base.interrupted_op_ch_snd.len(), 0); + assert_eq!(cache.base.write_op_ch.len(), 1); + + cache.run_pending_tasks().await; + assert_eq!(cache.base.write_op_ch.len(), 0); + + // -------------------------------------------------------------- + // Interrupt `invalidate` while blocking in `schedule_write_op` + // -------------------------------------------------------------- + + cache + .schedule_write_op_should_block + .store(true, Ordering::Release); + let fut = cache.invalidate(&1); + // Poll the fut only once, and drop it. The fut should not be completed (so + // it is cancelled) because schedule_write_op should be awaiting for a lock. + assert!(poll_immediate(fut).await.is_none()); + + assert_eq!(cache.base.interrupted_op_ch_snd.len(), 1); + assert_eq!(cache.base.write_op_ch.len(), 0); + + // This should retry the interrupted operation. + cache + .schedule_write_op_should_block + .store(false, Ordering::Release); + cache.get(&99).await; + assert_eq!(cache.base.interrupted_op_ch_snd.len(), 0); + assert_eq!(cache.base.write_op_ch.len(), 1); + + cache.run_pending_tasks().await; + assert_eq!(cache.base.write_op_ch.len(), 0); + } + // This test ensures that the `contains_key`, `get` and `invalidate` can use // borrowed form `&[u8]` for key with type `Vec`. // https://github.com/moka-rs/moka/issues/166 diff --git a/src/future/housekeeper.rs b/src/future/housekeeper.rs index 06e6c340..f70648f7 100644 --- a/src/future/housekeeper.rs +++ b/src/future/housekeeper.rs @@ -101,13 +101,15 @@ impl Housekeeper { let now = cache.now(); + // Async Cancellation Safety: Our maintenance task is cancellable as we save + // it in the lock. If it is canceled, we will resume it in the next run. + if let Some(task) = &*current_task { - // This task was being resolved, but did not complete. This means the - // previous run was canceled due to the enclosing Future was dropped. - // Resume the task now by awaiting. + // This task was cancelled in the previous run due to the enclosing + // Future was dropped. Resume the task now by awaiting. task.clone().await; } else { - // Create a new maintenance task and resolve it. + // Create a new maintenance task and await it. let task = async move { cache.run_pending_tasks(MAX_SYNC_REPEATS).await } .boxed() .shared(); @@ -119,7 +121,7 @@ impl Housekeeper { task.await; } - // If we are here, it means that the maintenance task has been resolved. + // If we are here, it means that the maintenance task has been completed. // We can remove it from the lock. *current_task = None; self.run_after.set_instant(Self::sync_after(now)); diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index d1a461aa..ffd86610 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -553,16 +553,18 @@ where ); match (op1, op2) { - (Some((_cnt, ins_op)), None) => self.post_insert(ts, &key, ins_op), + (Some((_cnt, ins_op)), None) => self.do_post_insert_steps(ts, &key, ins_op), (Some((cnt1, ins_op)), Some((cnt2, ..))) if cnt1 > cnt2 => { - self.post_insert(ts, &key, ins_op) + self.do_post_insert_steps(ts, &key, ins_op) + } + (_, Some((_cnt, old_info, upd_op))) => { + self.do_post_update_steps(ts, key, old_info, upd_op) } - (_, Some((_cnt, old_info, upd_op))) => self.post_update(ts, key, old_info, upd_op), (None, None) => unreachable!(), } } - fn post_insert( + fn do_post_insert_steps( &self, ts: Instant, key: &Arc, @@ -576,7 +578,7 @@ where (ins_op, ts) } - fn post_update( + fn do_post_update_steps( &self, ts: Instant, key: Arc,