Skip to content

Commit

Permalink
Improve async cancellation safety of future::Cache
Browse files Browse the repository at this point in the history
Refactoring on the `do_insert_with_hash` of `BaseCache`.
  • Loading branch information
tatsuya6502 committed Aug 26, 2023
1 parent 4194759 commit 3fcb35a
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 217 deletions.
16 changes: 16 additions & 0 deletions src/common/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,19 @@ pub(crate) enum WriteOp<K, V> {
},
Remove(KvEntry<K, V>),
}

pub(crate) struct OldEntryInfo<K, V> {
pub(crate) entry: TrioArc<ValueEntry<K, V>>,
pub(crate) last_accessed: Option<Instant>,
pub(crate) last_modified: Option<Instant>,
}

impl<K, V> OldEntryInfo<K, V> {
pub(crate) fn new(entry: &TrioArc<ValueEntry<K, V>>) -> Self {
Self {
entry: TrioArc::clone(entry),
last_accessed: entry.last_accessed(),
last_modified: entry.last_modified(),
}
}
}
202 changes: 84 additions & 118 deletions src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
},
deques::Deques,
entry_info::EntryInfo,
AccessTime, KeyHash, KeyHashDate, KvEntry, ValueEntry, Weigher,
AccessTime, KeyHash, KeyHashDate, KvEntry, OldEntryInfo, ValueEntry, Weigher,
},
deque::{DeqNode, Deque},
frequency_sketch::FrequencySketch,
Expand Down Expand Up @@ -469,8 +469,6 @@ where
hash: u64,
value: V,
) -> (TrioArc<WriteOp<K, V>>, Instant) {
use futures_util::FutureExt;

self.process_pending_ops().await;

let ts = self.current_time_from_expiration_clock();
Expand All @@ -489,7 +487,7 @@ where
None
};

let mut drop_guard = PendingOpGuard::new(pending_op_ch, ts);
let drop_guard = PendingOpGuard::new(pending_op_ch, ts);

// Since the cache (cht::SegmentedHashMap) employs optimistic locking
// strategy, insert_with_or_modify() may get an insert/modify operation
Expand All @@ -505,140 +503,108 @@ where
// on_insert
|| {
let entry = self.new_value_entry(&key, hash, value.clone(), ts, weight);
let ins_op = WriteOp::Upsert {
key_hash: KeyHash::new(Arc::clone(&key), hash),
value_entry: TrioArc::clone(&entry),
old_weight: 0,
new_weight: weight,
};
let cnt = op_cnt1.fetch_add(1, Ordering::Relaxed);
op1 = Some((
cnt,
WriteOp::Upsert {
key_hash: KeyHash::new(Arc::clone(&key), hash),
value_entry: TrioArc::clone(&entry),
old_weight: 0,
new_weight: weight,
},
));
op1 = Some((cnt, ins_op));
entry
},
// on_modify
|_k, old_entry| {
// NOTES on `new_value_entry_from` method:
// 1. The internal EntryInfo will be shared between the old and new
// ValueEntries.
// 2. This method will set the is_dirty to prevent this new
// ValueEntry from being evicted by an expiration policy.
// 3. This method will update the policy_weight with the new weight.
let old_weight = old_entry.policy_weight();
let old_timestamps = (old_entry.last_accessed(), old_entry.last_modified());

// Create this OldEntryInfo _before_ creating a new ValueEntry, so
// that the OldEntryInfo can preserve the old EntryInfo's
// last_accessed and last_modified timestamps.
let old_info = OldEntryInfo::new(old_entry);
let entry = self.new_value_entry_from(value.clone(), ts, weight, old_entry);
let upd_op = WriteOp::Upsert {
key_hash: KeyHash::new(Arc::clone(&key), hash),
value_entry: TrioArc::clone(&entry),
old_weight,
new_weight: weight,
};
let cnt = op_cnt2.fetch_add(1, Ordering::Relaxed);
op2 = Some((
cnt,
TrioArc::clone(old_entry),
old_timestamps,
WriteOp::Upsert {
key_hash: KeyHash::new(Arc::clone(&key), hash),
value_entry: TrioArc::clone(&entry),
old_weight,
new_weight: weight,
},
));
op2 = Some((cnt, old_info, upd_op));
entry
},
);

