From 0cd9ca9b0956f9f4b1db3c219f6d1e1f76cf82a5 Mon Sep 17 00:00:00 2001 From: Ziheng Wang Date: Sat, 29 Jun 2024 17:09:10 -0700 Subject: [PATCH] added unordered parquet reader --- bench.py | 7 +-- python/rottnest/utils.py | 2 +- src/formats/parquet.rs | 97 ++++++++++++++++++++++++---------------- src/lava_py/format.rs | 33 +++++++------- src/vamana/access.rs | 6 +-- 5 files changed, 82 insertions(+), 63 deletions(-) diff --git a/bench.py b/bench.py index 903a1c0..172ccee 100644 --- a/bench.py +++ b/bench.py @@ -3,7 +3,7 @@ from tqdm import tqdm import polars -metadata = polars.read_parquet("small.parquet") +metadata = polars.read_parquet("bench.parquet")[:10] # metadatas = [] # filenames = [] @@ -24,6 +24,7 @@ metadata["page_byte_size"].to_list(), [0] * len(metadata["filename"]), "aws", - file_metadata)) + file_metadata, + False)) -print(len(result)) \ No newline at end of file +print(len(result)) diff --git a/python/rottnest/utils.py b/python/rottnest/utils.py index d79a3da..05c836d 100644 --- a/python/rottnest/utils.py +++ b/python/rottnest/utils.py @@ -46,7 +46,7 @@ def read_metadata_file(file_path: str): return polars.from_arrow(table), cache_ranges -def read_columns(file_paths: list, row_groups: list, row_nr: list[list]): +def read_columns(file_paths: list, row_groups: list, row_nr: list): def read_parquet_file(file, row_group, row_nr): f = pq.ParquetFile(file.replace("s3://",''), filesystem=get_fs_from_file_path(file)) diff --git a/src/formats/parquet.rs b/src/formats/parquet.rs index 65f2b7e..ace7e3e 100644 --- a/src/formats/parquet.rs +++ b/src/formats/parquet.rs @@ -38,7 +38,8 @@ use crate::{ }; use super::readers::ReaderType; -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; +use tokio::task::JoinSet; async fn get_metadata_bytes( reader: &mut AsyncReader, @@ -205,13 +206,16 @@ async fn parse_metadatas( .await .unwrap(); - let metadata = decode_metadata(metadata_bytes.to_byte_slice()).map_err(LavaError::from).unwrap(); + let metadata = decode_metadata(metadata_bytes.to_byte_slice()) + .map_err(LavaError::from) + .unwrap(); (file_path, metadata) }) }) .collect::>() .await; - let res: Vec> = futures::future::join_all(handles).await; + let res: Vec> = + futures::future::join_all(handles).await; let mut metadatas = HashMap::new(); @@ -433,6 +437,7 @@ pub async fn read_indexed_pages_async( dict_page_sizes: Vec, // 0 means no dict page reader_type: ReaderType, file_metadatas: Option>, + in_order: Option, ) -> Result, LavaError> { // current implementation might re-read dictionary pages, this should be optimized // we are assuming that all the files are either on disk or cloud. @@ -446,17 +451,23 @@ pub async fn read_indexed_pages_async( println!("Using provided file metadatas"); let mut metadatas: HashMap = HashMap::new(); for (key, value) in file_metadatas.into_iter() { - metadatas.insert(key, decode_metadata(value.to_byte_slice()).map_err(LavaError::from).unwrap()); + metadatas.insert( + key, + decode_metadata(value.to_byte_slice()) + .map_err(LavaError::from) + .unwrap(), + ); } metadatas - }, - None => parse_metadatas(&file_paths, reader_type.clone()).await + } + None => parse_metadatas(&file_paths, reader_type.clone()).await, }; + let in_order: bool = in_order.unwrap_or(true); let mut reader = get_reader(file_paths[0].clone(), reader_type.clone()) - .await - .unwrap(); + .await + .unwrap(); let iter = izip!( file_paths, @@ -468,8 +479,10 @@ pub async fn read_indexed_pages_async( let start = std::time::Instant::now(); + let mut future_handles: Vec> = vec![]; + let mut join_set = JoinSet::new(); - let iter: Vec> = stream::iter(iter) + let iter: Vec<_> = stream::iter(iter) .map( |(file_path, row_group, page_offset, page_size, dict_page_size)| { let column_index = metadatas[&file_path] @@ -486,7 +499,7 @@ pub async fn read_indexed_pages_async( .row_group(row_group) .schema_descr() .column(column_index); - + let compression_scheme = metadatas[&file_path] .row_group(row_group) .column(column_index) @@ -498,12 +511,11 @@ pub async fn read_indexed_pages_async( let mut codec = create_codec(compression_scheme, &codec_options) .unwrap() .unwrap(); - + let mut reader_c = reader.clone(); reader_c.update_filename(file_path).unwrap(); - let handle = tokio::spawn(async move { - + let future = async move { let mut pages: Vec = Vec::new(); if dict_page_size > 0 { let start = dict_page_offset.unwrap() as u64; @@ -570,32 +582,39 @@ pub async fn read_indexed_pages_async( }; data - }); + }; - handle + if in_order { + let handle = tokio::spawn(future); + future_handles.push(handle); + } else { + join_set.spawn(future); + } }, ) .collect::>() .await; // it is absolutely crucial to collect results in the same order. - let res: Vec> = - futures::future::join_all(iter).await; - let result: Result, tokio::task::JoinError> = - res.into_iter().try_fold(Vec::new(), |mut acc, r| { - r.map(|inner_vec| { - acc.push(inner_vec); - acc - }) - }); + + let result: Vec = if in_order { + let res: Vec> = + futures::future::join_all(future_handles).await; + res.into_iter().map(|res| res.unwrap()).collect() + } else { + let mut result_inner: Vec = vec![]; + while let Some(res) = join_set.join_next().await { + result_inner.push(res.unwrap()); + } + result_inner + }; + + join_set.shutdown().await; let end = std::time::Instant::now(); println!("read_indexed_pages_async took {:?}", end - start); - result.map_err(|e| { - // Here, you can convert `e` (a JoinError) into your custom error type. - LavaError::from(ParquetError::General(e.to_string())) - }) + Ok(result) } pub fn read_indexed_pages( @@ -606,7 +625,8 @@ pub fn read_indexed_pages( page_sizes: Vec, dict_page_sizes: Vec, // 0 means no dict page reader_type: ReaderType, - file_metadatas: Option> + file_metadatas: Option>, + in_order: Option, ) -> Result, LavaError> { let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -614,15 +634,16 @@ pub fn read_indexed_pages( .unwrap(); let res = rt.block_on(read_indexed_pages_async( - column_name, - file_paths, - row_groups, - page_offsets, - page_sizes, - dict_page_sizes, - reader_type, - file_metadatas - )); + column_name, + file_paths, + row_groups, + page_offsets, + page_sizes, + dict_page_sizes, + reader_type, + file_metadatas, + in_order, + )); rt.shutdown_background(); res } diff --git a/src/lava_py/format.rs b/src/lava_py/format.rs index ba3e31c..5e44444 100644 --- a/src/lava_py/format.rs +++ b/src/lava_py/format.rs @@ -1,14 +1,14 @@ -use crate::formats::{parquet, cache, MatchResult, ParquetLayout}; +use crate::formats::{cache, parquet, MatchResult, ParquetLayout}; use crate::lava::error::LavaError; -use bytes::Bytes; use arrow::array::ArrayData; use arrow::pyarrow::{PyArrowType, ToPyArrow}; +use bytes::Bytes; use pyo3::prelude::*; +use pyo3::types::{PyBytes, PyDict, PyList, PyTuple}; use pyo3::{pyfunction, types::PyString}; use std::collections::hash_map::DefaultHasher; -use std::hash::{Hash, Hasher}; -use pyo3::types::{PyDict, PyBytes, PyTuple, PyList}; use std::collections::{BTreeMap, HashMap}; +use std::hash::{Hash, Hasher}; #[pyclass] pub struct ParquetLayoutWrapper { @@ -32,7 +32,7 @@ impl ParquetLayoutWrapper { fn from_parquet_layout(py: Python, parquet_layout: ParquetLayout) -> Self { ParquetLayoutWrapper { num_row_groups: parquet_layout.num_row_groups, - metadata_bytes: PyBytes::new(py, &parquet_layout.metadata_bytes.slice(..)).into_py(py), + metadata_bytes: PyBytes::new(py, &parquet_layout.metadata_bytes.slice(..)).into_py(py), dictionary_page_sizes: parquet_layout.dictionary_page_sizes, data_page_sizes: parquet_layout.data_page_sizes, data_page_offsets: parquet_layout.data_page_offsets, @@ -95,25 +95,21 @@ impl From for MatchResultWrapper { } } - #[pyfunction] pub fn populate_cache( py: Python, filenames: Vec<&PyString>, ranges: Vec>, - reader_type: Option<&PyString> -) -> Result<(), LavaError> { - + reader_type: Option<&PyString>, +) -> Result<(), LavaError> { let reader_type = reader_type.map(|x| x.to_string()).unwrap_or_default(); - + let mut range_dict: BTreeMap> = BTreeMap::new(); for (i, filename) in filenames.iter().enumerate() { range_dict.insert(filename.to_string(), ranges[i].clone()); } - py.allow_threads(|| { - cache::populate_cache(range_dict, reader_type.into()) - }) + py.allow_threads(|| cache::populate_cache(range_dict, reader_type.into())) } #[pyfunction] @@ -145,23 +141,25 @@ pub fn read_indexed_pages( dict_page_sizes: Vec, reader_type: Option<&PyString>, metadata_bytes: Option<&PyDict>, + in_order: Option, ) -> Result>, LavaError> { let column_name = column_name.to_string(); - let file_metadata: Option> = match metadata_bytes { + let file_metadata: Option> = match metadata_bytes { Some(dict) => { let mut metadata_map: HashMap = HashMap::new(); if let Some(dict) = metadata_bytes { for (key, value) in dict.iter() { let key_str = key.extract::<&PyString>()?.to_string(); - let value_bytes = Bytes::copy_from_slice(value.extract::<&PyBytes>()?.as_bytes()); + let value_bytes = + Bytes::copy_from_slice(value.extract::<&PyBytes>()?.as_bytes()); metadata_map.insert(key_str, value_bytes); } } Some(metadata_map) } - None => None + None => None, }; - + let file_paths: Vec = file_paths.iter().map(|x| x.to_string()).collect(); let page_offsets: Vec = page_offsets.iter().map(|x| *x as u64).collect(); let reader_type = reader_type.map(|x| x.to_string()).unwrap_or_default(); @@ -175,6 +173,7 @@ pub fn read_indexed_pages( dict_page_sizes, // 0 means no dict page reader_type.into(), file_metadata, + in_order, ) })?; Ok(match_result.into_iter().map(|x| PyArrowType(x)).collect()) diff --git a/src/vamana/access.rs b/src/vamana/access.rs index 491262b..6686f8f 100644 --- a/src/vamana/access.rs +++ b/src/vamana/access.rs @@ -1,13 +1,10 @@ use crate::formats::parquet::read_indexed_pages_async; use crate::formats::readers::ReaderType; -use crate::vamana::vamana::{ - Distance, Indexable, VectorAccessMethod, -}; +use crate::vamana::vamana::{Distance, Indexable, VectorAccessMethod}; use arrow::array::BinaryArray; use ndarray::parallel::prelude::*; use ndarray::{s, Array2}; - pub struct Euclidean { t: std::marker::PhantomData, } @@ -75,6 +72,7 @@ impl VectorAccessMethod for ReaderAccessMethodF32<'_> { vec![dict_page_size], // 0 means no dict page reader_type, None, + None, ) .await .unwrap()