diff --git a/CHANGELOG.md b/CHANGELOG.md index ba4ed026..00c93a23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Readable and writable stores include a `new_with_locks` method to choose the store lock implementation - Added `ArraySubset::new_with_ranges` - Added `ByteRange::offset` + - Added `object_store` feature with `AsyncObjectStore` store (wraps `object_store::ObjectStore`) + - **Breaking** Removes the explicit `object_store`-based stores (e.g. `AsyncAmazonS3Store`, `AsyncHTTPStore`) + - **Breaking** Removes the `object_store_impl` macro + - **Breaking** Removes the `s3`, `gcp`, and `azure` crate features + - Added `opendal` feature + - `OpendalStore` store (wraps `opendal::BlockingOperator`) + - `AsyncOpendalStore` store (wraps `opendal::Operator`) ### Changed - **Breaking** `ReadableStorageTraits` is no longer a supertrait of `WritableStorageTraits` @@ -27,11 +34,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Breaking** Remove `http` and `zip` from default features - Locking functionality for arrays is moved into stores - Improved `Array` documentation - - Add `object_store` feature and revise `object_store` support - - Add an `ObjectStore` store, which wraps any `object_store`-based store implementing `object_store::ObjectStore`. - - **Breaking** Removes the explicit `object_store`-based stores (e.g. `AsyncAmazonS3Store`, `AsyncHTTPStore`) - - **Breaking** Removes the `object_store_impl` macro - - **Breaking** Removes the `s3`, `gcp`, and `azure` crate features - Add store testing utility functions for unified store testing ### Fixed diff --git a/Cargo.toml b/Cargo.toml index da1f48c2..9a91983e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ zip = ["dep:zip"] # Enable the zip storage adapter ndarray = ["dep:ndarray"] # Adds ndarray utility functions to Array async = ["dep:async-trait", "dep:async-recursion", "dep:async-lock", "dep:futures"] # Enable experimental async API object_store = ["dep:object_store"] # Enable object_store asynchronous stores support +opendal = ["dep:opendal"] # Enable opendal asynchronous stores support [package.metadata.docs.rs] all-features = true @@ -52,6 +53,7 @@ itertools = "0.12" ndarray = { version = "0.15", optional = true } num = { version = "0.4" } object_store = { version = "0.8.0", optional = true } +opendal = { version = "0.43.0", optional = true } parking_lot = "0.12" pathdiff = "0.2" rayon = "1.6" diff --git a/TODO.md b/TODO.md index 382183f3..8ce451ff 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,6 @@ ## TODO +- Support [OpenDAL](https://crates.io/crates/opendal) stores - Review documentation - Increase test coverage - URI support [see ZEP0008](https://github.com/zarr-developers/zeps/pull/48) -- Add a performance comparison to other Zarr implementations +- Add a performance comparison to other Zarr implementations \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index ecb0dbaf..8a629ccb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,9 +15,16 @@ //! - [x] [ZEP0001 - Zarr specification version 3](https://zarr.dev/zeps/draft/ZEP0001.html). //! - [x] [ZEP0002 - Sharding codec](https://zarr.dev/zeps/draft/ZEP0002.html) ([under review](https://github.com/zarr-developers/zarr-specs/issues/254)). //! - [x] [ZEP0003 - Variable chunking](https://zarr.dev/zeps/draft/ZEP0003.html) ([draft](https://github.com/orgs/zarr-developers/discussions/52)). -//! - [x] Stores: -//! - Sync: [filesystem](crate::storage::store::FilesystemStore), [in memory](crate::storage::store::MemoryStore), [HTTP](crate::storage::store::HTTPStore), [ZIP](crate::storage::storage_adapter::ZipStorageAdapter). -//! - Async: [`AsyncObjectStore`](crate::storage::store::AsyncObjectStore) (supports all [`object_store::ObjectStore`] stores, e.g. in-memory, HTTP, Google Cloud Storage, Amazon S3, Microsoft Azure Storage, etc.). +//! - [x] Stores and storage adapters: +//! - Sync: +//! - [`FilesystemStore`](crate::storage::store::FilesystemStore). +//! - [`MemoryStore`](crate::storage::store::MemoryStore). +//! - [`HTTPStore`](crate::storage::store::HTTPStore). +//! - [`ZipStorageAdapter`](crate::storage::storage_adapter::ZipStorageAdapter). +//! - [`OpendalStore`](crate::storage::store::OpendalStore) (supports all [`opendal` services](https://docs.rs/opendal/latest/opendal/services/index.html)). +//! - Async: +//! - [`AsyncObjectStore`](crate::storage::store::AsyncObjectStore) (supports all [`object_store` stores](https://docs.rs/object_store/latest/object_store/index.html#modules)). +//! - [`AsyncOpendalStore`](crate::storage::store::AsyncOpendalStore) (supports all [`opendal` services](https://docs.rs/opendal/latest/opendal/services/index.html)). //! - [x] Data types: [core data types](crate::array::data_type::DataType), [raw bits](crate::array::data_type::DataType::RawBits), [float16](crate::array::data_type::DataType::Float16), [bfloat16](crate::array::data_type::DataType::BFloat16) [(spec issue)](https://github.com/zarr-developers/zarr-specs/issues/130). //! - [x] Chunk grids: [regular](crate::array::chunk_grid::RegularChunkGrid), [rectangular](crate::array::chunk_grid::RectangularChunkGrid) ([draft](https://github.com/orgs/zarr-developers/discussions/52)). //! - [x] Chunk key encoding: [default](crate::array::chunk_key_encoding::DefaultChunkKeyEncoding), [v2](crate::array::chunk_key_encoding::V2ChunkKeyEncoding). @@ -33,9 +40,10 @@ //! - Codecs: `blosc`, `gzip`, `transpose`, `zstd`, `sharding`, `crc32c`. //! //! The following features are disabled by default: -//! - `async`: asynchronous stores and associated storage, array, and group methods. //! - Codecs: `bitround`, `zfp`. -//! - Stores: `http`, `zip`, `s3` (Amazon S3) `gcp` (Google Cloud), `azure` (Microsoft Azure). +//! - `async`: asynchronous API for stores, arrays, and groups. +//! - `object_store`: support for [`object_store`] stores. +//! - `opendal`: support for [`opendal`] stores. //! //! ## Examples //! Examples can be run with `cargo run --example EXAMPLE_NAME`. diff --git a/src/storage.rs b/src/storage.rs index 78ea667c..e6165daa 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -204,6 +204,13 @@ impl From for StorageError { } } +#[cfg(feature = "opendal")] +impl From for StorageError { + fn from(err: opendal::Error) -> Self { + Self::Other(err.to_string()) + } +} + /// Return the metadata key given a node path. #[must_use] pub fn meta_key(path: &NodePath) -> StoreKey { diff --git a/src/storage/store.rs b/src/storage/store.rs index ba1e95e0..88d9aa80 100644 --- a/src/storage/store.rs +++ b/src/storage/store.rs @@ -1,7 +1,5 @@ //! Zarr stores. //! -//! All stores must be Send and Sync with internally managed synchronisation. -//! //! See #[cfg(feature = "async")] @@ -16,9 +14,14 @@ pub use store_sync::memory_store::MemoryStore; #[cfg(feature = "http")] pub use store_sync::http_store::{HTTPStore, HTTPStoreCreateError}; -#[cfg(all(feature = "async", feature = "object_store"))] +#[cfg(feature = "object_store")] pub use store_async::object_store::AsyncObjectStore; +#[cfg(feature = "opendal")] +pub use store_async::opendal::AsyncOpendalStore; +#[cfg(feature = "opendal")] +pub use store_sync::opendal::OpendalStore; + // pub use store_plugin::{StorePlugin, StorePluginCreateError}; // Currently disabled. use std::sync::Arc; diff --git a/src/storage/store/store_async.rs b/src/storage/store/store_async.rs index c7fb9e79..326f7507 100644 --- a/src/storage/store/store_async.rs +++ b/src/storage/store/store_async.rs @@ -1,6 +1,9 @@ #[cfg(feature = "object_store")] pub mod object_store; +#[cfg(feature = "opendal")] +pub mod opendal; + #[cfg(test)] mod test_util { use std::error::Error; diff --git a/src/storage/store/store_async/opendal.rs b/src/storage/store/store_async/opendal.rs new file mode 100644 index 00000000..5714bc6b --- /dev/null +++ b/src/storage/store/store_async/opendal.rs @@ -0,0 +1,253 @@ +use std::sync::Arc; + +use opendal::Operator; + +use crate::{ + array::MaybeBytes, + byte_range::{ByteRange, InvalidByteRangeError}, + storage::{ + store_lock::{AsyncDefaultStoreLocks, AsyncStoreKeyMutex, AsyncStoreLocks}, + AsyncListableStorageTraits, AsyncReadableStorageTraits, AsyncReadableWritableStorageTraits, + AsyncWritableStorageTraits, StorageError, StoreKey, StoreKeyRange, StoreKeyStartValue, + StoreKeys, StoreKeysPrefixes, StorePrefix, + }, +}; + +/// An asynchronous store backed by an [`Operator`]. +pub struct AsyncOpendalStore { + operator: Operator, + locks: AsyncStoreLocks, +} + +impl AsyncOpendalStore { + /// Create a new [`AsyncOpendalStore`]. + #[must_use] + pub fn new(operator: Operator) -> Self { + Self::new_with_locks(operator, Arc::new(AsyncDefaultStoreLocks::default())) + } + + /// Create a new [`AsyncOpendalStore`] with non-default store locks. + #[must_use] + pub fn new_with_locks(operator: Operator, store_locks: AsyncStoreLocks) -> Self { + Self { + operator, + locks: store_locks, + } + } +} + +/// Map [`opendal::ErrorKind::NotFound`] to None, pass through other errors +fn handle_result(result: Result) -> Result, StorageError> { + match result { + Ok(result) => Ok(Some(result)), + Err(err) => { + if err.kind() == opendal::ErrorKind::NotFound { + Ok(None) + } else { + Err(err.into()) + } + } + } +} + +#[async_trait::async_trait] +impl AsyncReadableStorageTraits for AsyncOpendalStore { + async fn get(&self, key: &StoreKey) -> Result { + handle_result(self.operator.read(key.as_str()).await) + } + + async fn get_partial_values_key( + &self, + key: &StoreKey, + byte_ranges: &[ByteRange], + ) -> Result>>, StorageError> { + use futures::FutureExt; + // FIXME: Does opendal offer a better way of retrieving multiple byte ranges? + // FIXME: Coalesce like object_store? + if byte_ranges + .iter() + .all(|byte_range| matches!(byte_range, ByteRange::FromEnd(_, _))) + { + let futures = byte_ranges + .iter() + .map(|byte_range| { + self.operator + .read_with(key.as_str()) + .range(byte_range.offset()..) + }) + .collect::>(); + handle_result(futures::future::try_join_all(futures).await) + } else { + let size = self + .size_key(key) + .await? + .ok_or(StorageError::UnknownKeySize(key.clone()))?; + let futures = byte_ranges + .iter() + .map(|byte_range| { + let start = byte_range.start(size); + let end = byte_range.end(size); + self.operator + .read_with(key.as_str()) + .range(start..end) + .map(move |bytes| match bytes { + Ok(bytes) => { + if (end - start) == bytes.len() as u64 { + Ok(bytes) + } else { + Err(InvalidByteRangeError.into()) + } + } + Err(err) => Err(StorageError::from(err.to_string())), + }) + }) + .collect::>(); + futures::future::try_join_all(futures).await.map(Some) + } + } + + async fn get_partial_values( + &self, + key_ranges: &[StoreKeyRange], + ) -> Result, StorageError> { + self.get_partial_values_batched_by_key(key_ranges).await + } + + async fn size_prefix(&self, prefix: &StorePrefix) -> Result { + let list = self + .operator + .list_with(prefix.as_str()) + .recursive(true) + .metakey(opendal::Metakey::ContentLength) + .await?; + let size = list + .into_iter() + .map(|entry| entry.metadata().content_length()) + .sum::(); + Ok(size) + } + + async fn size_key(&self, key: &StoreKey) -> Result, StorageError> { + Ok(handle_result(self.operator.stat(key.as_str()).await)? + .map(|metadata| metadata.content_length())) + } + + async fn size(&self) -> Result { + self.size_prefix(&StorePrefix::root()).await + } +} + +#[async_trait::async_trait] +impl AsyncWritableStorageTraits for AsyncOpendalStore { + async fn set(&self, key: &StoreKey, value: &[u8]) -> Result<(), StorageError> { + // FIXME: Can this copy be avoided? + let bytes = bytes::Bytes::copy_from_slice(value); + Ok(self.operator.write(key.as_str(), bytes).await?) + } + + async fn set_partial_values( + &self, + key_start_values: &[StoreKeyStartValue], + ) -> Result<(), StorageError> { + crate::storage::async_store_set_partial_values(self, key_start_values).await + } + + async fn erase(&self, key: &StoreKey) -> Result { + // FIXME: Make erase return ()? + self.operator.remove(vec![key.to_string()]).await?; + Ok(true) + } + + async fn erase_prefix(&self, prefix: &StorePrefix) -> Result { + // FIXME: Make erase_prefix return ()? + self.operator.remove_all(prefix.as_str()).await?; + Ok(true) + } +} + +#[async_trait::async_trait] +impl AsyncReadableWritableStorageTraits for AsyncOpendalStore { + async fn mutex(&self, key: &StoreKey) -> Result { + Ok(self.locks.mutex(key).await) + } +} + +#[async_trait::async_trait] +impl AsyncListableStorageTraits for AsyncOpendalStore { + async fn list(&self) -> Result { + self.list_prefix(&StorePrefix::root()).await + } + + async fn list_prefix(&self, prefix: &StorePrefix) -> Result { + let mut list = self + .operator + .list_with(prefix.as_str()) + .recursive(true) + .await? + .into_iter() + .filter_map(|entry| { + if entry.metadata().mode() == opendal::EntryMode::FILE { + Some(StoreKey::try_from(entry.path())) + } else { + None + } + }) + .collect::, _>>()?; + list.sort(); + Ok(list) + } + + async fn list_dir(&self, prefix: &StorePrefix) -> Result { + let entries = self + .operator + .list_with(prefix.as_str()) + .recursive(false) + .await?; + let mut prefixes = Vec::::with_capacity(entries.len()); + let mut keys = Vec::::with_capacity(entries.len()); + for entry in entries { + match entry.metadata().mode() { + opendal::EntryMode::FILE => { + keys.push(StoreKey::try_from(entry.path())?); + } + opendal::EntryMode::DIR => { + prefixes.push(StorePrefix::try_from(entry.path())?); + } + opendal::EntryMode::Unknown => {} + } + } + keys.sort(); + prefixes.sort(); + Ok(StoreKeysPrefixes { keys, prefixes }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::error::Error; + + #[tokio::test] + async fn memory() -> Result<(), Box> { + let builder = opendal::services::Memory::default(); + let op = Operator::new(builder)?.finish(); + let store = AsyncOpendalStore::new(op); + super::super::test_util::store_write(&store).await?; + super::super::test_util::store_read(&store).await?; + super::super::test_util::store_list(&store).await?; + Ok(()) + } + + #[tokio::test] + async fn filesystem() -> Result<(), Box> { + let path = tempfile::TempDir::new()?; + let mut builder = opendal::services::Fs::default(); + builder.root(&path.path().to_string_lossy()); + let op = Operator::new(builder)?.finish(); + let store = AsyncOpendalStore::new(op); + super::super::test_util::store_write(&store).await?; + super::super::test_util::store_read(&store).await?; + super::super::test_util::store_list(&store).await?; + Ok(()) + } +} diff --git a/src/storage/store/store_sync.rs b/src/storage/store/store_sync.rs index f95a633e..ec1debbc 100644 --- a/src/storage/store/store_sync.rs +++ b/src/storage/store/store_sync.rs @@ -4,6 +4,9 @@ pub mod memory_store; #[cfg(feature = "http")] pub mod http_store; +#[cfg(feature = "opendal")] +pub mod opendal; + #[cfg(test)] mod test_util { use std::error::Error; diff --git a/src/storage/store/store_sync/opendal.rs b/src/storage/store/store_sync/opendal.rs new file mode 100644 index 00000000..d676800c --- /dev/null +++ b/src/storage/store/store_sync/opendal.rs @@ -0,0 +1,259 @@ +use std::sync::Arc; + +use opendal::BlockingOperator; + +use crate::{ + array::MaybeBytes, + byte_range::ByteRange, + storage::{ + store_lock::{DefaultStoreLocks, StoreKeyMutex, StoreLocks}, + ListableStorageTraits, ReadableStorageTraits, ReadableWritableStorageTraits, StorageError, + StoreKey, StoreKeyRange, StoreKeyStartValue, StoreKeys, StoreKeysPrefixes, StorePrefix, + WritableStorageTraits, + }, +}; + +/// An asynchronous store backed by a [`BlockingOperator`]. +pub struct OpendalStore { + operator: BlockingOperator, + locks: StoreLocks, +} + +impl OpendalStore { + /// Create a new [`OpendalStore`]. + #[must_use] + pub fn new(operator: BlockingOperator) -> Self { + Self::new_with_locks(operator, Arc::new(DefaultStoreLocks::default())) + } + + /// Create a new [`OpendalStore`] with non-default store locks. + #[must_use] + pub fn new_with_locks(operator: BlockingOperator, store_locks: StoreLocks) -> Self { + Self { + operator, + locks: store_locks, + } + } +} + +/// Map [`opendal::ErrorKind::NotFound`] to None, pass through other errors +fn handle_result(result: Result) -> Result, StorageError> { + match result { + Ok(result) => Ok(Some(result)), + Err(err) => { + if err.kind() == opendal::ErrorKind::NotFound { + Ok(None) + } else { + Err(err.into()) + } + } + } +} + +#[async_trait::async_trait] +impl ReadableStorageTraits for OpendalStore { + fn get(&self, key: &StoreKey) -> Result { + handle_result(self.operator.read(key.as_str())) + } + + fn get_partial_values_key( + &self, + key: &StoreKey, + byte_ranges: &[ByteRange], + ) -> Result>>, StorageError> { + // FIXME: Does opendal offer a better way of retrieving multiple byte ranges? + // FIXME: Coalesce like object_store? + if byte_ranges + .iter() + .all(|byte_range| matches!(byte_range, ByteRange::FromEnd(_, _))) + { + let bytes = byte_ranges + .iter() + .map(|byte_range| { + self.operator + .read_with(key.as_str()) + .range(byte_range.offset()..) + .call() + }) + .collect::, _>>(); + handle_result(bytes) + } else { + let size = self + .size_key(key)? + .ok_or(StorageError::UnknownKeySize(key.clone()))?; + let bytes = byte_ranges + .iter() + .map(|byte_range| { + let start = byte_range.start(size); + let end = byte_range.end(size); + match self + .operator + .read_with(key.as_str()) + .range(start..end) + .call() + { + Ok(bytes) => { + if (end - start) == bytes.len() as u64 { + Ok(bytes) + } else { + Err(opendal::Error::new( + opendal::ErrorKind::InvalidInput, + "InvalidByteRangeError", + )) + } + } + Err(err) => Err(err), + } + }) + .collect::, _>>(); + handle_result(bytes) + } + } + + fn get_partial_values( + &self, + key_ranges: &[StoreKeyRange], + ) -> Result, StorageError> { + self.get_partial_values_batched_by_key(key_ranges) + } + + fn size_prefix(&self, prefix: &StorePrefix) -> Result { + let list = self + .operator + .list_with(prefix.as_str()) + .recursive(true) + .metakey(opendal::Metakey::ContentLength) + .call()?; + let size = list + .into_iter() + .map(|entry| entry.metadata().content_length()) + .sum::(); + Ok(size) + } + + fn size_key(&self, key: &StoreKey) -> Result, StorageError> { + Ok(handle_result(self.operator.stat(key.as_str()))? + .map(|metadata| metadata.content_length())) + } + + fn size(&self) -> Result { + self.size_prefix(&StorePrefix::root()) + } +} + +#[async_trait::async_trait] +impl WritableStorageTraits for OpendalStore { + fn set(&self, key: &StoreKey, value: &[u8]) -> Result<(), StorageError> { + // FIXME: Can this copy be avoided? + let bytes = bytes::Bytes::copy_from_slice(value); + Ok(self.operator.write(key.as_str(), bytes)?) + } + + fn set_partial_values( + &self, + key_start_values: &[StoreKeyStartValue], + ) -> Result<(), StorageError> { + crate::storage::store_set_partial_values(self, key_start_values) + } + + fn erase(&self, key: &StoreKey) -> Result { + // FIXME: Make erase return ()? + self.operator.remove(vec![key.to_string()])?; + Ok(true) + } + + fn erase_prefix(&self, prefix: &StorePrefix) -> Result { + // FIXME: Make erase_prefix return ()? + self.operator.remove_all(prefix.as_str())?; + Ok(true) + } +} + +#[async_trait::async_trait] +impl ReadableWritableStorageTraits for OpendalStore { + fn mutex(&self, key: &StoreKey) -> Result { + Ok(self.locks.mutex(key)) + } +} + +#[async_trait::async_trait] +impl ListableStorageTraits for OpendalStore { + fn list(&self) -> Result { + self.list_prefix(&StorePrefix::root()) + } + + fn list_prefix(&self, prefix: &StorePrefix) -> Result { + let mut list = self + .operator + .list_with(prefix.as_str()) + .recursive(true) + .call()? + .into_iter() + .filter_map(|entry| { + if entry.metadata().mode() == opendal::EntryMode::FILE { + Some(StoreKey::try_from(entry.path())) + } else { + None + } + }) + .collect::, _>>()?; + list.sort(); + Ok(list) + } + + fn list_dir(&self, prefix: &StorePrefix) -> Result { + let entries = self + .operator + .list_with(prefix.as_str()) + .recursive(false) + .call()?; + let mut prefixes = Vec::::with_capacity(entries.len()); + let mut keys = Vec::::with_capacity(entries.len()); + for entry in entries { + match entry.metadata().mode() { + opendal::EntryMode::FILE => { + keys.push(StoreKey::try_from(entry.path())?); + } + opendal::EntryMode::DIR => { + prefixes.push(StorePrefix::try_from(entry.path())?); + } + opendal::EntryMode::Unknown => {} + } + } + keys.sort(); + prefixes.sort(); + Ok(StoreKeysPrefixes { keys, prefixes }) + } +} + +#[cfg(test)] +mod tests { + use opendal::Operator; + + use super::*; + use std::error::Error; + + #[test] + fn memory() -> Result<(), Box> { + let builder = opendal::services::Memory::default(); + let op = Operator::new(builder)?.finish().blocking(); + let store = OpendalStore::new(op); + super::super::test_util::store_write(&store)?; + super::super::test_util::store_read(&store)?; + super::super::test_util::store_list(&store)?; + Ok(()) + } + + #[test] + fn filesystem() -> Result<(), Box> { + let path = tempfile::TempDir::new()?; + let mut builder = opendal::services::Fs::default(); + builder.root(&path.path().to_string_lossy()); + let op = Operator::new(builder)?.finish().blocking(); + let store = OpendalStore::new(op); + super::super::test_util::store_write(&store)?; + super::super::test_util::store_read(&store)?; + super::super::test_util::store_list(&store)?; + Ok(()) + } +}