Skip to content

Commit

Permalink
Precomputing index maps
Browse files Browse the repository at this point in the history
  • Loading branch information
grimbough committed Sep 20, 2023
1 parent 684541a commit a2daa32
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
23 changes: 22 additions & 1 deletion R/read_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,29 @@ read_zarr_array <- function(zarr_array_path, index, s3_client) {
return(res$output)
}

.find_index_in_result <- function(index, shape, chunk_dim, required_chunk) {

nchunks <- (shape %/% chunk_dim) + ((shape %% chunk_dim) > 0)
breaks <- seq.int(from = 1, length.out = nchunks+1, by = chunk_dim)
tmp <- .bincode(index, breaks = breaks, right = FALSE, include.lowest = TRUE)-1
which(tmp == required_chunk)
}

.create_index_map <- function(index, metadata) {

map <- list()
for(j in seq_along(metadata$shape)) {
map[[j]] <- (index[[j]] - 1) %/% metadata$chunks[[j]]
}
return(map)
}

#' @importFrom R.utils extract
read_data <- function(required_chunks, zarr_array_path, s3_client,
index, metadata) {

warn <- 0L
index_mapping <- .create_index_map(index, metadata)

## hopefully we can eventually do this in parallel
chunk_selections <- lapply(seq_len(nrow(required_chunks)), FUN = function(i) {
Expand All @@ -85,7 +103,10 @@ read_data <- function(required_chunks, zarr_array_path, s3_client,
alt_chunk_dim <- unlist(metadata$chunks)

for (j in seq_len(ncol(required_chunks))) {
index_in_result[[j]] <- which((index[[j]] - 1) %/% metadata$chunks[[j]] == required_chunks[i, j])
index_in_result[[j]] <- which(index_mapping[[j]] == required_chunks[i,j])
#index_in_result[[j]] <- .find_index_in_result(index[[j]], metadata$shape[[j]],
# metadata$chunks[[j]], required_chunks[i,j])
#index_in_result[[j]] <- which((index[[j]] - 1) %/% metadata$chunks[[j]] == required_chunks[i, j])
## are we requesting values outside the array due to overhanging chunks?
outside_extent <- index_in_result[[j]] > metadata$shape[[j]]
if (any(outside_extent))
Expand Down
16 changes: 13 additions & 3 deletions R/write_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -168,23 +168,33 @@ write_zarr_array <- function(x,

chunk_names <- .generate_chunk_names(x_dim = dim(x), chunk_dim = chunk_dim)
chunk_ids <- apply(chunk_names, 1, paste0, collapse = dimension_separator)
array_index_map <- .create_array_index_map(dim(x), metadata)

## iterate over each chunk
## TODO: maybe this can be done in parallel with bplapply() ?
res <- lapply(chunk_ids,
FUN = .write_chunk, x = x, path = path,
metadata = metadata
metadata = metadata, array_index_map = array_index_map
)

return(invisible(all(unlist(res))))
}

.create_array_index_map <- function(dim_x, metadata) {
map <- list()
for(j in seq_along(dim_x)) {
map[[j]] <- (seq_len(dim_x[j]) - 1) %/% metadata$chunks[[j]]
}
return(map)
}


.generate_chunk_names <- function(x_dim, chunk_dim) {
n_chunks_in_dim <- (x_dim %/% chunk_dim) + as.logical(x_dim %% chunk_dim)
expand.grid(lapply(n_chunks_in_dim, seq_len)) - 1
}

.write_chunk <- function(chunk_id, x, path, metadata) {
.write_chunk <- function(chunk_id, x, path, metadata, array_index_map) {

chunk_dim <- unlist(metadata$chunks)
dim_sep <- metadata$dimension_separator
Expand All @@ -194,7 +204,7 @@ write_zarr_array <- function(x,

idx_in_array <- list()
for (j in seq_along(dim(x))) {
idx_in_array[[j]] <- which((seq_len(dim(x)[j]) - 1) %/% chunk_dim[j] == chunk_id_split[j])
idx_in_array[[j]] <- which(array_index_map[[j]] == chunk_id_split[j])
}

chunk_in_mem <- R.utils::extract(x, indices = idx_in_array)
Expand Down

0 comments on commit a2daa32

Please sign in to comment.