Skip to content

Commit

Permalink
Support parallel encode/decode with blosc codec
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Oct 4, 2023
1 parent eefafb6 commit c301e18
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 21 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

- Add `From<&str>` and `From<String>` for `Other` variants of `CodecError`, `StorageError`, `ChunkGridShapeError`, `StorePluginCreateError`
- Support true partial decoding with the `blosc` codec, previously the whole chunk was always decoded
- Support parallel encoding, parallel decoding, and partial decoding with the `blosc` codec
- Previously the blosc codec would read the whole chunk when partial decoding

## [0.4.1] - 2023-10-04

Expand Down
18 changes: 13 additions & 5 deletions src/array/codec/bytes_to_bytes/blosc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ pub use blosc_configuration::{BloscCodecConfiguration, BloscCodecConfigurationV1
use blosc_sys::{
blosc_cbuffer_metainfo, blosc_cbuffer_sizes, blosc_cbuffer_validate, blosc_compress_ctx,
blosc_decompress_ctx, blosc_getitem, BLOSC_BITSHUFFLE, BLOSC_BLOSCLZ_COMPNAME,
BLOSC_LZ4HC_COMPNAME, BLOSC_LZ4_COMPNAME, BLOSC_MAX_OVERHEAD, BLOSC_NOSHUFFLE, BLOSC_SHUFFLE,
BLOSC_SNAPPY_COMPNAME, BLOSC_ZLIB_COMPNAME, BLOSC_ZSTD_COMPNAME,
BLOSC_LZ4HC_COMPNAME, BLOSC_LZ4_COMPNAME, BLOSC_MAX_OVERHEAD, BLOSC_MAX_THREADS,
BLOSC_NOSHUFFLE, BLOSC_SHUFFLE, BLOSC_SNAPPY_COMPNAME, BLOSC_ZLIB_COMPNAME,
BLOSC_ZSTD_COMPNAME,
};
use derive_more::From;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -120,7 +121,9 @@ fn blosc_compress_bytes(
typesize: usize,
compressor: BloscCompressor,
blocksize: usize,
numinternalthreads: usize,
) -> Result<Vec<u8>, BloscError> {
let numinternalthreads = std::cmp::min(numinternalthreads, BLOSC_MAX_THREADS as usize);
// let mut dest = vec![0; src.len() + BLOSC_MAX_OVERHEAD as usize];
let destsize = src.len() + BLOSC_MAX_OVERHEAD as usize;
let mut dest: Vec<u8> = Vec::with_capacity(destsize);
Expand All @@ -135,7 +138,7 @@ fn blosc_compress_bytes(
destsize,
compressor.as_cstr().cast::<c_char>(),
blocksize,
1,
i32::try_from(numinternalthreads).unwrap(),
)
};
if destsize > 0 {
Expand Down Expand Up @@ -210,14 +213,19 @@ fn blosc_nbytes(src: &[u8]) -> Option<usize> {
}
}

fn blosc_decompress_bytes(src: &[u8], destsize: usize) -> Result<Vec<u8>, BloscError> {
fn blosc_decompress_bytes(
src: &[u8],
destsize: usize,
numinternalthreads: usize,
) -> Result<Vec<u8>, BloscError> {
let numinternalthreads = std::cmp::min(numinternalthreads, BLOSC_MAX_THREADS as usize);
let mut dest: Vec<u8> = Vec::with_capacity(destsize);
let destsize = unsafe {
blosc_decompress_ctx(
src.as_ptr().cast::<c_void>(),
dest.as_mut_ptr().cast::<c_void>(),
destsize,
1,
i32::try_from(numinternalthreads).unwrap(),
)
};
if destsize > 0 {
Expand Down
53 changes: 38 additions & 15 deletions src/array/codec/bytes_to_bytes/blosc/blosc_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,28 @@ impl BloscCodec {
Some(configuration.typesize),
)
}

fn do_encode(&self, decoded_value: &[u8], n_threads: usize) -> Result<Vec<u8>, CodecError> {
blosc_compress_bytes(
decoded_value,
self.configuration.clevel,
self.configuration.shuffle,
self.configuration.typesize,
self.configuration.cname,
self.configuration.blocksize.unwrap_or(0),
n_threads,
)
.map_err(|err: BloscError| CodecError::Other(err.to_string()))
}

fn do_decode(encoded_value: &[u8], n_threads: usize) -> Result<Vec<u8>, CodecError> {
if let Some(destsize) = blosc_validate(encoded_value) {
blosc_decompress_bytes(encoded_value, destsize, n_threads)
.map_err(|e| CodecError::from(e.to_string()))
} else {
Err(CodecError::from("blosc encoded value is invalid"))
}
}
}

impl CodecTraits for BloscCodec {
Expand All @@ -130,28 +152,29 @@ impl CodecTraits for BloscCodec {

impl BytesToBytesCodecTraits for BloscCodec {
fn encode(&self, decoded_value: Vec<u8>) -> Result<Vec<u8>, CodecError> {
blosc_compress_bytes(
&decoded_value,
self.configuration.clevel,
self.configuration.shuffle,
self.configuration.typesize,
self.configuration.cname,
self.configuration.blocksize.unwrap_or(0),
)
.map_err(|err: BloscError| CodecError::Other(err.to_string()))
self.do_encode(&decoded_value, 1)
}

fn par_encode(&self, decoded_value: Vec<u8>) -> Result<Vec<u8>, CodecError> {
let n_threads = std::thread::available_parallelism().unwrap().get();
self.do_encode(&decoded_value, n_threads)
}

fn decode(
&self,
encoded_value: Vec<u8>,
_decoded_representation: &BytesRepresentation,
) -> Result<Vec<u8>, CodecError> {
if let Some(destsize) = blosc_validate(&encoded_value) {
blosc_decompress_bytes(&encoded_value, destsize)
.map_err(|e| CodecError::from(e.to_string()))
} else {
Err(CodecError::from("blosc encoded value is invalid"))
}
Self::do_decode(&encoded_value, 1)
}

fn par_decode(
&self,
encoded_value: Vec<u8>,
_decoded_representation: &BytesRepresentation,
) -> Result<Vec<u8>, CodecError> {
let n_threads = std::thread::available_parallelism().unwrap().get();
Self::do_decode(&encoded_value, n_threads)
}

fn partial_decoder<'a>(
Expand Down

0 comments on commit c301e18

Please sign in to comment.