match (op1, op2) {
(Some((_cnt, ins_op)), None) => {
if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
(&self.inner.expiration_policy.expiry(), &ins_op)
{
Self::expire_after_create(expiry, &key, value_entry, ts, self.inner.clocks());
}
(TrioArc::new(ins_op), ts)
(Some((_cnt, ins_op)), None) => self.post_insert(ts, &key, ins_op),
(Some((cnt1, ins_op)), Some((cnt2, ..))) if cnt1 > cnt2 => {
self.post_insert(ts, &key, ins_op)
}
(None, Some((_cnt, old_entry, (old_last_accessed, old_last_modified), upd_op))) => {
if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
(&self.inner.expiration_policy.expiry(), &upd_op)
{
Self::expire_after_read_or_update(
|k, v, t, d| expiry.expire_after_update(k, v, t, d),
&key,
value_entry,
self.inner.expiration_policy.time_to_live(),
self.inner.expiration_policy.time_to_idle(),
ts,
self.inner.clocks(),
);
}
(_, Some((_cnt, old_entry, upd_op))) => {
self.post_update(ts, key, old_entry, upd_op, drop_guard)
.await
}
(None, None) => unreachable!(),
}
}

let upd_op = TrioArc::new(upd_op);
fn post_insert(
&self,
ts: Instant,
key: &Arc<K>,
ins_op: WriteOp<K, V>,
) -> (TrioArc<WriteOp<K, V>>, Instant) {
if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
(&self.inner.expiration_policy.expiry(), &ins_op)
{
Self::expire_after_create(expiry, key, value_entry, ts, self.inner.clocks());
}
(TrioArc::new(ins_op), ts)
}

if self.is_removal_notifier_enabled() {
let future = self
.inner
.notify_upsert_future(key, &old_entry, old_last_accessed, old_last_modified)
.shared();
drop_guard.set_future_and_op(future.clone(), TrioArc::clone(&upd_op));
async fn post_update<'a>(
&self,
ts: Instant,
key: Arc<K>,
old_info: OldEntryInfo<K, V>,
upd_op: WriteOp<K, V>,
mut drop_guard: PendingOpGuard<'a, K, V>,
) -> (TrioArc<WriteOp<K, V>>, Instant) {
use futures_util::FutureExt;

future.await;
drop_guard.clear();
}
if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
(&self.inner.expiration_policy.expiry(), &upd_op)
{
Self::expire_after_read_or_update(
|k, v, t, d| expiry.expire_after_update(k, v, t, d),
&key,
value_entry,
self.inner.expiration_policy.time_to_live(),
self.inner.expiration_policy.time_to_idle(),
ts,
self.inner.clocks(),
);
}

crossbeam_epoch::pin().flush();
(upd_op, ts)
}
(
Some((cnt1, ins_op)),
Some((cnt2, old_entry, (old_last_accessed, old_last_modified), upd_op)),
) => {
if cnt1 > cnt2 {
if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
(&self.inner.expiration_policy.expiry(), &ins_op)
{
Self::expire_after_create(
expiry,
&key,
value_entry,
ts,
self.inner.clocks(),
);
}
(TrioArc::new(ins_op), ts)
} else {
if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
(&self.inner.expiration_policy.expiry(), &upd_op)
{
Self::expire_after_read_or_update(
|k, v, t, d| expiry.expire_after_update(k, v, t, d),
&key,
value_entry,
self.inner.expiration_policy.time_to_live(),
self.inner.expiration_policy.time_to_idle(),
ts,
self.inner.clocks(),
);
}
let upd_op = TrioArc::new(upd_op);

let upd_op = TrioArc::new(upd_op);

if self.is_removal_notifier_enabled() {
let future = self
.inner
.notify_upsert_future(
key,
&old_entry,
old_last_accessed,
old_last_modified,
)
.shared();
drop_guard.set_future_and_op(future.clone(), TrioArc::clone(&upd_op));

future.await;
drop_guard.clear();
}
crossbeam_epoch::pin().flush();
(upd_op, ts)
}
}
(None, None) => unreachable!(),
if self.is_removal_notifier_enabled() {
let future = self
.inner
.notify_upsert_future(
key,
&old_info.entry,
old_info.last_accessed,
old_info.last_modified,
)
.shared();
drop_guard.set_future_and_op(future.clone(), TrioArc::clone(&upd_op));

future.await;
drop_guard.clear();
}

crossbeam_epoch::pin().flush();
(upd_op, ts)
}

#[inline]
Expand Down
Loading

0 comments on commit 3fcb35a

Please sign in to comment.