Skip to content

Commit

Permalink
Remove the thread pool from future::Cache
Browse files Browse the repository at this point in the history
Make `run_pending_tasks` to be cancelling safe.
  • Loading branch information
tatsuya6502 committed Aug 20, 2023
1 parent 4f8eff7 commit 1c8e0eb
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/common/concurrent/housekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl BlockingHousekeeper {

#[inline]
fn should_apply(&self, ch_len: usize, ch_flush_point: usize, now: Instant) -> bool {
ch_len >= ch_flush_point || self.sync_after.instant().unwrap() >= now
ch_len >= ch_flush_point || now >= self.sync_after.instant().unwrap()
}

fn try_sync<T: InnerSync>(&self, cache: &T) -> bool {
Expand Down
12 changes: 10 additions & 2 deletions src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ where

if let Some(hk) = housekeeper {
if Self::should_apply_writes(hk, w_len, now) {
hk.try_sync(inner).await;
hk.try_run_pending_tasks(inner).await;
}
}
}
Expand Down Expand Up @@ -595,7 +595,7 @@ where

if let Some(hk) = &self.housekeeper {
if Self::should_apply_reads(hk, len, now) {
hk.try_sync(inner).await;
hk.try_run_pending_tasks(inner).await;
}
}
}
Expand Down Expand Up @@ -717,10 +717,18 @@ where
pub(crate) async fn reconfigure_for_testing(&mut self) {
// Enable the frequency sketch.
self.inner.enable_frequency_sketch_for_testing().await;
// Disable auto clean up of pending tasks.
if let Some(hk) = &self.housekeeper {
hk.disable_auto_run();
}
}

pub(crate) async fn set_expiration_clock(&self, clock: Option<Clock>) {
self.inner.set_expiration_clock(clock).await;
if let Some(hk) = &self.housekeeper {
let now = self.current_time_from_expiration_clock();
hk.reset_run_after(now);
}
}

pub(crate) fn key_locks_map_is_empty(&self) -> bool {
Expand Down
109 changes: 106 additions & 3 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::{
};
use crate::{
common::{
concurrent::{constants::MAX_SYNC_REPEATS, Weigher, WriteOp},
concurrent::{Weigher, WriteOp},
time::Instant,
},
notification::AsyncEvictionListener,
Expand Down Expand Up @@ -1512,7 +1512,9 @@ where
}

pub async fn run_pending_tasks(&self) {
self.base.inner.run_pending_tasks(MAX_SYNC_REPEATS).await;
if let Some(hk) = &self.base.housekeeper {
hk.run_pending_tasks(Arc::clone(&self.base.inner)).await;
}
}
}

Expand Down Expand Up @@ -1943,6 +1945,24 @@ where
fn key_locks_map_is_empty(&self) -> bool {
self.base.key_locks_map_is_empty()
}

fn run_pending_tasks_initiation_count(&self) -> usize {
use std::sync::atomic::Ordering;
self.base
.housekeeper
.as_ref()
.map(|hk| hk.start_count.load(Ordering::Acquire))
.expect("housekeeper is not set")
}

fn run_pending_tasks_completion_count(&self) -> usize {
use std::sync::atomic::Ordering;
self.base
.housekeeper
.as_ref()
.map(|hk| hk.complete_count.load(Ordering::Acquire))
.expect("housekeeper is not set")
}
}

