Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object store metastore #2309

Merged
merged 1 commit into from
Nov 29, 2024
Merged

Conversation

igalshilman
Copy link
Contributor

@igalshilman igalshilman commented Nov 18, 2024

Object store backed Metadata store

This PR introduces a new type of a metastore that uses S3 (any similar) object stores that support conditional updates.

end result

running the following:

export AWS_REGION=eu-central-1
export AWS_ACCESS_KEY_ID=
export AWS_SECRET_ACCESS_KEY=
export AWS_SESSION_TOKEN="

export AWS_ENDPOINT=https://s3.eu-central-1.amazonaws.com

export AWS_PROVIDER_NAME=Static
export RESTATE_METADATA_STORE_CLIENT__TYPE="object-store"
export RESTATE_METADATA_STORE_CLIENT__BUCKET="XXXXXXXX"
export RESTATE_METADATA_STORE_CLIENT__CREDENTIALS__TYPE="aws-env"

cargo run --example three_nodes_and_metadata -- --nocapture

starts a 3 node cluster that uses an s3 bucket for a metadata storage:

[nix-shell:~/work/restate]$ ./source.s3.sh 
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.73s
     Running `target/debug/examples/three_nodes_and_metadata --nocapture`
2024-11-27T14:38:34.545666Z  INFO restate_local_cluster_runner::cluster: Starting cluster test-cluster in /home/igal/work/restate/restate-data
2024-11-27T14:38:34.549226Z  INFO restate_local_cluster_runner::node: Started node metadata-node in /home/igal/work/restate/restate-data/metadata-node (pid 233735)
2024-11-27T14:38:35.825120Z  INFO three_nodes_and_metadata: admin node started: 2024-11-27T14:38:35.824212Z  INFO restate_node: Restate server is ready
2024-11-27T14:38:35.984871Z  INFO restate_local_cluster_runner::node: Node metadata-node admin check is healthy after 6 attempts
2024-11-27T14:38:35.989555Z  INFO restate_local_cluster_runner::node: Started node node-1 in /home/igal/work/restate/restate-data/node-1 (pid 233916)
2024-11-27T14:38:35.994266Z  INFO restate_local_cluster_runner::node: Started node node-2 in /home/igal/work/restate/restate-data/node-2 (pid 233917)
2024-11-27T14:38:35.998782Z  INFO restate_local_cluster_runner::node: Started node node-3 in /home/igal/work/restate/restate-data/node-3 (pid 233918)

listing the bucket yields the possible keys

[nix-shell:~]$ aws s3 ls s3://XXXX
2024-11-27 15:38:40       1221 bifrost_config
2024-11-27 15:38:38        950 nodes_config
2024-11-27 15:35:54        385 partition_table
2024-11-27 15:38:41        112 pp_epoch_0
2024-11-27 15:38:41        112 pp_epoch_1
2024-11-27 15:38:41        112 pp_epoch_2
2024-11-27 15:38:41        112 pp_epoch_3
2024-11-27 15:38:40        519 scheduling_plan

Additional configuration keys

[metadata-store-client]    
type = "object-store"    
bucket = "restate-metadata"    
    
[metadata-store-client.credentials]    
type = "aws-env"  

How does this work?

This is basically a client, each node runs an instance of this client, and the clients are using S3's optimistic concurrency control to move forward the different keys.

currently the only support cred type are AWS_X env variables, a followup referenced here in the conversation would be to plug in additional cred providers (unify the work done by @pcholakov )

@igalshilman igalshilman force-pushed the s3metastore branch 5 times, most recently from be3e74c to 638607a Compare November 19, 2024 16:45
@tillrohrmann tillrohrmann self-requested a review November 20, 2024 14:20
@igalshilman igalshilman force-pushed the s3metastore branch 3 times, most recently from ab8beca to 2f76ae8 Compare November 22, 2024 13:44
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating our object store based metadata store @igalshilman :-) It looks really nice! The implementation looks correct to me :-)

I left a few minor comments and questions. The one thing that is not fully clear to me is whether we want to clean up older versions or not. If not, would this be problematic for the get_latest_version which needs to list more and more versions?


let mut shutdown = pin::pin!(cancellation_watcher());

let delegate = match builder.build().await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this await take a long time? If yes, then maybe select over the shutdown to also support shutdowns during this time.

Copy link
Contributor Author

@igalshilman igalshilman Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this await take a long time?

