Skip to content

Commit

Permalink
Improve async cancellation safety of future::Cache
Browse files Browse the repository at this point in the history
Refactoring on the `PendingOp` and `PendingOpGuard`.
  • Loading branch information
tatsuya6502 committed Aug 26, 2023
1 parent 3fcb35a commit eef7157
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 106 deletions.
42 changes: 40 additions & 2 deletions src/common/concurrent.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::common::{deque::DeqNode, time::Instant};

use parking_lot::Mutex;
use std::{ptr::NonNull, sync::Arc};
use std::{fmt, ptr::NonNull, sync::Arc};
use tagptr::TagNonNull;
use triomphe::Arc as TrioArc;

Expand Down Expand Up @@ -112,6 +112,15 @@ impl<K, V> KvEntry<K, V> {
}
}

impl<K, V> Clone for KvEntry<K, V> {
fn clone(&self) -> Self {
Self {
key: Arc::clone(&self.key),
entry: TrioArc::clone(&self.entry),
}
}
}

impl<K> AccessTime for DeqNode<KeyHashDate<K>> {
#[inline]
fn last_accessed(&self) -> Option<Instant> {
Expand Down Expand Up @@ -308,7 +317,6 @@ pub(crate) enum ReadOp<K, V> {
Miss(u64),
}

#[cfg(feature = "sync")]
pub(crate) enum WriteOp<K, V> {
Upsert {
key_hash: KeyHash<K>,
Expand All @@ -319,6 +327,36 @@ pub(crate) enum WriteOp<K, V> {
Remove(KvEntry<K, V>),
}

/// Cloning a WriteOp is safe and cheap because it uses Arc and TrioArc pointers to
/// the actual data.
impl<K, V> Clone for WriteOp<K, V> {
fn clone(&self) -> Self {
match self {
Self::Upsert {
key_hash,
value_entry,
old_weight,
new_weight,
} => Self::Upsert {
key_hash: key_hash.clone(),
value_entry: TrioArc::clone(value_entry),
old_weight: *old_weight,
new_weight: *new_weight,
},
Self::Remove(kv_hash) => Self::Remove(kv_hash.clone()),
}
}
}

impl<K, V> fmt::Debug for WriteOp<K, V> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Upsert { .. } => f.debug_struct("Upsert").finish(),
Self::Remove(..) => f.debug_tuple("Remove").finish(),
}
}
}

pub(crate) struct OldEntryInfo<K, V> {
pub(crate) entry: TrioArc<ValueEntry<K, V>>,
pub(crate) last_accessed: Option<Instant>,
Expand Down
46 changes: 14 additions & 32 deletions src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use async_lock::Mutex;
use crossbeam_channel::Sender;
use futures_util::future::{BoxFuture, Shared};
use once_cell::sync::Lazy;
use std::{fmt, future::Future, hash::Hash, sync::Arc};
use std::{future::Future, hash::Hash, sync::Arc};
use triomphe::Arc as TrioArc;

use crate::common::{
concurrent::{KeyHash, KvEntry, ValueEntry},
concurrent::{ValueEntry, WriteOp},
time::Instant,
};

Expand Down Expand Up @@ -78,25 +78,30 @@ where
}
}

/// Operation that has been interrupted by async cancellation.
pub(crate) enum PendingOp<K, V> {
// 'static means that the future can capture only owned value and/or static
// references. No non-static references are allowed.
CallEvictionListener {
ts: Instant,
// 'static means that the future can capture only owned value and/or static
// references. No non-static references are allowed.
future: Shared<BoxFuture<'static, ()>>,
op: TrioArc<WriteOp<K, V>>,
op: WriteOp<K, V>,
},
SendWriteOp {
ts: Instant,
op: TrioArc<WriteOp<K, V>>,
op: WriteOp<K, V>,
},
}

/// Drop guard for an operation being performed. If this guard is dropped while it is
/// still having the future or the write op, it will convert them to a PendingOp and
/// send to the pending operation channel, so that the operation can be retried
/// later.
struct PendingOpGuard<'a, K, V> {
pending_op_ch: &'a Sender<PendingOp<K, V>>,
ts: Instant,
future: Option<Shared<BoxFuture<'static, ()>>>,
op: Option<TrioArc<WriteOp<K, V>>>,
op: Option<WriteOp<K, V>>,
}

impl<'a, K, V> PendingOpGuard<'a, K, V> {
Expand All @@ -109,16 +114,12 @@ impl<'a, K, V> PendingOpGuard<'a, K, V> {
}
}