// AS of Rust 1.71, we cannot make this function into a `const fn` because mutable
Expand All @@ -1968,7 +1988,10 @@ mod tests {
use async_lock::{Barrier, Mutex};
use std::{
convert::Infallible,
sync::Arc,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::{Duration, Instant as StdInstant},
};
use tokio::time::sleep;
Expand Down Expand Up @@ -4353,6 +4376,86 @@ mod tests {
verify_notification_vec(&cache, actual, &expected).await;
}

#[tokio::test]
async fn cancel_future_while_running_pending_tasks() {
use crate::future::FutureExt;
use futures_util::future::poll_immediate;

Check failure on line 4382 in src/future/cache.rs

View workflow job for this annotation

GitHub Actions / test (nightly)

unresolved import `futures_util::future::poll_immediate`

Check failure on line 4382 in src/future/cache.rs

View workflow job for this annotation

GitHub Actions / test (nightly)

unresolved import `futures_util::future::poll_immediate`

Check failure on line 4382 in src/future/cache.rs

View workflow job for this annotation

GitHub Actions / test (nightly)

unresolved import `futures_util::future::poll_immediate`

Check failure on line 4382 in src/future/cache.rs

View workflow job for this annotation

GitHub Actions / test (nightly)

unresolved import `futures_util::future::poll_immediate`
use tokio::task::yield_now;

let listener_initiation_count: Arc<AtomicU32> = Default::default();
let listener_completion_count: Arc<AtomicU32> = Default::default();

let listener = {
// Variables to capture.
let init_count = Arc::clone(&listener_initiation_count);
let comp_count = Arc::clone(&listener_completion_count);

// Our eviction listener closure.
move |_k, _v, _r| {
init_count.fetch_add(1, Ordering::AcqRel);
let comp_count1 = Arc::clone(&comp_count);

async move {
yield_now().await;
comp_count1.fetch_add(1, Ordering::AcqRel);
}
.boxed()
}
};

let mut cache: Cache<u32, u32> = Cache::builder()
.time_to_live(Duration::from_millis(10))
.async_eviction_listener(listener)
.build();

cache.reconfigure_for_testing().await;

let (clock, mock) = Clock::mock();
cache.set_expiration_clock(Some(clock)).await;

// Make the cache exterior immutable.
let cache = cache;

cache.insert(1, 1).await;
assert_eq!(cache.run_pending_tasks_initiation_count(), 0);
assert_eq!(cache.run_pending_tasks_completion_count(), 0);

// Key 1 is not yet expired.
mock.increment(Duration::from_millis(7));

cache.run_pending_tasks().await;
assert_eq!(cache.run_pending_tasks_initiation_count(), 1);
assert_eq!(cache.run_pending_tasks_completion_count(), 1);
assert_eq!(listener_initiation_count.load(Ordering::Acquire), 0);
assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);

// Now key 1 is expired, so the eviction listener should be called when we
// call run_pending_tasks() and poll the returned future.
mock.increment(Duration::from_millis(7));

let fut = cache.run_pending_tasks();
// Poll the fut only once, and drop it. The fut should not be completed (so
// it is cancelled) because the eviction listener performed a yield_now().
assert!(poll_immediate(fut).await.is_none());

// The task is initiated but not completed.
assert_eq!(cache.run_pending_tasks_initiation_count(), 2);
assert_eq!(cache.run_pending_tasks_completion_count(), 1);
// The listener is initiated but not completed.
assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);

// This will resume the task and the listener, and continue polling
// until complete.
cache.run_pending_tasks().await;
// Now the task is completed.
assert_eq!(cache.run_pending_tasks_initiation_count(), 2);
assert_eq!(cache.run_pending_tasks_completion_count(), 2);
// Now the listener is completed.
assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
}

// This test ensures that the `contains_key`, `get` and `invalidate` can use
// borrowed form `&[u8]` for key with type `Vec<u8>`.
// https://github.com/moka-rs/moka/issues/166
Expand Down
111 changes: 81 additions & 30 deletions src/future/housekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,16 @@ use crate::common::{
time::{CheckedTimeOps, Instant},
};

use std::{sync::Arc, time::Duration};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};

#[cfg(test)]
use std::sync::atomic::AtomicUsize;

use async_lock::Mutex;
use async_trait::async_trait;
Expand All @@ -25,13 +34,23 @@ pub(crate) struct Housekeeper {
/// A shared `Future` of the maintenance task that is currently being resolved.
current_task: Mutex<Option<Shared<BoxFuture<'static, ()>>>>,
run_after: AtomicInstant,
auto_run_enabled: AtomicBool,
#[cfg(test)]
pub(crate) start_count: AtomicUsize,
#[cfg(test)]
pub(crate) complete_count: AtomicUsize,
}

