diff --git a/crates/core/src/metadata_store/providers/objstore/glue.rs b/crates/core/src/metadata_store/providers/objstore/glue.rs index 23515aa9e..940840b17 100644 --- a/crates/core/src/metadata_store/providers/objstore/glue.rs +++ b/crates/core/src/metadata_store/providers/objstore/glue.rs @@ -12,7 +12,6 @@ use std::pin; use bytestring::ByteString; use restate_types::Version; -use tokio::select; use tokio::sync::oneshot::Sender; use tracing::warn; @@ -73,7 +72,7 @@ impl Server { }; loop { - select! { + tokio::select! { _ = &mut shutdown => { // stop accepting messages return Ok(()); @@ -95,13 +94,8 @@ impl Server { } } } - else => { - tracing::info!("input channel is closed, exiting the metadata polling loop."); - break; - } } } - Ok(()) } } diff --git a/crates/core/src/metadata_store/providers/objstore/mod.rs b/crates/core/src/metadata_store/providers/objstore/mod.rs index d0ac37477..625c930b6 100644 --- a/crates/core/src/metadata_store/providers/objstore/mod.rs +++ b/crates/core/src/metadata_store/providers/objstore/mod.rs @@ -8,8 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::sync::Arc; - use crate::metadata_store::providers::objstore::object_store_version_repository::ObjectStoreVersionRepository; use crate::metadata_store::providers::objstore::optimistic_store::OptimisticLockingMetadataStoreBuilder; use crate::metadata_store::providers::objstore::version_repository::VersionRepository; @@ -27,10 +25,10 @@ pub async fn create_object_store_based_meta_store( configuration: MetadataStoreClient, ) -> Result { // obtain an instance of a version repository from the configuration. - // we use an [[object_store]] backed version repository. - let version_repository = Arc::new(ObjectStoreVersionRepository::from_configuration( + // we use an object_store backed version repository. + let version_repository = Box::new(ObjectStoreVersionRepository::from_configuration( configuration.clone(), - )?) as Arc; + )?) as Box; // postpone the building of the store to the background task, // the runs at the task center. diff --git a/crates/core/src/metadata_store/providers/objstore/object_store_version_repository.rs b/crates/core/src/metadata_store/providers/objstore/object_store_version_repository.rs index 22d898ddc..ddbd1e281 100644 --- a/crates/core/src/metadata_store/providers/objstore/object_store_version_repository.rs +++ b/crates/core/src/metadata_store/providers/objstore/object_store_version_repository.rs @@ -8,22 +8,21 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::sync::Arc; - -use crate::metadata_store::providers::objstore::version_repository::{ - Tag, TaggedValue, VersionRepository, VersionRepositoryError, -}; use bytes::Bytes; use bytestring::ByteString; use object_store::aws::AmazonS3Builder; use object_store::aws::S3ConditionalPut::ETagMatch; use object_store::path::Path; use object_store::{Error, ObjectStore, PutMode, PutOptions, PutPayload, UpdateVersion}; + +use crate::metadata_store::providers::objstore::version_repository::{ + Tag, TaggedValue, VersionRepository, VersionRepositoryError, +}; use restate_types::config::{MetadataStoreClient, ObjectStoreCredentials}; #[derive(Debug)] pub(crate) struct ObjectStoreVersionRepository { - object_store: Arc, + object_store: Box, } impl ObjectStoreVersionRepository { @@ -48,7 +47,7 @@ impl ObjectStoreVersionRepository { .map_err(|e| anyhow::anyhow!("Unable to build an S3 object store: {}", e))?; Ok(Self { - object_store: Arc::new(store), + object_store: Box::new(store), }) } @@ -56,7 +55,7 @@ impl ObjectStoreVersionRepository { pub(crate) fn new_for_testing() -> Self { let store = object_store::memory::InMemory::new(); Self { - object_store: Arc::new(store), + object_store: Box::new(store), } } } @@ -79,7 +78,9 @@ impl VersionRepository for ObjectStoreVersionRepository { match self.object_store.put_opts(&path, payload, opts).await { Ok(res) => { let etag = res.e_tag.ok_or_else(|| { - VersionRepositoryError::Network("expecting an ETag to be present".into()) + VersionRepositoryError::UnexpectedCondition( + "expecting an ETag to be present".into(), + ) })?; Ok(etag.into()) } @@ -93,7 +94,7 @@ impl VersionRepository for ObjectStoreVersionRepository { .await .map_err(|e| VersionRepositoryError::Network(e.into()))?; let etag = get_result.meta.e_tag.as_ref().ok_or_else(|| { - VersionRepositoryError::Network("was expecting an etag".into()) + VersionRepositoryError::UnexpectedCondition("was expecting an etag".into()) })?; let tag: Tag = etag.to_owned().into(); let bytes = get_result @@ -115,7 +116,9 @@ impl VersionRepository for ObjectStoreVersionRepository { match self.object_store.get(&path).await { Ok(res) => { let etag = res.meta.e_tag.as_ref().ok_or_else(|| { - VersionRepositoryError::Network("expecting an ETag to be present".into()) + VersionRepositoryError::UnexpectedCondition( + "expecting an ETag to be present".into(), + ) })?; let tag = etag.to_owned().into(); let mut buf = res @@ -157,9 +160,9 @@ impl VersionRepository for ObjectStoreVersionRepository { .await { Ok(res) => { - let etag = res - .e_tag - .ok_or_else(|| VersionRepositoryError::Network("expecting an etag".into()))?; + let etag = res.e_tag.ok_or_else(|| { + VersionRepositoryError::UnexpectedCondition("expecting an etag".into()) + })?; let tag: Tag = etag.into(); Ok(tag) } @@ -180,9 +183,9 @@ impl VersionRepository for ObjectStoreVersionRepository { .await { Ok(res) => { - let etag = res - .e_tag - .ok_or_else(|| VersionRepositoryError::Network("expecting an etag".into()))?; + let etag = res.e_tag.ok_or_else(|| { + VersionRepositoryError::UnexpectedCondition("expecting an etag".into()) + })?; let tag: Tag = etag.into(); Ok(tag) } @@ -239,6 +242,7 @@ mod tests { use bytes::{Buf, Bytes}; use bytestring::ByteString; use std::sync::Arc; + use tokio::task::JoinSet; const KEY_1: ByteString = ByteString::from_static("1"); const HELLO_WORLD: Bytes = Bytes::from_static(b"hello world"); @@ -353,7 +357,7 @@ mod tests { } } - #[tokio::test] + #[tokio::test(flavor = "multi_thread")] async fn concurrency_test() { let store = Arc::new(ObjectStoreVersionRepository::new_for_testing()); @@ -366,54 +370,22 @@ mod tests { // first task // - let store1 = store.clone(); - let t1 = tokio::spawn(async move { - for _ in 0..1024 { - loop { - let mut tv = store1.get(KEY_1.clone()).await.unwrap(); - - let mut n = tv.bytes.get_u64(); - n += 1; - - match store1 - .put_if_tag_matches( - KEY_1.clone(), - tv.tag, - Bytes::copy_from_slice(&n.to_be_bytes()), - ) - .await - { - Ok(_) => { - break; - } - Err(VersionRepositoryError::PreconditionFailed) => { - continue; - } - Err(e) => { - panic!("should not happend: {}", e); - } - } - } - } - }); - - // - // second task - // + let mut futures = JoinSet::new(); - let store2 = store.clone(); - let t2 = tokio::spawn(async move { - for _ in 0..1024 { + for _ in 0..2048 { + let cloned_store = store.clone(); + futures.spawn(async move { loop { - let mut tv = store2.get(KEY_1.clone()).await.unwrap(); + let (tag, mut bytes) = + cloned_store.get(KEY_1.clone()).await.unwrap().into_inner(); - let mut n = tv.bytes.get_u64(); + let mut n = bytes.get_u64(); n += 1; - match store2 + match cloned_store .put_if_tag_matches( KEY_1.clone(), - tv.tag, + tag, Bytes::copy_from_slice(&n.to_be_bytes()), ) .await @@ -425,18 +397,17 @@ mod tests { continue; } Err(e) => { - panic!("should not happen: {}", e); + panic!("should not happened: {}", e); } } } - } - }); + }); + } - t1.await.unwrap(); - t2.await.unwrap(); + futures.join_all().await; - let tv = store.get(KEY_1).await.unwrap(); + let (_, mut bytes) = store.get(KEY_1).await.unwrap().into_inner(); - assert_eq!(tv.bytes, Bytes::copy_from_slice(&2048u64.to_be_bytes())); + assert_eq!(bytes.get_u64(), 2048u64); } } diff --git a/crates/core/src/metadata_store/providers/objstore/optimistic_store.rs b/crates/core/src/metadata_store/providers/objstore/optimistic_store.rs index e1debd750..0800a2c0a 100644 --- a/crates/core/src/metadata_store/providers/objstore/optimistic_store.rs +++ b/crates/core/src/metadata_store/providers/objstore/optimistic_store.rs @@ -9,9 +9,8 @@ // by the Apache License, Version 2.0. use std::borrow::Cow; -use std::sync::Arc; -use bytes::Bytes; +use bytes::{BufMut, Bytes, BytesMut}; use bytestring::ByteString; use crate::metadata_store::providers::objstore::version_repository::VersionRepositoryError::PreconditionFailed; @@ -23,23 +22,22 @@ use restate_types::config::MetadataStoreClient; use restate_types::Version; pub(crate) struct OptimisticLockingMetadataStoreBuilder { - pub(crate) version_repository: Arc, + pub(crate) version_repository: Box, pub(crate) configuration: MetadataStoreClient, } impl OptimisticLockingMetadataStoreBuilder { - pub(crate) async fn build(&self) -> anyhow::Result { + pub(crate) async fn build(self) -> anyhow::Result { let MetadataStoreClient::ObjectStore { .. } = self.configuration else { anyhow::bail!("unexpected configuration value"); }; - Ok(OptimisticLockingMetadataStore::new( - self.version_repository.clone(), - )) + Ok(OptimisticLockingMetadataStore::new(self.version_repository)) } } pub struct OptimisticLockingMetadataStore { - version_repository: Arc, + version_repository: Box, + arena: BytesMut, } #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] @@ -48,16 +46,6 @@ enum OnDiskValue<'a> { V1(Cow<'a, VersionedValue>), } -fn versioned_value_to_bytes(v: &VersionedValue) -> anyhow::Result { - let on_disk = OnDiskValue::V1(Cow::Borrowed(v)); - - let mut buf = vec![]; - - ciborium::into_writer(&on_disk, &mut buf) - .map(|_| Bytes::from(buf)) - .map_err(Into::into) -} - fn tagged_value_to_versioned_value(tagged_value: &TaggedValue) -> anyhow::Result { let on_disk: OnDiskValue<'static> = ciborium::from_reader(tagged_value.bytes.as_ref())?; match on_disk { @@ -66,8 +54,11 @@ fn tagged_value_to_versioned_value(tagged_value: &TaggedValue) -> anyhow::Result } impl OptimisticLockingMetadataStore { - fn new(version_repository: Arc) -> Self { - Self { version_repository } + fn new(version_repository: Box) -> Self { + Self { + version_repository, + arena: BytesMut::with_capacity(8196), + } } pub(crate) async fn get( @@ -96,13 +87,27 @@ impl OptimisticLockingMetadataStore { } } + fn serialize_versioned_value( + &mut self, + versioned_value: &VersionedValue, + ) -> Result { + self.arena.clear(); + let buf = self.arena.split_off(0); + let mut writer = buf.writer(); + + let on_disk = OnDiskValue::V1(Cow::Borrowed(versioned_value)); + ciborium::into_writer(&on_disk, &mut writer) + .map(|_| writer.into_inner().freeze()) + .map_err(|e| WriteError::Codec(e.into())) + } + pub(crate) async fn put( &mut self, key: ByteString, value: VersionedValue, precondition: Precondition, ) -> Result<(), WriteError> { - let buf = versioned_value_to_bytes(&value).map_err(|e| WriteError::Codec(e.into()))?; + let buf = self.serialize_versioned_value(&value)?; match precondition { Precondition::None => { self.version_repository @@ -225,16 +230,15 @@ mod tests { use bytes::Bytes; use bytestring::ByteString; use restate_types::Version; - use std::sync::Arc; const KEY_1: ByteString = ByteString::from_static("1"); const HELLO: Bytes = Bytes::from_static(b"hello"); #[tokio::test] async fn basic_example() { - let mut store = OptimisticLockingMetadataStore { - version_repository: Arc::new(ObjectStoreVersionRepository::new_for_testing()), - }; + let mut store = OptimisticLockingMetadataStore::new(Box::new( + ObjectStoreVersionRepository::new_for_testing(), + )); store .put( @@ -248,9 +252,9 @@ mod tests { #[tokio::test] async fn put_if_absent() { - let mut store = OptimisticLockingMetadataStore { - version_repository: Arc::new(ObjectStoreVersionRepository::new_for_testing()), - }; + let mut store = OptimisticLockingMetadataStore::new(Box::new( + ObjectStoreVersionRepository::new_for_testing(), + )); store .put( @@ -264,9 +268,9 @@ mod tests { #[tokio::test] async fn put_if_absent_should_fail() { - let mut store = OptimisticLockingMetadataStore { - version_repository: Arc::new(ObjectStoreVersionRepository::new_for_testing()), - }; + let mut store = OptimisticLockingMetadataStore::new(Box::new( + ObjectStoreVersionRepository::new_for_testing(), + )); store .put( @@ -296,9 +300,9 @@ mod tests { #[tokio::test] async fn put_if_absent_on_deleted_value() { - let mut store = OptimisticLockingMetadataStore { - version_repository: Arc::new(ObjectStoreVersionRepository::new_for_testing()), - }; + let mut store = OptimisticLockingMetadataStore::new(Box::new( + ObjectStoreVersionRepository::new_for_testing(), + )); store .put( diff --git a/crates/core/src/metadata_store/providers/objstore/version_repository.rs b/crates/core/src/metadata_store/providers/objstore/version_repository.rs index ffd5840f7..12cc6c271 100644 --- a/crates/core/src/metadata_store/providers/objstore/version_repository.rs +++ b/crates/core/src/metadata_store/providers/objstore/version_repository.rs @@ -23,6 +23,8 @@ pub enum VersionRepositoryError { NotFound, #[error("Network {0}")] Network(GenericError), + #[error("Unexpected condition {0}")] + UnexpectedCondition(String), } #[derive(Debug, Eq, PartialEq, Hash, Clone)] @@ -46,6 +48,13 @@ pub(crate) struct TaggedValue { pub bytes: Bytes, } +impl TaggedValue { + #[cfg(test)] + pub(crate) fn into_inner(self) -> (Tag, Bytes) { + (self.tag, self.bytes) + } +} + #[async_trait::async_trait] pub(crate) trait VersionRepository: Sync + Send + 'static { async fn create(&self, key: ByteString, content: Bytes) -> Result; diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index b0c92eb86..63f33a3f4 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -497,13 +497,7 @@ pub struct MetadataStoreClientOptions { pub enum ObjectStoreCredentials { /// # Use standard AWS environment variables /// - /// configure the object store by setting the standard AWS env variables (prefixed with AWS_) - /// for example, to test with minio: - /// * AWS_ALLOW_HTTP="true" - /// * AWS_PROVIDER_NAME=Static - /// * AWS_ENDPOINT=http://127.0.0.1:9000 - /// * AWS_ACCESS_KEY_ID="minioadmin" - /// * AWS_SECRET_ACCESS_KEY="minioadmin" + /// Configure the object store by setting the standard AWS env variables (prefixed with AWS_) AwsEnv, }