fn set_future_and_op(
&mut self,
future: Shared<BoxFuture<'static, ()>>,
op: TrioArc<WriteOp<K, V>>,
) {
fn set_future_and_op(&mut self, future: Shared<BoxFuture<'static, ()>>, op: WriteOp<K, V>) {
self.future = Some(future);
self.op = Some(op);
}

fn set_op(&mut self, op: TrioArc<WriteOp<K, V>>) {
fn set_op(&mut self, op: WriteOp<K, V>) {
self.op = Some(op);
}

Expand Down Expand Up @@ -170,22 +171,3 @@ pub(crate) enum ReadOp<K, V> {
// u64 is the hash of the key.
Miss(u64),
}

pub(crate) enum WriteOp<K, V> {
Upsert {
key_hash: KeyHash<K>,
value_entry: TrioArc<ValueEntry<K, V>>,
old_weight: u32,
new_weight: u32,
},
Remove(TrioArc<KvEntry<K, V>>),
}

impl<K, V> fmt::Debug for WriteOp<K, V> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Upsert { .. } => f.debug_struct("Upsert").finish(),
Self::Remove(..) => f.debug_tuple("Remove").finish(),
}
}
}
108 changes: 42 additions & 66 deletions src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub(crate) type HouseKeeperArc = Arc<Housekeeper>;
pub(crate) struct BaseCache<K, V, S = RandomState> {
pub(crate) inner: Arc<Inner<K, V, S>>,
read_op_ch: Sender<ReadOp<K, V>>,
pub(crate) write_op_ch: Sender<TrioArc<WriteOp<K, V>>>,
pub(crate) write_op_ch: Sender<WriteOp<K, V>>,
pub(crate) pending_op_ch_snd: Sender<PendingOp<K, V>>,
pub(crate) pending_op_ch_rcv: Receiver<PendingOp<K, V>>,
pub(crate) housekeeper: Option<HouseKeeperArc>,
Expand Down Expand Up @@ -119,7 +119,7 @@ impl<K, V, S> BaseCache<K, V, S> {
self.inner.current_time_from_expiration_clock()
}

pub(crate) fn notify_invalidate_future(
pub(crate) fn notify_invalidate(
&self,
key: &Arc<K>,
entry: &TrioArc<ValueEntry<K, V>>,
Expand Down Expand Up @@ -371,7 +371,7 @@ where
#[inline]
pub(crate) async fn apply_reads_writes_if_needed(
inner: Arc<impl InnerSync + Send + Sync + 'static>,
ch: &Sender<TrioArc<WriteOp<K, V>>>,
ch: &Sender<WriteOp<K, V>>,
ts: Instant,
housekeeper: Option<&HouseKeeperArc>,
) {
Expand Down Expand Up @@ -468,7 +468,7 @@ where
key: Arc<K>,
hash: u64,
value: V,
) -> (TrioArc<WriteOp<K, V>>, Instant) {
) -> (WriteOp<K, V>, Instant) {
self.process_pending_ops().await;

let ts = self.current_time_from_expiration_clock();
Expand All @@ -477,7 +477,6 @@ where
let op_cnt2 = Arc::clone(&op_cnt1);
let mut op1 = None;
let mut op2 = None;
let pending_op_ch = &self.pending_op_ch_snd;

// Lock the key for update if blocking removal notification is enabled.
let kl = self.maybe_key_lock(&key);
Expand All @@ -487,8 +486,6 @@ where
None
};

let drop_guard = PendingOpGuard::new(pending_op_ch, ts);

// Since the cache (cht::SegmentedHashMap) employs optimistic locking
// strategy, insert_with_or_modify() may get an insert/modify operation
// conflicted with other concurrent hash table operations. In that case, it
Expand Down Expand Up @@ -540,7 +537,7 @@ where
self.post_insert(ts, &key, ins_op)
}
(_, Some((_cnt, old_entry, upd_op))) => {
self.post_update(ts, key, old_entry, upd_op, drop_guard)
self.post_update(ts, key, old_entry, upd_op, &self.pending_op_ch_snd)
.await
}
(None, None) => unreachable!(),
Expand All @@ -552,13 +549,13 @@ where
ts: Instant,
key: &Arc<K>,
ins_op: WriteOp<K, V>,
) -> (TrioArc<WriteOp<K, V>>, Instant) {
) -> (WriteOp<K, V>, Instant) {
if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
(&self.inner.expiration_policy.expiry(), &ins_op)
{
Self::expire_after_create(expiry, key, value_entry, ts, self.inner.clocks());
}
(TrioArc::new(ins_op), ts)
(ins_op, ts)
}

async fn post_update<'a>(
Expand All @@ -567,8 +564,8 @@ where
key: Arc<K>,
old_info: OldEntryInfo<K, V>,
upd_op: WriteOp<K, V>,
mut drop_guard: PendingOpGuard<'a, K, V>,
) -> (TrioArc<WriteOp<K, V>>, Instant) {
pending_op_ch: &'a Sender<PendingOp<K, V>>,
) -> (WriteOp<K, V>, Instant) {
use futures_util::FutureExt;

if let (Some(expiry), WriteOp::Upsert { value_entry, .. }) =
Expand All @@ -585,20 +582,20 @@ where
);
}

