diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index a746af05..683bc4bb 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -335,6 +335,15 @@ where } } + pub(crate) fn get_key_with_hash(&self, key: &Q, hash: u64) -> Option> + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + self.inner + .get_key_value_and(key, hash, |k, _entry| Arc::clone(k)) + } + #[inline] pub(crate) fn remove_entry(&self, key: &Q, hash: u64) -> Option> where @@ -538,6 +547,8 @@ where } if self.is_removal_notifier_enabled() { + // TODO: Make this one resumable. (Pass `kl`, `_klg`, `upd_op` + // and `ts`) self.inner .notify_upsert(key, &old_entry, old_last_accessed, old_last_modified) .await; @@ -578,6 +589,8 @@ where } if self.is_removal_notifier_enabled() { + // TODO: Make this one resumable. (Pass `kl`, `_klg`, `upd_op` + // and `ts`) self.inner .notify_upsert(key, &old_entry, old_last_accessed, old_last_modified) .await; diff --git a/src/future/cache.rs b/src/future/cache.rs index 83b3ff1d..ff1d5e89 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -1383,22 +1383,58 @@ where K: Borrow, Q: Hash + Eq + ?Sized, { + // Lock the key for removal if blocking removal notification is enabled. + let mut kl = None; + let mut klg = None; + if self.base.is_removal_notifier_enabled() { + // To lock the key, we have to get Arc for key (&Q). + // + // TODO: Enhance this if possible. This is rather hack now because + // it cannot prevent race conditions like this: + // + // 1. We miss the key because it does not exist. So we do not lock + // the key. + // 2. Somebody else (other thread) inserts the key. + // 3. We remove the entry for the key, but without the key lock! + // + if let Some(arc_key) = self.base.get_key_with_hash(key, hash) { + kl = self.base.maybe_key_lock(&arc_key); + klg = if let Some(lock) = &kl { + Some(lock.lock().await) + } else { + None + }; + } + } + match self.base.remove_entry(key, hash) { None => None, Some(kv) => { + let now = self.base.current_time_from_expiration_clock(); + + if self.base.is_removal_notifier_enabled() { + // TODO: Make this one resumable. (Pass `kl`, `klg`, `kv` and + // `now`) + self.base.notify_invalidate(&kv.key, &kv.entry).await + } + // Drop the locks before scheduling write op to avoid a potential + // dead lock. (Scheduling write can do spin lock when the queue is + // full, and queue will be drained by the housekeeping thread that + // can lock the same key) + std::mem::drop(klg); + std::mem::drop(kl); + let maybe_v = if need_value { Some(kv.entry.value.clone()) } else { None }; - if self.base.is_removal_notifier_enabled() { - self.base.notify_invalidate(&kv.key, &kv.entry).await - } - let op = WriteOp::Remove(kv); - let now = self.base.current_time_from_expiration_clock(); let hk = self.base.housekeeper.as_ref(); + // TODO: If enclosing future is being dropped, save `op` and `now` so + // that we can resume later. (maybe we can send to an unbound mpsc + // channel) Self::schedule_write_op(&self.base.inner, &self.base.write_op_ch, op, now, hk) .await .expect("Failed to remove"); @@ -1845,6 +1881,8 @@ where let (op, now) = self.base.do_insert_with_hash(key, hash, value).await; let hk = self.base.housekeeper.as_ref(); + // TODO: If enclosing future is being dropped, save `op` and `now` so that + // we can resume later. (maybe we can send to an unbound mpsc channel) Self::schedule_write_op(&self.base.inner, &self.base.write_op_ch, op, now, hk) .await .expect("Failed to insert"); diff --git a/src/sync/cache.rs b/src/sync/cache.rs index 2e3dd2c2..2a5e6da6 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -1805,12 +1805,15 @@ where match self.base.remove_entry(key, hash) { None => None, Some(kv) => { + let now = self.base.current_time_from_expiration_clock(); + if self.base.is_removal_notifier_enabled() { self.base.notify_invalidate(&kv.key, &kv.entry) } - // Drop the locks before scheduling write op to avoid a potential dead lock. - // (Scheduling write can do spin lock when the queue is full, and queue will - // be drained by the housekeeping thread that can lock the same key) + // Drop the locks before scheduling write op to avoid a potential + // dead lock. (Scheduling write can do spin lock when the queue is + // full, and queue will be drained by the housekeeping thread that + // can lock the same key) std::mem::drop(klg); std::mem::drop(kl); @@ -1821,7 +1824,6 @@ where }; let op = WriteOp::Remove(kv); - let now = self.base.current_time_from_expiration_clock(); let hk = self.base.housekeeper.as_ref(); Self::schedule_write_op( self.base.inner.as_ref(), diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index c19b4267..6b6df9c8 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -109,7 +109,6 @@ impl BaseCache { } #[inline] - #[cfg(feature = "sync")] pub(crate) fn is_blocking_removal_notification(&self) -> bool { self.inner.is_blocking_removal_notification() } @@ -378,7 +377,6 @@ where } } - #[cfg(feature = "sync")] pub(crate) fn get_key_with_hash(&self, key: &Q, hash: u64) -> Option> where K: Borrow, @@ -997,7 +995,6 @@ impl Inner { } #[inline] - #[cfg(feature = "sync")] fn is_blocking_removal_notification(&self) -> bool { self.removal_notifier .as_ref()