Skip to content

Commit

Permalink
added unordered parquet reader
Browse files Browse the repository at this point in the history
  • Loading branch information
marsupialtail committed Jun 30, 2024
1 parent be67623 commit 0cd9ca9
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 63 deletions.
7 changes: 4 additions & 3 deletions bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from tqdm import tqdm
import polars

metadata = polars.read_parquet("small.parquet")
metadata = polars.read_parquet("bench.parquet")[:10]

# metadatas = []
# filenames = []
Expand All @@ -24,6 +24,7 @@
metadata["page_byte_size"].to_list(),
[0] * len(metadata["filename"]),
"aws",
file_metadata))
file_metadata,
False))

print(len(result))
print(len(result))
2 changes: 1 addition & 1 deletion python/rottnest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def read_metadata_file(file_path: str):

return polars.from_arrow(table), cache_ranges

def read_columns(file_paths: list, row_groups: list, row_nr: list[list]):
def read_columns(file_paths: list, row_groups: list, row_nr: list):

def read_parquet_file(file, row_group, row_nr):
f = pq.ParquetFile(file.replace("s3://",''), filesystem=get_fs_from_file_path(file))
Expand Down
97 changes: 59 additions & 38 deletions src/formats/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ use crate::{
};

use super::readers::ReaderType;
use serde::{Serialize, Deserialize};
use serde::{Deserialize, Serialize};
use tokio::task::JoinSet;

async fn get_metadata_bytes(
reader: &mut AsyncReader,
Expand Down Expand Up @@ -205,13 +206,16 @@ async fn parse_metadatas(
.await
.unwrap();

let metadata = decode_metadata(metadata_bytes.to_byte_slice()).map_err(LavaError::from).unwrap();
let metadata = decode_metadata(metadata_bytes.to_byte_slice())
.map_err(LavaError::from)
.unwrap();
(file_path, metadata)
})
})
.collect::<Vec<_>>()
.await;
let res: Vec<Result<(String, ParquetMetaData), tokio::task::JoinError>> = futures::future::join_all(handles).await;
let res: Vec<Result<(String, ParquetMetaData), tokio::task::JoinError>> =
futures::future::join_all(handles).await;

let mut metadatas = HashMap::new();

Expand Down Expand Up @@ -433,6 +437,7 @@ pub async fn read_indexed_pages_async(
dict_page_sizes: Vec<usize>, // 0 means no dict page
reader_type: ReaderType,
file_metadatas: Option<HashMap<String, Bytes>>,
in_order: Option<bool>,
) -> Result<Vec<ArrayData>, LavaError> {
// current implementation might re-read dictionary pages, this should be optimized
// we are assuming that all the files are either on disk or cloud.
Expand All @@ -446,17 +451,23 @@ pub async fn read_indexed_pages_async(
println!("Using provided file metadatas");
let mut metadatas: HashMap<String, ParquetMetaData> = HashMap::new();
for (key, value) in file_metadatas.into_iter() {
metadatas.insert(key, decode_metadata(value.to_byte_slice()).map_err(LavaError::from).unwrap());
metadatas.insert(
key,
decode_metadata(value.to_byte_slice())
.map_err(LavaError::from)
.unwrap(),
);
}
metadatas
},
None => parse_metadatas(&file_paths, reader_type.clone()).await
}
None => parse_metadatas(&file_paths, reader_type.clone()).await,
};

let in_order: bool = in_order.unwrap_or(true);

let mut reader = get_reader(file_paths[0].clone(), reader_type.clone())
.await
.unwrap();
.await
.unwrap();

