Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement partition store restore-from-snapshot #2353

Open
wants to merge 7 commits into
base: feat/snapshot-upload
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 27 additions & 19 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::Arc;

use rocksdb::ExportImportFilesMetaData;
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
use tracing::{debug, info, warn};

use crate::cf_options;
use crate::snapshots::LocalPartitionSnapshot;
Expand Down Expand Up @@ -86,9 +86,11 @@ impl PartitionStoreManager {
})
}

pub async fn has_partition(&self, partition_id: PartitionId) -> bool {
let guard = self.lookup.lock().await;
guard.live.contains_key(&partition_id)
/// Check whether we have a partition store for the given partition id, irrespective of whether
/// the store is open or not.
pub async fn has_partition_store(&self, partition_id: PartitionId) -> bool {
let cf_name = cf_for_partition(partition_id);
self.rocksdb.inner().cf_handle(&cf_name).is_some()
}

pub async fn get_partition_store(&self, partition_id: PartitionId) -> Option<PartitionStore> {
Expand Down Expand Up @@ -147,8 +149,7 @@ impl PartitionStoreManager {
let mut guard = self.lookup.lock().await;
if guard.live.contains_key(&partition_id) {
warn!(
?partition_id,
?snapshot,
%partition_id,
"The partition store is already open, refusing to import snapshot"
);
return Err(RocksError::AlreadyOpen);
Expand All @@ -158,32 +159,39 @@ impl PartitionStoreManager {
let cf_exists = self.rocksdb.inner().cf_handle(&cf_name).is_some();
if cf_exists {
warn!(
?partition_id,
?cf_name,
?snapshot,
%partition_id,
%cf_name,
"The column family for partition already exists in the database, cannot import snapshot"
);
return Err(RocksError::ColumnFamilyExists);
}

if snapshot.key_range.start() > partition_key_range.start()
|| snapshot.key_range.end() < partition_key_range.end()
{
warn!(
%partition_id,
snapshot_range = ?snapshot.key_range,
partition_range = ?partition_key_range,
"The snapshot key range does not fully cover the partition key range"
);
return Err(RocksError::SnapshotKeyRangeMismatch);
}
Comment on lines +169 to +179
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good check :-)


let mut import_metadata = ExportImportFilesMetaData::default();
import_metadata.set_db_comparator_name(snapshot.db_comparator_name.as_str());
import_metadata.set_files(&snapshot.files);

info!(
?partition_id,
min_applied_lsn = ?snapshot.min_applied_lsn,
"Initializing partition store from snapshot"
%partition_id,
min_lsn = %snapshot.min_applied_lsn,
path = ?snapshot.base_dir,
"Importing partition store snapshot"
);

if let Err(e) = self
.rocksdb
self.rocksdb
.import_cf(cf_name.clone(), opts, import_metadata)
.await
{
error!(?partition_id, "Failed to import snapshot");
return Err(e);
}
.await?;

assert!(self.rocksdb.inner().cf_handle(&cf_name).is_some());
let partition_store = PartitionStore::new(
Expand Down
3 changes: 3 additions & 0 deletions crates/rocksdb/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub enum RocksError {
#[error("already exists")]
#[code(unknown)]
ColumnFamilyExists,
#[error("invalid key range for partition")]
#[code(unknown)]
SnapshotKeyRangeMismatch,
#[error(transparent)]
#[code(unknown)]
Other(#[from] rocksdb::Error),
Expand Down
4 changes: 3 additions & 1 deletion crates/rocksdb/src/rock_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub trait RocksAccess {
default_cf_options: rocksdb::Options,
cf_patterns: Arc<[(BoxedCfMatcher, BoxedCfOptionUpdater)]>,
) -> Result<(), RocksError>;
/// Create a column family from a snapshot. The data files referenced by
/// `metadata` will be moved into the RocksDB data directory.
fn import_cf(
&self,
name: CfName,
Expand Down Expand Up @@ -163,7 +165,7 @@ impl RocksAccess for rocksdb::DB {
let options = prepare_cf_options(&cf_patterns, default_cf_options, &name)?;

let mut import_opts = ImportColumnFamilyOptions::default();
import_opts.set_move_files(false); // keep the snapshot files intact
import_opts.set_move_files(true);

Ok(Self::create_column_family_with_import(
self,
Expand Down
3 changes: 2 additions & 1 deletion crates/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
strum = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tokio-util = { workspace = true, features = ["io-util"] }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
ulid = { workspace = true }
Expand Down
150 changes: 144 additions & 6 deletions crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@ use object_store::aws::AmazonS3Builder;
use object_store::{MultipartUpload, ObjectStore, PutMode, PutOptions, PutPayload};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use tempfile::TempDir;
use tokio::io;
use tokio::io::AsyncReadExt;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tokio_util::io::StreamReader;
use tracing::{debug, info, instrument, trace};
use url::Url;

use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion};
use restate_partition_store::snapshots::{
LocalPartitionSnapshot, PartitionSnapshotMetadata, SnapshotFormatVersion,
};
use restate_types::config::SnapshotsOptions;
use restate_types::identifiers::{PartitionId, SnapshotId};
use restate_types::logs::Lsn;
Expand All @@ -49,8 +56,16 @@ pub struct SnapshotRepository {
object_store: Arc<dyn ObjectStore>,
destination: Url,
prefix: String,
base_dir: PathBuf,
}

/// S3 and other stores require a certain minimum size for the parts of a multipart upload. It is an
/// API error to attempt a multipart put below this size, apart from the final segment.
const MULTIPART_UPLOAD_CHUNK_SIZE_BYTES: usize = 5 * 1024 * 1024;

/// Maximum number of concurrent downloads when getting snapshots from the repository.
const DOWNLOAD_CONCURRENCY_LIMIT: usize = 8;

#[serde_as]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LatestSnapshot {
Expand Down Expand Up @@ -92,7 +107,7 @@ impl SnapshotRepository {
.into_os_string()
.into_string()
.map(|path| format!("file://{path}"))
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?
.map_err(|path| anyhow!("Unable to convert path to string: {:?}", path))?
};
let destination =
Url::parse(&destination).context("Failed parsing snapshot repository URL")?;
Expand Down Expand Up @@ -138,6 +153,7 @@ impl SnapshotRepository {
object_store,
destination,
prefix,
base_dir,
})
}

Expand Down Expand Up @@ -306,11 +322,133 @@ impl SnapshotRepository {

Ok(())
}
}

/// S3 and other stores require a certain minimum size for the parts of a multipart upload. It is an
/// API error to attempt a multipart put below this size, apart from the final segment.
const MULTIPART_UPLOAD_CHUNK_SIZE_BYTES: usize = 5 * 1024 * 1024;
/// Discover and download the latest snapshot available. Dropping the returned
/// `LocalPartitionSnapshot` will delete the local snapshot data files.
Comment on lines +326 to +327
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it because the files are stored in a temp directory? On LocalPartitionSnapshot itself I couldn't find how the files are deleted when dropping it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the temp dir also the mechanism to clean things up if downloading it failed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that TempDir::with_prefix_in takes care of it since it deletes the files when it gets dropped. This is a nice solution!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly right! I'm not 100% in love with it - it works but it's a big magical as deletion happens implicitly when LocalPartitionSnapshot is dropped, and that could be quite far removed from the find_latest call. Something that's hard to do with this approach is to retain the snapshot files if importing it fails, which could be useful for troubleshooting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description seems off though. If I understand the code correctly, then dropping LocalPartitionSnapshot won't delete the files. What will happen via TempDir is that if an error occurs before we call snapshot_dir.into_path(), then the snapshot_dir will be deleted. Afterwards its the responsibility of the whoever owns LocalPartitionSnapshot to clean things up.

#[instrument(
level = "debug",
skip_all,
err,
fields(%partition_id),
)]
pub(crate) async fn get_latest(
&self,
partition_id: PartitionId,
) -> anyhow::Result<Option<LocalPartitionSnapshot>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something for the future: It feels as if callers might be interested in why get_latest failed in the future. I could imagine that different errors are handled differently (e.g. retried because connection to S3 failed vs. unsupported snapshot format). So anyhow::Result (while convienent) might not be the perfect fit here.

let latest_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/latest.json",
prefix = self.prefix,
partition_id = partition_id,
));

let latest = self.object_store.get(&latest_path).await;

let latest = match latest {
Ok(result) => result,
Err(object_store::Error::NotFound { .. }) => {
debug!("Latest snapshot data not found in repository");
return Ok(None);
}
Err(e) => return Err(e.into()),
};

let latest: LatestSnapshot =
serde_json::from_slice(latest.bytes().await?.iter().as_slice())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does iter().as_slice() do? Would &latest.bytes().await? work instead?

trace!("Latest snapshot metadata: {:?}", latest);

let snapshot_metadata_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/{path}/metadata.json",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These paths are probably used on the write and read path. Should we share them through a function. That makes it easier to keep them in sync between the two paths.

prefix = self.prefix,
partition_id = partition_id,
path = latest.path,
));
let snapshot_metadata = self.object_store.get(&snapshot_metadata_path).await;

let snapshot_metadata = match snapshot_metadata {
Ok(result) => result,
Err(object_store::Error::NotFound { .. }) => {
info!("Latest snapshot points to a snapshot that was not found in the repository!");
return Ok(None); // arguably this could also be an error
Comment on lines +370 to +371
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering whether this does not denote a "corruption" of our snapshots and therefore might even warrant a panic? I mean we might still be lucky and don't encounter a trim gap because a) we haven't trimmed yet or b) our applied index is still after the trim point. So I guess this might have been the motivation to return None, here? This is actually more resilient than panicking in some cases. The downside is that we might be stuck in a retry loop if we are encountering a trim gap. Maybe raise the log level to warn so that this becomes more visible?

}
Err(e) => return Err(e.into()),
};

