From 9871aca5ab6156662d7511940f8e8906f3fce0e2 Mon Sep 17 00:00:00 2001 From: 0xmountaintop <37070449+0xmountaintop@users.noreply.github.com> Date: Sat, 9 Nov 2024 02:21:51 +1100 Subject: [PATCH] feat: add task cache (#49) --- .gitignore | 1 + Cargo.lock | 161 +++++++++++++++++- Cargo.toml | 1 + src/config.rs | 16 +- src/coordinator_handler/coordinator_client.rs | 2 +- src/db.rs | 64 +++++++ src/prover/builder.rs | 2 + src/prover/mod.rs | 28 +++ 8 files changed, 265 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 5c45118..7a4f418 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ target/ config.json keys/ +db/ # Helm chart related *.lock diff --git a/Cargo.lock b/Cargo.lock index 337a04c..194fc60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -516,6 +516,26 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.64.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4243e6031260db77ede97ad86c27e501d646a27ab57b59a574f725d98ab1fb4" +dependencies = [ + "bitflags 1.3.2", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "peeking_take_while", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 1.0.109", +] + [[package]] name = "bit-set" version = "0.5.3" @@ -693,6 +713,17 @@ dependencies = [ "serde", ] +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "c-kzg" version = "1.0.2" @@ -709,13 +740,22 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.98" +version = "1.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41c270e7540d725e65ac7f1b212ac8ce349719624d7bcff99f8e2e488e8cf03f" +checksum = "baee610e9452a8f6f0a1b6194ec09ff9e2d85dea54432acdae41aa0761c95d70" dependencies = [ "jobserver", "libc", - "once_cell", + "shlex", +] + +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", ] [[package]] @@ -754,6 +794,17 @@ dependencies = [ "inout", ] +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "4.5.4" @@ -2484,18 +2535,61 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +[[package]] +name = "libloading" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" +dependencies = [ + "cfg-if 1.0.0", + "windows-targets 0.52.5", +] + [[package]] name = "libm" version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "librocksdb-sys" +version = "0.10.0+7.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fe4d5874f5ff2bc616e55e8c6086d478fcda13faf9495768a4aa1c22042d30b" +dependencies = [ + "bindgen", + "bzip2-sys", + "cc", + "glob", + "libc", + "libz-sys", + "lz4-sys", + "zstd-sys 2.0.13+zstd.1.5.6", +] + +[[package]] +name = "libz-sys" +version = "1.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2d16453e800a8cf6dd2fc3eb4bc99b786a9b90c663b8559a5b1a041bf89e472" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -2545,6 +2639,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "lz4-sys" +version = "1.11.1+lz4-1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "matchers" version = "0.1.0" @@ -2582,6 +2686,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.7.3" @@ -2665,6 +2775,16 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -3032,6 +3152,12 @@ dependencies = [ "hmac", ] +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "percent-encoding" version = "2.3.1" @@ -3670,6 +3796,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "rocksdb" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "015439787fce1e75d55f279078d33ff14b4af5d93d995e8838ee4631301c8a99" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "ruint" version = "1.12.1" @@ -3905,6 +4041,7 @@ dependencies = [ "reqwest-middleware", "reqwest-retry", "rlp", + "rocksdb", "serde", "serde_json", "sled", @@ -4204,6 +4341,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -5418,7 +5561,7 @@ name = "zstd-safe" version = "7.0.0" source = "git+https://github.com/scroll-tech/zstd-rs?branch=hack/mul-block#5c0892b6567dab31394d701477183ce9d6a32aca" dependencies = [ - "zstd-sys", + "zstd-sys 2.0.9+zstd.1.5.5", ] [[package]] @@ -5429,3 +5572,13 @@ dependencies = [ "cc", "pkg-config", ] + +[[package]] +name = "zstd-sys" +version = "2.0.13+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38ff0f21cfee8f97d94cef41359e0c89aa6113028ab0291aa8ca0038995a95aa" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index fdbaa87..5d5ec8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,3 +46,4 @@ tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } axum = "0.6.0" dotenv = "0.15" +rocksdb = "0.20" diff --git a/src/config.rs b/src/config.rs index 361c57c..9076bcd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -8,6 +8,7 @@ use std::fs::File; pub struct Config { pub prover_name_prefix: String, pub keys_dir: String, + pub db_path: String, pub coordinator: CoordinatorConfig, pub l2geth: Option, pub prover: ProverConfig, @@ -81,11 +82,13 @@ impl Config { } fn get_env_var(key: &str) -> anyhow::Result> { - Ok(std::env::var_os(key).map(|val| { - val.to_str() - .ok_or_else(|| anyhow::anyhow!("{key} env var is not valid UTF-8")) - .map(String::from) - }).transpose()?) + Ok(std::env::var_os(key) + .map(|val| { + val.to_str() + .ok_or_else(|| anyhow::anyhow!("{key} env var is not valid UTF-8")) + .map(String::from) + }) + .transpose()?) } fn override_with_env(&mut self) -> anyhow::Result<()> { @@ -118,6 +121,9 @@ impl Config { cloud.api_key = val; } } + if let Some(val) = Self::get_env_var("DB_PATH")? { + self.db_path = val; + } Ok(()) } diff --git a/src/coordinator_handler/coordinator_client.rs b/src/coordinator_handler/coordinator_client.rs index 594654e..82c8cbb 100644 --- a/src/coordinator_handler/coordinator_client.rs +++ b/src/coordinator_handler/coordinator_client.rs @@ -10,7 +10,7 @@ pub struct CoordinatorClient { vks: Vec, circuit_version: String, pub prover_name: String, - key_signer: KeySigner, + pub key_signer: KeySigner, api: Api, token: Mutex>, } diff --git a/src/db.rs b/src/db.rs index 8b13789..ad90f76 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1 +1,65 @@ +use crate::coordinator_handler::GetTaskResponseData; +use rocksdb::DB; +pub struct Db { + db: DB, +} + +impl Db { + pub fn new(path: &str) -> anyhow::Result { + let db = DB::open_default(path)?; + Ok(Self { db }) + } + + pub fn get_coordinator_task_by_public_key( + &self, + public_key: String, + ) -> Option { + self.db + .get(fmt_coordinator_task_key(public_key)) + .ok()? + .as_ref() + .map(|v| serde_json::from_slice(v).ok()) + .flatten() + } + + pub fn get_proving_task_id_by_public_key(&self, public_key: String) -> Option { + self.db + .get(fmt_proving_task_id_key(public_key)) + .ok()? + .map(|v| String::from_utf8(v).ok()) + .flatten() + } + + pub fn set_coordinator_task_by_public_key( + &self, + public_key: String, + coordinator_task: &GetTaskResponseData, + ) { + let _ = serde_json::to_vec(coordinator_task) + .map(|bytes| self.db.put(fmt_coordinator_task_key(public_key), bytes)); + } + + pub fn set_proving_task_id_by_public_key(&self, public_key: String, proving_task_id: String) { + let _ = self.db.put( + fmt_proving_task_id_key(public_key), + proving_task_id.as_bytes(), + ); + } + + pub fn delete_coordinator_task_by_public_key(&self, public_key: String) { + let _ = self.db.delete(fmt_coordinator_task_key(public_key)); + } + + pub fn delete_proving_task_id_by_public_key(&self, public_key: String) { + let _ = self.db.delete(fmt_proving_task_id_key(public_key)); + } +} + +fn fmt_coordinator_task_key(public_key: String) -> String { + format!("last_coordinator_task_{}", public_key) +} + +fn fmt_proving_task_id_key(public_key: String) -> String { + format!("last_proving_task_id_{}", public_key) +} diff --git a/src/prover/builder.rs b/src/prover/builder.rs index 80678a0..aa1f2d1 100644 --- a/src/prover/builder.rs +++ b/src/prover/builder.rs @@ -2,6 +2,7 @@ use super::CircuitType; use crate::{ config::Config, coordinator_handler::{CoordinatorClient, KeySigner}, + db::Db, prover::{ proving_service::{GetVkRequest, ProvingService}, Prover, @@ -93,6 +94,7 @@ impl ProverBuilder { proving_service: self.proving_service.unwrap(), n_workers: self.cfg.prover.n_workers, health_listener_addr: self.cfg.health_listener_addr, + db: Db::new(&self.cfg.db_path)?, }) } } diff --git a/src/prover/mod.rs b/src/prover/mod.rs index e262254..9c8ce73 100644 --- a/src/prover/mod.rs +++ b/src/prover/mod.rs @@ -6,6 +6,7 @@ use crate::{ ChunkTaskDetail, CoordinatorClient, ErrorCode, GetTaskRequest, GetTaskResponseData, ProofFailureType, ProofStatus, SubmitProofRequest, }, + db::Db, tracing_handler::L2gethClient, }; use axum::{routing::get, Router}; @@ -28,6 +29,7 @@ pub struct Prover { proving_service: Box, n_workers: usize, health_listener_addr: String, + db: Db, } impl Prover { @@ -84,6 +86,17 @@ impl Prover { } async fn handle_task(&self, coordinator_client: &CoordinatorClient) -> anyhow::Result<()> { + let public_key = coordinator_client.key_signer.get_public_key(); + if let (Some(coordinator_task), Some(proving_task_id)) = ( + self.db + .get_coordinator_task_by_public_key(public_key.clone()), + self.db.get_proving_task_id_by_public_key(public_key), + ) { + return self + .handle_proving_progress(coordinator_client, &coordinator_task, proving_task_id) + .await; + } + let coordinator_task = self.get_coordinator_task(coordinator_client).await?; let proving_task = self.request_proving(&coordinator_task).await?; self.handle_proving_progress(coordinator_client, &coordinator_task, proving_task.task_id) @@ -137,6 +150,7 @@ impl Prover { proving_service_task_id: String, ) -> anyhow::Result<()> { let prover_name = &coordinator_client.prover_name; + let public_key = &coordinator_client.key_signer.get_public_key(); let task_type = coordinator_task.task_type; let coordinator_task_uuid = &coordinator_task.uuid; let coordinator_task_id = &coordinator_task.task_id; @@ -160,6 +174,12 @@ impl Prover { status = ?task.status, "Task status update" ); + self.db + .set_coordinator_task_by_public_key(public_key.clone(), coordinator_task); + self.db.set_proving_task_id_by_public_key( + public_key.clone(), + proving_service_task_id.clone(), + ); sleep(Duration::from_secs(WORKER_SLEEP_SEC)).await; } TaskStatus::Success => { @@ -179,6 +199,10 @@ impl Prover { None, ) .await?; + self.db + .delete_coordinator_task_by_public_key(public_key.clone()); + self.db + .delete_proving_task_id_by_public_key(public_key.clone()); break; } TaskStatus::Failed => { @@ -200,6 +224,10 @@ impl Prover { Some(task_err), ) .await?; + self.db + .delete_coordinator_task_by_public_key(public_key.clone()); + self.db + .delete_proving_task_id_by_public_key(public_key.clone()); break; } }