Skip to content

Commit

Permalink
added unordered parquet reader
Browse files Browse the repository at this point in the history
  • Loading branch information
marsupialtail committed Aug 31, 2024
1 parent 968f9e0 commit 6d862c1
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 18 deletions.
13 changes: 13 additions & 0 deletions python/rottnest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ def read_parquet_file(file, row_group, row_nr):

return pyarrow.concat_tables(results)

def read_row_groups(file_paths: list, row_groups: list, row_ranges: list, column: str):

def read_parquet_file(file, row_group, row_range):
f = pq.ParquetFile(file.replace("s3://",''), filesystem=get_fs_from_file_path(file))
return f.read_row_group(row_group, columns=[column])[column].combine_chunks()[row_range[0]:row_range[1]]

with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: # Control the number of parallel threads
results = list(executor.map(read_parquet_file, file_paths, row_groups, row_ranges))

return results

def get_physical_layout(file_paths: list[str], column_name: str, type = "str", remote = None):

Expand Down Expand Up @@ -151,6 +161,9 @@ def get_result_from_index_result(metadata: polars.DataFrame, index_search_result
result = rottnest.read_indexed_pages(column_name, metadata["file_path"].to_list(), metadata["row_groups"].to_list(),
metadata["data_page_offsets"].to_list(), metadata["data_page_sizes"].to_list(), metadata["dictionary_page_sizes"].to_list(),
"aws", file_metadatas)

# magic number 2044 for vetors
# result = read_row_groups(metadata["file_path"].to_list(), metadata["row_groups"].to_list(), [(i, i + 2044) for i in metadata['page_row_offset_in_row_group'].to_list()], column_name)

row_group_rownr = [pyarrow.array(np.arange(metadata['page_row_offset_in_row_group'][i], metadata['page_row_offset_in_row_group'][i] + len(arr))) for i, arr in enumerate(result)]

Expand Down
16 changes: 10 additions & 6 deletions src/formats/readers/aws_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,11 @@ impl AsyncAwsReader {

#[async_trait]
impl super::Reader for AsyncAwsReader {

fn update_filename(&mut self, file: String) -> Result<(), LavaError> {
if !file.starts_with("s3://") {
return Err(LavaError::Parse("File scheme not supported".to_string()));
}

let tokens = file[5..].split('/').collect::<Vec<_>>();
let bucket = tokens[0].to_string();
let filename = tokens[1..].join("/");
Expand All @@ -87,7 +86,7 @@ impl super::Reader for AsyncAwsReader {
.set_range(Some(format!("bytes={}-{}", from, to - 1)))
.send()
.await;

if let Ok(res) = this_result {
break res;
}
Expand Down Expand Up @@ -164,7 +163,9 @@ impl Operator {
}
}

pub(crate) async fn get_file_size_and_reader(file: String) -> Result<(usize, AsyncAwsReader), LavaError> {
pub(crate) async fn get_file_size_and_reader(
file: String,
) -> Result<(usize, AsyncAwsReader), LavaError> {
// Extract filename
let mut reader = get_reader(file.clone()).await?;
// Get the file size
Expand All @@ -177,7 +178,6 @@ pub(crate) async fn get_file_size_and_reader(file: String) -> Result<(usize, Asy
Ok((file_size as usize, reader))
}


pub(crate) async fn get_reader(file: String) -> Result<AsyncAwsReader, LavaError> {
// Extract filename
if !file.starts_with("s3://") {
Expand All @@ -192,5 +192,9 @@ pub(crate) async fn get_reader(file: String) -> Result<AsyncAwsReader, LavaError
let filename = tokens[1..].join("/");

// Create the reader
Ok(AsyncAwsReader::new(operator.into_inner(), bucket.clone(), filename.clone()))
Ok(AsyncAwsReader::new(
operator.into_inner(),
bucket.clone(),
filename.clone(),
))
}
26 changes: 14 additions & 12 deletions src/lava/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,16 @@ async fn search_uuid_one_file(
) -> Result<Vec<(u64, u64)>, LavaError> {
let mut result: Vec<(u64, u64)> = Vec::new();
let mut start_time = Instant::now();
let mut end_time = Instant::now();

let this_result: Vec<usize> =
FastTrie::query_with_reader(file_size, &mut reader, &query).await?;
result.extend(this_result.iter().map(|x| (file_id, *x as u64)));

// println!(
// "search_uuid_one_file: {}ms",
// start_time.elapsed().as_millis()
// );

Ok(result)
}

Expand All @@ -246,6 +250,7 @@ async fn search_generic_async(
) -> Result<Vec<(u64, u64)>, LavaError> {
let mut join_set = JoinSet::new();

let mut start_time = Instant::now();
for file_id in 0..readers.len() {
let reader = readers.remove(0);
let file_size = file_sizes.remove(0);
Expand Down Expand Up @@ -276,10 +281,7 @@ async fn search_generic_async(
let res = res.unwrap().unwrap();
result.extend(res);
/*
This is not safe. This is because the index might raise false positives, such that the top K only contains false positives.
We should support doing this if the index is guaranteed not to have false positives.
E.g. SSA index will have false positives with skip_factor > 1
E.g. Trie index will have false positives since values on the itnermediate nodes, due to the merge process.
We cannot truncate to k anywhere, not even at the end, because of false
*/
// if result.len() >= k {
// break;
Expand All @@ -288,9 +290,9 @@ async fn search_generic_async(

join_set.shutdown().await;

// keep only k elements in the result
println!("Time stage 1 read: {:?}", start_time.elapsed());

let result: Vec<(u64, u64)> = result.into_iter().collect_vec();
// result.truncate(k);
Ok(result)
}

Expand Down Expand Up @@ -770,11 +772,11 @@ pub async fn search_lava_vector_async(
futures.push(tokio::spawn(async move {
let start_time = Instant::now();
let codes_and_plist = reader_c.read_range(start, end).await.unwrap();
println!(
"Time to read {:?}, {:?}",
Instant::now() - start_time,
codes_and_plist.len()
);
// println!(
// "Time to read {:?}, {:?}",
// Instant::now() - start_time,
// codes_and_plist.len()
// );
(file_id, Array1::<u8>::from_vec(codes_and_plist.to_vec()))
}));
}
Expand Down

0 comments on commit 6d862c1

Please sign in to comment.