Skip to content

Commit

Permalink
Code review
Browse files Browse the repository at this point in the history
  • Loading branch information
igalshilman committed Nov 29, 2024
1 parent 91b0966 commit 6fd9cf2
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 119 deletions.
8 changes: 1 addition & 7 deletions crates/core/src/metadata_store/providers/objstore/glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::pin;

use bytestring::ByteString;
use restate_types::Version;
use tokio::select;
use tokio::sync::oneshot::Sender;
use tracing::warn;

Expand Down Expand Up @@ -73,7 +72,7 @@ impl Server {
};

loop {
select! {
tokio::select! {
_ = &mut shutdown => {
// stop accepting messages
return Ok(());
Expand All @@ -95,13 +94,8 @@ impl Server {
}
}
}
else => {
tracing::info!("input channel is closed, exiting the metadata polling loop.");
break;
}
}
}
Ok(())
}
}

Expand Down
8 changes: 3 additions & 5 deletions crates/core/src/metadata_store/providers/objstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use crate::metadata_store::providers::objstore::object_store_version_repository::ObjectStoreVersionRepository;
use crate::metadata_store::providers::objstore::optimistic_store::OptimisticLockingMetadataStoreBuilder;
use crate::metadata_store::providers::objstore::version_repository::VersionRepository;
Expand All @@ -27,10 +25,10 @@ pub async fn create_object_store_based_meta_store(
configuration: MetadataStoreClient,
) -> Result<impl MetadataStore, GenericError> {
// obtain an instance of a version repository from the configuration.
// we use an [[object_store]] backed version repository.
let version_repository = Arc::new(ObjectStoreVersionRepository::from_configuration(
// we use an object_store backed version repository.
let version_repository = Box::new(ObjectStoreVersionRepository::from_configuration(
configuration.clone(),
)?) as Arc<dyn VersionRepository>;
)?) as Box<dyn VersionRepository>;

// postpone the building of the store to the background task,
// the runs at the task center.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,21 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use crate::metadata_store::providers::objstore::version_repository::{
Tag, TaggedValue, VersionRepository, VersionRepositoryError,
};
use bytes::Bytes;
use bytestring::ByteString;
use object_store::aws::AmazonS3Builder;
use object_store::aws::S3ConditionalPut::ETagMatch;
use object_store::path::Path;
use object_store::{Error, ObjectStore, PutMode, PutOptions, PutPayload, UpdateVersion};

use crate::metadata_store::providers::objstore::version_repository::{
Tag, TaggedValue, VersionRepository, VersionRepositoryError,
};
use restate_types::config::{MetadataStoreClient, ObjectStoreCredentials};

#[derive(Debug)]
pub(crate) struct ObjectStoreVersionRepository {
object_store: Arc<dyn ObjectStore>,
object_store: Box<dyn ObjectStore>,
}