impl Default for Housekeeper {
fn default() -> Self {
Self {
current_task: Default::default(),
run_after: AtomicInstant::new(Self::sync_after(Instant::now())),
auto_run_enabled: AtomicBool::new(true),
#[cfg(test)]
start_count: Default::default(),
#[cfg(test)]
complete_count: Default::default(),
}
}
}
Expand All @@ -47,47 +66,68 @@ impl Housekeeper {

#[inline]
fn should_apply(&self, ch_len: usize, ch_flush_point: usize, now: Instant) -> bool {
ch_len >= ch_flush_point || self.run_after.instant().unwrap() >= now
self.auto_run_enabled.load(Ordering::Relaxed)
&& (ch_len >= ch_flush_point || now >= self.run_after.instant().unwrap())
}

// TODO: Change the name to something that shows some kind of relationship with
// `run_pending_tasks`.
pub(crate) async fn try_sync<T>(&self, cache: Arc<T>) -> bool
pub(crate) async fn run_pending_tasks<T>(&self, cache: Arc<T>)
where
T: InnerSync + Send + Sync + 'static,
{
use futures_util::FutureExt;

// TODO: This will skip to run pending tasks if lock cannot be acquired.
// Change this so that when `try_sync` is explicitly called, it will be
// blocked here until the lock is acquired.
if let Some(mut lock) = self.current_task.try_lock() {
let now = cache.now();

if let Some(task) = &*lock {
// This task was being resolved, but did not complete. This means
// that the enclosing Future was canceled. Try to resolve it.
task.clone().await;
} else {
// Create a new maintenance task and try to resolve it.
let task = async move { cache.run_pending_tasks(MAX_SYNC_REPEATS).await }
.boxed()
.shared();
*lock = Some(task.clone());
task.await;
}

// If we are here, it means that the maintenance task has been completed,
// so we can remove it from the lock.
*lock = None;
self.run_after.set_instant(Self::sync_after(now));
let mut current_task = self.current_task.lock().await;
self.do_run_pending_tasks(cache, &mut current_task).await;
}

pub(crate) async fn try_run_pending_tasks<T>(&self, cache: Arc<T>) -> bool
where
T: InnerSync + Send + Sync + 'static,
{
if let Some(mut current_task) = self.current_task.try_lock() {
self.do_run_pending_tasks(cache, &mut current_task).await;
true
} else {
false
}
}

async fn do_run_pending_tasks<T>(
&self,
cache: Arc<T>,
current_task: &mut Option<Shared<BoxFuture<'static, ()>>>,
) where
T: InnerSync + Send + Sync + 'static,
{
use futures_util::FutureExt;

let now = cache.now();

if let Some(task) = &*current_task {
// This task was being resolved, but did not complete. This means the
// previous run was canceled due to the enclosing Future was dropped.
// Resume the task now by awaiting.
task.clone().await;
} else {
// Create a new maintenance task and resolve it.
let task = async move { cache.run_pending_tasks(MAX_SYNC_REPEATS).await }
.boxed()
.shared();
*current_task = Some(task.clone());

#[cfg(test)]
self.start_count.fetch_add(1, Ordering::AcqRel);

task.await;
}

// If we are here, it means that the maintenance task has been resolved.
// We can remove it from the lock.
*current_task = None;
self.run_after.set_instant(Self::sync_after(now));

#[cfg(test)]
self.complete_count.fetch_add(1, Ordering::AcqRel);
}

fn sync_after(now: Instant) -> Instant {
let dur = Duration::from_millis(PERIODICAL_SYNC_INITIAL_DELAY_MILLIS);
let ts = now.checked_add(dur);
Expand All @@ -96,3 +136,14 @@ impl Housekeeper {
ts.expect("Timestamp overflow")
}
}

#[cfg(test)]
impl Housekeeper {
pub(crate) fn disable_auto_run(&self) {
self.auto_run_enabled.store(false, Ordering::Relaxed);
}

pub(crate) fn reset_run_after(&self, now: Instant) {
self.run_after.set_instant(Self::sync_after(now));
}
}

0 comments on commit 1c8e0eb

Please sign in to comment.