Skip to content

Commit

Permalink
added logcloud index
Browse files Browse the repository at this point in the history
  • Loading branch information
marsupialtail committed Sep 22, 2024
1 parent 4853c44 commit 51c9727
Show file tree
Hide file tree
Showing 11 changed files with 936 additions and 579 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ num-traits = "0.2.18"
ordered-float = "4.2.0"
reqwest = "0.12.4"
redis = {version = "0", features = ["aio", "tokio-comp"] }

divsufsort = "2.0.0"
[profile.release]
lto = false
bit-vec = "0.6.3"
Expand Down
8 changes: 4 additions & 4 deletions demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
# print(result)

def substring_test():
# internal.index_files_substring(["example_data/a.parquet"], "text", "index0", token_skip_factor = 10)
# internal.index_files_substring(["example_data/b.parquet"], "text", "index1", token_skip_factor = 10)
# internal.index_files_substring(["example_data/a.parquet"], "text", "index0", token_skip_factor = 1, char_index=True)
# internal.index_files_substring(["example_data/b.parquet"], "text", "index1", token_skip_factor = 1, char_index=True)
# internal.merge_index_substring("merged_index", ["index0", "index1"])
result = internal.search_index_substring(["index0"],
"One step you have to remember not to skip is to use Disk Utility to partition the SSD as GUID partition scheme HFS+ before doing the clone.",
K = 10, token_viable_limit= 1, sample_factor = 10)
K = 10, sample_factor = 1, char_index=True)
print(result)

