From c301e18516344996f2190e95032b955d39ea2cc8 Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Thu, 5 Oct 2023 10:04:49 +1100 Subject: [PATCH] Support parallel encode/decode with blosc codec --- CHANGELOG.md | 3 +- src/array/codec/bytes_to_bytes/blosc.rs | 18 +++++-- .../codec/bytes_to_bytes/blosc/blosc_codec.rs | 53 +++++++++++++------ 3 files changed, 53 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc900e8a..e77c192f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - Add `From<&str>` and `From` for `Other` variants of `CodecError`, `StorageError`, `ChunkGridShapeError`, `StorePluginCreateError` - - Support true partial decoding with the `blosc` codec, previously the whole chunk was always decoded + - Support parallel encoding, parallel decoding, and partial decoding with the `blosc` codec + - Previously the blosc codec would read the whole chunk when partial decoding ## [0.4.1] - 2023-10-04 diff --git a/src/array/codec/bytes_to_bytes/blosc.rs b/src/array/codec/bytes_to_bytes/blosc.rs index d1d062bc..8c999c6d 100644 --- a/src/array/codec/bytes_to_bytes/blosc.rs +++ b/src/array/codec/bytes_to_bytes/blosc.rs @@ -21,8 +21,9 @@ pub use blosc_configuration::{BloscCodecConfiguration, BloscCodecConfigurationV1 use blosc_sys::{ blosc_cbuffer_metainfo, blosc_cbuffer_sizes, blosc_cbuffer_validate, blosc_compress_ctx, blosc_decompress_ctx, blosc_getitem, BLOSC_BITSHUFFLE, BLOSC_BLOSCLZ_COMPNAME, - BLOSC_LZ4HC_COMPNAME, BLOSC_LZ4_COMPNAME, BLOSC_MAX_OVERHEAD, BLOSC_NOSHUFFLE, BLOSC_SHUFFLE, - BLOSC_SNAPPY_COMPNAME, BLOSC_ZLIB_COMPNAME, BLOSC_ZSTD_COMPNAME, + BLOSC_LZ4HC_COMPNAME, BLOSC_LZ4_COMPNAME, BLOSC_MAX_OVERHEAD, BLOSC_MAX_THREADS, + BLOSC_NOSHUFFLE, BLOSC_SHUFFLE, BLOSC_SNAPPY_COMPNAME, BLOSC_ZLIB_COMPNAME, + BLOSC_ZSTD_COMPNAME, }; use derive_more::From; use serde::{Deserialize, Serialize}; @@ -120,7 +121,9 @@ fn blosc_compress_bytes( typesize: usize, compressor: BloscCompressor, blocksize: usize, + numinternalthreads: usize, ) -> Result, BloscError> { + let numinternalthreads = std::cmp::min(numinternalthreads, BLOSC_MAX_THREADS as usize); // let mut dest = vec![0; src.len() + BLOSC_MAX_OVERHEAD as usize]; let destsize = src.len() + BLOSC_MAX_OVERHEAD as usize; let mut dest: Vec = Vec::with_capacity(destsize); @@ -135,7 +138,7 @@ fn blosc_compress_bytes( destsize, compressor.as_cstr().cast::(), blocksize, - 1, + i32::try_from(numinternalthreads).unwrap(), ) }; if destsize > 0 { @@ -210,14 +213,19 @@ fn blosc_nbytes(src: &[u8]) -> Option { } } -fn blosc_decompress_bytes(src: &[u8], destsize: usize) -> Result, BloscError> { +fn blosc_decompress_bytes( + src: &[u8], + destsize: usize, + numinternalthreads: usize, +) -> Result, BloscError> { + let numinternalthreads = std::cmp::min(numinternalthreads, BLOSC_MAX_THREADS as usize); let mut dest: Vec = Vec::with_capacity(destsize); let destsize = unsafe { blosc_decompress_ctx( src.as_ptr().cast::(), dest.as_mut_ptr().cast::(), destsize, - 1, + i32::try_from(numinternalthreads).unwrap(), ) }; if destsize > 0 { diff --git a/src/array/codec/bytes_to_bytes/blosc/blosc_codec.rs b/src/array/codec/bytes_to_bytes/blosc/blosc_codec.rs index 6b9e0205..536f04ea 100644 --- a/src/array/codec/bytes_to_bytes/blosc/blosc_codec.rs +++ b/src/array/codec/bytes_to_bytes/blosc/blosc_codec.rs @@ -110,6 +110,28 @@ impl BloscCodec { Some(configuration.typesize), ) } + + fn do_encode(&self, decoded_value: &[u8], n_threads: usize) -> Result, CodecError> { + blosc_compress_bytes( + decoded_value, + self.configuration.clevel, + self.configuration.shuffle, + self.configuration.typesize, + self.configuration.cname, + self.configuration.blocksize.unwrap_or(0), + n_threads, + ) + .map_err(|err: BloscError| CodecError::Other(err.to_string())) + } + + fn do_decode(encoded_value: &[u8], n_threads: usize) -> Result, CodecError> { + if let Some(destsize) = blosc_validate(encoded_value) { + blosc_decompress_bytes(encoded_value, destsize, n_threads) + .map_err(|e| CodecError::from(e.to_string())) + } else { + Err(CodecError::from("blosc encoded value is invalid")) + } + } } impl CodecTraits for BloscCodec { @@ -130,15 +152,12 @@ impl CodecTraits for BloscCodec { impl BytesToBytesCodecTraits for BloscCodec { fn encode(&self, decoded_value: Vec) -> Result, CodecError> { - blosc_compress_bytes( - &decoded_value, - self.configuration.clevel, - self.configuration.shuffle, - self.configuration.typesize, - self.configuration.cname, - self.configuration.blocksize.unwrap_or(0), - ) - .map_err(|err: BloscError| CodecError::Other(err.to_string())) + self.do_encode(&decoded_value, 1) + } + + fn par_encode(&self, decoded_value: Vec) -> Result, CodecError> { + let n_threads = std::thread::available_parallelism().unwrap().get(); + self.do_encode(&decoded_value, n_threads) } fn decode( @@ -146,12 +165,16 @@ impl BytesToBytesCodecTraits for BloscCodec { encoded_value: Vec, _decoded_representation: &BytesRepresentation, ) -> Result, CodecError> { - if let Some(destsize) = blosc_validate(&encoded_value) { - blosc_decompress_bytes(&encoded_value, destsize) - .map_err(|e| CodecError::from(e.to_string())) - } else { - Err(CodecError::from("blosc encoded value is invalid")) - } + Self::do_decode(&encoded_value, 1) + } + + fn par_decode( + &self, + encoded_value: Vec, + _decoded_representation: &BytesRepresentation, + ) -> Result, CodecError> { + let n_threads = std::thread::available_parallelism().unwrap().get(); + Self::do_decode(&encoded_value, n_threads) } fn partial_decoder<'a>(