Skip to content

Commit

Permalink
Add support for opendal stores
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Dec 26, 2023
1 parent 60a6832 commit 72bcbbb
Show file tree
Hide file tree
Showing 10 changed files with 555 additions and 14 deletions.
12 changes: 7 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Readable and writable stores include a `new_with_locks` method to choose the store lock implementation
- Added `ArraySubset::new_with_ranges`
- Added `ByteRange::offset`
- Added `object_store` feature with `AsyncObjectStore` store (wraps `object_store::ObjectStore`)
- **Breaking** Removes the explicit `object_store`-based stores (e.g. `AsyncAmazonS3Store`, `AsyncHTTPStore`)
- **Breaking** Removes the `object_store_impl` macro
- **Breaking** Removes the `s3`, `gcp`, and `azure` crate features
- Added `opendal` feature
- `OpendalStore` store (wraps `opendal::BlockingOperator`)
- `AsyncOpendalStore` store (wraps `opendal::Operator`)

### Changed
- **Breaking** `ReadableStorageTraits` is no longer a supertrait of `WritableStorageTraits`
Expand All @@ -27,11 +34,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **Breaking** Remove `http` and `zip` from default features
- Locking functionality for arrays is moved into stores
- Improved `Array` documentation
- Add `object_store` feature and revise `object_store` support
- Add an `ObjectStore` store, which wraps any `object_store`-based store implementing `object_store::ObjectStore`.
- **Breaking** Removes the explicit `object_store`-based stores (e.g. `AsyncAmazonS3Store`, `AsyncHTTPStore`)
- **Breaking** Removes the `object_store_impl` macro
- **Breaking** Removes the `s3`, `gcp`, and `azure` crate features
- Add store testing utility functions for unified store testing

### Fixed
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ zip = ["dep:zip"] # Enable the zip storage adapter
ndarray = ["dep:ndarray"] # Adds ndarray utility functions to Array
async = ["dep:async-trait", "dep:async-recursion", "dep:async-lock", "dep:futures"] # Enable experimental async API
object_store = ["dep:object_store"] # Enable object_store asynchronous stores support
opendal = ["dep:opendal"] # Enable opendal asynchronous stores support