let mut snapshot_metadata: PartitionSnapshotMetadata =
serde_json::from_slice(snapshot_metadata.bytes().await?.iter().as_slice())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably works as well:

Suggested change
serde_json::from_slice(snapshot_metadata.bytes().await?.iter().as_slice())?;
serde_json::from_slice(&snapshot_metadata.bytes().await?)?;

if snapshot_metadata.version != SnapshotFormatVersion::V1 {
return Err(anyhow!(
"Unsupported snapshot format version: {:?}",
snapshot_metadata.version
));
}

// The snapshot ingest directory should be on the same filesystem as the partition store
// to minimize IO and disk space usage during import.
let snapshot_dir = TempDir::with_prefix_in(
format!("{}-", snapshot_metadata.snapshot_id),
&self.base_dir,
)?;
debug!(
snapshot_id = %snapshot_metadata.snapshot_id,
path = ?snapshot_dir.path(),
"Getting snapshot data",
);

let directory = snapshot_dir.path().to_string_lossy().to_string();
let concurrency_limiter = Arc::new(Semaphore::new(DOWNLOAD_CONCURRENCY_LIMIT));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DOWNLOAD_CONCURRENCY_LIMIT might be something we want to make configurable in the future.

let mut downloads = JoinSet::new();
for file in &mut snapshot_metadata.files {
let filename = file.name.trim_start_matches("/");
let key = object_store::path::Path::from(format!(
"{prefix}{partition_id}/{path}/{filename}",
prefix = self.prefix,
partition_id = partition_id,
path = latest.path,
filename = filename,
));
let file_path = snapshot_dir.path().join(filename);
let concurrency_limiter = Arc::clone(&concurrency_limiter);
let object_store = Arc::clone(&self.object_store);

downloads.spawn(async move {
let _permit = concurrency_limiter.acquire().await?;
let mut file_data = StreamReader::new(object_store.get(&key).await?.into_stream());
let mut snapshot_file = tokio::fs::File::create_new(&file_path).await?;
let size = io::copy(&mut file_data, &mut snapshot_file).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a sanity check that the downloaded file size matches what is expected from the snapshot metadata

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this solution for copying the file in streaming fashion :-)

trace!(%key, ?size, "Downloaded snapshot data file to {:?}", file_path);
anyhow::Ok(())
});
// patch the directory path to reflect the actual location on the restoring node
file.directory = directory.clone();
}