# table1 = polars.read_parquet("uuid_data/a.parquet")
Expand Down Expand Up @@ -41,4 +41,4 @@ def uuid_test():
# result = rottnest.search_index_uuid(["index0", "index1"], "650243a9024fe6595fa953e309c722c225cb2fae1f70c74364917eb901bcdce1f9a878d22345a8576a201646b6da815ebd6397cfd313447ee3a548259f63825a", K = 10)
# print(result)
# result = rottnest.search_index_uuid(["merged_index"], "32b8fd4d808300b97b2dff451cba4185faee842a1248c84c1ab544632957eb8904dccb5880f0d4a9a7317c3a4490b0222e4deb5047abc1788665a46176009a07", K = 10)
# print(result)
# print(result)
8 changes: 4 additions & 4 deletions python/rottnest/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ def index_files_bm25(file_paths: list[str], column_name: str, name = uuid.uuid4(
file_data = file_data.replace_schema_metadata({"cache_ranges": json.dumps(cache_ranges)})
pq.write_table(file_data, f"{name}.meta", write_statistics = False, compression = 'zstd')

def index_files_substring(file_paths: list[str], column_name: str, name = uuid.uuid4().hex, index_mode = "physical", tokenizer_file = None, token_skip_factor = None, remote = None):
def index_files_substring(file_paths: list[str], column_name: str, name = uuid.uuid4().hex, index_mode = "physical", tokenizer_file = None, token_skip_factor = None, remote = None, char_index = False):

arr, uid, file_data = get_physical_layout(file_paths, column_name, remote = remote) if index_mode == "physical" else get_virtual_layout(file_paths, column_name, "uid", remote = remote)

cache_ranges = rottnest.build_lava_substring(f"{name}.lava", arr, uid, tokenizer_file, token_skip_factor)
cache_ranges = rottnest.build_lava_substring(f"{name}.lava", arr, uid, tokenizer_file, token_skip_factor, char_index)

file_data = file_data.to_arrow()
file_data = file_data.replace_schema_metadata({"cache_ranges": json.dumps(cache_ranges)})
Expand Down Expand Up @@ -266,11 +266,11 @@ def search_index_uuid(indices: List[str], query: str, K: int, columns = []):
return return_full_result(result, metadata, column_name, columns)


def search_index_substring(indices: List[str], query: str, K: int, sample_factor = None, token_viable_limit = 1, columns = []):
def search_index_substring(indices: List[str], query: str, K: int, sample_factor = None, token_viable_limit = 10, columns = [], char_index = False):

metadata = get_metadata_and_populate_cache(indices)

index_search_results = rottnest.search_lava_substring([f"{index_name}.lava" for index_name in indices], query, K, "aws", sample_factor = sample_factor, token_viable_limit = token_viable_limit)
index_search_results = rottnest.search_lava_substring([f"{index_name}.lava" for index_name in indices], query, K, "aws", sample_factor = sample_factor, token_viable_limit = token_viable_limit, char_index = char_index)
print(index_search_results)

if len(index_search_results) == 0:
Expand Down
4 changes: 2 additions & 2 deletions python/rottnest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def read_parquet_file(file, row_group, row_range):

return results

def get_physical_layout(file_paths: list[str], column_name: str, type = "str", remote = None):
def get_physical_layout(file_paths: list, column_name: str, type = "str", remote = None):

assert type in {"str", "binary"}

Expand Down Expand Up @@ -115,7 +115,7 @@ def get_physical_layout(file_paths: list[str], column_name: str, type = "str", r

return pyarrow.concat_arrays(all_arrs), pyarrow.array(all_uids.astype(np.uint64)), polars.concat(metadatas)

def get_virtual_layout(file_paths: list[str], column_name: str, key_column_name: str, type = "str", stride = 500, remote = None):
def get_virtual_layout(file_paths: list, column_name: str, key_column_name: str, type = "str", stride = 500, remote = None):

fs = get_fs_from_file_path(file_paths[0])
metadatas = []
Expand Down
107 changes: 52 additions & 55 deletions src/lava/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ use serde_json;
use tokenizers::parallelism::MaybeParallelIterator;
use tokenizers::tokenizer::Tokenizer; // You'll need the `byteorder` crate

use crate::lava::constants::*;
use crate::lava::error::LavaError;
use crate::lava::plist::PListChunk;
use crate::lava::trie::{BinaryTrieNode, FastTrie};
use bincode;
use bytes;
use divsufsort::sort_in_place;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::io::Read;

use crate::lava::constants::*;
use crate::lava::error::LavaError;
use crate::lava::plist::PListChunk;
use crate::lava::trie::{BinaryTrieNode, FastTrie};
use std::fs::File;
use std::io::Read;
use std::io::{BufWriter, Seek, SeekFrom, Write};
use zstd::stream::encode_all;

Expand Down Expand Up @@ -228,48 +228,20 @@ pub async fn build_lava_bm25(
Ok(vec![(compressed_term_dict_offset as usize, cache_end)])
}

#[tokio::main]
pub async fn build_lava_substring_char(
pub async fn _build_lava_substring_char(
output_file_name: String,
array: ArrayData,
uid: ArrayData,
char_skip_factor: Option<u32>,
texts: Vec<(u64, String)>,
char_skip_factor: u32,
) -> Result<Vec<(usize, usize)>, LavaError> {
let array = make_array(array);
// let uid = make_array(ArrayData::from_pyarrow(uid)?);
let uid = make_array(uid);

let char_skip_factor = char_skip_factor.unwrap_or(1);

let array: &arrow_array::GenericByteArray<arrow_array::types::GenericStringType<i64>> = array
.as_any()
.downcast_ref::<LargeStringArray>()
.ok_or(LavaError::Parse("Expects string array as first argument".to_string()))?;

let uid = uid
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or(LavaError::Parse("Expects uint64 array as second argument".to_string()))?;

if array.len() != uid.len() {
return Err(LavaError::Parse("The length of the array and the uid array must be the same".to_string()));
}

let mut texts: Vec<(u64, &str)> = Vec::with_capacity(array.len());
for i in 0..array.len() {
let text = array.value(i);
texts.push((uid.value(i), text));
}

// parallelize the string operations
let named_encodings = texts
.into_maybe_par_iter()
.into_iter()
.map(|(uid, text)| {
let lower: String = text.chars().flat_map(|c| c.to_lowercase()).collect();
let result: Vec<u8> = if char_skip_factor == 1 {
text.chars().filter(|id| !SKIP.chars().contains(id)).map(|c| c as u8).collect()
lower.chars().filter(|id| !SKIP.chars().contains(id)).map(|c| c as u8).collect()
} else {
text.chars()
lower
.chars()
.filter(|id| !SKIP.chars().contains(id))
.enumerate()
.filter(|&(index, _)| index % char_skip_factor as usize == 1)
Expand All @@ -283,21 +255,9 @@ pub async fn build_lava_substring_char(
let uids: Vec<u64> = named_encodings.iter().map(|(uid, _)| uid).flatten().cloned().collect::<Vec<u64>>();
let encodings: Vec<u8> = named_encodings.into_iter().map(|(_, text)| text).flatten().collect::<Vec<u8>>();

let mut suffices: Vec<Vec<u8>> = vec![];
let mut sa: Vec<i32> = (0..encodings.len() as i32).collect();

for i in 10..encodings.len() {
suffices.push(encodings[i - 10..i].to_vec());
}

for i in encodings.len()..encodings.len() + 10 {
let mut suffix = encodings[i - 10..encodings.len()].to_vec();
suffix.append(&mut vec![0; i - encodings.len()]);
suffices.push(suffix);
}

let mut sa: Vec<usize> = (0..suffices.len()).collect();

sa.par_sort_by(|&a, &b| suffices[a].cmp(&suffices[b]));
sort_in_place(&encodings, &mut sa);

let mut idx: Vec<u64> = Vec::with_capacity(encodings.len());
let mut bwt: Vec<u8> = Vec::with_capacity(encodings.len());
Expand Down Expand Up @@ -384,6 +344,43 @@ pub async fn build_lava_substring_char(
Ok(vec![(cache_start, cache_end)])
}

#[tokio::main]
pub async fn build_lava_substring_char(
output_file_name: String,
array: ArrayData,
uid: ArrayData,
char_skip_factor: Option<u32>,
) -> Result<Vec<(usize, usize)>, LavaError> {
let array = make_array(array);
// let uid = make_array(ArrayData::from_pyarrow(uid)?);
let uid = make_array(uid);

let char_skip_factor = char_skip_factor.unwrap_or(1);

let array: &arrow_array::GenericByteArray<arrow_array::types::GenericStringType<i64>> = array
.as_any()
.downcast_ref::<LargeStringArray>()
.ok_or(LavaError::Parse("Expects string array as first argument".to_string()))?;

let uid = uid
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or(LavaError::Parse("Expects uint64 array as second argument".to_string()))?;

if array.len() != uid.len() {
return Err(LavaError::Parse("The length of the array and the uid array must be the same".to_string()));
}

let mut texts: Vec<(u64, String)> = Vec::with_capacity(array.len());
for i in 0..array.len() {
let text = array.value(i);
texts.push((uid.value(i), text.to_string()));
}

println!("made it to this point");
_build_lava_substring_char(output_file_name, texts, char_skip_factor).await
}

#[tokio::main]
pub async fn build_lava_substring(
output_file_name: String,
Expand Down
Loading

0 comments on commit 51c9727

Please sign in to comment.