[package.metadata.docs.rs]
all-features = true
Expand All @@ -52,6 +53,7 @@ itertools = "0.12"
ndarray = { version = "0.15", optional = true }
num = { version = "0.4" }
object_store = { version = "0.8.0", optional = true }
opendal = { version = "0.43.0", optional = true }
parking_lot = "0.12"
pathdiff = "0.2"
rayon = "1.6"
Expand Down
3 changes: 2 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## TODO
- Support [OpenDAL](https://crates.io/crates/opendal) stores
- Review documentation
- Increase test coverage
- URI support [see ZEP0008](https://github.com/zarr-developers/zeps/pull/48)
- Add a performance comparison to other Zarr implementations
- Add a performance comparison to other Zarr implementations
18 changes: 13 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,16 @@
//! - [x] [ZEP0001 - Zarr specification version 3](https://zarr.dev/zeps/draft/ZEP0001.html).
//! - [x] [ZEP0002 - Sharding codec](https://zarr.dev/zeps/draft/ZEP0002.html) ([under review](https://github.com/zarr-developers/zarr-specs/issues/254)).
//! - [x] [ZEP0003 - Variable chunking](https://zarr.dev/zeps/draft/ZEP0003.html) ([draft](https://github.com/orgs/zarr-developers/discussions/52)).
//! - [x] Stores:
//! - Sync: [filesystem](crate::storage::store::FilesystemStore), [in memory](crate::storage::store::MemoryStore), [HTTP](crate::storage::store::HTTPStore), [ZIP](crate::storage::storage_adapter::ZipStorageAdapter).
//! - Async: [`AsyncObjectStore`](crate::storage::store::AsyncObjectStore) (supports all [`object_store::ObjectStore`] stores, e.g. in-memory, HTTP, Google Cloud Storage, Amazon S3, Microsoft Azure Storage, etc.).
//! - [x] Stores and storage adapters:
//! - Sync:
//! - [`FilesystemStore`](crate::storage::store::FilesystemStore).
//! - [`MemoryStore`](crate::storage::store::MemoryStore).
//! - [`HTTPStore`](crate::storage::store::HTTPStore).
//! - [`ZipStorageAdapter`](crate::storage::storage_adapter::ZipStorageAdapter).
//! - [`OpendalStore`](crate::storage::store::OpendalStore) (supports all [`opendal` services](https://docs.rs/opendal/latest/opendal/services/index.html)).
//! - Async:
//! - [`AsyncObjectStore`](crate::storage::store::AsyncObjectStore) (supports all [`object_store` stores](https://docs.rs/object_store/latest/object_store/index.html#modules)).
//! - [`AsyncOpendalStore`](crate::storage::store::AsyncOpendalStore) (supports all [`opendal` services](https://docs.rs/opendal/latest/opendal/services/index.html)).
//! - [x] Data types: [core data types](crate::array::data_type::DataType), [raw bits](crate::array::data_type::DataType::RawBits), [float16](crate::array::data_type::DataType::Float16), [bfloat16](crate::array::data_type::DataType::BFloat16) [(spec issue)](https://github.com/zarr-developers/zarr-specs/issues/130).
//! - [x] Chunk grids: [regular](crate::array::chunk_grid::RegularChunkGrid), [rectangular](crate::array::chunk_grid::RectangularChunkGrid) ([draft](https://github.com/orgs/zarr-developers/discussions/52)).
//! - [x] Chunk key encoding: [default](crate::array::chunk_key_encoding::DefaultChunkKeyEncoding), [v2](crate::array::chunk_key_encoding::V2ChunkKeyEncoding).
Expand All @@ -33,9 +40,10 @@
//! - Codecs: `blosc`, `gzip`, `transpose`, `zstd`, `sharding`, `crc32c`.
//!
//! The following features are disabled by default:
//! - `async`: asynchronous stores and associated storage, array, and group methods.
//! - Codecs: `bitround`, `zfp`.
//! - Stores: `http`, `zip`, `s3` (Amazon S3) `gcp` (Google Cloud), `azure` (Microsoft Azure).
//! - `async`: asynchronous API for stores, arrays, and groups.
//! - `object_store`: support for [`object_store`] stores.
//! - `opendal`: support for [`opendal`] stores.
//!
//! ## Examples
//! Examples can be run with `cargo run --example EXAMPLE_NAME`.
Expand Down
7 changes: 7 additions & 0 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ impl From<String> for StorageError {
}
}

#[cfg(feature = "opendal")]
impl From<opendal::Error> for StorageError {
fn from(err: opendal::Error) -> Self {
Self::Other(err.to_string())
}
}

/// Return the metadata key given a node path.
#[must_use]
pub fn meta_key(path: &NodePath) -> StoreKey {
Expand Down
9 changes: 6 additions & 3 deletions src/storage/store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! Zarr stores.
//!
//! All stores must be Send and Sync with internally managed synchronisation.
//!
//! See <https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html#id21>

#[cfg(feature = "async")]
Expand All @@ -16,9 +14,14 @@ pub use store_sync::memory_store::MemoryStore;
#[cfg(feature = "http")]
pub use store_sync::http_store::{HTTPStore, HTTPStoreCreateError};

#[cfg(all(feature = "async", feature = "object_store"))]
#[cfg(feature = "object_store")]
pub use store_async::object_store::AsyncObjectStore;

#[cfg(feature = "opendal")]
pub use store_async::opendal::AsyncOpendalStore;
#[cfg(feature = "opendal")]
pub use store_sync::opendal::OpendalStore;

// pub use store_plugin::{StorePlugin, StorePluginCreateError}; // Currently disabled.

use std::sync::Arc;
Expand Down
3 changes: 3 additions & 0 deletions src/storage/store/store_async.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(feature = "object_store")]
pub mod object_store;

#[cfg(feature = "opendal")]
pub mod opendal;

#[cfg(test)]
mod test_util {
use std::error::Error;
Expand Down
253 changes: 253 additions & 0 deletions src/storage/store/store_async/opendal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
use std::sync::Arc;

use opendal::Operator;

use crate::{
array::MaybeBytes,
byte_range::{ByteRange, InvalidByteRangeError},
storage::{
store_lock::{AsyncDefaultStoreLocks, AsyncStoreKeyMutex, AsyncStoreLocks},
AsyncListableStorageTraits, AsyncReadableStorageTraits, AsyncReadableWritableStorageTraits,
AsyncWritableStorageTraits, StorageError, StoreKey, StoreKeyRange, StoreKeyStartValue,
StoreKeys, StoreKeysPrefixes, StorePrefix,
},
};

/// An asynchronous store backed by an [`Operator`].
pub struct AsyncOpendalStore {
operator: Operator,
locks: AsyncStoreLocks,
}

impl AsyncOpendalStore {
/// Create a new [`AsyncOpendalStore`].
#[must_use]
pub fn new(operator: Operator) -> Self {
Self::new_with_locks(operator, Arc::new(AsyncDefaultStoreLocks::default()))
}

/// Create a new [`AsyncOpendalStore`] with non-default store locks.
#[must_use]
pub fn new_with_locks(operator: Operator, store_locks: AsyncStoreLocks) -> Self {
Self {
operator,
locks: store_locks,
}
}
}

/// Map [`opendal::ErrorKind::NotFound`] to None, pass through other errors
fn handle_result<T>(result: Result<T, opendal::Error>) -> Result<Option<T>, StorageError> {
match result {
Ok(result) => Ok(Some(result)),
Err(err) => {
if err.kind() == opendal::ErrorKind::NotFound {
Ok(None)
} else {
Err(err.into())
}
}
}
}

#[async_trait::async_trait]
impl AsyncReadableStorageTraits for AsyncOpendalStore {
async fn get(&self, key: &StoreKey) -> Result<MaybeBytes, StorageError> {
handle_result(self.operator.read(key.as_str()).await)
}

async fn get_partial_values_key(
&self,
key: &StoreKey,
byte_ranges: &[ByteRange],
) -> Result<Option<Vec<Vec<u8>>>, StorageError> {
use futures::FutureExt;
// FIXME: Does opendal offer a better way of retrieving multiple byte ranges?
// FIXME: Coalesce like object_store?
if byte_ranges
.iter()
.all(|byte_range| matches!(byte_range, ByteRange::FromEnd(_, _)))
{
let futures = byte_ranges
.iter()
.map(|byte_range| {
self.operator
.read_with(key.as_str())
.range(byte_range.offset()..)
})
.collect::<Vec<_>>();
handle_result(futures::future::try_join_all(futures).await)
} else {
let size = self
.size_key(key)
.await?
.ok_or(StorageError::UnknownKeySize(key.clone()))?;
let futures = byte_ranges
.iter()
.map(|byte_range| {
let start = byte_range.start(size);
let end = byte_range.end(size);
self.operator
.read_with(key.as_str())
.range(start..end)
.map(move |bytes| match bytes {
Ok(bytes) => {
if (end - start) == bytes.len() as u64 {
Ok(bytes)
} else {
Err(InvalidByteRangeError.into())
}
}
Err(err) => Err(StorageError::from(err.to_string())),
})
})
.collect::<Vec<_>>();
futures::future::try_join_all(futures).await.map(Some)
}
}

async fn get_partial_values(
&self,
key_ranges: &[StoreKeyRange],
) -> Result<Vec<MaybeBytes>, StorageError> {
self.get_partial_values_batched_by_key(key_ranges).await
}

async fn size_prefix(&self, prefix: &StorePrefix) -> Result<u64, StorageError> {
let list = self
.operator
.list_with(prefix.as_str())
.recursive(true)
.metakey(opendal::Metakey::ContentLength)
.await?;
let size = list
.into_iter()
.map(|entry| entry.metadata().content_length())
.sum::<u64>();
Ok(size)
}

async fn size_key(&self, key: &StoreKey) -> Result<Option<u64>, StorageError> {
Ok(handle_result(self.operator.stat(key.as_str()).await)?
.map(|metadata| metadata.content_length()))
}

async fn size(&self) -> Result<u64, StorageError> {
self.size_prefix(&StorePrefix::root()).await
}
}

#[async_trait::async_trait]
impl AsyncWritableStorageTraits for AsyncOpendalStore {
async fn set(&self, key: &StoreKey, value: &[u8]) -> Result<(), StorageError> {
// FIXME: Can this copy be avoided?
let bytes = bytes::Bytes::copy_from_slice(value);
Ok(self.operator.write(key.as_str(), bytes).await?)
}

async fn set_partial_values(
&self,
key_start_values: &[StoreKeyStartValue],
) -> Result<(), StorageError> {
crate::storage::async_store_set_partial_values(self, key_start_values).await
}

async fn erase(&self, key: &StoreKey) -> Result<bool, StorageError> {
// FIXME: Make erase return ()?
self.operator.remove(vec![key.to_string()]).await?;
Ok(true)
}

async fn erase_prefix(&self, prefix: &StorePrefix) -> Result<bool, StorageError> {
// FIXME: Make erase_prefix return ()?
self.operator.remove_all(prefix.as_str()).await?;
Ok(true)
}
}

#[async_trait::async_trait]
impl AsyncReadableWritableStorageTraits for AsyncOpendalStore {
async fn mutex(&self, key: &StoreKey) -> Result<AsyncStoreKeyMutex, StorageError> {
Ok(self.locks.mutex(key).await)
}
}

#[async_trait::async_trait]
impl AsyncListableStorageTraits for AsyncOpendalStore {
async fn list(&self) -> Result<StoreKeys, StorageError> {
self.list_prefix(&StorePrefix::root()).await
}

async fn list_prefix(&self, prefix: &StorePrefix) -> Result<StoreKeys, StorageError> {
let mut list = self
.operator
.list_with(prefix.as_str())
.recursive(true)
.await?
.into_iter()
.filter_map(|entry| {
if entry.metadata().mode() == opendal::EntryMode::FILE {
Some(StoreKey::try_from(entry.path()))
} else {
None
}
})
.collect::<Result<Vec<_>, _>>()?;
list.sort();
Ok(list)
}

async fn list_dir(&self, prefix: &StorePrefix) -> Result<StoreKeysPrefixes, StorageError> {
let entries = self
.operator
.list_with(prefix.as_str())
.recursive(false)
.await?;
let mut prefixes = Vec::<StorePrefix>::with_capacity(entries.len());
let mut keys = Vec::<StoreKey>::with_capacity(entries.len());
for entry in entries {
match entry.metadata().mode() {
opendal::EntryMode::FILE => {
keys.push(StoreKey::try_from(entry.path())?);
}
opendal::EntryMode::DIR => {
prefixes.push(StorePrefix::try_from(entry.path())?);
}
opendal::EntryMode::Unknown => {}
}
}
keys.sort();
prefixes.sort();
Ok(StoreKeysPrefixes { keys, prefixes })
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::error::Error;

#[tokio::test]
async fn memory() -> Result<(), Box<dyn Error>> {
let builder = opendal::services::Memory::default();
let op = Operator::new(builder)?.finish();
let store = AsyncOpendalStore::new(op);
super::super::test_util::store_write(&store).await?;
super::super::test_util::store_read(&store).await?;
super::super::test_util::store_list(&store).await?;
Ok(())
}

#[tokio::test]
async fn filesystem() -> Result<(), Box<dyn Error>> {
let path = tempfile::TempDir::new()?;
let mut builder = opendal::services::Fs::default();
builder.root(&path.path().to_string_lossy());
let op = Operator::new(builder)?.finish();
let store = AsyncOpendalStore::new(op);
super::super::test_util::store_write(&store).await?;
super::super::test_util::store_read(&store).await?;
super::super::test_util::store_list(&store).await?;
Ok(())
}
}
3 changes: 3 additions & 0 deletions src/storage/store/store_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ pub mod memory_store;
#[cfg(feature = "http")]
pub mod http_store;

#[cfg(feature = "opendal")]
pub mod opendal;

#[cfg(test)]
mod test_util {
use std::error::Error;
Expand Down
Loading

0 comments on commit 72bcbbb

Please sign in to comment.