Skip to content

Commit

Permalink
Remove the thread pool from future::Cache
Browse files Browse the repository at this point in the history
Try to make `run_pending_tasks` to be cancelling safe.
  • Loading branch information
tatsuya6502 committed Aug 20, 2023
1 parent 16b4f89 commit 4f8eff7
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 81 deletions.
75 changes: 38 additions & 37 deletions src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ where

#[inline]
pub(crate) async fn apply_reads_writes_if_needed(
inner: &impl InnerSync,
inner: Arc<impl InnerSync + Send + Sync + 'static>,
ch: &Sender<WriteOp<K, V>>,
now: Instant,
housekeeper: Option<&HouseKeeperArc>,
Expand Down Expand Up @@ -428,7 +428,8 @@ where
op: ReadOp<K, V>,
now: Instant,
) -> Result<(), TrySendError<ReadOp<K, V>>> {
self.apply_reads_if_needed(&self.inner, now).await;
self.apply_reads_if_needed(Arc::clone(&self.inner), now)
.await;
let ch = &self.read_op_ch;
match ch.try_send(op) {
// Discard the ReadOp when the channel is full.
Expand Down Expand Up @@ -589,7 +590,7 @@ where
}

#[inline]
async fn apply_reads_if_needed(&self, inner: &Inner<K, V, S>, now: Instant) {
async fn apply_reads_if_needed(&self, inner: Arc<Inner<K, V, S>>, now: Instant) {
let len = self.read_op_ch.len();

if let Some(hk) = &self.housekeeper {
Expand Down Expand Up @@ -1236,7 +1237,7 @@ where
S: BuildHasher + Clone + Send + Sync + 'static,
{
async fn run_pending_tasks(&self, max_repeats: usize) {
self.run_pending_tasks(max_repeats).await;
self.do_run_pending_tasks(max_repeats).await;
}

fn now(&self) -> Instant {
Expand All @@ -1250,7 +1251,7 @@ where
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
async fn run_pending_tasks(&self, max_repeats: usize) {
async fn do_run_pending_tasks(&self, max_repeats: usize) {
if self.max_capacity == Some(0) {
return;
}
Expand Down Expand Up @@ -2605,19 +2606,19 @@ mod tests {
($cache:ident, $key:ident, $hash:ident, $mock:ident, $duration_secs:expr) => {
// Increment the time.
$mock.increment(Duration::from_millis($duration_secs * 1000 - 1));
$cache.inner.run_pending_tasks(1).await;
$cache.inner.do_run_pending_tasks(1).await;
assert!($cache.contains_key_with_hash(&$key, $hash));
assert_eq!($cache.entry_count(), 1);

// Increment the time by 1ms (3). The entry should be expired.
$mock.increment(Duration::from_millis(1));
$cache.inner.run_pending_tasks(1).await;
$cache.inner.do_run_pending_tasks(1).await;
assert!(!$cache.contains_key_with_hash(&$key, $hash));

// Increment the time again to ensure the entry has been evicted from the
// cache.
$mock.increment(Duration::from_secs(1));
$cache.inner.run_pending_tasks(1).await;
$cache.inner.do_run_pending_tasks(1).await;
assert_eq!($cache.entry_count(), 0);
};
}
Expand Down Expand Up @@ -2899,7 +2900,7 @@ mod tests {
insert(&cache, key, hash, value).await;
// Run a sync to register the entry to the internal data structures including
// the timer wheel.
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert_eq!(cache.entry_count(), 1);

assert_expiry!(cache, key, hash, mock, 1);
Expand All @@ -2921,12 +2922,12 @@ mod tests {
ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None);
let inserted_at = current_time(&cache);
insert(&cache, key, hash, value).await;
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert_eq!(cache.entry_count(), 1);

// Increment the time.
mock.increment(Duration::from_secs(1));
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert!(cache.contains_key_with_hash(&key, hash));

// Read the entry (2).
Expand All @@ -2946,7 +2947,7 @@ mod tests {
.map(Entry::into_value),
Some(value)
);
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;

assert_expiry!(cache, key, hash, mock, 3);

Expand All @@ -2968,12 +2969,12 @@ mod tests {
ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None);
let inserted_at = current_time(&cache);
insert(&cache, key, hash, value).await;
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert_eq!(cache.entry_count(), 1);

// Increment the time.
mock.increment(Duration::from_secs(1));
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert!(cache.contains_key_with_hash(&key, hash));

// Read the entry (2).
Expand All @@ -2993,11 +2994,11 @@ mod tests {
.map(Entry::into_value),
Some(value)
);
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;

// Increment the time.
mock.increment(Duration::from_secs(2));
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert!(cache.contains_key_with_hash(&key, hash));
assert_eq!(cache.entry_count(), 1);

Expand All @@ -3012,7 +3013,7 @@ mod tests {
Some(3),
);
insert(&cache, key, hash, value).await;
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert_eq!(cache.entry_count(), 1);

assert_expiry!(cache, key, hash, mock, 3);
Expand All @@ -3035,12 +3036,12 @@ mod tests {
ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), None);
let inserted_at = current_time(&cache);
insert(&cache, key, hash, value).await;
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert_eq!(cache.entry_count(), 1);

// Increment the time.
mock.increment(Duration::from_secs(1));
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert!(cache.contains_key_with_hash(&key, hash));
assert_eq!(cache.entry_count(), 1);

Expand All @@ -3061,11 +3062,11 @@ mod tests {
.map(Entry::into_value),
Some(value)
);
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;

// Increment the time.
mock.increment(Duration::from_secs(2));
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert!(cache.contains_key_with_hash(&key, hash));
assert_eq!(cache.entry_count(), 1);

Expand All @@ -3080,7 +3081,7 @@ mod tests {
None,
);
insert(&cache, key, hash, value).await;
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert_eq!(cache.entry_count(), 1);

assert_expiry!(cache, key, hash, mock, 7);
Expand All @@ -3102,12 +3103,12 @@ mod tests {
ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8));
let inserted_at = current_time(&cache);
insert(&cache, key, hash, value).await;
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert_eq!(cache.entry_count(), 1);

// Increment the time.
mock.increment(Duration::from_secs(5));
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert!(cache.contains_key_with_hash(&key, hash));
assert_eq!(cache.entry_count(), 1);

Expand All @@ -3128,7 +3129,7 @@ mod tests {
.map(Entry::into_value),
Some(value)
);
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;

assert_expiry!(cache, key, hash, mock, 7);

Expand All @@ -3150,12 +3151,12 @@ mod tests {
ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(8));
let inserted_at = current_time(&cache);
insert(&cache, key, hash, value).await;
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert_eq!(cache.entry_count(), 1);

// Increment the time.
mock.increment(Duration::from_secs(5));
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert!(cache.contains_key_with_hash(&key, hash));
assert_eq!(cache.entry_count(), 1);

Expand All @@ -3176,11 +3177,11 @@ mod tests {
.map(Entry::into_value),
Some(value)
);
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;

// Increment the time.
mock.increment(Duration::from_secs(6));
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert!(cache.contains_key_with_hash(&key, hash));
assert_eq!(cache.entry_count(), 1);

Expand All @@ -3201,7 +3202,7 @@ mod tests {
.map(Entry::into_value),
Some(value)
);
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;

assert_expiry!(cache, key, hash, mock, 5);

Expand All @@ -3222,12 +3223,12 @@ mod tests {
*expectation.lock().unwrap() =
ExpiryExpectation::after_create(line!(), key, value, current_time(&cache), Some(9));
insert(&cache, key, hash, value).await;
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert_eq!(cache.entry_count(), 1);

// Increment the time.
mock.increment(Duration::from_secs(6));
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert!(cache.contains_key_with_hash(&key, hash));
assert_eq!(cache.entry_count(), 1);

Expand All @@ -3243,12 +3244,12 @@ mod tests {
);
let updated_at = current_time(&cache);
insert(&cache, key, hash, value).await;
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert_eq!(cache.entry_count(), 1);

// Increment the time.
mock.increment(Duration::from_secs(6));
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert!(cache.contains_key_with_hash(&key, hash));
assert_eq!(cache.entry_count(), 1);

Expand All @@ -3269,11 +3270,11 @@ mod tests {
.map(Entry::into_value),
Some(value)
);
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;

// Increment the time.
mock.increment(Duration::from_secs(6));
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;
assert!(cache.contains_key_with_hash(&key, hash));
assert_eq!(cache.entry_count(), 1);

Expand All @@ -3294,7 +3295,7 @@ mod tests {
.map(Entry::into_value),
Some(value)
);
cache.inner.run_pending_tasks(1).await;
cache.inner.do_run_pending_tasks(1).await;

assert_expiry!(cache, key, hash, mock, 4);
}
Expand Down
34 changes: 14 additions & 20 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1394,15 +1394,9 @@ where
let op = WriteOp::Remove(kv);
let now = self.base.current_time_from_expiration_clock();
let hk = self.base.housekeeper.as_ref();
Self::schedule_write_op(
self.base.inner.as_ref(),
&self.base.write_op_ch,
op,
now,
hk,
)
.await
.expect("Failed to remove");
Self::schedule_write_op(&self.base.inner, &self.base.write_op_ch, op, now, hk)
.await
.expect("Failed to remove");
crossbeam_epoch::pin().flush();
maybe_v
}
Expand Down Expand Up @@ -1844,20 +1838,14 @@ where

let (op, now) = self.base.do_insert_with_hash(key, hash, value).await;
let hk = self.base.housekeeper.as_ref();
Self::schedule_write_op(
self.base.inner.as_ref(),
&self.base.write_op_ch,
op,
now,
hk,
)
.await
.expect("Failed to insert");
Self::schedule_write_op(&self.base.inner, &self.base.write_op_ch, op, now, hk)
.await
.expect("Failed to insert");
}

#[inline]
async fn schedule_write_op(
inner: &impl InnerSync,
inner: &Arc<impl InnerSync + Send + Sync + 'static>,
ch: &Sender<WriteOp<K, V>>,
op: WriteOp<K, V>,
now: Instant,
Expand All @@ -1866,7 +1854,13 @@ where
let mut op = op;
let mut spin_count = 0u8;
loop {
BaseCache::<K, V, S>::apply_reads_writes_if_needed(inner, ch, now, housekeeper).await;
BaseCache::<K, V, S>::apply_reads_writes_if_needed(
Arc::clone(inner),
ch,
now,
housekeeper,
)
.await;
match ch.try_send(op) {
Ok(()) => return Ok(()),
Err(TrySendError::Full(op1)) => {
Expand Down
Loading

0 comments on commit 4f8eff7

Please sign in to comment.