Skip to content

Commit

Permalink
Remove the thread pool from future::Cache
Browse files Browse the repository at this point in the history
Add a missing key-level lock to `invalidate` method.
  • Loading branch information
tatsuya6502 committed Aug 22, 2023
1 parent 1da63c8 commit 6a9ba9a
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 12 deletions.
13 changes: 13 additions & 0 deletions src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,15 @@ where
}
}

pub(crate) fn get_key_with_hash<Q>(&self, key: &Q, hash: u64) -> Option<Arc<K>>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.inner
.get_key_value_and(key, hash, |k, _entry| Arc::clone(k))
}

#[inline]
pub(crate) fn remove_entry<Q>(&self, key: &Q, hash: u64) -> Option<KvEntry<K, V>>
where
Expand Down Expand Up @@ -538,6 +547,8 @@ where
}

if self.is_removal_notifier_enabled() {
// TODO: Make this one resumable. (Pass `kl`, `_klg`, `upd_op`
// and `ts`)
self.inner
.notify_upsert(key, &old_entry, old_last_accessed, old_last_modified)
.await;
Expand Down Expand Up @@ -578,6 +589,8 @@ where
}

if self.is_removal_notifier_enabled() {
// TODO: Make this one resumable. (Pass `kl`, `_klg`, `upd_op`
// and `ts`)
self.inner
.notify_upsert(key, &old_entry, old_last_accessed, old_last_modified)
.await;
Expand Down
48 changes: 43 additions & 5 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1383,22 +1383,58 @@ where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
// Lock the key for removal if blocking removal notification is enabled.
let mut kl = None;
let mut klg = None;
if self.base.is_removal_notifier_enabled() {
// To lock the key, we have to get Arc<K> for key (&Q).
//
// TODO: Enhance this if possible. This is rather hack now because
// it cannot prevent race conditions like this:
//
// 1. We miss the key because it does not exist. So we do not lock
// the key.
// 2. Somebody else (other thread) inserts the key.
// 3. We remove the entry for the key, but without the key lock!
//
if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
kl = self.base.maybe_key_lock(&arc_key);
klg = if let Some(lock) = &kl {
Some(lock.lock().await)
} else {
None
};
}
}

match self.base.remove_entry(key, hash) {
None => None,
Some(kv) => {
let now = self.base.current_time_from_expiration_clock();

if self.base.is_removal_notifier_enabled() {
// TODO: Make this one resumable. (Pass `kl`, `klg`, `kv` and
// `now`)
self.base.notify_invalidate(&kv.key, &kv.entry).await
}
// Drop the locks before scheduling write op to avoid a potential
// dead lock. (Scheduling write can do spin lock when the queue is
// full, and queue will be drained by the housekeeping thread that
// can lock the same key)
std::mem::drop(klg);
std::mem::drop(kl);

let maybe_v = if need_value {
Some(kv.entry.value.clone())
} else {
None
};

if self.base.is_removal_notifier_enabled() {
self.base.notify_invalidate(&kv.key, &kv.entry).await
}

let op = WriteOp::Remove(kv);
let now = self.base.current_time_from_expiration_clock();
let hk = self.base.housekeeper.as_ref();
// TODO: If enclosing future is being dropped, save `op` and `now` so
// that we can resume later. (maybe we can send to an unbound mpsc
// channel)
Self::schedule_write_op(&self.base.inner, &self.base.write_op_ch, op, now, hk)
.await
.expect("Failed to remove");
Expand Down Expand Up @@ -1845,6 +1881,8 @@ where

let (op, now) = self.base.do_insert_with_hash(key, hash, value).await;
let hk = self.base.housekeeper.as_ref();
// TODO: If enclosing future is being dropped, save `op` and `now` so that
// we can resume later. (maybe we can send to an unbound mpsc channel)
Self::schedule_write_op(&self.base.inner, &self.base.write_op_ch, op, now, hk)
.await
.expect("Failed to insert");
Expand Down
10 changes: 6 additions & 4 deletions src/sync/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1805,12 +1805,15 @@ where
match self.base.remove_entry(key, hash) {
None => None,
Some(kv) => {
let now = self.base.current_time_from_expiration_clock();

if self.base.is_removal_notifier_enabled() {
self.base.notify_invalidate(&kv.key, &kv.entry)
}
// Drop the locks before scheduling write op to avoid a potential dead lock.
// (Scheduling write can do spin lock when the queue is full, and queue will
// be drained by the housekeeping thread that can lock the same key)
// Drop the locks before scheduling write op to avoid a potential
// dead lock. (Scheduling write can do spin lock when the queue is
// full, and queue will be drained by the housekeeping thread that
// can lock the same key)
std::mem::drop(klg);
std::mem::drop(kl);

Expand All @@ -1821,7 +1824,6 @@ where
};

let op = WriteOp::Remove(kv);
let now = self.base.current_time_from_expiration_clock();
let hk = self.base.housekeeper.as_ref();
Self::schedule_write_op(
self.base.inner.as_ref(),
Expand Down
3 changes: 0 additions & 3 deletions src/sync_base/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ impl<K, V, S> BaseCache<K, V, S> {
}

#[inline]
#[cfg(feature = "sync")]
pub(crate) fn is_blocking_removal_notification(&self) -> bool {
self.inner.is_blocking_removal_notification()
}
Expand Down Expand Up @@ -378,7 +377,6 @@ where
}
}

#[cfg(feature = "sync")]
pub(crate) fn get_key_with_hash<Q>(&self, key: &Q, hash: u64) -> Option<Arc<K>>
where
K: Borrow<Q>,
Expand Down Expand Up @@ -997,7 +995,6 @@ impl<K, V, S> Inner<K, V, S> {
}

#[inline]
#[cfg(feature = "sync")]
fn is_blocking_removal_notification(&self) -> bool {
self.removal_notifier
.as_ref()
Expand Down

0 comments on commit 6a9ba9a

Please sign in to comment.