Skip to content

Commit

Permalink
Add an object_store backed metadata store
Browse files Browse the repository at this point in the history
  • Loading branch information
igalshilman committed Nov 29, 2024
1 parent 6d189c1 commit 96c304d
Show file tree
Hide file tree
Showing 11 changed files with 1,112 additions and 3 deletions.
28 changes: 25 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ bitflags = { version = "2.6.0" }
bytes = { version = "1.7", features = ["serde"] }
bytes-utils = "0.1.3"
bytestring = { version = "1.2", features = ["serde"] }
ciborium = { version = "0.2.2" }
chrono = { version = "0.4.38", default-features = false, features = ["clock"] }
comfy-table = { version = "7.1" }
chrono-humanize = { version = "0.2.3" }
Expand All @@ -101,6 +102,7 @@ datafusion = { version = "42.0.0", default-features = false, features = [
"regex_expressions",
"unicode_expressions",
] }
object_store = { version = "0.11.1"}
datafusion-expr = { version = "42.0.0" }
derive_builder = "0.20.0"
derive_more = { version = "1", features = ["full"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ assert2 = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
ciborium = { workspace = true}
dashmap = { workspace = true }
derive_builder = { workspace = true }
derive_more = { workspace = true }
Expand All @@ -40,6 +41,7 @@ hyper-util = { workspace = true }
metrics = { workspace = true }
opentelemetry = { workspace = true }
once_cell = { workspace = true }
object_store = { workspace = true, features = ["aws"] }
parking_lot = { workspace = true }
pin-project-lite = { workspace = true }
prost = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/metadata_store/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@
// by the Apache License, Version 2.0.

mod etcd;
mod objstore;

pub use etcd::EtcdMetadataStore;
pub use objstore::create_object_store_based_meta_store;
175 changes: 175 additions & 0 deletions crates/core/src/metadata_store/providers/objstore/glue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::pin;

use bytestring::ByteString;
use restate_types::Version;
use tokio::sync::oneshot::Sender;
use tracing::warn;

use crate::cancellation_watcher;
use crate::metadata_store::providers::objstore::optimistic_store::OptimisticLockingMetadataStoreBuilder;
use crate::metadata_store::{MetadataStore, Precondition, ReadError, VersionedValue, WriteError};

#[derive(Debug)]
pub(crate) enum Commands {
Get {
key: ByteString,
tx: Sender<Result<Option<VersionedValue>, ReadError>>,
},
GetVersion {
key: ByteString,
tx: Sender<Result<Option<Version>, ReadError>>,
},
Put {
key: ByteString,
value: VersionedValue,
precondition: Precondition,
tx: Sender<Result<(), WriteError>>,
},
Delete {
key: ByteString,
precondition: Precondition,
tx: Sender<Result<(), WriteError>>,
},
}

pub(crate) struct Server {
receiver: tokio::sync::mpsc::UnboundedReceiver<Commands>,
builder: OptimisticLockingMetadataStoreBuilder,
}

impl Server {
pub(crate) fn new(
builder: OptimisticLockingMetadataStoreBuilder,
receiver: tokio::sync::mpsc::UnboundedReceiver<Commands>,
) -> Self {
Self { builder, receiver }
}

pub(crate) async fn run(self) -> anyhow::Result<()> {
let Server {
mut receiver,
builder,
} = self;

let mut shutdown = pin::pin!(cancellation_watcher());

let mut delegate = match builder.build().await {
Ok(delegate) => delegate,
Err(err) => {
warn!(error = ?err, "error while loading latest metastore version.");
return Err(err);
}
};

loop {
tokio::select! {
_ = &mut shutdown => {
// stop accepting messages
return Ok(());
}
Some(cmd) = receiver.recv() => {
match cmd {
Commands::Get{key,tx } => {
let res = delegate.get(key).await;
let _ = tx.send(res);
}
Commands::GetVersion{key,tx } => {
let _ = tx.send(delegate.get_version(key).await);
}
Commands::Put{key,value,precondition,tx } => {
let _ = tx.send(delegate.put(key, value, precondition).await);
}
Commands::Delete{key,precondition,tx } => {
let _ = tx.send(delegate.delete(key, precondition).await);
}
}
}
}
}
}
}

#[derive(Debug, Clone)]
pub struct Client {
sender: tokio::sync::mpsc::UnboundedSender<Commands>,
}

impl Client {
pub fn new(sender: tokio::sync::mpsc::UnboundedSender<Commands>) -> Self {
Self { sender }
}
}

#[async_trait::async_trait]
impl MetadataStore for Client {
async fn get(&self, key: ByteString) -> Result<Option<VersionedValue>, ReadError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::Get { key, tx })
.map_err(|_| ReadError::Internal("Object store fetch channel ".into()))?;

rx.await.map_err(|_| {
ReadError::Internal("Object store fetch channel disconnected".to_string())
})?
}

async fn get_version(&self, key: ByteString) -> Result<Option<Version>, ReadError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::GetVersion { key, tx })
.map_err(|_| ReadError::Internal("Object store fetch channel ".into()))?;

rx.await.map_err(|_| {
ReadError::Internal("Object store fetch channel disconnected".to_string())
})?
}

async fn put(
&self,
key: ByteString,
value: VersionedValue,
precondition: Precondition,
) -> Result<(), WriteError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::Put {
key,
value,
precondition,
tx,
})
.map_err(|_| WriteError::Internal("Object store channel ".into()))?;

rx.await
.map_err(|_| WriteError::Internal("Object store channel disconnected".to_string()))?
}

async fn delete(&self, key: ByteString, precondition: Precondition) -> Result<(), WriteError> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.sender
.send(Commands::Delete {
key,
precondition,
tx,
})
.map_err(|_| WriteError::Internal("Object store fetch channel ".into()))?;

rx.await.map_err(|_| {
WriteError::Internal("Object store fetch channel disconnected".to_string())
})?
}
}
53 changes: 53 additions & 0 deletions crates/core/src/metadata_store/providers/objstore/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::metadata_store::providers::objstore::object_store_version_repository::ObjectStoreVersionRepository;
use crate::metadata_store::providers::objstore::optimistic_store::OptimisticLockingMetadataStoreBuilder;
use crate::metadata_store::providers::objstore::version_repository::VersionRepository;
use crate::metadata_store::MetadataStore;
use crate::{TaskCenter, TaskKind};
use restate_types::config::MetadataStoreClient;
use restate_types::errors::GenericError;

mod glue;
mod object_store_version_repository;
mod optimistic_store;
mod version_repository;

pub async fn create_object_store_based_meta_store(
configuration: MetadataStoreClient,
) -> Result<impl MetadataStore, GenericError> {
// obtain an instance of a version repository from the configuration.
// we use an object_store backed version repository.
let version_repository = Box::new(ObjectStoreVersionRepository::from_configuration(
configuration.clone(),
)?) as Box<dyn VersionRepository>;

// postpone the building of the store to the background task,
// the runs at the task center.
let store_builder = OptimisticLockingMetadataStoreBuilder {
version_repository,
configuration,
};
//
// setup all the glue code, the forwarding client and the event loop server.
//
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let server = glue::Server::new(store_builder, rx);
TaskCenter::spawn(
TaskKind::MetadataStore,
"metadata-store-client",
server.run(),
)
.expect("unable to spawn a task");

let client = glue::Client::new(tx);
Ok(client)
}
Loading

0 comments on commit 96c304d

Please sign in to comment.