From ec779f88b8395d335b7f293ad6cd85c0295e8d81 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sat, 26 Aug 2023 00:02:58 +0100 Subject: [PATCH] Implementing a setting cache ghstack-source-id: c906f41fd517a0021667280bf1bed438bf93e9e2 Pull Request resolved: https://github.com/devtari-io/cronback/pull/19 --- Cargo.lock | 224 ++++++++++++++++++++++++++++++- cronback-lib/Cargo.toml | 4 +- cronback-lib/lib.rs | 2 + cronback-lib/project_settings.rs | 115 ++++++++++++++++ 4 files changed, 343 insertions(+), 2 deletions(-) create mode 100644 cronback-lib/project_settings.rs diff --git a/Cargo.lock b/Cargo.lock index 6a6d3aa..7bb687a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,6 +142,35 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix 0.37.23", + "slab", + "socket2", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" +dependencies = [ + "event-listener", +] + [[package]] name = "async-recursion" version = "1.0.4" @@ -452,6 +481,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "bytecount" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" + [[package]] name = "byteorder" version = "1.4.3" @@ -464,6 +499,37 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +[[package]] +name = "camino" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cfa25e60aea747ec7e1124f238816749faa93759c6ff5b31f1ccdda137f4479" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "cc" version = "1.0.79" @@ -667,6 +733,15 @@ dependencies = [ "yansi", ] +[[package]] +name = "concurrent-queue" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "config" version = "0.13.3" @@ -882,6 +957,7 @@ dependencies = [ "ipext", "iso8601-duration", "metrics", + "moka", "notify", "notify-debouncer-mini", "rand", @@ -1247,6 +1323,15 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "etcetera" version = "0.8.0" @@ -1398,7 +1483,7 @@ checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5" dependencies = [ "futures-core", "lock_api", - "parking_lot", + "parking_lot 0.11.2", ] [[package]] @@ -1407,6 +1492,21 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.28" @@ -2073,6 +2173,31 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "moka" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206bf83f415b0579fd885fe0804eb828e727636657dc1bf73d80d2f1218e14a1" +dependencies = [ + "async-io", + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "futures-util", + "once_cell", + "parking_lot 0.12.1", + "quanta", + "rustc_version", + "scheduled-thread-pool", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "monostate" version = "0.1.8" @@ -2327,6 +2452,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" + [[package]] name = "parking_lot" version = "0.11.2" @@ -2338,6 +2469,16 @@ dependencies = [ "parking_lot_core 0.8.6", ] +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core 0.9.8", +] + [[package]] name = "parking_lot_core" version = "0.8.6" @@ -2515,6 +2656,22 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + [[package]] name = "portable-atomic" version = "1.4.0" @@ -2677,6 +2834,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "pulldown-cmark" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a1a2f1f0a7ecff9c31abbe177637be0e97a0aef46cf8738ece09327985d998" +dependencies = [ + "bitflags 1.3.2", + "memchr", + "unicase", +] + [[package]] name = "quanta" version = "0.11.1" @@ -3014,6 +3182,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot 0.12.1", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -3217,6 +3394,9 @@ name = "semver" version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" +dependencies = [ + "serde", +] [[package]] name = "serde" @@ -3364,6 +3544,21 @@ version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "sketches-ddsketch" version = "0.2.1" @@ -3627,6 +3822,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" @@ -4054,6 +4255,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "triomphe" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee8098afad3fb0c54a9007aab6804558410503ad676d4633f9c2559a00ac0f" + [[package]] name = "try-lock" version = "0.2.4" @@ -4091,6 +4298,15 @@ dependencies = [ "rand", ] +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.13" @@ -4218,6 +4434,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "walkdir" version = "2.3.3" diff --git a/cronback-lib/Cargo.toml b/cronback-lib/Cargo.toml index 3481920..bc42645 100644 --- a/cronback-lib/Cargo.toml +++ b/cronback-lib/Cargo.toml @@ -50,7 +50,7 @@ validator = { workspace = true } # Unique Dependencies config = { version = "0.13", default-features = false, features = ["toml"] } notify = { version = "6.0.1" } -notify-debouncer-mini = { version = "0.3.0" } +notify-debouncer-mini = { version = "0.3.0" } hyper = "0.14.24" tonic-reflection = "0.9.0" url = { workspace = true, features = ["serde"] } @@ -65,3 +65,5 @@ sea-orm-migration = { version = "0.11.0", features = [ "sqlx-sqlite", # `DATABASE_DRIVER` feature "sqlx-postgres", # `DATABASE_DRIVER` feature ] } + +moka = { version = "0.11.2", features = ["future"] } diff --git a/cronback-lib/lib.rs b/cronback-lib/lib.rs index e9eb844..2d09441 100644 --- a/cronback-lib/lib.rs +++ b/cronback-lib/lib.rs @@ -4,6 +4,7 @@ mod database; mod grpc_client_provider; mod grpc_helpers; mod model; +mod project_settings; mod rpc_middleware; mod shutdown; mod types; @@ -28,6 +29,7 @@ pub mod prelude { pub use crate::ext::*; pub use crate::grpc_helpers::*; pub use crate::model::*; + pub use crate::project_settings::*; pub use crate::service::*; pub use crate::types::*; } diff --git a/cronback-lib/project_settings.rs b/cronback-lib/project_settings.rs new file mode 100644 index 0000000..ad7810e --- /dev/null +++ b/cronback-lib/project_settings.rs @@ -0,0 +1,115 @@ +use std::sync::Arc; +use std::time::Duration; + +use futures::Future; +use moka::future::Cache; +use moka::Expiry; +use thiserror::Error; + +use crate::clients::ScopedMetadataSvcClient; +use crate::model::ValidShardedId; +use crate::types::{ProjectId, RequestId}; +use crate::GrpcClientFactory; + +type MetadataClientFactory = Arc< + Box + Send>, +>; + +#[derive(Error, Debug, Clone)] +pub enum ProjectSettingError { + // Optimally this should wrap a `GrpcClientError` but unfortauntly, it's + // not `Clone` and it's not possible to make it Clone. Error being `Clone` + // is a requirement for the cache though. + #[error("failed to create metadata svc client: {0}")] + Client(String), + #[error("setting grpc called failed with code {0}")] + Server(#[from] tonic::Status), +} + +/// A TTL-based read-through async project setting cache. A single instance of +/// this struct caches only a single setting type. +/// +/// Reading from the cache will return the value immediately if it exists, and +/// will fetch it if it doesn't exist. Notes: +/// 1. If multiple concurrent callers request the same uncached value, only one +/// of them will fetch it and the rest will wait for the result. +/// 2. If the cached value exceeded the TTL, it will get refetched on the next +/// read. +/// 3. If the fetch from source fails, waiting callers will get fulfilled with +/// the failed Result. However, the value will get refetched on the next read +/// attempt (even if it didn't exceed the TTL). +/// 4. Cache evictions happen in the background in a separate thread pool. +pub struct ProjectSetting { + client_factory: MetadataClientFactory, + fetcher: F, + cache: Cache>, +} + +impl ProjectSetting +where + V: Send + Clone + Sync + 'static, + F: Fn(&ValidShardedId, ScopedMetadataSvcClient) -> Fut, + Fut: Future>, +{ + pub fn new( + client_factory: MetadataClientFactory, + fetcher: F, + ttl: Duration, + ) -> Self { + Self { + client_factory, + fetcher, + cache: Cache::builder() + .expire_after(ExpirationPolicy { ttl }) + .build(), + } + } + + /// Returns the setting value if it's cached, and requests it if it's not in + /// the cache. If multiple concurrent calls attempt to read the same key, + /// it'll only be fetched once and returned to all the callers. + pub async fn get( + &self, + project_id: &ValidShardedId, + ) -> Result { + self.cache + .get_with_by_ref(project_id.inner(), async move { + let client = self + .client_factory + .get_client(&RequestId::new(), project_id) + .await + .map_err(|e| ProjectSettingError::Client(e.to_string()))?; + Ok((self.fetcher)(project_id, client).await?) + }) + .await + } + + /// Returns the value if it exists in the cache and didn't expire, and None + /// otherwise. + pub async fn get_cached( + &self, + project_id: &ValidShardedId, + ) -> Option> { + self.cache.get(project_id) + } +} + +struct ExpirationPolicy { + ttl: Duration, +} + +impl Expiry> for ExpirationPolicy { + fn expire_after_create( + &self, + _key: &ProjectId, + value: &Result, + _current_time: std::time::Instant, + ) -> Option { + if value.is_ok() { + Some(self.ttl) + } else { + // If the cached value is an error, let it expire asap. + Some(Duration::default()) + } + } +}