impl ObjectStoreVersionRepository {
Expand All @@ -48,15 +47,15 @@ impl ObjectStoreVersionRepository {
.map_err(|e| anyhow::anyhow!("Unable to build an S3 object store: {}", e))?;

Ok(Self {
object_store: Arc::new(store),
object_store: Box::new(store),
})
}

#[cfg(test)]
pub(crate) fn new_for_testing() -> Self {
let store = object_store::memory::InMemory::new();
Self {
object_store: Arc::new(store),
object_store: Box::new(store),
}
}
}
Expand All @@ -79,7 +78,9 @@ impl VersionRepository for ObjectStoreVersionRepository {
match self.object_store.put_opts(&path, payload, opts).await {
Ok(res) => {
let etag = res.e_tag.ok_or_else(|| {
VersionRepositoryError::Network("expecting an ETag to be present".into())
VersionRepositoryError::UnexpectedCondition(
"expecting an ETag to be present".into(),
)
})?;
Ok(etag.into())
}
Expand All @@ -93,7 +94,7 @@ impl VersionRepository for ObjectStoreVersionRepository {
.await
.map_err(|e| VersionRepositoryError::Network(e.into()))?;
let etag = get_result.meta.e_tag.as_ref().ok_or_else(|| {
VersionRepositoryError::Network("was expecting an etag".into())
VersionRepositoryError::UnexpectedCondition("was expecting an etag".into())
})?;
let tag: Tag = etag.to_owned().into();
let bytes = get_result
Expand All @@ -115,7 +116,9 @@ impl VersionRepository for ObjectStoreVersionRepository {
match self.object_store.get(&path).await {
Ok(res) => {
let etag = res.meta.e_tag.as_ref().ok_or_else(|| {
VersionRepositoryError::Network("expecting an ETag to be present".into())
VersionRepositoryError::UnexpectedCondition(
"expecting an ETag to be present".into(),
)
})?;
let tag = etag.to_owned().into();
let mut buf = res
Expand Down Expand Up @@ -157,9 +160,9 @@ impl VersionRepository for ObjectStoreVersionRepository {
.await
{
Ok(res) => {
let etag = res
.e_tag
.ok_or_else(|| VersionRepositoryError::Network("expecting an etag".into()))?;
let etag = res.e_tag.ok_or_else(|| {
VersionRepositoryError::UnexpectedCondition("expecting an etag".into())
})?;
let tag: Tag = etag.into();
Ok(tag)
}
Expand All @@ -180,9 +183,9 @@ impl VersionRepository for ObjectStoreVersionRepository {
.await
{
Ok(res) => {
let etag = res
.e_tag
.ok_or_else(|| VersionRepositoryError::Network("expecting an etag".into()))?;
let etag = res.e_tag.ok_or_else(|| {
VersionRepositoryError::UnexpectedCondition("expecting an etag".into())
})?;
let tag: Tag = etag.into();
Ok(tag)
}
Expand Down Expand Up @@ -239,6 +242,7 @@ mod tests {
use bytes::{Buf, Bytes};
use bytestring::ByteString;
use std::sync::Arc;
use tokio::task::JoinSet;

const KEY_1: ByteString = ByteString::from_static("1");
const HELLO_WORLD: Bytes = Bytes::from_static(b"hello world");
Expand Down Expand Up @@ -353,7 +357,7 @@ mod tests {
}
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn concurrency_test() {
let store = Arc::new(ObjectStoreVersionRepository::new_for_testing());

Expand All @@ -366,54 +370,22 @@ mod tests {
// first task
//

let store1 = store.clone();
let t1 = tokio::spawn(async move {
for _ in 0..1024 {
loop {
let mut tv = store1.get(KEY_1.clone()).await.unwrap();

let mut n = tv.bytes.get_u64();
n += 1;

match store1
.put_if_tag_matches(
KEY_1.clone(),
tv.tag,
Bytes::copy_from_slice(&n.to_be_bytes()),
)
.await
{
Ok(_) => {
break;
}
Err(VersionRepositoryError::PreconditionFailed) => {
continue;
}
Err(e) => {
panic!("should not happend: {}", e);
}
}
}
}
});

//
// second task
//
let mut futures = JoinSet::new();

let store2 = store.clone();
let t2 = tokio::spawn(async move {
for _ in 0..1024 {
for _ in 0..2048 {
let cloned_store = store.clone();
futures.spawn(async move {
loop {
let mut tv = store2.get(KEY_1.clone()).await.unwrap();
let (tag, mut bytes) =
cloned_store.get(KEY_1.clone()).await.unwrap().into_inner();

let mut n = tv.bytes.get_u64();
let mut n = bytes.get_u64();
n += 1;

match store2
match cloned_store
.put_if_tag_matches(
KEY_1.clone(),
tv.tag,
tag,
Bytes::copy_from_slice(&n.to_be_bytes()),
)
.await
Expand All @@ -425,18 +397,17 @@ mod tests {
continue;
}
Err(e) => {
panic!("should not happen: {}", e);
panic!("should not happened: {}", e);
}
}
}
}
});
});
}

t1.await.unwrap();
t2.await.unwrap();
futures.join_all().await;

let tv = store.get(KEY_1).await.unwrap();
let (_, mut bytes) = store.get(KEY_1).await.unwrap().into_inner();

assert_eq!(tv.bytes, Bytes::copy_from_slice(&2048u64.to_be_bytes()));
assert_eq!(bytes.get_u64(), 2048u64);
}
}
Loading

0 comments on commit 6fd9cf2

Please sign in to comment.