loop {
match downloads.join_next().await {
None => {
break;
}
Some(Err(e)) => {
downloads.abort_all();
return Err(e.into());
Comment on lines +431 to +432
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I think it does not make a difference right now for correctness, I would still recommend to drain downloads after aborting all because abort_all does not guarantee that tasks have completely finished (e.g. if something calls spawn_blocking).

}
Some(Ok(_)) => {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional to not handle errors returned by the download routine ? the _ here is actually an anyhow::Result which itself can be an error.

Check suggestion below

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right and we need to handle the inner error case as well. Right now, we might accept an incomplete snapshot as complete if any of the file downloads fails with an error and not a panic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably also want to include in the error message which file failed the download process.

}
}
Comment on lines +425 to +436
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
loop {
match downloads.join_next().await {
None => {
break;
}
Some(Err(e)) => {
downloads.abort_all();
return Err(e.into());
}
Some(Ok(_)) => {}
}
}
for result in downloads.join_next().await {
match result {
Err(err) => {
// join error
},
Ok(Err(err)) => {
// anyhow error
}
Ok(Ok(_)) => {
// download succeeded
},
}
}


info!(
snapshot_id = %snapshot_metadata.snapshot_id,
path = ?snapshot_dir.path(),
"Downloaded partition snapshot",
);
Ok(Some(LocalPartitionSnapshot {
base_dir: snapshot_dir.into_path(),
min_applied_lsn: snapshot_metadata.min_applied_lsn,
db_comparator_name: snapshot_metadata.db_comparator_name,
files: snapshot_metadata.files,
key_range: snapshot_metadata.key_range.clone(),
}))
}
}

async fn put_snapshot_object(
snapshot_path: &Path,
Expand Down
1 change: 1 addition & 0 deletions crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,7 @@ impl PartitionProcessorManager {
self.updateable_config.clone(),
self.bifrost.clone(),
self.partition_store_manager.clone(),
self.snapshot_repository.clone(),
)
}

Expand Down
Loading
Loading