Skip to content

Commit

Permalink
Remove Arc storage type lifetimes and enforce storage transformers …
Browse files Browse the repository at this point in the history
…be `Arc` wrapped
  • Loading branch information
LDeakin committed Feb 12, 2024
1 parent ceb1018 commit 5eab757
Show file tree
Hide file tree
Showing 18 changed files with 214 additions and 226 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **Breaking**: use `IncompatibleArraySubsetAndShapeError` in `ArrayStoreBytesError::InvalidArrayShape`
- Increase coverage for:
- `array_subset_iterators.rs`
- **Major breaking**: storage transformers must be `Arc` wrapped as `StorageTransformerExtension` trait method now take `self: Arc<Self>`
- Removed lifetimes from `{Async}{Readable,Writable,ReadableWritable,Listable,ReadableListable}Storage`
- **Breaking**: `Group` and `Array` methods generic on storage now require the storage with a `'static` lifetime

### Removed
- **Breaking**: remove `InvalidArraySubsetError` and `ArrayExtractElementsError`
Expand Down
4 changes: 2 additions & 2 deletions examples/http_array_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ fn http_array_read() -> Result<(), Box<dyn std::error::Error>> {
std::io::stdout(),
// )
));
let usage_log = UsageLogStorageTransformer::new(log_writer, || {
let usage_log = Arc::new(UsageLogStorageTransformer::new(log_writer, || {
chrono::Utc::now().format("[%T%.3f] ").to_string()
});
}));
let store = usage_log.create_readable_transformer(store);

// Init the existing array, reading metadata
Expand Down
12 changes: 8 additions & 4 deletions examples/sharded_array_write_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@ fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
std::io::stdout(),
// )
));
let usage_log = UsageLogStorageTransformer::new(log_writer, || {
let usage_log = Arc::new(UsageLogStorageTransformer::new(log_writer, || {
chrono::Utc::now().format("[%T%.3f] ").to_string()
});
let store_readable_listable = usage_log.create_readable_listable_transformer(store.clone());
let store = usage_log.create_readable_writable_transformer(store);
}));
let store_readable_listable = usage_log
.clone()
.create_readable_listable_transformer(store.clone());
let store = usage_log
.clone()
.create_readable_writable_transformer(store);

// Create a group
let group_path = "/group";
Expand Down
4 changes: 2 additions & 2 deletions examples/zip_array_write_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use zarrs::{
// const ARRAY_PATH: &'static str = "/array";
const ARRAY_PATH: &str = "/";

fn write_array_to_storage<TStorage: ReadableWritableStorageTraits>(
fn write_array_to_storage<TStorage: ReadableWritableStorageTraits + 'static>(
storage: Arc<TStorage>,
) -> Result<Array<TStorage>, Box<dyn std::error::Error>> {
use zarrs::array::{chunk_grid::ChunkGridTraits, codec, DataType, FillValue};
Expand Down Expand Up @@ -87,7 +87,7 @@ fn write_array_to_storage<TStorage: ReadableWritableStorageTraits>(
Ok(array)
}

fn read_array_from_store<TStorage: ReadableStorageTraits>(
fn read_array_from_store<TStorage: ReadableStorageTraits + 'static>(
array: Array<TStorage>,
) -> Result<(), Box<dyn std::error::Error>> {
// Read the whole array
Expand Down
10 changes: 5 additions & 5 deletions src/array/array_async_readable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use super::{
#[cfg(feature = "ndarray")]
use super::elements_to_ndarray;

impl<TStorage: ?Sized + AsyncReadableStorageTraits> Array<TStorage> {
impl<TStorage: ?Sized + AsyncReadableStorageTraits + 'static> Array<TStorage> {
/// Create an array in `storage` at `path`. The metadata is read from the store.
///
/// # Errors
Expand Down Expand Up @@ -54,7 +54,7 @@ impl<TStorage: ?Sized + AsyncReadableStorageTraits> Array<TStorage> {
&self,
chunk_indices: &[u64],
) -> Result<Option<Vec<u8>>, ArrayError> {
let storage_handle = Arc::new(StorageHandle::new(&*self.storage));
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_async_readable_transformer(storage_handle);
Expand Down Expand Up @@ -438,7 +438,7 @@ impl<TStorage: ?Sized + AsyncReadableStorageTraits> Array<TStorage> {
overlap.relative_to_unchecked(chunk_subset_in_array.start())
};

let storage_handle = Arc::new(StorageHandle::new(&*self.storage));
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_async_readable_transformer(storage_handle);
Expand Down Expand Up @@ -565,7 +565,7 @@ impl<TStorage: ?Sized + AsyncReadableStorageTraits> Array<TStorage> {
));
}

let storage_handle = Arc::new(StorageHandle::new(&*self.storage));
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_async_readable_transformer(storage_handle);
Expand Down Expand Up @@ -646,7 +646,7 @@ impl<TStorage: ?Sized + AsyncReadableStorageTraits> Array<TStorage> {
chunk_indices: &[u64],
parallel: bool,
) -> Result<Box<dyn AsyncArrayPartialDecoderTraits + 'a>, ArrayError> {
let storage_handle = Arc::new(StorageHandle::new(&*self.storage));
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_async_readable_transformer(storage_handle);
Expand Down
2 changes: 1 addition & 1 deletion src/array/array_async_readable_writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{

use super::{Array, ArrayError};

impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits> Array<TStorage> {
impl<TStorage: ?Sized + AsyncReadableWritableStorageTraits + 'static> Array<TStorage> {
/// Encode `subset_bytes` and store in `array_subset`.
///
/// If `parallel` is true, chunks intersecting the array subset are retrieved in parallel.
Expand Down
10 changes: 5 additions & 5 deletions src/array/array_async_writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use crate::{

use super::{codec::ArrayCodecTraits, Array, ArrayError};

impl<TStorage: ?Sized + AsyncWritableStorageTraits> Array<TStorage> {
impl<TStorage: ?Sized + AsyncWritableStorageTraits + 'static> Array<TStorage> {
/// Store metadata.
///
/// # Errors
/// Returns [`StorageError`] if there is an underlying store error.
pub async fn async_store_metadata(&self) -> Result<(), StorageError> {
let storage_handle = Arc::new(StorageHandle::new(&*self.storage));
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_async_writable_transformer(storage_handle);
Expand Down Expand Up @@ -52,7 +52,7 @@ impl<TStorage: ?Sized + AsyncWritableStorageTraits> Array<TStorage> {
self.async_erase_chunk(chunk_indices).await?;
Ok(())
} else {
let storage_handle = Arc::new(StorageHandle::new(&*self.storage));
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_async_writable_transformer(storage_handle);
Expand Down Expand Up @@ -234,7 +234,7 @@ impl<TStorage: ?Sized + AsyncWritableStorageTraits> Array<TStorage> {
/// # Errors
/// Returns a [`StorageError`] if there is an underlying store error.
pub async fn async_erase_chunk(&self, chunk_indices: &[u64]) -> Result<(), StorageError> {
let storage_handle = Arc::new(StorageHandle::new(&*self.storage));
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_async_writable_transformer(storage_handle);
Expand All @@ -252,7 +252,7 @@ impl<TStorage: ?Sized + AsyncWritableStorageTraits> Array<TStorage> {
/// # Errors
/// Returns a [`StorageError`] if there is an underlying store error.
pub async fn async_erase_chunks(&self, chunks: &ArraySubset) -> Result<(), StorageError> {
let storage_handle = Arc::new(StorageHandle::new(&*self.storage));
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_async_writable_transformer(storage_handle);
Expand Down
8 changes: 4 additions & 4 deletions src/array/array_sync_readable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use super::{
#[cfg(feature = "ndarray")]
use super::elements_to_ndarray;

impl<TStorage: ?Sized + ReadableStorageTraits> Array<TStorage> {
impl<TStorage: ?Sized + ReadableStorageTraits + 'static> Array<TStorage> {
/// Create an array in `storage` at `path`. The metadata is read from the store.
///
/// # Errors
Expand Down Expand Up @@ -51,7 +51,7 @@ impl<TStorage: ?Sized + ReadableStorageTraits> Array<TStorage> {
&self,
chunk_indices: &[u64],
) -> Result<Option<Vec<u8>>, ArrayError> {
let storage_handle = Arc::new(StorageHandle::new(&*self.storage));
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_readable_transformer(storage_handle);
Expand Down Expand Up @@ -641,7 +641,7 @@ impl<TStorage: ?Sized + ReadableStorageTraits> Array<TStorage> {
));
}

let storage_handle = Arc::new(StorageHandle::new(&*self.storage));
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_readable_transformer(storage_handle);
Expand Down Expand Up @@ -716,7 +716,7 @@ impl<TStorage: ?Sized + ReadableStorageTraits> Array<TStorage> {
chunk_indices: &[u64],
parallel: bool,
) -> Result<Box<dyn ArrayPartialDecoderTraits + 'a>, ArrayError> {
let storage_handle = Arc::new(StorageHandle::new(&*self.storage));
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_readable_transformer(storage_handle);
Expand Down
2 changes: 1 addition & 1 deletion src/array/array_sync_readable_writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{

use super::{unravel_index, Array, ArrayError};

impl<TStorage: ?Sized + ReadableWritableStorageTraits> Array<TStorage> {
impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage> {
/// Encode `subset_bytes` and store in `array_subset`.
///
/// If `parallel` is true, chunks intersecting the array subset are retrieved in parallel.
Expand Down
10 changes: 5 additions & 5 deletions src/array/array_sync_writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use crate::{

use super::{codec::ArrayCodecTraits, unravel_index, Array, ArrayError};

impl<TStorage: ?Sized + WritableStorageTraits> Array<TStorage> {
impl<TStorage: ?Sized + WritableStorageTraits + 'static> Array<TStorage> {
/// Store metadata.
///
/// # Errors
/// Returns [`StorageError`] if there is an underlying store error.
pub fn store_metadata(&self) -> Result<(), StorageError> {
let storage_handle = Arc::new(StorageHandle::new(&*self.storage));
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_writable_transformer(storage_handle);
Expand Down Expand Up @@ -51,7 +51,7 @@ impl<TStorage: ?Sized + WritableStorageTraits> Array<TStorage> {
self.erase_chunk(chunk_indices)?;
Ok(())
} else {
let storage_handle = Arc::new(StorageHandle::new(&*self.storage));
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_writable_transformer(storage_handle);
Expand Down Expand Up @@ -315,7 +315,7 @@ impl<TStorage: ?Sized + WritableStorageTraits> Array<TStorage> {
/// # Errors
/// Returns a [`StorageError`] if there is an underlying store error.
pub fn erase_chunk(&self, chunk_indices: &[u64]) -> Result<(), StorageError> {
let storage_handle = Arc::new(StorageHandle::new(&*self.storage));
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_writable_transformer(storage_handle);
Expand All @@ -336,7 +336,7 @@ impl<TStorage: ?Sized + WritableStorageTraits> Array<TStorage> {
chunks: &ArraySubset,
parallel: bool,
) -> Result<(), StorageError> {
let storage_handle = Arc::new(StorageHandle::new(&*self.storage));
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));
let storage_transformer = self
.storage_transformers()
.create_writable_transformer(storage_handle);
Expand Down
20 changes: 10 additions & 10 deletions src/array/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,19 +517,19 @@ pub trait AsyncArrayPartialDecoderTraits: Send + Sync {
}

/// A [`ReadableStorage`] partial decoder.
pub struct StoragePartialDecoder<'a> {
storage: ReadableStorage<'a>,
pub struct StoragePartialDecoder {
storage: ReadableStorage,
key: StoreKey,
}

impl<'a> StoragePartialDecoder<'a> {
impl StoragePartialDecoder {
/// Create a new storage partial decoder.
pub fn new(storage: ReadableStorage<'a>, key: StoreKey) -> Self {
pub fn new(storage: ReadableStorage, key: StoreKey) -> Self {
Self { storage, key }
}
}

impl BytesPartialDecoderTraits for StoragePartialDecoder<'_> {
impl BytesPartialDecoderTraits for StoragePartialDecoder {
fn partial_decode_opt(
&self,
decoded_regions: &[ByteRange],
Expand All @@ -543,22 +543,22 @@ impl BytesPartialDecoderTraits for StoragePartialDecoder<'_> {

#[cfg(feature = "async")]
/// A [`ReadableStorage`] partial decoder.
pub struct AsyncStoragePartialDecoder<'a> {
storage: AsyncReadableStorage<'a>,
pub struct AsyncStoragePartialDecoder {
storage: AsyncReadableStorage,
key: StoreKey,
}

#[cfg(feature = "async")]
impl<'a> AsyncStoragePartialDecoder<'a> {
impl AsyncStoragePartialDecoder {
/// Create a new storage partial decoder.
pub fn new(storage: AsyncReadableStorage<'a>, key: StoreKey) -> Self {
pub fn new(storage: AsyncReadableStorage, key: StoreKey) -> Self {
Self { storage, key }
}
}

#[cfg(feature = "async")]
#[async_trait::async_trait]
impl AsyncBytesPartialDecoderTraits for AsyncStoragePartialDecoder<'_> {
impl AsyncBytesPartialDecoderTraits for AsyncStoragePartialDecoder {
async fn partial_decode_opt(
&self,
decoded_regions: &[ByteRange],
Expand Down
6 changes: 3 additions & 3 deletions src/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,14 @@ fn validate_group_metadata(metadata: &GroupMetadataV3) -> Result<(), GroupCreate

impl<TStorage: ?Sized + ReadableStorageTraits> Group<TStorage> {}

impl<TStorage: ?Sized + WritableStorageTraits> Group<TStorage> {
impl<TStorage: ?Sized + WritableStorageTraits + 'static> Group<TStorage> {
/// Store metadata.
///
/// # Errors
///
/// Returns [`StorageError`] if there is an underlying store error.
pub fn store_metadata(&self) -> Result<(), StorageError> {
let storage_handle = StorageHandle::new(&*self.storage);
let storage_handle = StorageHandle::new(self.storage.clone());
crate::storage::create_group(&storage_handle, self.path(), &self.metadata())
}
}
Expand All @@ -213,7 +213,7 @@ impl<TStorage: ?Sized + AsyncWritableStorageTraits> Group<TStorage> {
///
/// Returns [`StorageError`] if there is an underlying store error.
pub async fn async_store_metadata(&self) -> Result<(), StorageError> {
let storage_handle = StorageHandle::new(&*self.storage);
let storage_handle = StorageHandle::new(self.storage.clone());
crate::storage::async_create_group(&storage_handle, self.path(), &self.metadata()).await
}
}
Expand Down
18 changes: 9 additions & 9 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,35 +60,35 @@ pub use self::storage_handle::StorageHandle;
pub use storage_value_io::StorageValueIO;

/// [`Arc`] wrapped readable storage.
pub type ReadableStorage<'a> = Arc<dyn ReadableStorageTraits + 'a>;
pub type ReadableStorage = Arc<dyn ReadableStorageTraits>;

/// [`Arc`] wrapped writable storage.
pub type WritableStorage<'a> = Arc<dyn WritableStorageTraits + 'a>;
pub type WritableStorage = Arc<dyn WritableStorageTraits>;

/// [`Arc`] wrapped readable and writable storage.
pub type ReadableWritableStorage<'a> = Arc<dyn ReadableWritableStorageTraits + 'a>;
pub type ReadableWritableStorage = Arc<dyn ReadableWritableStorageTraits>;

/// [`Arc`] wrapped listable storage.
pub type ListableStorage<'a> = Arc<dyn ListableStorageTraits + 'a>;
pub type ListableStorage = Arc<dyn ListableStorageTraits>;

/// [`Arc`] wrapped readable and listable storage.
pub type ReadableListableStorage<'a> = Arc<dyn ReadableListableStorageTraits + 'a>;
pub type ReadableListableStorage = Arc<dyn ReadableListableStorageTraits>;

#[cfg(feature = "async")]
/// [`Arc`] wrapped asynchronous readable storage.
pub type AsyncReadableStorage<'a> = Arc<dyn AsyncReadableStorageTraits + 'a>;
pub type AsyncReadableStorage = Arc<dyn AsyncReadableStorageTraits>;

#[cfg(feature = "async")]
/// [`Arc`] wrapped asynchronous writable storage.
pub type AsyncWritableStorage<'a> = Arc<dyn AsyncWritableStorageTraits + 'a>;
pub type AsyncWritableStorage = Arc<dyn AsyncWritableStorageTraits>;

#[cfg(feature = "async")]
/// [`Arc`] wrapped asynchronous listable storage.
pub type AsyncListableStorage<'a> = Arc<dyn AsyncListableStorageTraits + 'a>;
pub type AsyncListableStorage = Arc<dyn AsyncListableStorageTraits>;

#[cfg(feature = "async")]
/// [`Arc`] wrapped asynchronous readable and listable storage.
pub type AsyncReadableListableStorage<'a> = Arc<dyn AsyncReadableListableStorageTraits + 'a>;
pub type AsyncReadableListableStorage = Arc<dyn AsyncReadableListableStorageTraits>;

/// A [`StoreKey`] and [`ByteRange`].
#[derive(Debug, Clone)]
Expand Down
Loading

0 comments on commit 5eab757

Please sign in to comment.