Skip to content

Commit

Permalink
HTTP store
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Oct 3, 2023
1 parent 7e3f37e commit ce5f99a
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 22 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Read only `ZipStorageAdapter` behind `zip` feature
- Readable and listable `ZipStorageAdapter` behind `zip` feature
- Readable `HTTPStore` behind `http` feature
- Add `StorageValueIO`, a [`std::io::Read`] interface to a storage value.

### Changed
Expand Down
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ categories = ["encoding"]
exclude = [".dockerignore", ".github", ".editorconfig", "Dockerfile", "coverage.sh", "TODO.md"]

[features]
default = ["transpose", "blosc", "gzip", "sharding", "crc32c", "zstd", "raw_bits", "float16", "bfloat16", "zip", "ndarray"]
default = ["transpose", "blosc", "gzip", "sharding", "crc32c", "zstd", "raw_bits", "float16", "bfloat16", "http", "zip", "ndarray"]
# Codecs
transpose = ["dep:ndarray"]
blosc = ["dep:blosc-sys"]
Expand All @@ -24,6 +24,7 @@ raw_bits = []
float16 = ["dep:half"]
bfloat16 = ["dep:half"]
# Stores
http = ["dep:reqwest", "dep:url"]
zip = ["dep:zip"]
# Utilities
ndarray = ["dep:ndarray"] # Adds ndarray utility functions to Array
Expand All @@ -45,10 +46,12 @@ num = { version = "0.4" }
parking_lot = "0.12"
pathdiff = "0.2"
rayon = "1.6"
reqwest = { version = "0.11.21", features = ["blocking"], optional = true }
safe-transmute = "0.11"
serde = { version = "1.0.100", features = ["derive"] }
serde_json = { version = "1.0.71", features = ["preserve_order"] }
thiserror = "1.0.7"
url = { version = "2.4.1", optional = true }
walkdir = "2.3.2"
zip = { version = "0.6", optional = true }
zstd = { version = "0.12", optional = true }
Expand Down
6 changes: 6 additions & 0 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ pub enum StorageError {
Other(String),
}

impl From<&str> for StorageError {
fn from(err: &str) -> Self {
Self::Other(err.to_string())
}
}

/// Return the metadata key given a node path.
#[must_use]
pub fn meta_key(path: &NodePath) -> StoreKey {
Expand Down
11 changes: 8 additions & 3 deletions src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ pub use key::{StoreKey, StoreKeyError, StoreKeys};
pub use memory::MemoryStore;
pub use prefix::{StorePrefix, StorePrefixError, StorePrefixes};

#[cfg(feature = "http")]
mod http;
#[cfg(feature = "http")]
pub use http::{HTTPStore, HTTPStoreCreateError};

// pub use store_plugin::{StorePlugin, StorePluginCreateError}; // Currently disabled.

use std::sync::Arc;
Expand Down Expand Up @@ -49,10 +54,10 @@ pub type ReadableWritableStore = Arc<dyn ReadableWritableStoreExtension>;
// pub type ReadableWritableStorePlugin = StorePlugin<ReadableWritableStore>;
// inventory::collect!(ReadableWritableStorePlugin);

/// A store with [`Send`], [`Sync`] and an optional [`uri_scheme`](StoreExtension::uri_scheme).
/// Traits for a store extension.
pub trait StoreExtension: Send + Sync {
/// The URI scheme of the store, if it has one.
fn uri_scheme(&self) -> Option<&'static str>;
// /// The URI scheme of the store, if it has one.
// fn uri_scheme(&self) -> Option<&'static str>;
}

/// A readable store extension.
Expand Down
6 changes: 1 addition & 5 deletions src/storage/store/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,7 @@ impl ListableStoreExtension for FilesystemStore {}

impl ReadableWritableStoreExtension for FilesystemStore {}

impl StoreExtension for FilesystemStore {
fn uri_scheme(&self) -> Option<&'static str> {
Some("file")
}
}
impl StoreExtension for FilesystemStore {}