not any more.

}
};

let mut refresh = tokio::time::interval(Duration::from_secs(2));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably something we want to make configurable at some point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outdated, it doesn't exists any more.


#[derive(Debug, Clone)]
pub struct Client {
sender: tokio::sync::mpsc::UnboundedSender<Commands>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a bounded sender also work?

Comment on lines 169 to 163
.map_err(|_| WriteError::Internal("Object store fetch channel ".into()))?;

rx.await.map_err(|_| {
WriteError::Internal("Object store fetch channel disconnected".to_string())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error messages seem to be copied from the get call.

Comment on lines 41 to 44
// serialize as json
serde_json::to_vec(self).map(Into::into).map_err(Into::into)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the json encoding would be more helpful if the values were also encoded in json?

}

#[async_trait::async_trait]
impl MetadataStore for OptimisticLockingMetadataStore {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the OptimisticLockingMetadataStore implements the MetadataStore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, it is not any more.


pub struct OptimisticLockingMetadataStore {
version_repository: Arc<dyn VersionRepository>,
latest_store_cache: ArcSwap<Option<CachedStore>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this also be a CachedStore (w/o ArcSwap<Option<>>) and change the methods to take a mutable borrow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it won't implement the metastore trait then I can do that.
Let me try this out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worked out beautifully 🧑‍🍳

Comment on lines 187 to 188
// refresh and try again
let delay = {
let mut rng = rand::thread_rng();
rng.gen_range(50..300)
};
tokio::time::sleep(Duration::from_millis(delay)).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to replace this with a RetryPolicy.


match self
.version_repository
.try_create_version(new_store.current_version(), new_store_bytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correctness of the implementation relies on the fact that our version space won't have any wholes, right? I guess this makes cleaning older versions up a bit more tricky.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe assert that new_store.current_version() == cached_store.store.version().next() to state that there mustn't be any wholes.

Copy link
Contributor

@pcholakov pcholakov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might have a better path to implement this, using Object Versioning. A quick search shows that this is supported across MinIO / Azure / GCP, here's the S3 documentation: https://docs.aws.amazon.com/AmazonS3/latest/userguide/Versioning.html.

Here's a sketch of the conditional PUT path:

async fn put(
    &self,
    key: ByteString,
    value: VersionedValue,
    precondition: Precondition,
) -> Result<(), WriteError> {
    let key = object_store::path::Path::from(key.to_string());

    match precondition {
        Precondition::MatchesVersion(version) => {
            // If we have a cached version, we can also add an if-modified-since condition to the GET.
            // Unfortunately we can't set the version explicitly, and a HEAD request doesn't return enough
            // information to determine our own metadata value version.
            let get_result = self.object_store.get(&key).await.map_err(|e| {
                WriteError::Internal(format!("Failed to check precondition: {}", e))
            })?;

            if extract_object_version(&get_result.payload) != version {
                return Err(WriteError::FailedPrecondition(
                    "Version mismatch".to_string(),
                ));
            }

            self.object_store
                .put_opts(
                    &key,
                    PutPayload::from_bytes(serialize_versioned_value(value)),
                    PutOptions::from(PutMode::Update(UpdateVersion::from(&get_result))),
                )
                .await
                .map_err(|e| WriteError::Internal(format!("Failed to update value: {}", e)))?;

            Ok(())
        }
        _ => todo!(),
    }
}

Unfortunately we don't control the versions; in S3 they are a monotonic numbered sequence but we don't get to set them directly - rather, S3 will set them for us. Our own API relies on explicit versions so I'm assuming we'll just serialize them along with the rest of the payload as the object body.

A normal GET request implicitly returns the latest version, no further tricks required. It's also possible to request a particular previous version, but that's not needed for our API. Might be useful for troubleshooting though! (And I'd seriously consider serializing the values as JSON for operations friendliness.)

The only requirement for this all to work is that we must use a bucket with object versioning enabled, which is not the default but is trivial to set. S3 and other stores also support automatic cleanup of old versions using object lifecycle policies, so we don't need to implement that ourselves.

return Ok(());
};

if maybe_cached_version.is_none() || maybe_cached_version.unwrap() < global_latest_version {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the !is_some_and(...) version personally.

Comment on lines 59 to 78
let mut stream = self.object_store.list(None);

// TODO: we should switch to a create internal Version
let mut current = 0;
while let Some(version) = stream
.try_next()
.await
.map_err(|e| VersionRepositoryError::Network(e.into()))?
{
if let Some(name) = version.location.filename() {
if !name.starts_with("v") {
continue;
}
if let Ok(v) = name[1..].parse::<u32>() {
if v > current {
current = v;
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do this without LIST operations but rather using object versioning.

@tillrohrmann
Copy link
Contributor

tillrohrmann commented Nov 23, 2024

I think we might have a better path to implement this, using Object Versioning. A quick search shows that this is supported across MinIO / Azure / GCP, here's the S3 documentation: https://docs.aws.amazon.com/AmazonS3/latest/userguide/Versioning.html.

Here's a sketch of the conditional PUT path:

async fn put(
    &self,
    key: ByteString,
    value: VersionedValue,
    precondition: Precondition,
) -> Result<(), WriteError> {
    let key = object_store::path::Path::from(key.to_string());

    match precondition {
        Precondition::MatchesVersion(version) => {
            // If we have a cached version, we can also add an if-modified-since condition to the GET.
            // Unfortunately we can't set the version explicitly, and a HEAD request doesn't return enough
            // information to determine our own metadata value version.
            let get_result = self.object_store.get(&key).await.map_err(|e| {
                WriteError::Internal(format!("Failed to check precondition: {}", e))
            })?;

            if extract_object_version(&get_result.payload) != version {
                return Err(WriteError::FailedPrecondition(
                    "Version mismatch".to_string(),
                ));
            }

            self.object_store
                .put_opts(
                    &key,
                    PutPayload::from_bytes(serialize_versioned_value(value)),
                    PutOptions::from(PutMode::Update(UpdateVersion::from(&get_result))),
                )
                .await
                .map_err(|e| WriteError::Internal(format!("Failed to update value: {}", e)))?;

            Ok(())
        }
        _ => todo!(),
    }
}

Unfortunately we don't control the versions; in S3 they are a monotonic numbered sequence but we don't get to set them directly - rather, S3 will set them for us. Our own API relies on explicit versions so I'm assuming we'll just serialize them along with the rest of the payload as the object body.

A normal GET request implicitly returns the latest version, no further tricks required. It's also possible to request a particular previous version, but that's not needed for our API. Might be useful for troubleshooting though! (And I'd seriously consider serializing the values as JSON for operations friendliness.)

The only requirement for this all to work is that we must use a bucket with object versioning enabled, which is not the default but is trivial to set. S3 and other stores also support automatic cleanup of old versions using object lifecycle policies, so we don't need to implement that ourselves.

Wouldn't the PutMode::Update require an If-Match header support from S3? I thought that it currently only supports If-None-Match natively and for PutMode::Update one would have to use DynamoDB? Or is this different when using versioned buckets?

@pcholakov
Copy link
Contributor

@tillrohrmann you are right, I was wrong; this should work on Minio and Cloudflare R2, but not with S3. I got excited when I saw how elegant the object versioning API looked but it requires If-Match support by the underlying object store, which is a no-go.

Copy link
Contributor

@pcholakov pcholakov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a couple more minor questions I had while unpacking this :-)

update_fn(&mut new_map);

Self {
global_version: self.global_version.next(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if approaching this as a into_builder() / build_next_version() might not be a safer API. Chaining multiple update_map calls will create gaps in the version sequence, which we want to avoid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what do you mean, it is an internal impl detail, the only way to get a new version is by using an update_map.
A builder would be implemented exactly like that and kept internal.

/// a version that is increased with every mutating operation.
/// do not confuse with the per-key versions.
global_version: Version,
/// The collection of versioned values.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we not be better off doing this per metadata store key, rather than as a monolithic blob? We don't have a global linearizability requirement and writing key-value pairs as individual objects would reduce conflicts. Not too worried about the value sizes but there's still some nontrivial write amplification going on with the current implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The blob is tiny ~ kb , mutations are rare. with that we can amortize API calls like listing etc'

@ScmTble
Copy link

ScmTble commented Nov 26, 2024

@tillrohrmann you are right, I was wrong; this should work on Minio and Cloudflare R2, but not with S3. I got excited when I saw how elegant the object versioning API looked but it requires If-Match support by the underlying object store, which is a no-go.

https://aws.amazon.com/about-aws/whats-new/2024/11/amazon-s3-functionality-conditional-writes/ It seems support has been added

@tillrohrmann
Copy link
Contributor

https://aws.amazon.com/about-aws/whats-new/2024/11/amazon-s3-functionality-conditional-writes/ It seems support has been added

Nice, this was quick 😄

@pcholakov
Copy link
Contributor

Finally! 🎉😆

@igalshilman igalshilman force-pushed the s3metastore branch 2 times, most recently from 6788660 to 91b0966 Compare November 26, 2024 21:04
@igalshilman igalshilman changed the title [wip] object store metastore object store metastore Nov 27, 2024
@tillrohrmann tillrohrmann self-requested a review November 28, 2024 23:04
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this PR to use S3's new conditional put functionality @igalshilman :-) The changes look really nice :-) I left a few minor comments and suggestions. Did you try it out with a S3 bucket? +1 for merging.

I gues you could switch this PR from draft to normal.

};

loop {
select! {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
select! {
tokio::select! {

to make it a tad bit clearer that we using Tokio's select statement.

Comment on lines 98 to 101
else => {
tracing::info!("input channel is closed, exiting the metadata polling loop.");
break;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this else branch will never be executed because the first branch will never be disabled. If you want to break out of the loop when receiver closes, then I would suggest to do have a result = receiver.recv() => {} and do the pattern matching in the body of this arm.

configuration: MetadataStoreClient,
) -> Result<impl MetadataStore, GenericError> {
// obtain an instance of a version repository from the configuration.
// we use an [[object_store]] backed version repository.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To link single brackets are enough [object_store]. Those are, however, only respected in rust doc and not normal comments.


#[derive(Debug)]
pub(crate) struct ObjectStoreVersionRepository {
object_store: Arc<dyn ObjectStore>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is object_store behind an Arc? It seems that ObjectStoreVersionRepository won't be cloned. Then it could also be a simple Box.

Comment on lines +31 to +42
let MetadataStoreClient::ObjectStore {
credentials,
bucket,
..
} = configuration
else {
anyhow::bail!("unexpected configuration value");
};

let builder = match credentials {
ObjectStoreCredentials::AwsEnv => AmazonS3Builder::from_env(),
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we talked about this with @pcholakov and agreed that we do this alignment as a followup once both of these prs are in, otherwise we'll be copy pasting code between PRs right now.

Comment on lines 356 to 361
#[tokio::test]
async fn concurrency_test() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also spawn a multi threaded runtime via tokio::test(flavor = "multi-thread"). That way we also test parallelism. This is probably not super important, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is very good idea, i'm going to change that.
my original intention was to see how it behaves with parallelism

fn versioned_value_to_bytes(v: &VersionedValue) -> anyhow::Result<Bytes> {
let on_disk = OnDiskValue::V1(Cow::Borrowed(v));

let mut buf = vec![];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we pass in this buffer (using a BytesMut), then it could live in the OptimisticLockingMetadataStore and be reused across multiple serializations.

pub enum ObjectStoreCredentials {
/// # Use standard AWS environment variables
///
/// configure the object store by setting the standard AWS env variables (prefixed with AWS_)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// configure the object store by setting the standard AWS env variables (prefixed with AWS_)
/// Configure the object store by setting the standard AWS env variables (prefixed with AWS_)

Comment on lines 501 to 506
/// for example, to test with minio:
/// * AWS_ALLOW_HTTP="true"
/// * AWS_PROVIDER_NAME=Static
/// * AWS_ENDPOINT=http://127.0.0.1:9000
/// * AWS_ACCESS_KEY_ID="minioadmin"
/// * AWS_SECRET_ACCESS_KEY="minioadmin"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part will be added to our docs once we generate the config docs. Is this something we want to keep or is it for internal testing purposes?

credentials: ObjectStoreCredentials,

/// # The bucket name to use for storage
#[cfg_attr(feature = "schemars", schemars(with = "String"))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably don't have to add this given that it's an identity function (String -> String).

@igalshilman
Copy link
Contributor Author

Thanks for updating this PR to use S3's new conditional put functionality @igalshilman :-) The changes look really nice :-) I left a few minor comments and suggestions. Did you try it out with a S3 bucket? +1 for merging.

I gues you could switch this PR from draft to normal.

@tillrohrmann yes, the PR description shows a real AWS S3 interaction.

@igalshilman igalshilman marked this pull request as ready for review November 29, 2024 15:58
@igalshilman igalshilman merged commit 72596f6 into restatedev:main Nov 29, 2024
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants