diff --git a/.github/workflows/actions.yaml b/.github/workflows/actions.yaml index 004473b..5bf8b49 100644 --- a/.github/workflows/actions.yaml +++ b/.github/workflows/actions.yaml @@ -38,6 +38,8 @@ jobs: version: "8" run_install: true # build, test, and package sqlsync + - name: Lint & Fmt + run: just lint - name: Build all run: just build - name: Unit tests diff --git a/.rustfmt.toml b/.rustfmt.toml index 82cc2ba..3f21afb 100644 --- a/.rustfmt.toml +++ b/.rustfmt.toml @@ -1,4 +1,3 @@ # https://rust-lang.github.io/rustfmt -overflow_delimited_expr = true -max_width = 80 -struct_lit_width = 80 +max_width = 100 +struct_lit_width = 40 diff --git a/demo/cloudflare-backend/src/coordinator.rs b/demo/cloudflare-backend/src/coordinator.rs index 696d542..dee422f 100644 --- a/demo/cloudflare-backend/src/coordinator.rs +++ b/demo/cloudflare-backend/src/coordinator.rs @@ -37,8 +37,7 @@ impl Coordinator { console_log!("creating new document with id {}", id); - let mut storage = MemoryJournal::open(id) - .map_err(|e| Error::RustError(e.to_string()))?; + let mut storage = MemoryJournal::open(id).map_err(|e| Error::RustError(e.to_string()))?; // load the persistence layer let persistence = Persistence::init(state.storage()).await?; @@ -55,7 +54,11 @@ impl Coordinator { Ok(( Self { accept_queue: accept_queue_tx }, - CoordinatorTask { accept_queue: accept_queue_rx, persistence, doc }, + CoordinatorTask { + accept_queue: accept_queue_rx, + persistence, + doc, + }, )) } @@ -189,10 +192,7 @@ impl Client { (Self { protocol, writer }, reader) } - async fn start_replication( - &mut self, - doc: &Document, - ) -> anyhow::Result<()> { + async fn start_replication(&mut self, doc: &Document) -> anyhow::Result<()> { let msg = self.protocol.start(doc); self.send_msg(msg).await } @@ -223,12 +223,9 @@ impl Client { match msg { Ok(Message::Bytes(bytes)) => { let mut cursor = Cursor::new(bytes); - let msg: ReplicationMsg = - bincode::deserialize_from(&mut cursor)?; + let msg: ReplicationMsg = bincode::deserialize_from(&mut cursor)?; console_log!("received message {:?}", msg); - if let Some(resp) = - self.protocol.handle(doc, msg, &mut cursor)? - { + if let Some(resp) = self.protocol.handle(doc, msg, &mut cursor)? { self.send_msg(resp).await?; } Ok(()) diff --git a/demo/cloudflare-backend/src/lib.rs b/demo/cloudflare-backend/src/lib.rs index cbed093..1f6500e 100644 --- a/demo/cloudflare-backend/src/lib.rs +++ b/demo/cloudflare-backend/src/lib.rs @@ -27,8 +27,7 @@ impl DurableObject for DocumentCoordinator { async fn fetch(&mut self, req: Request) -> Result { // check that the Upgrade header is set and == "websocket" - let is_upgrade_req = - req.headers().get("Upgrade")?.unwrap_or("".into()) == "websocket"; + let is_upgrade_req = req.headers().get("Upgrade")?.unwrap_or("".into()) == "websocket"; if !is_upgrade_req { return Response::error("Bad Request", 400); } @@ -37,11 +36,10 @@ impl DurableObject for DocumentCoordinator { if self.coordinator.is_none() { // retrieve the reducer digest from the request url let url = req.url()?; - let reducer_digest = - match url.query_pairs().find(|(k, _)| k == "reducer") { - Some((_, v)) => v, - None => return Response::error("Bad Request", 400), - }; + let reducer_digest = match url.query_pairs().find(|(k, _)| k == "reducer") { + Some((_, v)) => v, + None => return Response::error("Bad Request", 400), + }; let bucket = self.env.bucket(REDUCER_BUCKET)?; let object = bucket .get(format!("{}.wasm", reducer_digest)) @@ -51,27 +49,19 @@ impl DurableObject for DocumentCoordinator { Some(object) => { object .body() - .ok_or_else(|| { - Error::RustError( - "reducer not found in bucket".to_string(), - ) - })? + .ok_or_else(|| Error::RustError("reducer not found in bucket".to_string()))? .bytes() .await? } None => { return Response::error( - format!( - "reducer {} not found in bucket", - reducer_digest - ), + format!("reducer {} not found in bucket", reducer_digest), 404, ) } }; - let (coordinator, task) = - Coordinator::init(&self.state, reducer_bytes).await?; + let (coordinator, task) = Coordinator::init(&self.state, reducer_bytes).await?; spawn_local(task.into_task()); self.coordinator = Some(coordinator); } @@ -106,11 +96,10 @@ async fn main(req: Request, env: Env, _ctx: Context) -> Result { // upload a reducer to the bucket let bucket = ctx.bucket(REDUCER_BUCKET)?; - let data_len: u64 = - match req.headers().get("Content-Length")?.map(|s| s.parse()) { - Some(Ok(len)) => len, - _ => return Response::error("Bad Request", 400), - }; + let data_len: u64 = match req.headers().get("Content-Length")?.map(|s| s.parse()) { + Some(Ok(len)) => len, + _ => return Response::error("Bad Request", 400), + }; if data_len > 10 * 1024 * 1024 { return Response::error("Payload Too Large", 413); } @@ -130,10 +119,8 @@ async fn main(req: Request, env: Env, _ctx: Context) -> Result { .subtle(); // sha256 sum the data and convert to bs58 - let digest = JsFuture::from( - subtle.digest_with_str_and_buffer_source("SHA-256", &data)?, - ) - .await?; + let digest = + JsFuture::from(subtle.digest_with_str_and_buffer_source("SHA-256", &data)?).await?; // convert digest to base58 let digest = bs58::encode(Uint8Array::new(&digest).to_vec()) @@ -164,8 +151,7 @@ async fn main(req: Request, env: Env, _ctx: Context) -> Result { if let Some(name) = ctx.param("name") { let namespace = ctx.durable_object(DURABLE_OBJECT_NAME)?; // until SQLSync is stable, named doc resolution will periodically break when we increment this counter - let id = - namespace.id_from_name(&format!("sqlsync-1-{}", name))?; + let id = namespace.id_from_name(&format!("sqlsync-1-{}", name))?; let id = object_id_to_journal_id(id)?; Response::ok(id.to_base58()) } else { @@ -176,15 +162,11 @@ async fn main(req: Request, env: Env, _ctx: Context) -> Result { if let Some(id) = ctx.param("id") { console_log!("forwarding request to document with id: {}", id); let namespace = ctx.durable_object(DURABLE_OBJECT_NAME)?; - let id = JournalId::from_base58(&id) - .map_err(|e| Error::RustError(e.to_string()))?; + let id = JournalId::from_base58(id).map_err(|e| Error::RustError(e.to_string()))?; let id = match namespace.id_from_string(&id.to_hex()) { Ok(id) => id, Err(e) => { - return Response::error( - format!("Invalid Durable Object ID: {}", e), - 400, - ) + return Response::error(format!("Invalid Durable Object ID: {}", e), 400) } }; let stub = id.get_stub()?; diff --git a/examples/reducer-guestbook/Cargo.toml b/examples/reducer-guestbook/Cargo.toml index da34ca6..e7efe9d 100644 --- a/examples/reducer-guestbook/Cargo.toml +++ b/examples/reducer-guestbook/Cargo.toml @@ -10,11 +10,6 @@ version.workspace = true [lib] crate-type = ["cdylib"] -[profile.release] -lto = true -strip = "debuginfo" -codegen-units = 1 - [dependencies] sqlsync-reducer = "0.2" serde = { version = "1.0", features = ["derive"] } diff --git a/justfile b/justfile index 83a3164..4d1a4cd 100644 --- a/justfile +++ b/justfile @@ -3,6 +3,10 @@ SQLSYNC_PROD_URL := "https://sqlsync.orbitinghail.workers.dev" default: @just --choose +lint: + cargo clippy --all-targets --all-features -- -D warnings + cargo fmt --check + unit-test: cargo test diff --git a/lib/sqlsync-reducer/examples/guest.rs b/lib/sqlsync-reducer/examples/guest.rs index 5e30f8a..73ad242 100644 --- a/lib/sqlsync-reducer/examples/guest.rs +++ b/lib/sqlsync-reducer/examples/guest.rs @@ -17,8 +17,7 @@ async fn reducer(mutation: Vec) -> Result<(), ReducerError> { log::info!("running query and execute at the same time"); let x: Option = None; - let query_future = - query!("SELECT * FROM foo WHERE bar = ?", "baz", 1, 1.23, x); + let query_future = query!("SELECT * FROM foo WHERE bar = ?", "baz", 1, 1.23, x); let exec_future = execute!("SELECT * FROM foo WHERE bar = ?", "baz"); let (result, result2) = futures::join!(query_future, exec_future); diff --git a/lib/sqlsync-reducer/examples/host.rs b/lib/sqlsync-reducer/examples/host.rs index a41996f..99e8106 100644 --- a/lib/sqlsync-reducer/examples/host.rs +++ b/lib/sqlsync-reducer/examples/host.rs @@ -22,9 +22,8 @@ fn main() -> anyhow::Result<()> { .init()?; // build guest.wasm using: `cargo build --target wasm32-unknown-unknown --example guest` - let wasm_bytes = include_bytes!( - "../../../target/wasm32-unknown-unknown/debug/examples/guest.wasm" - ); + let wasm_bytes = + include_bytes!("../../../target/wasm32-unknown-unknown/debug/examples/guest.wasm"); let engine = Engine::default(); let module = Module::new(&engine, &wasm_bytes[..])?; @@ -33,12 +32,11 @@ fn main() -> anyhow::Result<()> { register_log_handler(&mut linker)?; let mut store = Store::new(&engine, WasmFFI::uninitialized()); - let instance = - linker.instantiate(&mut store, &module)?.start(&mut store)?; + let instance = linker.instantiate(&mut store, &module)?.start(&mut store)?; // initialize the FFI let ffi = WasmFFI::initialized(&store, &instance)?; - (*store.data_mut()) = ffi.clone(); + (*store.data_mut()) = ffi; // initialize the reducer ffi.init_reducer(&mut store)?; @@ -70,20 +68,16 @@ fn main() -> anyhow::Result<()> { if sql == "FAIL" { let ptr = ffi.encode( &mut store, - &Err::( - ErrorResponse::SqliteError { - code: 1, - message: "error".to_string(), - }, - ), + &Err::(ErrorResponse::SqliteError { + code: 1, + message: "error".to_string(), + }), )?; responses.insert(id, ptr); } else { let ptr = ffi.encode( &mut store, - &Ok::<_, ErrorResponse>(ExecResponse { - changes: 1, - }), + &Ok::<_, ErrorResponse>(ExecResponse { changes: 1 }), )?; responses.insert(id, ptr); } diff --git a/lib/sqlsync-reducer/src/guest_ffi.rs b/lib/sqlsync-reducer/src/guest_ffi.rs index 2c35dc1..9315bbf 100644 --- a/lib/sqlsync-reducer/src/guest_ffi.rs +++ b/lib/sqlsync-reducer/src/guest_ffi.rs @@ -13,25 +13,20 @@ pub fn fbm() -> &'static mut FFIBufManager { static ONCE: Once = Once::new(); unsafe { ONCE.call_once(|| { - let singleton = FFIBufManager::new(); + let singleton = FFIBufManager::default(); SINGLETON.write(singleton); }); SINGLETON.assume_init_mut() } } +#[derive(Default)] pub struct FFIBufManager { // map from pointer to buffer to length of buffer bufs: BTreeMap, } impl FFIBufManager { - pub fn new() -> Self { - Self { - bufs: BTreeMap::new(), - } - } - pub fn alloc(&mut self, len: FFIBufLen) -> FFIBufPtr { let mut buf = Vec::with_capacity(len as usize); let ptr = buf.as_mut_ptr(); @@ -40,7 +35,11 @@ impl FFIBufManager { ptr } - pub fn dealloc(&mut self, ptr: FFIBufPtr) { + /// frees the memory pointed to by ptr + /// + /// # Safety + /// The pointer must have been allocated by FFIBufManager::alloc. + pub unsafe fn dealloc(&mut self, ptr: FFIBufPtr) { self.consume(ptr); // immediately drops the vec, freeing the memory } @@ -49,9 +48,13 @@ impl FFIBufManager { *self.bufs.get(&ptr).unwrap() } - pub fn consume(&mut self, ptr: FFIBufPtr) -> FFIBuf { + /// consumes the buffer pointed to by ptr and returns a Vec with the same contents. + /// + /// # Safety + /// The pointer must have been allocated by FFIBufManager::alloc. + pub unsafe fn consume(&mut self, ptr: FFIBufPtr) -> FFIBuf { let len = self.bufs.remove(&ptr).unwrap(); - unsafe { Vec::from_raw_parts(ptr, len as usize, len as usize) } + Vec::from_raw_parts(ptr, len as usize, len as usize) } pub fn encode(&mut self, data: &T) -> Result { @@ -62,7 +65,20 @@ impl FFIBufManager { Ok(ptr) } - pub fn decode(&mut self, ptr: FFIBufPtr) -> Result { + /// decode will consume the raw memory pointed to by ptr and return a deserialized object. + /// After calling decode, manually deallocating the ptr is no longer needed. + /// + /// # Errors + /// + /// This function will return an error if deserialization fails. If this + /// happens the memory pointed to by the ptr will also be dropped. + /// + /// # Safety + /// The pointer must have been allocated by FFIBufManager::alloc. + pub unsafe fn decode( + &mut self, + ptr: FFIBufPtr, + ) -> Result { let buf = self.consume(ptr); bincode::deserialize(&buf) } @@ -73,8 +89,12 @@ pub fn ffi_buf_allocate(length: FFIBufLen) -> FFIBufPtr { fbm().alloc(length) } +/// ffi_buf_deallocate will immediately drop the buffer pointed to by the pointer, freeing the memory +/// +/// # Safety +/// The pointer must have been allocated by ffi_buf_allocate or FFIBufManager::alloc. #[no_mangle] -pub fn ffi_buf_deallocate(ptr: FFIBufPtr) { +pub unsafe fn ffi_buf_deallocate(ptr: FFIBufPtr) { fbm().dealloc(ptr) } diff --git a/lib/sqlsync-reducer/src/guest_reactor.rs b/lib/sqlsync-reducer/src/guest_reactor.rs index eafe46b..df08f81 100644 --- a/lib/sqlsync-reducer/src/guest_reactor.rs +++ b/lib/sqlsync-reducer/src/guest_reactor.rs @@ -12,8 +12,8 @@ use serde::de::DeserializeOwned; use crate::{ guest_ffi::{fbm, FFIBufPtr}, types::{ - ErrorResponse, ExecResponse, QueryResponse, ReducerError, Request, - RequestId, Requests, Responses, SqliteValue, + ErrorResponse, ExecResponse, QueryResponse, ReducerError, Request, RequestId, Requests, + Responses, SqliteValue, }, }; @@ -31,6 +31,7 @@ pub fn reactor() -> &'static mut Reactor { type ReducerTask = Pin>>>; +#[derive(Default)] pub struct Reactor { task: Option, request_id_generator: RequestId, @@ -43,12 +44,7 @@ pub struct Reactor { impl Reactor { pub fn new() -> Self { - Self { - task: None, - request_id_generator: 0, - requests: None, - responses: None, - } + Reactor::default() } fn queue_request(&mut self, request: Request) -> RequestId { @@ -60,14 +56,14 @@ impl Reactor { id } - fn get_response( - &mut self, - id: RequestId, - ) -> Option { + fn get_response(&mut self, id: RequestId) -> Option { self.responses .as_mut() .and_then(|b| b.remove(&id)) - .map(|ptr| fbm().decode(ptr as *mut u8).unwrap()) + .map(|ptr| { + let f = fbm(); + unsafe { f.decode(ptr as *mut u8).unwrap() } + }) } pub fn spawn(&mut self, task: ReducerTask) { @@ -77,10 +73,7 @@ impl Reactor { self.task = Some(task); } - pub fn step( - &mut self, - responses: Responses, - ) -> Result { + pub fn step(&mut self, responses: Responses) -> Result { if let Some(ref mut previous) = self.responses { // if we still have previous responses, merge new responses in // this replaces keys in previous with those in next - as long @@ -166,8 +159,14 @@ macro_rules! execute { macro_rules! init_reducer { // fn should be (Vec) -> Future> ($fn:ident) => { + /// ffi_reduce is called by the host to cause the reducer to start processing a new mutation. + /// + /// # Panics + /// Panics if the host passes in an invalid pointer. + /// # Safety + /// The host must pass in a valid pointer to a Mutation buffer. #[no_mangle] - pub fn ffi_reduce( + pub unsafe fn ffi_reduce( mutation_ptr: sqlsync_reducer::guest_ffi::FFIBufPtr, ) -> sqlsync_reducer::guest_ffi::FFIBufPtr { let reactor = sqlsync_reducer::guest_reactor::reactor(); @@ -191,8 +190,15 @@ macro_rules! init_reducer { }; } +/// ffi_reactor_step is called by the host to advance the reactor forward. +/// +/// # Panics +/// Panics if the host passes in an invalid pointer. +/// +/// # Safety +/// The host must pass in a valid pointer to a serialized Responses object. #[no_mangle] -pub fn ffi_reactor_step(responses_ptr: FFIBufPtr) -> FFIBufPtr { +pub unsafe fn ffi_reactor_step(responses_ptr: FFIBufPtr) -> FFIBufPtr { let fbm = fbm(); let responses = fbm.decode(responses_ptr).unwrap(); let out = reactor().step(responses); diff --git a/lib/sqlsync-reducer/src/host_ffi.rs b/lib/sqlsync-reducer/src/host_ffi.rs index 2e21e59..6687d1a 100644 --- a/lib/sqlsync-reducer/src/host_ffi.rs +++ b/lib/sqlsync-reducer/src/host_ffi.rs @@ -31,31 +31,19 @@ impl WasmFFI { Self::Uninitialized } - pub fn initialized( - store: &impl AsContext, - instance: &Instance, - ) -> Result { + pub fn initialized(store: &impl AsContext, instance: &Instance) -> Result { let memory = instance .get_memory(store, "memory") .ok_or(WasmFFIError::MemoryNotFound)?; - let ffi_buf_allocate = instance - .get_typed_func::( - store, - "ffi_buf_allocate", - )?; - let ffi_buf_deallocate = instance - .get_typed_func::(store, "ffi_buf_deallocate")?; - let ffi_buf_len = instance - .get_typed_func::(store, "ffi_buf_len")?; - let ffi_init_reducer = - instance.get_typed_func::<(), ()>(store, "ffi_init_reducer")?; - let ffi_reduce = instance - .get_typed_func::(store, "ffi_reduce")?; - let ffi_reactor_step = instance - .get_typed_func::( - store, - "ffi_reactor_step", - )?; + let ffi_buf_allocate = + instance.get_typed_func::(store, "ffi_buf_allocate")?; + let ffi_buf_deallocate = + instance.get_typed_func::(store, "ffi_buf_deallocate")?; + let ffi_buf_len = instance.get_typed_func::(store, "ffi_buf_len")?; + let ffi_init_reducer = instance.get_typed_func::<(), ()>(store, "ffi_init_reducer")?; + let ffi_reduce = instance.get_typed_func::(store, "ffi_reduce")?; + let ffi_reactor_step = + instance.get_typed_func::(store, "ffi_reactor_step")?; Ok(Self::Initialized { memory, @@ -76,10 +64,7 @@ impl WasmFFI { match self { Self::Uninitialized => Err(WasmFFIError::Uninitialized), Self::Initialized { - memory, - ffi_buf_deallocate, - ffi_buf_len, - .. + memory, ffi_buf_deallocate, ffi_buf_len, .. } => { let len = ffi_buf_len.call(&mut store, ptr)?; let mem = memory.data(&store); @@ -90,11 +75,7 @@ impl WasmFFI { } } - fn persist( - &self, - mut store: impl AsContextMut, - buf: &[u8], - ) -> Result { + fn persist(&self, mut store: impl AsContextMut, buf: &[u8]) -> Result { match self { Self::Uninitialized => Err(WasmFFIError::Uninitialized), Self::Initialized { memory, ffi_buf_allocate, .. } => { @@ -125,15 +106,10 @@ impl WasmFFI { self.persist(&mut store, &bytes) } - pub fn init_reducer( - &self, - mut ctx: impl AsContextMut, - ) -> Result<(), WasmFFIError> { + pub fn init_reducer(&self, mut ctx: impl AsContextMut) -> Result<(), WasmFFIError> { match self { Self::Uninitialized => Err(WasmFFIError::Uninitialized), - Self::Initialized { ffi_init_reducer, .. } => { - Ok(ffi_init_reducer.call(&mut ctx, ())?) - } + Self::Initialized { ffi_init_reducer, .. } => Ok(ffi_init_reducer.call(&mut ctx, ())?), } } @@ -163,8 +139,7 @@ impl WasmFFI { Self::Uninitialized => Err(WasmFFIError::Uninitialized), Self::Initialized { ffi_reactor_step, .. } => { let responses_ptr = self.encode(&mut ctx, responses)?; - let requests_ptr = - ffi_reactor_step.call(&mut ctx, responses_ptr)?; + let requests_ptr = ffi_reactor_step.call(&mut ctx, responses_ptr)?; let requests: Result = self.decode(&mut ctx, requests_ptr)?; Ok(requests?) @@ -205,9 +180,7 @@ impl From for WasmFFIError { } } -pub fn register_log_handler( - linker: &mut Linker, -) -> Result<(), LinkerError> { +pub fn register_log_handler(linker: &mut Linker) -> Result<(), LinkerError> { linker.func_wrap( "env", "host_log", diff --git a/lib/sqlsync-reducer/src/types.rs b/lib/sqlsync-reducer/src/types.rs index 32f7b74..8a698dc 100644 --- a/lib/sqlsync-reducer/src/types.rs +++ b/lib/sqlsync-reducer/src/types.rs @@ -107,7 +107,7 @@ impl LogRecord { log::logger().log( &log::Record::builder() .level(Level::from_str(&self.level).unwrap_or(Level::Error)) - .file(self.file.as_ref().map(|s| s.as_str())) + .file(self.file.as_deref()) .line(self.line) .module_path(Some("wasm guest")) .args(format_args!("{}", self.message)) @@ -148,10 +148,7 @@ impl Row { T::try_from(self.get_value(idx)) } - pub fn maybe_get<'a, T>( - &'a self, - idx: usize, - ) -> Result, ReducerError> + pub fn maybe_get<'a, T>(&'a self, idx: usize) -> Result, ReducerError> where T: TryFrom<&'a SqliteValue, Error = ReducerError>, { diff --git a/lib/sqlsync-worker/sqlsync-wasm/Cargo.toml b/lib/sqlsync-worker/sqlsync-wasm/Cargo.toml index 805c9e4..2b68307 100644 --- a/lib/sqlsync-worker/sqlsync-wasm/Cargo.toml +++ b/lib/sqlsync-worker/sqlsync-wasm/Cargo.toml @@ -1,5 +1,6 @@ [package] name = "sqlsync-wasm" +description = "SQLSync is a collaborative offline-first wrapper around SQLite. It is designed to synchronize web application state between users, devices, and the edge." version.workspace = true authors.workspace = true diff --git a/lib/sqlsync-worker/sqlsync-wasm/LICENSE b/lib/sqlsync-worker/sqlsync-wasm/LICENSE new file mode 100644 index 0000000..c4b3ec1 --- /dev/null +++ b/lib/sqlsync-worker/sqlsync-wasm/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2023 Orbitinghail Systems Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/lib/sqlsync-worker/sqlsync-wasm/src/api.rs b/lib/sqlsync-worker/sqlsync-wasm/src/api.rs index 4bc92cd..0e34076 100644 --- a/lib/sqlsync-worker/sqlsync-wasm/src/api.rs +++ b/lib/sqlsync-worker/sqlsync-wasm/src/api.rs @@ -1,3 +1,6 @@ +// this is needed due to an issue with Tsify emitting non-snake_case names without the correct annotations +#![allow(non_snake_case)] + use std::{collections::HashMap, fmt::Debug}; use anyhow::anyhow; @@ -177,11 +180,12 @@ pub struct WorkerApi { #[wasm_bindgen] impl WorkerApi { #[wasm_bindgen(constructor)] - pub fn new( - ports: PortRouter, - coordinator_url: Option, - ) -> WorkerApi { - WorkerApi { coordinator_url, ports, inboxes: HashMap::new() } + pub fn new(ports: PortRouter, coordinator_url: Option) -> WorkerApi { + WorkerApi { + coordinator_url, + ports, + inboxes: HashMap::new(), + } } #[wasm_bindgen(skip_typescript)] @@ -198,10 +202,8 @@ impl WorkerApi { inbox.send(msg).await?; } else { // open the doc - self.spawn_doc_task(msg.doc_id, &reducer_url).await?; - let _ = self - .ports - .send_one(msg.port_id, msg.reply(DocReply::Ack)); + self.spawn_doc_task(msg.doc_id, reducer_url).await?; + let _ = self.ports.send_one(msg.port_id, msg.reply(DocReply::Ack)); } } @@ -210,10 +212,7 @@ impl WorkerApi { None => { let _ = self.ports.send_one( msg.port_id, - msg.reply_err(WasmError(anyhow!( - "no document with id {}", - msg.doc_id - ))), + msg.reply_err(WasmError(anyhow!("no document with id {}", msg.doc_id))), ); } }, @@ -240,8 +239,7 @@ impl WorkerApi { let (tx, rx) = mpsc::unbounded(); - let task = - DocTask::new(doc_id, doc_url, reducer, rx, self.ports.clone())?; + let task = DocTask::new(doc_id, doc_url, reducer, rx, self.ports.clone())?; wasm_bindgen_futures::spawn_local(task.into_task()); diff --git a/lib/sqlsync-worker/sqlsync-wasm/src/doc_task.rs b/lib/sqlsync-worker/sqlsync-wasm/src/doc_task.rs index 50d8376..fa48f75 100644 --- a/lib/sqlsync-worker/sqlsync-wasm/src/doc_task.rs +++ b/lib/sqlsync-worker/sqlsync-wasm/src/doc_task.rs @@ -2,15 +2,11 @@ use anyhow::anyhow; use futures::{channel::mpsc, select, FutureExt, StreamExt}; use rand::thread_rng; use sqlsync::{ - local::LocalDocument, sqlite::params_from_iter, JournalId, MemoryJournal, - WasmReducer, + local::LocalDocument, sqlite::params_from_iter, JournalId, MemoryJournal, WasmReducer, }; use crate::{ - api::{ - DocEvent, DocReply, DocRequest, HostToWorkerMsg, PortRouter, - WorkerToHostMsg, - }, + api::{DocEvent, DocReply, DocRequest, HostToWorkerMsg, PortRouter, WorkerToHostMsg}, net::{ConnectionTask, CoordinatorClient}, reactive::ReactiveQueries, signal::{SignalEmitter, SignalRouter}, @@ -60,14 +56,18 @@ impl DocTask { signals.emitter(Signal::CanRebase), )?; - let queries = - ReactiveQueries::new(signals.emitter(Signal::HasDirtyQueries)); - let coordinator_client = CoordinatorClient::new( - doc_url, - signals.emitter(Signal::ConnectionStateChanged), - ); - - Ok(Self { doc, inbox, signals, ports, queries, coordinator_client }) + let queries = ReactiveQueries::new(signals.emitter(Signal::HasDirtyQueries)); + let coordinator_client = + CoordinatorClient::new(doc_url, signals.emitter(Signal::ConnectionStateChanged)); + + Ok(Self { + doc, + inbox, + signals, + ports, + queries, + coordinator_client, + }) } pub async fn into_task(mut self) { @@ -102,9 +102,7 @@ impl DocTask { async fn handle_signals(&mut self, signals: Vec) { for signal in signals { match signal { - Signal::ConnectionStateChanged => { - self.handle_connection_state_changed() - } + Signal::ConnectionStateChanged => self.handle_connection_state_changed(), Signal::TimelineChanged => self.handle_timeline_changed().await, Signal::HasDirtyQueries => self.handle_dirty_queries(), @@ -124,11 +122,9 @@ impl DocTask { } fn handle_connection_state_changed(&mut self) { - let _ = self.ports.send_all(WorkerToHostMsg::Event { + self.ports.send_all(WorkerToHostMsg::Event { doc_id: self.doc.doc_id(), - evt: DocEvent::ConnectionStatus { - status: self.coordinator_client.status(), - }, + evt: DocEvent::ConnectionStatus { status: self.coordinator_client.status() }, }); } @@ -147,15 +143,14 @@ impl DocTask { fn handle_dirty_queries(&mut self) { if let Some(query) = self.queries.next_dirty_query() { - let result = - query.refresh(self.doc.sqlite_readonly(), |columns, row| { - let mut out = Vec::with_capacity(columns.len()); - for i in 0..columns.len() { - let val: SqlValue = row.get_ref(i)?.into(); - out.push(val); - } - Ok::<_, WasmError>(out) - }); + let result = query.refresh(self.doc.sqlite_readonly(), |columns, row| { + let mut out = Vec::with_capacity(columns.len()); + for i in 0..columns.len() { + let val: SqlValue = row.get_ref(i)?.into(); + out.push(val); + } + Ok::<_, WasmError>(out) + }); let msg = match result { Ok((columns, rows)) => WorkerToHostMsg::Event { @@ -197,22 +192,16 @@ impl DocTask { } } - async fn process_request( - &mut self, - msg: &HostToWorkerMsg, - ) -> WasmResult { + async fn process_request(&mut self, msg: &HostToWorkerMsg) -> WasmResult { log::info!("DocTask::process_request: {:?}", msg.req); match &msg.req { - DocRequest::Open { .. } => { - Err(WasmError(anyhow!("doc is already open"))) - } + DocRequest::Open { .. } => Err(WasmError(anyhow!("doc is already open"))), DocRequest::Query { sql, params } => self.doc.query(|conn| { let params = params_from_iter(params.iter()); let mut stmt = conn.prepare(sql)?; - let columns: Vec<_> = - stmt.column_names().iter().map(|&s| s.to_owned()).collect(); + let columns: Vec<_> = stmt.column_names().iter().map(|&s| s.to_owned()).collect(); let rows = stmt .query_and_then(params, |row| { @@ -235,7 +224,7 @@ impl DocTask { } DocRequest::QueryUnsubscribe { key } => { - self.queries.unsubscribe(msg.port_id, &key); + self.queries.unsubscribe(msg.port_id, key); Ok(DocReply::Ack) } diff --git a/lib/sqlsync-worker/sqlsync-wasm/src/net.rs b/lib/sqlsync-worker/sqlsync-wasm/src/net.rs index fc5cbfd..73202ab 100644 --- a/lib/sqlsync-worker/sqlsync-wasm/src/net.rs +++ b/lib/sqlsync-worker/sqlsync-wasm/src/net.rs @@ -1,3 +1,6 @@ +// this is needed due to an issue with Tsify emitting non-snake_case names without the correct annotations +#![allow(non_snake_case)] + use std::{ fmt::Debug, io::{self, Cursor}, @@ -12,10 +15,7 @@ use gloo::net::websocket::{futures::WebSocket, Message}; use serde::Serialize; use sqlsync::{ local::Signal, - replication::{ - ReplicationDestination, ReplicationMsg, ReplicationProtocol, - ReplicationSource, - }, + replication::{ReplicationDestination, ReplicationMsg, ReplicationProtocol, ReplicationSource}, }; use tsify::Tsify; @@ -56,9 +56,7 @@ impl CoordinatorClient { pub async fn poll(&mut self) -> ConnectionTask { match self.state { Some(ref mut state) => state.poll().await, - None => unreachable!( - "CoordinatorClient: invalid concurrent call to poll" - ), + None => unreachable!("CoordinatorClient: invalid concurrent call to poll"), } } @@ -66,18 +64,13 @@ impl CoordinatorClient { pub fn status(&self) -> ConnectionStatus { match self.state { Some(ref state) => state.status(), - None => unreachable!( - "CoordinatorClient: invalid concurrent call to status" - ), + None => unreachable!("CoordinatorClient: invalid concurrent call to status"), } } // SAFETY: poll, status, and handle can not be called concurrently on the same CoordinatorClient - pub async fn handle<'a, R, D>( - &mut self, - doc: &'a mut D, - task: ConnectionTask, - ) where + pub async fn handle<'a, R, D>(&mut self, doc: &'a mut D, task: ConnectionTask) + where R: io::Read, D: ReplicationDestination + ReplicationSource = R>, { @@ -175,18 +168,18 @@ impl ConnectionState { backoff.wait().await; ConnectionTask::Connect } - ConnectionState::Connecting { conn, .. } => { - conn.recv().await.map_or_else( - |e| ConnectionTask::Error(e), - |(msg, buf)| ConnectionTask::Recv(msg, buf), - ) - } - ConnectionState::Connected { conn } => { - conn.recv().await.map_or_else( - |e| ConnectionTask::Error(e), - |(msg, buf)| ConnectionTask::Recv(msg, buf), - ) - } + ConnectionState::Connecting { conn, .. } => conn + .recv() + .await + .map_or_else(ConnectionTask::Error, |(msg, buf)| { + ConnectionTask::Recv(msg, buf) + }), + ConnectionState::Connected { conn } => conn + .recv() + .await + .map_or_else(ConnectionTask::Error, |(msg, buf)| { + ConnectionTask::Recv(msg, buf) + }), } } @@ -225,15 +218,13 @@ impl ConnectionState { match (self, task) { // disabled ignores all tasks except for Connect - (Disabled, Connect) => { - match CoordinatorConnection::open(url, doc).await { - Ok(conn) => ConnectionState::Connecting { - conn, - backoff: Backoff::new(MIN_BACKOFF_MS, MAX_BACKOFF_MS), - }, - Err(e) => handle_err!(e), - } - } + (Disabled, Connect) => match CoordinatorConnection::open(url, doc).await { + Ok(conn) => ConnectionState::Connecting { + conn, + backoff: Backoff::new(MIN_BACKOFF_MS, MAX_BACKOFF_MS), + }, + Err(e) => handle_err!(e), + }, (s @ Disabled, _) => s, // the disable task universally disables @@ -279,12 +270,10 @@ impl ConnectionState { (s @ Connected { .. }, Connect) => s, - (Connected { mut conn }, Recv(msg, buf)) => { - match conn.handle(doc, msg, buf).await { - Ok(()) => Connected { conn }, - Err(e) => handle_err!(e), - } - } + (Connected { mut conn }, Recv(msg, buf)) => match conn.handle(doc, msg, buf).await { + Ok(()) => Connected { conn }, + Err(e) => handle_err!(e), + }, (Connected { mut conn }, Sync) => match conn.sync(doc).await { Ok(()) => Connected { conn }, @@ -303,10 +292,7 @@ struct CoordinatorConnection { } impl CoordinatorConnection { - async fn open( - url: &str, - doc: &D, - ) -> anyhow::Result + async fn open(url: &str, doc: &D) -> anyhow::Result where D: ReplicationSource, { @@ -332,9 +318,7 @@ impl CoordinatorConnection { Ok(self.writer.send(Message::Bytes(msg)).await?) } - async fn recv( - &mut self, - ) -> anyhow::Result<(ReplicationMsg, Cursor>)> { + async fn recv(&mut self) -> anyhow::Result<(ReplicationMsg, Cursor>)> { let msg = self.reader.select_next_some().await?; match msg { Message::Bytes(bytes) => { diff --git a/lib/sqlsync-worker/sqlsync-wasm/src/reactive.rs b/lib/sqlsync-worker/sqlsync-wasm/src/reactive.rs index 66a7b27..9ff36b3 100644 --- a/lib/sqlsync-worker/sqlsync-wasm/src/reactive.rs +++ b/lib/sqlsync-worker/sqlsync-wasm/src/reactive.rs @@ -47,7 +47,10 @@ pub struct ReactiveQueries { impl ReactiveQueries { pub fn new(has_dirty_queries: S) -> Self { - Self { queries: BTreeMap::new(), has_dirty_queries } + Self { + queries: BTreeMap::new(), + has_dirty_queries, + } } pub fn handle_storage_change(&mut self, change: &StorageChange) { @@ -61,21 +64,15 @@ impl ReactiveQueries { } } - pub fn subscribe( - &mut self, - port: PortId, - key: &QueryKey, - sql: &str, - params: Vec, - ) { - let tracker = - self.queries - .entry(key.clone()) - .or_insert_with(|| QueryTracker { - query_key: key.clone(), - query: ReactiveQuery::new(sql.to_owned(), params), - ports: Vec::new(), - }); + pub fn subscribe(&mut self, port: PortId, key: &QueryKey, sql: &str, params: Vec) { + let tracker = self + .queries + .entry(key.clone()) + .or_insert_with(|| QueryTracker { + query_key: key.clone(), + query: ReactiveQuery::new(sql.to_owned(), params), + ports: Vec::new(), + }); // store the port, if it's not already subscribed if !tracker.ports.contains(&port) { @@ -97,7 +94,7 @@ impl ReactiveQueries { } } - pub fn unsubscribe_all(&mut self, ports: &Vec) { + pub fn unsubscribe_all(&mut self, ports: &[PortId]) { for tracker in self.queries.values_mut() { tracker.ports.retain(|p| !ports.contains(p)); } diff --git a/lib/sqlsync-worker/sqlsync-wasm/src/signal.rs b/lib/sqlsync-worker/sqlsync-wasm/src/signal.rs index 5df4960..735a714 100644 --- a/lib/sqlsync-worker/sqlsync-wasm/src/signal.rs +++ b/lib/sqlsync-worker/sqlsync-wasm/src/signal.rs @@ -10,7 +10,9 @@ pub struct SignalRouter { impl SignalRouter { pub fn new() -> Self { - Self { shared: Rc::new(RefCell::new(Shared::new())) } + Self { + shared: Rc::new(RefCell::new(Shared::new())), + } } pub fn emitter(&self, signal: S) -> SignalEmitter { @@ -48,7 +50,10 @@ struct Shared { impl Shared { fn new() -> Self { - Self { signals: HashSet::new(), event: Event::new() } + Self { + signals: HashSet::new(), + event: Event::new(), + } } fn emit(&mut self, signal: S) { diff --git a/lib/sqlsync-worker/sqlsync-wasm/src/utils.rs b/lib/sqlsync-worker/sqlsync-wasm/src/utils.rs index 955b686..cf27730 100644 --- a/lib/sqlsync-worker/sqlsync-wasm/src/utils.rs +++ b/lib/sqlsync-worker/sqlsync-wasm/src/utils.rs @@ -1,9 +1,7 @@ use std::{convert::TryFrom, fmt::Display, io}; use anyhow::anyhow; -use gloo::{ - net::http::Request, timers::future::TimeoutFuture, utils::errors::JsError, -}; +use gloo::{net::http::Request, timers::future::TimeoutFuture, utils::errors::JsError}; use js_sys::{Reflect, Uint8Array}; use log::Level; use sha2::{Digest, Sha256}; @@ -104,9 +102,7 @@ impl_from_error!( futures::channel::mpsc::SendError, ); -pub async fn fetch_reducer( - reducer_url: &str, -) -> Result<(WasmReducer, Vec), WasmError> { +pub async fn fetch_reducer(reducer_url: &str) -> Result<(WasmReducer, Vec), WasmError> { let resp = Request::get(reducer_url).send().await?; if !resp.ok() { return Err(WasmError(anyhow!( @@ -134,10 +130,9 @@ pub async fn fetch_reducer( // sha256 sum the data // TODO: it would be much better to stream the data through the hash function // but afaik that's not doable with the crypto.subtle api - let digest = JsFuture::from(subtle.digest_with_str_and_u8_array( - "SHA-256", - &mut reducer_wasm_bytes, - )?) + let digest = JsFuture::from( + subtle.digest_with_str_and_u8_array("SHA-256", &mut reducer_wasm_bytes)?, + ) .await?; Uint8Array::new(&digest).to_vec() }; @@ -155,7 +150,11 @@ pub struct Backoff { impl Backoff { pub fn new(start_ms: u32, max_ms: u32) -> Self { - Self { current_ms: start_ms, max_ms, future: None } + Self { + current_ms: start_ms, + max_ms, + future: None, + } } /// increase the backoff time if needed diff --git a/lib/sqlsync/examples/counter-reducer.rs b/lib/sqlsync/examples/counter-reducer.rs index f039d78..4d4f69d 100644 --- a/lib/sqlsync/examples/counter-reducer.rs +++ b/lib/sqlsync/examples/counter-reducer.rs @@ -21,9 +21,7 @@ async fn reducer(mutation: Vec) -> Result<(), ReducerError> { value INTEGER )" ); - let init_counter = execute!( - "INSERT OR IGNORE INTO counter (id, value) VALUES (0, 0)" - ); + let init_counter = execute!("INSERT OR IGNORE INTO counter (id, value) VALUES (0, 0)"); create_table.await?; init_counter.await?; diff --git a/lib/sqlsync/examples/end-to-end-local-net.rs b/lib/sqlsync/examples/end-to-end-local-net.rs index 957d707..304f5e6 100644 --- a/lib/sqlsync/examples/end-to-end-local-net.rs +++ b/lib/sqlsync/examples/end-to-end-local-net.rs @@ -146,9 +146,7 @@ fn handle_client( let msg = receive_msg(&mut socket_reader)?; log::info!("server: received {:?}", msg); - if let Some(resp) = - unlock!(|doc| protocol.handle(doc, msg, &mut socket_reader)?) - { + if let Some(resp) = unlock!(|doc| protocol.handle(doc, msg, &mut socket_reader)?) { log::info!("server: sending {:?}", resp); send_msg(socket_writer, &resp)?; } @@ -235,18 +233,18 @@ fn start_client( let mut protocol = ReplicationProtocol::new(); // send start message - let start_msg = protocol.start(&mut doc); + let start_msg = protocol.start(&doc); log::info!("client({}): sending {:?}", timeline_id, start_msg); send_msg(socket_writer, &start_msg)?; log::info!("client({}): connected to server", timeline_id); // the amount of mutations we will send the server - let total_mutations = 10 as usize; + let total_mutations = 10; let mut remaining_mutations = total_mutations; // the total number of sync attempts we will make - let total_syncs = 100 as usize; + let total_syncs = 100; let mut syncs = 0; loop { @@ -258,9 +256,7 @@ fn start_client( let msg = receive_msg(&mut socket_reader)?; log::info!("client({}): received {:?}", timeline_id, msg); - if let Some(resp) = - protocol.handle(&mut doc, msg, &mut socket_reader)? - { + if let Some(resp) = protocol.handle(&mut doc, msg, &mut socket_reader)? { log::info!("client({}): sending {:?}", timeline_id, resp); send_msg(socket_writer, &resp)?; } @@ -275,7 +271,7 @@ fn start_client( } // sync pending mutations to the server - if let Some((msg, mut reader)) = protocol.sync(&mut doc)? { + if let Some((msg, mut reader)) = protocol.sync(&doc)? { log::info!("client({}): syncing to server: {:?}", timeline_id, msg); send_msg(socket_writer, &msg)?; // write the frame @@ -284,19 +280,11 @@ fn start_client( log::info!("client({}): QUERYING STATE", timeline_id); let current_value = doc.query(|conn| { - let value = conn.query_row( - "select value from counter where id = 0", - [], - |row| { - let value: Option = row.get(0)?; - log::info!( - "client({}): counter value: {:?}", - timeline_id, - value - ); - Ok(value) - }, - )?; + let value = conn.query_row("select value from counter where id = 0", [], |row| { + let value: Option = row.get(0)?; + log::info!("client({}): counter value: {:?}", timeline_id, value); + Ok(value) + })?; Ok::<_, anyhow::Error>(value) })?; @@ -320,47 +308,35 @@ fn start_client( // final query, value should be total_mutations * num_clients doc.query(|conn| { - conn.query_row_and_then( - "select value from counter where id = 0", - [], - |row| { - let value: Option = row.get(0)?; - log::info!( - "client({}): FINAL counter value: {:?}", - timeline_id, - value - ); - if value != Some(total_mutations * num_clients) { - return Err(anyhow::anyhow!( + conn.query_row_and_then("select value from counter where id = 0", [], |row| { + let value: Option = row.get(0)?; + log::info!("client({}): FINAL counter value: {:?}", timeline_id, value); + if value != Some(total_mutations * num_clients) { + return Err(anyhow::anyhow!( "client({}): counter value is incorrect: {:?}, expected {}", timeline_id, value, total_mutations * num_clients )); - } - Ok(()) - }, - )?; - conn.query_row_and_then( - "select value from counter where id = 1", - [], - |row| { - let value: Option = row.get(0)?; - log::info!( - "client({}): FINAL server counter value: {:?}", - timeline_id, - value - ); - if value.is_none() || value == Some(0) { - return Err(anyhow::anyhow!( + } + Ok(()) + })?; + conn.query_row_and_then("select value from counter where id = 1", [], |row| { + let value: Option = row.get(0)?; + log::info!( + "client({}): FINAL server counter value: {:?}", + timeline_id, + value + ); + if value.is_none() || value == Some(0) { + return Err(anyhow::anyhow!( "client({}): server counter value is incorrect: {:?}, expected non-zero value", timeline_id, value, )); - } - Ok(()) - }, - )?; + } + Ok(()) + })?; Ok::<_, anyhow::Error>(()) })?; @@ -394,17 +370,13 @@ fn main() -> anyhow::Result<()> { thread::scope(|s| { let num_clients = 2; - s.spawn(move || { - start_server(listener, doc_id, num_clients, s) - .expect("server failed") - }); + s.spawn(move || start_server(listener, doc_id, num_clients, s).expect("server failed")); for _ in 0..num_clients { // create separate rngs for each client seeded by the root rng let client_rng = StdRng::seed_from_u64(rng.gen()); s.spawn(move || { - start_client(client_rng, addr, num_clients, doc_id) - .expect("client failed") + start_client(client_rng, addr, num_clients, doc_id).expect("client failed") }); } }); diff --git a/lib/sqlsync/examples/end-to-end-local.rs b/lib/sqlsync/examples/end-to-end-local.rs index 5e6a59c..56f309b 100644 --- a/lib/sqlsync/examples/end-to-end-local.rs +++ b/lib/sqlsync/examples/end-to-end-local.rs @@ -1,7 +1,7 @@ -///! This example demonstrates setting up sqlsync in process between two clients -///! and a server. There is no networking in this example so it's easy to follow -///! the sync & rebase logic between the different nodes. -/// +//! This example demonstrates setting up sqlsync in process between two clients +//! and a server. There is no networking in this example so it's easy to follow +//! the sync & rebase logic between the different nodes. + use std::{collections::BTreeMap, format, io}; use rand::{rngs::StdRng, Rng, SeedableRng}; @@ -151,11 +151,7 @@ fn main() -> anyhow::Result<()> { ($client:ident) => { $client.query(|conn| { let tasks = query_tasks(conn)?; - log::info!( - "{} has {} tasks:", - std::stringify!($client), - tasks.len() - ); + log::info!("{} has {} tasks:", std::stringify!($client), tasks.len()); for task in tasks { log::info!(" {:?}", task); } @@ -335,8 +331,8 @@ fn main() -> anyhow::Result<()> { print_tasks!(local2)?; // get both sets of tasks and make sure they are the same - let tasks1 = local.query(|conn| query_tasks(conn))?; - let tasks2 = local2.query(|conn| query_tasks(conn))?; + let tasks1 = local.query(query_tasks)?; + let tasks2 = local2.query(query_tasks)?; // compare the two Vec objects assert_eq!(tasks1.len(), tasks2.len(), "different number of tasks",); diff --git a/lib/sqlsync/examples/task-reducer.rs b/lib/sqlsync/examples/task-reducer.rs index 19197b2..2559226 100644 --- a/lib/sqlsync/examples/task-reducer.rs +++ b/lib/sqlsync/examples/task-reducer.rs @@ -47,7 +47,7 @@ async fn query_sort_after(id: i64) -> Result { ) .await?; - if response.rows.len() == 0 { + if response.rows.is_empty() { query_max_sort().await } else { let row = &response.rows[0]; @@ -108,8 +108,7 @@ async fn reducer(mutation: Vec) -> Result<(), ReducerError> { Mutation::MoveTask { id, after } => { let new_sort = query_sort_after(after).await?; - execute!("update tasks set sort = ? where id = ?", new_sort, id) - .await?; + execute!("update tasks set sort = ? where id = ?", new_sort, id).await?; } } diff --git a/lib/sqlsync/src/coordinator.rs b/lib/sqlsync/src/coordinator.rs index 1f444ee..62e0d8d 100644 --- a/lib/sqlsync/src/coordinator.rs +++ b/lib/sqlsync/src/coordinator.rs @@ -9,9 +9,7 @@ use rusqlite::Transaction; use crate::db::{open_with_vfs, run_in_tx, ConnectionPair}; use crate::error::Result; use crate::reducer::Reducer; -use crate::replication::{ - ReplicationDestination, ReplicationError, ReplicationSource, -}; +use crate::replication::{ReplicationDestination, ReplicationError, ReplicationSource}; use crate::timeline::{apply_timeline_range, run_timeline_migration}; use crate::Lsn; use crate::{ @@ -44,11 +42,7 @@ impl Debug for CoordinatorDocument { } impl CoordinatorDocument { - pub fn open( - storage: J, - timeline_factory: J::Factory, - reducer: R, - ) -> Result { + pub fn open(storage: J, timeline_factory: J::Factory, reducer: R) -> Result { let (mut sqlite, mut storage) = open_with_vfs(storage)?; // TODO: this feels awkward here @@ -65,15 +59,10 @@ impl CoordinatorDocument { }) } - fn get_or_create_timeline_mut( - &mut self, - id: JournalId, - ) -> io::Result<&mut J> { + fn get_or_create_timeline_mut(&mut self, id: JournalId) -> io::Result<&mut J> { match self.timelines.entry(id) { Entry::Occupied(entry) => Ok(entry.into_mut()), - Entry::Vacant(entry) => { - Ok(entry.insert(self.timeline_factory.open(id)?)) - } + Entry::Vacant(entry) => Ok(entry.insert(self.timeline_factory.open(id)?)), } } @@ -90,10 +79,9 @@ impl CoordinatorDocument { } } // otherwise, just push a new entry - _ => self.timeline_receive_queue.push_back(ReceiveQueueEntry { - id, - range: LsnRange::new(lsn, lsn), - }), + _ => self + .timeline_receive_queue + .push_back(ReceiveQueueEntry { id, range: LsnRange::new(lsn, lsn) }), } } @@ -112,11 +100,7 @@ impl CoordinatorDocument { let entry = self.timeline_receive_queue.pop_front(); if let Some(entry) = entry { - log::debug!( - "applying range {} to timeline {}", - entry.range, - entry.id - ); + log::debug!("applying range {} to timeline {}", entry.range, entry.id); // get the timeline let timeline = self @@ -141,9 +125,7 @@ impl CoordinatorDocument { } /// CoordinatorDocument knows how to replicate it's storage journal -impl ReplicationSource - for CoordinatorDocument -{ +impl ReplicationSource for CoordinatorDocument { type Reader<'a> = ::Reader<'a> where Self: 'a; @@ -156,10 +138,7 @@ impl ReplicationSource self.storage.source_range() } - fn read_lsn<'a>( - &'a self, - lsn: crate::Lsn, - ) -> io::Result>> { + fn read_lsn(&self, lsn: crate::Lsn) -> io::Result>> { self.storage.read_lsn(lsn) } } @@ -168,10 +147,7 @@ impl ReplicationSource impl ReplicationDestination for CoordinatorDocument { - fn range( - &mut self, - id: JournalId, - ) -> std::result::Result { + fn range(&mut self, id: JournalId) -> std::result::Result { let timeline = self.get_or_create_timeline_mut(id)?; ReplicationDestination::range(timeline, id) } diff --git a/lib/sqlsync/src/db.rs b/lib/sqlsync/src/db.rs index 1542058..50135ec 100644 --- a/lib/sqlsync/src/db.rs +++ b/lib/sqlsync/src/db.rs @@ -6,9 +6,7 @@ use rusqlite::{ }; use sqlite_vfs::FilePtr; -use crate::{ - journal::Journal, page::PAGESIZE, storage::Storage, vfs::StorageVfs, -}; +use crate::{journal::Journal, page::PAGESIZE, storage::Storage, vfs::StorageVfs}; pub struct ConnectionPair { pub readwrite: Connection, @@ -26,8 +24,7 @@ pub fn open_with_vfs( // register the vfs globally let vfs = StorageVfs::new(storage_ptr); - sqlite_vfs::register(&vfs_name, vfs) - .expect("failed to register local-vfs with sqlite"); + sqlite_vfs::register(&vfs_name, vfs).expect("failed to register local-vfs with sqlite"); let sqlite = Connection::open_with_flags_and_vfs( "main.db", @@ -64,7 +61,10 @@ pub fn open_with_vfs( })); Ok(( - ConnectionPair { readwrite: sqlite, readonly: sqlite_readonly }, + ConnectionPair { + readwrite: sqlite, + readonly: sqlite_readonly, + }, storage, )) } diff --git a/lib/sqlsync/src/error.rs b/lib/sqlsync/src/error.rs index e9f7c98..523388e 100644 --- a/lib/sqlsync/src/error.rs +++ b/lib/sqlsync/src/error.rs @@ -3,8 +3,8 @@ use std::io; use thiserror::Error; use crate::{ - reducer::ReducerError, replication::ReplicationError, - timeline::TimelineError, JournalIdParseError, + reducer::ReducerError, replication::ReplicationError, timeline::TimelineError, + JournalIdParseError, }; #[derive(Error, Debug)] diff --git a/lib/sqlsync/src/iter.rs b/lib/sqlsync/src/iter.rs index c23f33d..06c6e30 100644 --- a/lib/sqlsync/src/iter.rs +++ b/lib/sqlsync/src/iter.rs @@ -1,6 +1,6 @@ /// has_sorted_intersection returns true if the two iterators have an intersection /// requires that both iterators are sorted -pub fn has_sorted_intersection<'a, I, J, T>(a: I, b: J) -> bool +pub fn has_sorted_intersection(a: I, b: J) -> bool where T: Ord + Eq, I: IntoIterator, diff --git a/lib/sqlsync/src/journal/cursor.rs b/lib/sqlsync/src/journal/cursor.rs index efaa495..70499ae 100644 --- a/lib/sqlsync/src/journal/cursor.rs +++ b/lib/sqlsync/src/journal/cursor.rs @@ -14,10 +14,10 @@ pub trait Scannable: Sized { where Self: 'a; - fn scan<'a>(&'a self) -> Cursor<'a, Self, LsnIter>; - fn scan_range<'a>(&'a self, range: LsnRange) -> Cursor<'a, Self, LsnIter>; + fn scan(&self) -> Cursor<'_, Self, LsnIter>; + fn scan_range(&self, range: LsnRange) -> Cursor<'_, Self, LsnIter>; - fn get<'a>(&'a self, lsn: Lsn) -> Result>>; + fn get(&self, lsn: Lsn) -> Result>>; } pub struct Cursor<'a, S: Scannable, I> { @@ -56,7 +56,11 @@ impl<'a, S: Scannable, I: DoubleEndedIterator> Cursor<'a, S, I> { /// reverse this cursor pub fn into_rev(self) -> Cursor<'a, S, Rev> { - Cursor { inner: self.inner, lsn_iter: self.lsn_iter.rev(), state: None } + Cursor { + inner: self.inner, + lsn_iter: self.lsn_iter.rev(), + state: None, + } } } diff --git a/lib/sqlsync/src/journal/journal.rs b/lib/sqlsync/src/journal/journal.rs deleted file mode 100644 index 38086ff..0000000 --- a/lib/sqlsync/src/journal/journal.rs +++ /dev/null @@ -1,30 +0,0 @@ -use std::fmt::Debug; -use std::io; - -use crate::Serializable; -use crate::{ - lsn::{Lsn, LsnRange}, - JournalId, -}; - -use super::Scannable; - -pub trait Journal: Scannable + Debug + Sized { - type Factory: JournalFactory; - - /// this journal's id - fn id(&self) -> JournalId; - - /// this journal's range - fn range(&self) -> LsnRange; - - /// append a new journal entry, and then write to it - fn append(&mut self, obj: impl Serializable) -> io::Result<()>; - - /// drop the journal's prefix - fn drop_prefix(&mut self, up_to: Lsn) -> io::Result<()>; -} - -pub trait JournalFactory { - fn open(&self, id: JournalId) -> io::Result; -} diff --git a/lib/sqlsync/src/journal/memory.rs b/lib/sqlsync/src/journal/memory.rs index cd78f11..5627d44 100644 --- a/lib/sqlsync/src/journal/memory.rs +++ b/lib/sqlsync/src/journal/memory.rs @@ -5,9 +5,7 @@ use crate::lsn::{Lsn, LsnIter, LsnRange}; use crate::{JournalFactory, Serializable}; use super::{Cursor, Journal, JournalId, Scannable}; -use crate::replication::{ - ReplicationDestination, ReplicationError, ReplicationSource, -}; +use crate::replication::{ReplicationDestination, ReplicationError, ReplicationSource}; pub struct MemoryJournal { id: JournalId, @@ -26,7 +24,11 @@ impl Debug for MemoryJournal { impl MemoryJournal { pub fn open(id: JournalId) -> io::Result { - Ok(MemoryJournal { id, range: LsnRange::empty(), data: vec![] }) + Ok(MemoryJournal { + id, + range: LsnRange::empty(), + data: vec![], + }) } } @@ -75,16 +77,16 @@ impl Scannable for MemoryJournal { where Self: 'a; - fn scan<'a>(&'a self) -> Cursor<'a, Self, LsnIter> { + fn scan(&self) -> Cursor<'_, Self, LsnIter> { Cursor::new(self, self.range.iter()) } - fn scan_range<'a>(&'a self, range: LsnRange) -> Cursor<'a, Self, LsnIter> { + fn scan_range(&self, range: LsnRange) -> Cursor<'_, Self, LsnIter> { let intersection = self.range.intersect(&range); Cursor::new(self, intersection.iter()) } - fn get<'a>(&'a self, lsn: Lsn) -> io::Result>> { + fn get(&self, lsn: Lsn) -> io::Result>> { Ok(self .range .offset(lsn) @@ -105,10 +107,7 @@ impl ReplicationSource for MemoryJournal { self.range() } - fn read_lsn<'a>( - &'a self, - lsn: Lsn, - ) -> io::Result>> { + fn read_lsn(&self, lsn: Lsn) -> io::Result>> { match self.range.offset(lsn) { None => Ok(None), Some(offset) => Ok(Some(&self.data[offset][..])), @@ -164,10 +163,7 @@ impl ReplicationDestination for MemoryJournal { Ok(()) } else { - Err(ReplicationError::NonContiguousLsn { - received: lsn, - range: accepted_range, - }) + Err(ReplicationError::NonContiguousLsn { received: lsn, range: accepted_range }) } } } diff --git a/lib/sqlsync/src/journal/mod.rs b/lib/sqlsync/src/journal/mod.rs index c66e26e..651faca 100644 --- a/lib/sqlsync/src/journal/mod.rs +++ b/lib/sqlsync/src/journal/mod.rs @@ -1,10 +1,34 @@ mod cursor; -mod journal; mod journalid; mod memory; pub use cursor::{Cursor, Scannable}; -pub use journal::*; pub use journalid::{JournalId, JournalIdParseError}; pub use memory::{MemoryJournal, MemoryJournalFactory}; + +use std::fmt::Debug; +use std::io; + +use crate::lsn::{Lsn, LsnRange}; +use crate::Serializable; + +pub trait Journal: Scannable + Debug + Sized { + type Factory: JournalFactory; + + /// this journal's id + fn id(&self) -> JournalId; + + /// this journal's range + fn range(&self) -> LsnRange; + + /// append a new journal entry, and then write to it + fn append(&mut self, obj: impl Serializable) -> io::Result<()>; + + /// drop the journal's prefix + fn drop_prefix(&mut self, up_to: Lsn) -> io::Result<()>; +} + +pub trait JournalFactory { + fn open(&self, id: JournalId) -> io::Result; +} diff --git a/lib/sqlsync/src/local.rs b/lib/sqlsync/src/local.rs index da46405..c7c20d0 100644 --- a/lib/sqlsync/src/local.rs +++ b/lib/sqlsync/src/local.rs @@ -8,9 +8,7 @@ use crate::{ journal::{Journal, JournalId}, lsn::LsnRange, reducer::WasmReducer, - replication::{ - ReplicationDestination, ReplicationError, ReplicationSource, - }, + replication::{ReplicationDestination, ReplicationError, ReplicationSource}, storage::{Storage, StorageChange}, timeline::{apply_mutation, rebase_timeline, run_timeline_migration}, Lsn, @@ -111,9 +109,7 @@ where } pub fn rebase(&mut self) -> Result<()> { - if self.storage.has_committed_pages() - && self.storage.has_invisible_pages() - { + if self.storage.has_committed_pages() && self.storage.has_invisible_pages() { self.storage.reset()?; rebase_timeline( &mut self.timeline, @@ -148,22 +144,14 @@ impl ReplicationSource for LocalDocument { self.timeline.source_range() } - fn read_lsn<'a>( - &'a self, - lsn: crate::Lsn, - ) -> io::Result>> { + fn read_lsn(&self, lsn: crate::Lsn) -> io::Result>> { self.timeline.read_lsn(lsn) } } /// LocalDocument knows how to receive a storage journal from elsewhere -impl ReplicationDestination - for LocalDocument -{ - fn range( - &mut self, - id: JournalId, - ) -> std::result::Result { +impl ReplicationDestination for LocalDocument { + fn range(&mut self, id: JournalId) -> std::result::Result { self.storage.range(id) } diff --git a/lib/sqlsync/src/lsn.rs b/lib/sqlsync/src/lsn.rs index 87747eb..a6536d9 100644 --- a/lib/sqlsync/src/lsn.rs +++ b/lib/sqlsync/src/lsn.rs @@ -100,13 +100,10 @@ impl LsnRange { pub fn immediately_preceeds(&self, other: &Self) -> bool { match (self, other) { (_, LsnRange::Empty { .. }) => false, - (LsnRange::Empty { nextlsn }, LsnRange::NonEmpty { first, .. }) => { - *nextlsn == *first + (LsnRange::Empty { nextlsn }, LsnRange::NonEmpty { first, .. }) => *nextlsn == *first, + (LsnRange::NonEmpty { last, .. }, LsnRange::NonEmpty { first, .. }) => { + last + 1 == *first } - ( - LsnRange::NonEmpty { last, .. }, - LsnRange::NonEmpty { first, .. }, - ) => last + 1 == *first, } } @@ -118,9 +115,7 @@ impl LsnRange { if self.contains(lsn) { match self { LsnRange::Empty { .. } => None, - LsnRange::NonEmpty { first, .. } => { - Some(lsn.saturating_sub(*first) as usize) - } + LsnRange::NonEmpty { first, .. } => Some(lsn.saturating_sub(*first) as usize), } } else { None @@ -134,10 +129,8 @@ impl LsnRange { LsnRange::NonEmpty { first: self_first, last: self_last }, LsnRange::NonEmpty { first: other_first, last: other_last }, ) => { - let start = - std::cmp::max(*self_first, *other_first) - self_first; - let end = - std::cmp::min(*self_last, *other_last) - self_first + 1; + let start = std::cmp::max(*self_first, *other_first) - self_first; + let end = std::cmp::min(*self_last, *other_last) - self_first + 1; start as usize..end as usize } (_, _) => 0..0, @@ -200,12 +193,8 @@ impl LsnRange { pub fn extend_by(&self, len: u64) -> LsnRange { assert!(len > 0, "len must be > 0"); match self { - LsnRange::Empty { nextlsn } => { - LsnRange::new(*nextlsn, nextlsn + len - 1) - } - LsnRange::NonEmpty { first, last } => { - LsnRange::new(*first, last + len) - } + LsnRange::Empty { nextlsn } => LsnRange::new(*nextlsn, nextlsn + len - 1), + LsnRange::NonEmpty { first, last } => LsnRange::new(*first, last + len), } } @@ -288,9 +277,7 @@ impl LsnRange { impl Debug for LsnRange { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - LsnRange::Empty { nextlsn } => { - f.debug_tuple("LsnRange::E").field(nextlsn).finish() - } + LsnRange::Empty { nextlsn } => f.debug_tuple("LsnRange::E").field(nextlsn).finish(), LsnRange::NonEmpty { first, last } => { f.debug_tuple("LsnRange").field(first).field(last).finish() } diff --git a/lib/sqlsync/src/page.rs b/lib/sqlsync/src/page.rs index dad1da8..44c0868 100644 --- a/lib/sqlsync/src/page.rs +++ b/lib/sqlsync/src/page.rs @@ -23,9 +23,7 @@ pub struct SparsePages { impl SparsePages { pub fn new() -> SparsePages { - Self { - pages: BTreeMap::new(), - } + Self { pages: BTreeMap::new() } } pub fn num_pages(&self) -> usize { @@ -66,7 +64,7 @@ impl SparsePages { impl Serializable for SparsePages { fn serialize_into(&self, writer: &mut W) -> io::Result<()> { assert!( - self.pages.len() > 0, + !self.pages.is_empty(), "cannot serialize empty sparse pages obj" ); @@ -134,15 +132,19 @@ impl SerializedPagesReader { let mid_idx = PageIdx::from_le_bytes(page_idx_buf); - if mid_idx == page_idx { - let page_offset = (num_pages * PAGE_IDX_SIZE) + (mid * PAGESIZE); - return Ok(Some(page_offset)); - } else if mid_idx < page_idx { - // pages are sorted in descending order, so we need to search left - right = mid; - } else { - // pages are sorted in descending order, so we need to search right - left = mid + 1; + match mid_idx.cmp(&page_idx) { + std::cmp::Ordering::Equal => { + let page_offset = (num_pages * PAGE_IDX_SIZE) + (mid * PAGESIZE); + return Ok(Some(page_offset)); + } + std::cmp::Ordering::Less => { + // pages are sorted in descending order, so we need to search left + right = mid; + } + std::cmp::Ordering::Greater => { + // pages are sorted in descending order, so we need to search right + left = mid + 1; + } } } diff --git a/lib/sqlsync/src/reactive_query.rs b/lib/sqlsync/src/reactive_query.rs index 2cc6d68..8685a2d 100644 --- a/lib/sqlsync/src/reactive_query.rs +++ b/lib/sqlsync/src/reactive_query.rs @@ -29,7 +29,12 @@ pub struct ReactiveQuery { impl ReactiveQuery

{ pub fn new(sql: String, params: Vec

) -> Self { let explain_sql = format!("EXPLAIN {}", &sql); - Self { sql, explain_sql, params, state: State::Dirty } + Self { + sql, + explain_sql, + params, + state: State::Dirty, + } } // handle_storage_change checks if the storage change affects this query @@ -38,21 +43,16 @@ impl ReactiveQuery

{ pub fn handle_storage_change(&mut self, change: &StorageChange) -> bool { match self.state { State::Dirty => {} - State::Monitoring { root_pages_sorted: ref root_pages } => { - match change { - StorageChange::Full => self.state = State::Dirty, - StorageChange::Tables { - root_pages_sorted: ref changed_root_pages, - } => { - if has_sorted_intersection( - root_pages, - changed_root_pages, - ) { - self.state = State::Dirty; - } + State::Monitoring { root_pages_sorted: ref root_pages } => match change { + StorageChange::Full => self.state = State::Dirty, + StorageChange::Tables { + root_pages_sorted: ref changed_root_pages, + } => { + if has_sorted_intersection(root_pages, changed_root_pages) { + self.state = State::Dirty; } } - } + }, State::Error => self.state = State::Dirty, } self.is_dirty() @@ -85,12 +85,11 @@ impl ReactiveQuery

{ self.refresh_state(conn)?; let mut stmt = conn.prepare_cached(&self.sql)?; - let columns: Vec<_> = - stmt.column_names().iter().map(|&s| s.to_owned()).collect(); + let columns: Vec<_> = stmt.column_names().iter().map(|&s| s.to_owned()).collect(); let mut rows = stmt.query(params_from_iter(&self.params))?; let mut out = Vec::new(); while let Some(row) = rows.next()? { - let mapped = f(&columns, &row)?; + let mapped = f(&columns, row)?; out.push(mapped); } diff --git a/lib/sqlsync/src/reducer.rs b/lib/sqlsync/src/reducer.rs index 8fd44df..dad3e78 100644 --- a/lib/sqlsync/src/reducer.rs +++ b/lib/sqlsync/src/reducer.rs @@ -7,9 +7,7 @@ use rusqlite::{ }; use sqlsync_reducer::{ host_ffi::{register_log_handler, WasmFFI, WasmFFIError}, - types::{ - ErrorResponse, ExecResponse, QueryResponse, Request, Row, SqliteValue, - }, + types::{ErrorResponse, ExecResponse, QueryResponse, Request, Row, SqliteValue}, }; use thiserror::Error; use wasmi::{errors::LinkerError, Engine, Linker, Module, Store}; @@ -60,12 +58,11 @@ impl WasmReducer { register_log_handler(&mut linker)?; let mut store = Store::new(&engine, WasmFFI::uninitialized()); - let instance = - linker.instantiate(&mut store, &module)?.start(&mut store)?; + let instance = linker.instantiate(&mut store, &module)?.start(&mut store)?; // initialize the FFI let ffi = WasmFFI::initialized(&store, &instance)?; - (*store.data_mut()) = ffi.clone(); + (*store.data_mut()) = ffi; // initialize the reducer ffi.init_reducer(&mut store)?; @@ -73,11 +70,7 @@ impl WasmReducer { Ok(Self { store }) } - pub fn apply( - &mut self, - tx: &mut Transaction, - mutation: &[u8], - ) -> Result<()> { + pub fn apply(&mut self, tx: &mut Transaction, mutation: &[u8]) -> Result<()> { let ffi = self.store.data().to_owned(); // start the reducer @@ -115,10 +108,8 @@ impl WasmReducer { params: Vec, ) -> SqlResult { log::info!("received query req: {}, {:?}", sql, params); - let params = - params_from_iter(params.into_iter().map(from_sqlite_value)); - let mut stmt = - tx.prepare(&sql).map_err(rusqlite_err_to_response_err)?; + let params = params_from_iter(params.into_iter().map(from_sqlite_value)); + let mut stmt = tx.prepare(sql).map_err(rusqlite_err_to_response_err)?; let columns: Vec = stmt .column_names() @@ -152,13 +143,12 @@ impl WasmReducer { params: Vec, ) -> SqlResult { log::info!("received exec req: {}, {:?}", sql, params); - let params = - params_from_iter(params.into_iter().map(from_sqlite_value)); + let params = params_from_iter(params.into_iter().map(from_sqlite_value)); let start = unix_timestamp_milliseconds(); let changes = tx - .execute(&sql, params) + .execute(sql, params) .map_err(rusqlite_err_to_response_err)?; let end = unix_timestamp_milliseconds(); @@ -185,9 +175,7 @@ fn to_sqlite_value(v: ValueRef) -> SqliteValue { ValueRef::Null => SqliteValue::Null, ValueRef::Integer(i) => SqliteValue::Integer(i), ValueRef::Real(f) => SqliteValue::Real(f), - r @ ValueRef::Text(_) => { - SqliteValue::Text(r.as_str().unwrap().to_owned()) - } + r @ ValueRef::Text(_) => SqliteValue::Text(r.as_str().unwrap().to_owned()), ValueRef::Blob(b) => SqliteValue::Blob(b.to_vec()), } } diff --git a/lib/sqlsync/src/replication.rs b/lib/sqlsync/src/replication.rs index 6341442..2cec7e2 100644 --- a/lib/sqlsync/src/replication.rs +++ b/lib/sqlsync/src/replication.rs @@ -38,7 +38,7 @@ pub enum ReplicationError { NonContiguousLsn { received: Lsn, range: LsnRange }, } -#[derive(Debug)] +#[derive(Debug, Default)] pub struct ReplicationProtocol { // outstanding lsn frames sent to the destination but awaiting acknowledgement // this is an Option because we need the to initialize it from the initial RangeRequest @@ -47,7 +47,7 @@ pub struct ReplicationProtocol { impl ReplicationProtocol { pub fn new() -> Self { - Self { outstanding_range: None } + Self::default() } /// start replication, must be called on both sides of the connection @@ -127,18 +127,14 @@ impl ReplicationProtocol { // subsequent range response, update outstanding range |outstanding_range| { let next = range.next(); - assert!( - next > 0, - "subsequent range responses should never be empty" - ); + assert!(next > 0, "subsequent range responses should never be empty"); Some(outstanding_range.trim_prefix(next - 1)) }, ); Ok(None) } ReplicationMsg::Frame { id, lsn, len } => { - let mut reader = - LimitedReader { limit: len, inner: connection }; + let mut reader = LimitedReader { limit: len, inner: connection }; doc.write_lsn(id, lsn, &mut reader)?; Ok(Some(ReplicationMsg::Range { range: doc.range(id)? })) } @@ -158,8 +154,7 @@ pub trait ReplicationSource { fn source_range(&self) -> LsnRange; /// read the given lsn from the source journal if it exists - fn read_lsn<'a>(&'a self, lsn: Lsn) - -> io::Result>>; + fn read_lsn(&self, lsn: Lsn) -> io::Result>>; } pub trait ReplicationDestination { diff --git a/lib/sqlsync/src/storage.rs b/lib/sqlsync/src/storage.rs index 192538f..b72abc5 100644 --- a/lib/sqlsync/src/storage.rs +++ b/lib/sqlsync/src/storage.rs @@ -87,8 +87,7 @@ impl Storage { self.journal.append(std::mem::take(&mut self.pending))?; // calculate the LsnRange between the current visible range and the committed range - let new_lsns = - self.journal.range().difference(&self.visible_lsn_range); + let new_lsns = self.journal.range().difference(&self.visible_lsn_range); // clear the changed pages list (update_changed_root_pages will scan the new lsns) self.changed_pages.clear(); @@ -136,11 +135,9 @@ impl Storage { for page_idx in pages.page_idxs()?.iter() { // we need to resolve each page_idx to it's root page by only // looking at ptrmap pages that existed as of this lsn - if let Some(root_page_idx) = self.resolve_root_page( - LsnRange::new(0, lsn), - false, - *page_idx, - )? { + if let Some(root_page_idx) = + self.resolve_root_page(LsnRange::new(0, lsn), false, *page_idx)? + { self.changed_root_pages.insert(root_page_idx); } } @@ -187,8 +184,7 @@ impl Storage { // effectively taking into account the ptrmap page itself // math mostly copied from: // https://github.com/sqlite/sqlite/blob/1eca330a08e18fd0930491302802141f5ce6298e/src/btree.c#L989C1-L1001C2 - const PAGES_PER_PTRMAP: u64 = - (USABLE_PAGE_SIZE / PTRMAP_ENTRY_SIZE) + 1; + const PAGES_PER_PTRMAP: u64 = (USABLE_PAGE_SIZE / PTRMAP_ENTRY_SIZE) + 1; if page_idx == 1 { // page 1 is the schema root page @@ -217,19 +213,12 @@ impl Storage { } // calculate the offset of the page_idx within the ptrmap page - let page_idx_offset = - (page_idx - ptrmap_page_idx - 1) * PTRMAP_ENTRY_SIZE; + let page_idx_offset = (page_idx - ptrmap_page_idx - 1) * PTRMAP_ENTRY_SIZE; // convert the relative offset to an absolute offset within the file - let page_idx_pos = - ((ptrmap_page_idx - 1) * (PAGESIZE as u64)) + page_idx_offset; + let page_idx_pos = ((ptrmap_page_idx - 1) * (PAGESIZE as u64)) + page_idx_offset; // read the ptrmap_entry for this page - self.read_at_range( - range, - include_pending, - page_idx_pos, - &mut ptrmap_entry, - )?; + self.read_at_range(range, include_pending, page_idx_pos, &mut ptrmap_entry)?; match ptrmap_entry[0] { 0 => { // page is missing, this can happen while we are rebasing @@ -272,8 +261,7 @@ impl Storage { pub fn has_changes(&self) -> bool { // it's not possible for the schema to change without also modifying pages // so we don't have to check the schema cookie here - return self.changed_pages.len() > 0 - || self.changed_root_pages.len() > 0; + !(self.changed_pages.is_empty() && self.changed_root_pages.is_empty()) } pub fn changes(&mut self) -> io::Result { @@ -297,8 +285,7 @@ impl Storage { self.update_changed_root_pages(LsnRange::empty())?; // gather changed root pages into sorted vec - let mut root_pages_sorted: Vec<_> = - self.changed_root_pages.iter().copied().collect(); + let mut root_pages_sorted: Vec<_> = self.changed_root_pages.iter().copied().collect(); root_pages_sorted.sort(); // reset variables @@ -343,8 +330,7 @@ impl Storage { { // if pos = 0, then this should be FILE_CHANGE_COUNTER_OFFSET // if pos = FILE_CHANGE_COUNTER_OFFSET, this this should be 0 - let file_change_buf_offset = - FILE_CHANGE_COUNTER_OFFSET - page_offset; + let file_change_buf_offset = FILE_CHANGE_COUNTER_OFFSET - page_offset; buf[file_change_buf_offset..(file_change_buf_offset + 4)] .copy_from_slice(&self.file_change_counter.to_be_bytes()); @@ -370,10 +356,7 @@ impl ReplicationSource for Storage { self.journal.source_range() } - fn read_lsn<'a>( - &'a self, - lsn: crate::Lsn, - ) -> io::Result>> { + fn read_lsn(&self, lsn: crate::Lsn) -> io::Result>> { self.journal.read_lsn(lsn) } } @@ -408,8 +391,7 @@ impl sqlite_vfs::File for Storage { let mut cursor = self.journal.scan_range(self.visible_lsn_range); while cursor.advance().map_err(|_| SQLITE_IOERR)? { let pages = SerializedPagesReader(&cursor); - max_page_idx = max_page_idx - .max(Some(pages.max_page_idx().map_err(|_| SQLITE_IOERR)?)); + max_page_idx = max_page_idx.max(Some(pages.max_page_idx().map_err(|_| SQLITE_IOERR)?)); } Ok(max_page_idx @@ -441,11 +423,7 @@ impl sqlite_vfs::File for Storage { Ok(buf.len()) } - fn read( - &mut self, - pos: u64, - buf: &mut [u8], - ) -> sqlite_vfs::VfsResult { + fn read(&mut self, pos: u64, buf: &mut [u8]) -> sqlite_vfs::VfsResult { self.read_at_range(self.visible_lsn_range, true, pos, buf) .map_err(|_| SQLITE_IOERR) } diff --git a/lib/sqlsync/src/timeline.rs b/lib/sqlsync/src/timeline.rs index 8cdf669..34e3cd0 100644 --- a/lib/sqlsync/src/timeline.rs +++ b/lib/sqlsync/src/timeline.rs @@ -55,7 +55,7 @@ pub fn apply_mutation( reducer: &mut WasmReducer, mutation: &[u8], ) -> Result<()> { - run_in_tx(sqlite, |tx| reducer.apply(tx, &mutation))?; + run_in_tx(sqlite, |tx| reducer.apply(tx, mutation))?; timeline.append(mutation)?; Ok(()) }