impl FilesystemStore {
/// Create a new file system store at a given `base_path`.
Expand Down
59 changes: 52 additions & 7 deletions src/storage/store/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,39 +131,84 @@ pub enum HTTPStoreCreateError {
mod tests {
use crate::{
array::{Array, DataType},
array_subset::ArraySubset,
node::NodePath,
storage::meta_key,
};

use super::*;

const HTTP_TEST_PATH: &'static str =
const HTTP_TEST_PATH_REF: &'static str =
"https://raw.githubusercontent.com/LDeakin/zarrs/main/tests/data/hierarchy.zarr";
const ARRAY_PATH: &'static str = "/a/baz";
const ARRAY_PATH_REF: &'static str = "/a/baz";

#[test]
fn http_store_size() {
let store = HTTPStore::new(HTTP_TEST_PATH).unwrap();
let store = HTTPStore::new(HTTP_TEST_PATH_REF).unwrap();
let len = store
.size_key(&meta_key(&NodePath::new(ARRAY_PATH).unwrap()))
.size_key(&meta_key(&NodePath::new(ARRAY_PATH_REF).unwrap()))
.unwrap();
assert_eq!(len, 691);
}

#[test]
fn http_store_get() {
let store = HTTPStore::new(HTTP_TEST_PATH).unwrap();
let store = HTTPStore::new(HTTP_TEST_PATH_REF).unwrap();
let metadata = store
.get(&meta_key(&NodePath::new(ARRAY_PATH).unwrap()))
.get(&meta_key(&NodePath::new(ARRAY_PATH_REF).unwrap()))
.unwrap();
let metadata: crate::array::ArrayMetadataV3 = serde_json::from_slice(&metadata).unwrap();
assert_eq!(metadata.data_type.name(), "float64");
}

#[test]
fn http_store_array() {
let store = HTTPStore::new(HTTP_TEST_PATH_REF).unwrap();
let array = Array::new(store.into(), ARRAY_PATH_REF).unwrap();
assert_eq!(array.data_type(), &DataType::Float64);
}

#[cfg(feature = "gzip")]
#[test]
fn http_store_array_get() {
const HTTP_TEST_PATH: &'static str =
"https://raw.githubusercontent.com/LDeakin/zarrs/main/tests/data/array_write_read.zarr";
const ARRAY_PATH: &'static str = "/group/array";

let store = HTTPStore::new(HTTP_TEST_PATH).unwrap();
let array = Array::new(store.into(), ARRAY_PATH).unwrap();
assert_eq!(array.data_type(), &DataType::Float64);
assert_eq!(array.data_type(), &DataType::Float32);

// Read the central 2x2 subset of the array
let subset_2x2 = ArraySubset::new_with_start_shape(vec![3, 3], vec![2, 2]).unwrap(); // the center 2x2 region
let data_2x2 = array
.retrieve_array_subset_elements::<f32>(&subset_2x2)
.unwrap();
assert_eq!(data_2x2, &[0.1, 0.2, 0.4, 0.5]);

// let data = array.retrieve_array_subset_ndarray::<f32>(&ArraySubset::new_with_shape(array.shape().to_vec())).unwrap();
// println!("{data:?}");
}

#[cfg(all(feature = "sharding", feature = "gzip", feature = "crc32c"))]
#[test]
fn http_store_sharded_array_get() {
const HTTP_TEST_PATH_SHARDED: &'static str =
"https://raw.githubusercontent.com/LDeakin/zarrs/main/tests/data/sharded_array_write_read.zarr";
const ARRAY_PATH_SHARDED: &'static str = "/group/array";

let store = HTTPStore::new(HTTP_TEST_PATH_SHARDED).unwrap();
let array = Array::new(store.into(), ARRAY_PATH_SHARDED).unwrap();
assert_eq!(array.data_type(), &DataType::UInt16);

// Read the central 2x2 subset of the array
let subset_2x2 = ArraySubset::new_with_start_shape(vec![3, 3], vec![2, 2]).unwrap(); // the center 2x2 region
let data_2x2 = array
.retrieve_array_subset_elements::<u16>(&subset_2x2)
.unwrap();
assert_eq!(data_2x2, &[27, 28, 35, 36]);

// let data = array.retrieve_array_subset_ndarray::<u16>(&ArraySubset::new_with_shape(array.shape().to_vec())).unwrap();
// println!("{data:?}");
}
}
6 changes: 1 addition & 5 deletions src/storage/store/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ impl Default for MemoryStore {
}
}

impl StoreExtension for MemoryStore {
fn uri_scheme(&self) -> Option<&'static str> {
None
}
}
impl StoreExtension for MemoryStore {}

impl ReadableStoreExtension for MemoryStore {}

Expand Down

0 comments on commit ce5f99a

Please sign in to comment.