diff --git a/src/common/concurrent.rs b/src/common/concurrent.rs index 27344452..7f0e035b 100644 --- a/src/common/concurrent.rs +++ b/src/common/concurrent.rs @@ -1,7 +1,7 @@ use crate::common::{deque::DeqNode, time::Instant}; use parking_lot::Mutex; -use std::{ptr::NonNull, sync::Arc}; +use std::{fmt, ptr::NonNull, sync::Arc}; use tagptr::TagNonNull; use triomphe::Arc as TrioArc; @@ -112,6 +112,15 @@ impl KvEntry { } } +impl Clone for KvEntry { + fn clone(&self) -> Self { + Self { + key: Arc::clone(&self.key), + entry: TrioArc::clone(&self.entry), + } + } +} + impl AccessTime for DeqNode> { #[inline] fn last_accessed(&self) -> Option { @@ -308,7 +317,6 @@ pub(crate) enum ReadOp { Miss(u64), } -#[cfg(feature = "sync")] pub(crate) enum WriteOp { Upsert { key_hash: KeyHash, @@ -319,6 +327,36 @@ pub(crate) enum WriteOp { Remove(KvEntry), } +/// Cloning a WriteOp is safe and cheap because it uses Arc and TrioArc pointers to +/// the actual data. +impl Clone for WriteOp { + fn clone(&self) -> Self { + match self { + Self::Upsert { + key_hash, + value_entry, + old_weight, + new_weight, + } => Self::Upsert { + key_hash: key_hash.clone(), + value_entry: TrioArc::clone(value_entry), + old_weight: *old_weight, + new_weight: *new_weight, + }, + Self::Remove(kv_hash) => Self::Remove(kv_hash.clone()), + } + } +} + +impl fmt::Debug for WriteOp { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Upsert { .. } => f.debug_struct("Upsert").finish(), + Self::Remove(..) => f.debug_tuple("Remove").finish(), + } + } +} + pub(crate) struct OldEntryInfo { pub(crate) entry: TrioArc>, pub(crate) last_accessed: Option, diff --git a/src/future.rs b/src/future.rs index 641a4b1e..15a74532 100644 --- a/src/future.rs +++ b/src/future.rs @@ -7,11 +7,11 @@ use async_lock::Mutex; use crossbeam_channel::Sender; use futures_util::future::{BoxFuture, Shared}; use once_cell::sync::Lazy; -use std::{fmt, future::Future, hash::Hash, sync::Arc}; +use std::{future::Future, hash::Hash, sync::Arc}; use triomphe::Arc as TrioArc; use crate::common::{ - concurrent::{KeyHash, KvEntry, ValueEntry}, + concurrent::{ValueEntry, WriteOp}, time::Instant, }; @@ -78,25 +78,30 @@ where } } +/// Operation that has been interrupted by async cancellation. pub(crate) enum PendingOp { - // 'static means that the future can capture only owned value and/or static - // references. No non-static references are allowed. CallEvictionListener { ts: Instant, + // 'static means that the future can capture only owned value and/or static + // references. No non-static references are allowed. future: Shared>, - op: TrioArc>, + op: WriteOp, }, SendWriteOp { ts: Instant, - op: TrioArc>, + op: WriteOp, }, } +/// Drop guard for an operation being performed. If this guard is dropped while it is +/// still having the future or the write op, it will convert them to a PendingOp and +/// send to the pending operation channel, so that the operation can be retried +/// later. struct PendingOpGuard<'a, K, V> { pending_op_ch: &'a Sender>, ts: Instant, future: Option>>, - op: Option>>, + op: Option>, } impl<'a, K, V> PendingOpGuard<'a, K, V> { @@ -109,16 +114,12 @@ impl<'a, K, V> PendingOpGuard<'a, K, V> { } } - fn set_future_and_op( - &mut self, - future: Shared>, - op: TrioArc>, - ) { + fn set_future_and_op(&mut self, future: Shared>, op: WriteOp) { self.future = Some(future); self.op = Some(op); } - fn set_op(&mut self, op: TrioArc>) { + fn set_op(&mut self, op: WriteOp) { self.op = Some(op); } @@ -170,22 +171,3 @@ pub(crate) enum ReadOp { // u64 is the hash of the key. Miss(u64), } - -pub(crate) enum WriteOp { - Upsert { - key_hash: KeyHash, - value_entry: TrioArc>, - old_weight: u32, - new_weight: u32, - }, - Remove(TrioArc>), -} - -impl fmt::Debug for WriteOp { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Upsert { .. } => f.debug_struct("Upsert").finish(), - Self::Remove(..) => f.debug_tuple("Remove").finish(), - } - } -} diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index 334d7753..acdc3ec7 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -58,7 +58,7 @@ pub(crate) type HouseKeeperArc = Arc; pub(crate) struct BaseCache { pub(crate) inner: Arc>, read_op_ch: Sender>, - pub(crate) write_op_ch: Sender>>, + pub(crate) write_op_ch: Sender>, pub(crate) pending_op_ch_snd: Sender>, pub(crate) pending_op_ch_rcv: Receiver>, pub(crate) housekeeper: Option, @@ -119,7 +119,7 @@ impl BaseCache { self.inner.current_time_from_expiration_clock() } - pub(crate) fn notify_invalidate_future( + pub(crate) fn notify_invalidate( &self, key: &Arc, entry: &TrioArc>, @@ -371,7 +371,7 @@ where #[inline] pub(crate) async fn apply_reads_writes_if_needed( inner: Arc, - ch: &Sender>>, + ch: &Sender>, ts: Instant, housekeeper: Option<&HouseKeeperArc>, ) { @@ -468,7 +468,7 @@ where key: Arc, hash: u64, value: V, - ) -> (TrioArc>, Instant) { + ) -> (WriteOp, Instant) { self.process_pending_ops().await; let ts = self.current_time_from_expiration_clock(); @@ -477,7 +477,6 @@ where let op_cnt2 = Arc::clone(&op_cnt1); let mut op1 = None; let mut op2 = None; - let pending_op_ch = &self.pending_op_ch_snd; // Lock the key for update if blocking removal notification is enabled. let kl = self.maybe_key_lock(&key); @@ -487,8 +486,6 @@ where None }; - let drop_guard = PendingOpGuard::new(pending_op_ch, ts); - // Since the cache (cht::SegmentedHashMap) employs optimistic locking // strategy, insert_with_or_modify() may get an insert/modify operation // conflicted with other concurrent hash table operations. In that case, it @@ -540,7 +537,7 @@ where self.post_insert(ts, &key, ins_op) } (_, Some((_cnt, old_entry, upd_op))) => { - self.post_update(ts, key, old_entry, upd_op, drop_guard) + self.post_update(ts, key, old_entry, upd_op, &self.pending_op_ch_snd) .await } (None, None) => unreachable!(), @@ -552,13 +549,13 @@ where ts: Instant, key: &Arc, ins_op: WriteOp, - ) -> (TrioArc>, Instant) { + ) -> (WriteOp, Instant) { if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = (&self.inner.expiration_policy.expiry(), &ins_op) { Self::expire_after_create(expiry, key, value_entry, ts, self.inner.clocks()); } - (TrioArc::new(ins_op), ts) + (ins_op, ts) } async fn post_update<'a>( @@ -567,8 +564,8 @@ where key: Arc, old_info: OldEntryInfo, upd_op: WriteOp, - mut drop_guard: PendingOpGuard<'a, K, V>, - ) -> (TrioArc>, Instant) { + pending_op_ch: &'a Sender>, + ) -> (WriteOp, Instant) { use futures_util::FutureExt; if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) = @@ -585,20 +582,20 @@ where ); } - let upd_op = TrioArc::new(upd_op); - if self.is_removal_notifier_enabled() { let future = self .inner - .notify_upsert_future( + .notify_upsert( key, &old_info.entry, old_info.last_accessed, old_info.last_modified, ) .shared(); - drop_guard.set_future_and_op(future.clone(), TrioArc::clone(&upd_op)); + let mut drop_guard = PendingOpGuard::new(pending_op_ch, ts); + drop_guard.set_future_and_op(future.clone(), upd_op.clone()); + // Notify the eviction listener. future.await; drop_guard.clear(); } @@ -610,11 +607,11 @@ where #[inline] pub(crate) async fn schedule_write_op( inner: &Arc, - ch: &Sender>>, - op: TrioArc>, + ch: &Sender>, + op: WriteOp, ts: Instant, housekeeper: Option<&HouseKeeperArc>, - ) -> Result<(), TrySendError>>> { + ) -> Result<(), TrySendError>> { let mut op = op; let mut spin_count = 0u8; loop { @@ -672,7 +669,7 @@ where } let ts = drop_guard.ts; - let op = drop_guard.op.as_ref().map(TrioArc::clone).unwrap(); + let op = drop_guard.op.as_ref().cloned().unwrap(); let hk = self.housekeeper.as_ref(); Self::schedule_write_op(&self.inner, &self.write_op_ch, op, ts, hk) .await @@ -984,7 +981,7 @@ pub(crate) struct Inner { frequency_sketch: RwLock, frequency_sketch_enabled: AtomicBool, read_op_ch: Receiver>, - write_op_ch: Receiver>>, + write_op_ch: Receiver>, expiration_policy: ExpirationPolicy, valid_after: AtomicInstant, weigher: Option>, @@ -1128,7 +1125,7 @@ where weigher: Option>, eviction_listener: Option>, read_op_ch: Receiver>, - write_op_ch: Receiver>>, + write_op_ch: Receiver>, expiration_policy: ExpirationPolicy, invalidator_enabled: bool, ) -> Self { @@ -1548,49 +1545,28 @@ where for _ in 0..count { match ch.try_recv() { - Err(_) => break, // for loop - Ok(mut arc_op) => loop { - match TrioArc::try_unwrap(arc_op) { - Ok(op) => { - match op { - Upsert { - key_hash: kh, - value_entry: entry, - old_weight, - new_weight, - } => { - self.handle_upsert( - kh, - entry, - old_weight, - new_weight, - deqs, - timer_wheel, - &freq, - eviction_state, - ) - .await - } - Remove(kv_entry) => { - let KvEntry { key: _key, entry } = &*kv_entry; - Self::handle_remove( - deqs, - timer_wheel, - TrioArc::clone(entry), - &mut eviction_state.counters, - ) - } - } - break; // inner loop - } - Err(op) => { - arc_op = op; - for _ in 0..8 { - std::hint::spin_loop(); - } - } - } - }, + Ok(Upsert { + key_hash: kh, + value_entry: entry, + old_weight, + new_weight, + }) => { + self.handle_upsert( + kh, + entry, + old_weight, + new_weight, + deqs, + timer_wheel, + &freq, + eviction_state, + ) + .await + } + Ok(Remove(KvEntry { key: _key, entry })) => { + Self::handle_remove(deqs, timer_wheel, entry, &mut eviction_state.counters) + } + Err(_) => break, }; } } @@ -2388,7 +2364,7 @@ where } #[inline] - fn notify_upsert_future( + fn notify_upsert( &self, key: Arc, entry: &TrioArc>, diff --git a/src/future/cache.rs b/src/future/cache.rs index 9793d6b7..28e5951c 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -23,7 +23,6 @@ use std::{ pin::Pin, sync::Arc, }; -use triomphe::Arc as TrioArc; /// A thread-safe, futures-aware concurrent in-memory cache. /// @@ -1417,19 +1416,21 @@ where None }; - let kv = TrioArc::new(kv); - let op = TrioArc::new(WriteOp::Remove(TrioArc::clone(&kv))); + let op = WriteOp::Remove(kv.clone()); let mut drop_guard = PendingOpGuard::new(&self.base.pending_op_ch_snd, now); if self.base.is_removal_notifier_enabled() { let future = self .base - .notify_invalidate_future(&kv.key, &kv.entry) + .notify_invalidate(&kv.key, &kv.entry) .boxed() .shared(); - drop_guard.set_future_and_op(future.clone(), TrioArc::clone(&op)); + drop_guard.set_future_and_op(future.clone(), op.clone()); + // Send notification to the eviction listener. future.await; drop_guard.unset_future(); + } else { + drop_guard.set_op(op.clone()); } // Drop the locks before scheduling write op to avoid a potential @@ -1896,7 +1897,7 @@ where let (op, ts) = self.base.do_insert_with_hash(key, hash, value).await; let hk = self.base.housekeeper.as_ref(); let mut drop_guard = PendingOpGuard::new(&self.base.pending_op_ch_snd, ts); - drop_guard.set_op(TrioArc::clone(&op)); + drop_guard.set_op(op.clone()); BaseCache::::schedule_write_op( &self.base.inner,