let upd_op = TrioArc::new(upd_op);

if self.is_removal_notifier_enabled() {
let future = self
.inner
.notify_upsert_future(
.notify_upsert(
key,
&old_info.entry,
old_info.last_accessed,
old_info.last_modified,
)
.shared();
drop_guard.set_future_and_op(future.clone(), TrioArc::clone(&upd_op));
let mut drop_guard = PendingOpGuard::new(pending_op_ch, ts);
drop_guard.set_future_and_op(future.clone(), upd_op.clone());

// Notify the eviction listener.
future.await;
drop_guard.clear();
}
Expand All @@ -610,11 +607,11 @@ where
#[inline]
pub(crate) async fn schedule_write_op(
inner: &Arc<impl InnerSync + Send + Sync + 'static>,
ch: &Sender<TrioArc<WriteOp<K, V>>>,
op: TrioArc<WriteOp<K, V>>,
ch: &Sender<WriteOp<K, V>>,
op: WriteOp<K, V>,
ts: Instant,
housekeeper: Option<&HouseKeeperArc>,
) -> Result<(), TrySendError<TrioArc<WriteOp<K, V>>>> {
) -> Result<(), TrySendError<WriteOp<K, V>>> {
let mut op = op;
let mut spin_count = 0u8;
loop {
Expand Down Expand Up @@ -672,7 +669,7 @@ where
}

let ts = drop_guard.ts;
let op = drop_guard.op.as_ref().map(TrioArc::clone).unwrap();
let op = drop_guard.op.as_ref().cloned().unwrap();
let hk = self.housekeeper.as_ref();
Self::schedule_write_op(&self.inner, &self.write_op_ch, op, ts, hk)
.await
Expand Down Expand Up @@ -984,7 +981,7 @@ pub(crate) struct Inner<K, V, S> {
frequency_sketch: RwLock<FrequencySketch>,
frequency_sketch_enabled: AtomicBool,
read_op_ch: Receiver<ReadOp<K, V>>,
write_op_ch: Receiver<TrioArc<WriteOp<K, V>>>,
write_op_ch: Receiver<WriteOp<K, V>>,
expiration_policy: ExpirationPolicy<K, V>,
valid_after: AtomicInstant,
weigher: Option<Weigher<K, V>>,
Expand Down Expand Up @@ -1128,7 +1125,7 @@ where
weigher: Option<Weigher<K, V>>,
eviction_listener: Option<AsyncEvictionListener<K, V>>,
read_op_ch: Receiver<ReadOp<K, V>>,
write_op_ch: Receiver<TrioArc<WriteOp<K, V>>>,
write_op_ch: Receiver<WriteOp<K, V>>,
expiration_policy: ExpirationPolicy<K, V>,
invalidator_enabled: bool,
) -> Self {
Expand Down Expand Up @@ -1548,49 +1545,28 @@ where

for _ in 0..count {
match ch.try_recv() {
Err(_) => break, // for loop
Ok(mut arc_op) => loop {
match TrioArc::try_unwrap(arc_op) {
Ok(op) => {
match op {
Upsert {
key_hash: kh,
value_entry: entry,
old_weight,
new_weight,
} => {
self.handle_upsert(
kh,
entry,
old_weight,
new_weight,
deqs,
timer_wheel,
&freq,
eviction_state,
)
.await
}
Remove(kv_entry) => {
let KvEntry { key: _key, entry } = &*kv_entry;
Self::handle_remove(
deqs,
timer_wheel,
TrioArc::clone(entry),
&mut eviction_state.counters,
)
}
}
break; // inner loop
}
Err(op) => {
arc_op = op;
for _ in 0..8 {
std::hint::spin_loop();
}
}
}
},
Ok(Upsert {
key_hash: kh,
value_entry: entry,
old_weight,
new_weight,
}) => {
self.handle_upsert(
kh,
entry,
old_weight,
new_weight,
deqs,
timer_wheel,
&freq,
eviction_state,
)
.await
}
Ok(Remove(KvEntry { key: _key, entry })) => {
Self::handle_remove(deqs, timer_wheel, entry, &mut eviction_state.counters)
}
Err(_) => break,
};
}
}
Expand Down Expand Up @@ -2388,7 +2364,7 @@ where
}

#[inline]
fn notify_upsert_future(
fn notify_upsert(
&self,
key: Arc<K>,
entry: &TrioArc<ValueEntry<K, V>>,
Expand Down
Loading

0 comments on commit eef7157

Please sign in to comment.