let iter = izip!(
file_paths,
Expand All @@ -468,8 +479,10 @@ pub async fn read_indexed_pages_async(

let start = std::time::Instant::now();

let mut future_handles: Vec<tokio::task::JoinHandle<ArrayData>> = vec![];
let mut join_set = JoinSet::new();

let iter: Vec<tokio::task::JoinHandle<ArrayData>> = stream::iter(iter)
let iter: Vec<_> = stream::iter(iter)
.map(
|(file_path, row_group, page_offset, page_size, dict_page_size)| {
let column_index = metadatas[&file_path]
Expand All @@ -486,7 +499,7 @@ pub async fn read_indexed_pages_async(
.row_group(row_group)
.schema_descr()
.column(column_index);

let compression_scheme = metadatas[&file_path]
.row_group(row_group)
.column(column_index)
Expand All @@ -498,12 +511,11 @@ pub async fn read_indexed_pages_async(
let mut codec = create_codec(compression_scheme, &codec_options)
.unwrap()
.unwrap();

let mut reader_c = reader.clone();
reader_c.update_filename(file_path).unwrap();

let handle = tokio::spawn(async move {

let future = async move {
let mut pages: Vec<parquet::column::page::Page> = Vec::new();
if dict_page_size > 0 {
let start = dict_page_offset.unwrap() as u64;
Expand Down Expand Up @@ -570,32 +582,39 @@ pub async fn read_indexed_pages_async(
};

data
});
};

handle
if in_order {
let handle = tokio::spawn(future);
future_handles.push(handle);
} else {
join_set.spawn(future);
}
},
)
.collect::<Vec<_>>()
.await;

// it is absolutely crucial to collect results in the same order.
let res: Vec<std::prelude::v1::Result<ArrayData, tokio::task::JoinError>> =
futures::future::join_all(iter).await;
let result: Result<Vec<ArrayData>, tokio::task::JoinError> =
res.into_iter().try_fold(Vec::new(), |mut acc, r| {
r.map(|inner_vec| {
acc.push(inner_vec);
acc
})
});

let result: Vec<ArrayData> = if in_order {
let res: Vec<std::prelude::v1::Result<ArrayData, tokio::task::JoinError>> =
futures::future::join_all(future_handles).await;
res.into_iter().map(|res| res.unwrap()).collect()
} else {
let mut result_inner: Vec<ArrayData> = vec![];
while let Some(res) = join_set.join_next().await {
result_inner.push(res.unwrap());
}
result_inner
};

join_set.shutdown().await;

let end = std::time::Instant::now();
println!("read_indexed_pages_async took {:?}", end - start);

result.map_err(|e| {
// Here, you can convert `e` (a JoinError) into your custom error type.
LavaError::from(ParquetError::General(e.to_string()))
})
Ok(result)
}

pub fn read_indexed_pages(
Expand All @@ -606,23 +625,25 @@ pub fn read_indexed_pages(
page_sizes: Vec<usize>,
dict_page_sizes: Vec<usize>, // 0 means no dict page
reader_type: ReaderType,
file_metadatas: Option<HashMap<String, Bytes>>
file_metadatas: Option<HashMap<String, Bytes>>,
in_order: Option<bool>,
) -> Result<Vec<ArrayData>, LavaError> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

let res = rt.block_on(read_indexed_pages_async(
column_name,
file_paths,
row_groups,
page_offsets,
page_sizes,
dict_page_sizes,
reader_type,
file_metadatas
));
column_name,
file_paths,
row_groups,
page_offsets,
page_sizes,
dict_page_sizes,
reader_type,
file_metadatas,
in_order,
));
rt.shutdown_background();
res
}
33 changes: 16 additions & 17 deletions src/lava_py/format.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::formats::{parquet, cache, MatchResult, ParquetLayout};
use crate::formats::{cache, parquet, MatchResult, ParquetLayout};
use crate::lava::error::LavaError;
use bytes::Bytes;
use arrow::array::ArrayData;
use arrow::pyarrow::{PyArrowType, ToPyArrow};
use bytes::Bytes;
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyDict, PyList, PyTuple};
use pyo3::{pyfunction, types::PyString};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use pyo3::types::{PyDict, PyBytes, PyTuple, PyList};
use std::collections::{BTreeMap, HashMap};
use std::hash::{Hash, Hasher};

#[pyclass]
pub struct ParquetLayoutWrapper {
Expand All @@ -32,7 +32,7 @@ impl ParquetLayoutWrapper {
fn from_parquet_layout(py: Python, parquet_layout: ParquetLayout) -> Self {
ParquetLayoutWrapper {
num_row_groups: parquet_layout.num_row_groups,
metadata_bytes: PyBytes::new(py, &parquet_layout.metadata_bytes.slice(..)).into_py(py),
metadata_bytes: PyBytes::new(py, &parquet_layout.metadata_bytes.slice(..)).into_py(py),
dictionary_page_sizes: parquet_layout.dictionary_page_sizes,
data_page_sizes: parquet_layout.data_page_sizes,
data_page_offsets: parquet_layout.data_page_offsets,
Expand Down Expand Up @@ -95,25 +95,21 @@ impl From<MatchResult> for MatchResultWrapper {
}
}


#[pyfunction]
pub fn populate_cache(
py: Python,
filenames: Vec<&PyString>,
ranges: Vec<Vec<(usize, usize)>>,
reader_type: Option<&PyString>
) -> Result<(), LavaError> {

reader_type: Option<&PyString>,
) -> Result<(), LavaError> {
let reader_type = reader_type.map(|x| x.to_string()).unwrap_or_default();

let mut range_dict: BTreeMap<String, Vec<(usize, usize)>> = BTreeMap::new();
for (i, filename) in filenames.iter().enumerate() {
range_dict.insert(filename.to_string(), ranges[i].clone());
}

py.allow_threads(|| {
cache::populate_cache(range_dict, reader_type.into())
})
py.allow_threads(|| cache::populate_cache(range_dict, reader_type.into()))
}

#[pyfunction]
Expand Down Expand Up @@ -145,23 +141,25 @@ pub fn read_indexed_pages(
dict_page_sizes: Vec<usize>,
reader_type: Option<&PyString>,
metadata_bytes: Option<&PyDict>,
in_order: Option<bool>,
) -> Result<Vec<PyArrowType<ArrayData>>, LavaError> {
let column_name = column_name.to_string();
let file_metadata: Option<HashMap<String, Bytes>> = match metadata_bytes {
let file_metadata: Option<HashMap<String, Bytes>> = match metadata_bytes {
Some(dict) => {
let mut metadata_map: HashMap<String, Bytes> = HashMap::new();
if let Some(dict) = metadata_bytes {
for (key, value) in dict.iter() {
let key_str = key.extract::<&PyString>()?.to_string();
let value_bytes = Bytes::copy_from_slice(value.extract::<&PyBytes>()?.as_bytes());
let value_bytes =
Bytes::copy_from_slice(value.extract::<&PyBytes>()?.as_bytes());
metadata_map.insert(key_str, value_bytes);
}
}
Some(metadata_map)
}
None => None
None => None,
};

let file_paths: Vec<String> = file_paths.iter().map(|x| x.to_string()).collect();
let page_offsets: Vec<u64> = page_offsets.iter().map(|x| *x as u64).collect();
let reader_type = reader_type.map(|x| x.to_string()).unwrap_or_default();
Expand All @@ -175,6 +173,7 @@ pub fn read_indexed_pages(
dict_page_sizes, // 0 means no dict page
reader_type.into(),
file_metadata,
in_order,
)
})?;
Ok(match_result.into_iter().map(|x| PyArrowType(x)).collect())
Expand Down
6 changes: 2 additions & 4 deletions src/vamana/access.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use crate::formats::parquet::read_indexed_pages_async;
use crate::formats::readers::ReaderType;
use crate::vamana::vamana::{
Distance, Indexable, VectorAccessMethod,
};
use crate::vamana::vamana::{Distance, Indexable, VectorAccessMethod};
use arrow::array::BinaryArray;
use ndarray::parallel::prelude::*;
use ndarray::{s, Array2};


pub struct Euclidean<T: Indexable> {
t: std::marker::PhantomData<T>,
}
Expand Down Expand Up @@ -75,6 +72,7 @@ impl VectorAccessMethod<f32> for ReaderAccessMethodF32<'_> {
vec![dict_page_size], // 0 means no dict page
reader_type,
None,
None,
)
.await
.unwrap()
Expand Down

0 comments on commit 0cd9ca9

Please sign in to comment.