Skip to content

Commit

Permalink
fix: add get_buf to caclulate admit freq
Browse files Browse the repository at this point in the history
  • Loading branch information
Millione committed Apr 18, 2023
1 parent 549a608 commit 30122af
Show file tree
Hide file tree
Showing 9 changed files with 382 additions and 54 deletions.
10 changes: 5 additions & 5 deletions examples/async_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() {

// when we get the value, we will get a ValueRef, which contains a RwLockReadGuard
// so when we finish use this value, we must release the ValueRef
let v = c.get(&"a").unwrap();
let v = c.get(&"a").await.unwrap();
assert_eq!(v.value(), &"a");
// release the value
v.release(); // or drop(v)
Expand All @@ -25,17 +25,17 @@ async fn main() {
{
// when we get the value, we will get a ValueRef, which contains a RwLockWriteGuard
// so when we finish use this value, we must release the ValueRefMut
let mut v = c.get_mut(&"a").unwrap();
let mut v = c.get_mut(&"a").await.unwrap();
v.write("aa");
assert_eq!(v.value(), &"aa");
// release the value
}

// if you just want to do one operation
let v = c.get_mut(&"a").unwrap();
let v = c.get_mut(&"a").await.unwrap();
v.write_once("aaa");

let v = c.get(&"a").unwrap();
let v = c.get(&"a").await.unwrap();
println!("{}", v);
assert_eq!(v.value(), &"aaa");
v.release();
Expand All @@ -45,5 +45,5 @@ async fn main() {
// wait all the operations are finished
c.wait().await.unwrap();

assert!(c.get(&"a").is_none());
assert!(c.get(&"a").await.is_none());
}
221 changes: 217 additions & 4 deletions src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ macro_rules! impl_builder {
}
}

/// Set the insert buffer items for the Cache.
///
/// `buffer_items` determines the size of Get buffers.
///
/// Unless you have a rare use case, using `64` as the BufferItems value
/// results in good performance.
#[inline]
pub fn set_buffer_items(self, sz: usize) -> Self {
Self {
inner: self.inner.set_buffer_items(sz),
}
}

/// Set whether record the metrics or not.
///
/// Metrics is true when you want real-time logging of a variety of stats.
Expand Down Expand Up @@ -213,6 +226,8 @@ macro_rules! impl_cache {

let (index, conflict) = self.key_to_hash.build_key(key);

self.get_buf.push(index);

match self.store.get(&index, conflict) {
None => {
self.metrics.add(MetricType::Miss, index, 1);
Expand All @@ -237,6 +252,9 @@ macro_rules! impl_cache {
}

let (index, conflict) = self.key_to_hash.build_key(key);

self.get_buf.push(index);

match self.store.get_mut(&index, conflict) {
None => {
self.metrics.add(MetricType::Miss, index, 1);
Expand Down Expand Up @@ -357,6 +375,7 @@ macro_rules! impl_cache {
Self {
store: self.store.clone(),
policy: self.policy.clone(),
get_buf: self.get_buf.clone(),
insert_buf_tx: self.insert_buf_tx.clone(),
stop_tx: self.stop_tx.clone(),
clear_tx: self.clear_tx.clone(),
Expand Down Expand Up @@ -494,6 +513,200 @@ macro_rules! impl_cache_processor {
};
}

#[cfg(feature = "async")]
macro_rules! impl_async_cache {
($cache: ident, $builder: ident, $item: ident) => {
use crate::store::UpdateResult;
use crate::{ValueRef, ValueRefMut};

impl<K, V, KH, C, U, CB, S> $cache<K, V, KH, C, U, CB, S>
where
K: Hash + Eq,
V: Send + Sync + 'static,
KH: KeyBuilder<Key = K>,
C: Coster<Value = V>,
U: UpdateValidator<Value = V>,
CB: CacheCallback<Value = V>,
S: BuildHasher + Clone + 'static + Send,
{
/// `get` returns a `Option<ValueRef<V, SS>>` (if any) representing whether the
/// value was found or not.
pub async fn get<Q>(&self, key: &Q) -> Option<ValueRef<V, S>>
where
K: core::borrow::Borrow<Q>,
Q: core::hash::Hash + Eq + ?Sized,
{
if self.is_closed.load(Ordering::SeqCst) {
return None;
}

let (index, conflict) = self.key_to_hash.build_key(key);

self.get_buf.push(index).await;

match self.store.get(&index, conflict) {
None => {
self.metrics.add(MetricType::Miss, index, 1);
None
}
Some(v) => {
self.metrics.add(MetricType::Hit, index, 1);
Some(v)
}
}
}

/// `get_mut` returns a `Option<ValueRefMut<V, SS>>` (if any) representing whether the
/// value was found or not.
pub async fn get_mut<Q>(&self, key: &Q) -> Option<ValueRefMut<V, S>>
where
K: core::borrow::Borrow<Q>,
Q: core::hash::Hash + Eq + ?Sized,
{
if self.is_closed.load(Ordering::SeqCst) {
return None;
}

let (index, conflict) = self.key_to_hash.build_key(key);

self.get_buf.push(index).await;

match self.store.get_mut(&index, conflict) {
None => {
self.metrics.add(MetricType::Miss, index, 1);
None
}
Some(v) => {
self.metrics.add(MetricType::Hit, index, 1);
Some(v)
}
}
}

/// Returns the TTL for the specified key if the
/// item was found and is not expired.
pub fn get_ttl<Q>(&self, key: &Q) -> Option<Duration>
where
K: core::borrow::Borrow<Q>,
Q: core::hash::Hash + Eq + ?Sized,
{
let (index, conflict) = self.key_to_hash.build_key(key);
self.store
.get(&index, conflict)
.and_then(|_| self.store.expiration(&index).map(|time| time.get_ttl()))
}

/// `max_cost` returns the max cost of the cache.
#[inline]
pub fn max_cost(&self) -> i64 {
self.policy.max_cost()
}

/// `update_max_cost` updates the maxCost of an existing cache.
#[inline]
pub fn update_max_cost(&self, max_cost: i64) {
self.policy.update_max_cost(max_cost)
}

/// Returns the number of items in the Cache
#[inline]
pub fn len(&self) -> usize {
self.store.len()
}

/// Returns true if the cache is empty
#[inline]
pub fn is_empty(&self) -> bool {
self.store.len() == 0
}

#[inline]
fn try_update(
&self,
key: K,
val: V,
cost: i64,
ttl: Duration,
only_update: bool,
) -> Result<Option<(u64, $item<V>)>, CacheError> {
let expiration = if ttl.is_zero() {
Time::now()
} else {
Time::now_with_expiration(ttl)
};

let (index, conflict) = self.key_to_hash.build_key(&key);

// cost is eventually updated. The expiration must also be immediately updated
// to prevent items from being prematurely removed from the map.
let external_cost = if cost == 0 { self.coster.cost(&val) } else { 0 };
match self.store.try_update(index, val, conflict, expiration)? {
UpdateResult::NotExist(v)
| UpdateResult::Reject(v)
| UpdateResult::Conflict(v) => {
if only_update {
Ok(None)
} else {
Ok(Some((
index,
$item::new(index, conflict, cost + external_cost, v, expiration),
)))
}
}
UpdateResult::Update(v) => {
self.callback.on_exit(Some(v));
Ok(Some((index, $item::update(index, cost, external_cost))))
}
}
}
}

impl<K, V, KH, C, U, CB, S> AsRef<$cache<K, V, KH, C, U, CB, S>>
for $cache<K, V, KH, C, U, CB, S>
where
K: Hash + Eq,
V: Send + Sync + 'static,
KH: KeyBuilder<Key = K>,
C: Coster<Value = V>,
U: UpdateValidator<Value = V>,
CB: CacheCallback<Value = V>,
S: BuildHasher + Clone + 'static,
{
fn as_ref(&self) -> &$cache<K, V, KH, C, U, CB, S> {
self
}
}

impl<K, V, KH, C, U, CB, S> Clone for $cache<K, V, KH, C, U, CB, S>
where
K: Hash + Eq,
V: Send + Sync + 'static,
KH: KeyBuilder<Key = K>,
C: Coster<Value = V>,
U: UpdateValidator<Value = V>,
CB: CacheCallback<Value = V>,
S: BuildHasher + Clone + 'static,
{
fn clone(&self) -> Self {
Self {
store: self.store.clone(),
policy: self.policy.clone(),
get_buf: self.get_buf.clone(),
insert_buf_tx: self.insert_buf_tx.clone(),
stop_tx: self.stop_tx.clone(),
clear_tx: self.clear_tx.clone(),
callback: self.callback.clone(),
key_to_hash: self.key_to_hash.clone(),
is_closed: self.is_closed.clone(),
coster: self.coster.clone(),
metrics: self.metrics.clone(),
_marker: self._marker,
}
}
}
};
}

macro_rules! impl_cache_cleaner {
($cleaner: ident, $processor: ident, $item: ident) => {
impl<'a, V, U, CB, S> $cleaner<'a, V, U, CB, S>
Expand Down Expand Up @@ -548,12 +761,12 @@ pub use sync::{Cache, CacheBuilder};

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
mod axync;
mod r#async;
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub use axync::{AsyncCache, AsyncCacheBuilder};
pub use r#async::{AsyncCache, AsyncCacheBuilder};

// TODO: find the optimal value for this
const DEFAULT_INSERT_BUF_SIZE: usize = 32 * 1024;
// const DEFAULT_BUFFER_ITEMS: usize = 64;
const DEFAULT_CLEANUP_DURATION: Duration = Duration::from_millis(500);
pub(crate) const DEFAULT_BUFFER_ITEMS: usize = 64;
const DEFAULT_CLEANUP_DURATION: Duration = Duration::from_secs(2);
9 changes: 7 additions & 2 deletions src/cache/axync.rs → src/cache/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::axync::{
};
use crate::cache::builder::CacheBuilderCore;
use crate::policy::AsyncLFUPolicy;
use crate::ring::AsyncRingStripe;
use crate::store::ShardedMap;
use crate::ttl::{ExpirationMap, Time};
use crate::{
Expand Down Expand Up @@ -209,7 +210,6 @@ where
self.inner.update_validator.unwrap(),
hasher.clone(),
));

let mut policy = AsyncLFUPolicy::with_hasher(num_counters, max_cost, hasher, spawner)?;

let coster = Arc::new(self.inner.coster.unwrap());
Expand Down Expand Up @@ -239,9 +239,12 @@ where
spawner(fut);
}));

let buffer_items = self.inner.buffer_items;
let get_buf = AsyncRingStripe::new(policy.clone(), buffer_items);
let this = AsyncCache {
store,
policy,
get_buf: Arc::new(get_buf),
insert_buf_tx: buf_tx,
callback,
key_to_hash: Arc::new(self.inner.key_to_hash),
Expand Down Expand Up @@ -373,6 +376,8 @@ pub struct AsyncCache<
/// contention.
pub(crate) insert_buf_tx: Sender<Item<V>>,

pub(crate) get_buf: Arc<AsyncRingStripe<S>>,

pub(crate) stop_tx: Sender<()>,

pub(crate) clear_tx: Sender<()>,
Expand Down Expand Up @@ -776,6 +781,6 @@ where
}

impl_builder!(AsyncCacheBuilder);
impl_cache!(AsyncCache, AsyncCacheBuilder, Item);
impl_async_cache!(AsyncCache, AsyncCacheBuilder, Item);
impl_cache_processor!(CacheProcessor, Item);
impl_cache_cleaner!(CacheCleaner, CacheProcessor, Item);
Loading

0 comments on commit 30122af

Please sign in to comment.