Skip to content

Commit

Permalink
search: Use index config schema to get indexed fields for search
Browse files Browse the repository at this point in the history
  • Loading branch information
tontinton committed Jun 1, 2024
1 parent 5a075e0 commit aab2dc8
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 14 deletions.
11 changes: 11 additions & 0 deletions src/index_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,17 @@ pub struct IndexSchema {
time_field: Option<String>,
}

impl IndexSchema {
pub fn get_indexed_fields(&self) -> Vec<String> {
self.mappings
.iter()
.filter(|(_, v)| v.is_indexed())
.map(|(k, _)| k)
.cloned()
.collect()
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct IndexConfig {
pub name: String,
Expand Down
39 changes: 25 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use unified_index::unified_directory::UnifiedDirectory;

use crate::{
args::{parse_args, SubCommand},
index_config::MappingFieldType,
index_config::FieldType,
merge_directory::MergeDirectory,
opendal_file_handle::OpenDalFileHandle,
unified_index::{file_cache::build_file_cache, writer::UnifiedIndexWriter},
Expand Down Expand Up @@ -263,11 +263,11 @@ async fn run_index(args: IndexArgs, pool: PgPool, config: IndexConfig) -> Result
let mut fields = Vec::<(String, Field, fn(serde_json::Value) -> Result<OwnedValue>)>::new();
for (name, schema) in config.schema.mappings {
match schema.type_ {
MappingFieldType::Text(options) => {
FieldType::Text(options) => {
let field = schema_builder.add_text_field(&name, options);
fields.push((name, field, common_parse));
}
MappingFieldType::Datetime(options) => {
FieldType::Datetime(options) => {
let field = schema_builder.add_date_field(&name, options);
fields.push((name, field, |v| {
let timestamp: i64 = serde_json::from_value(v)?;
Expand Down Expand Up @@ -376,18 +376,27 @@ async fn run_merge(args: MergeArgs, pool: PgPool, config: IndexConfig) -> Result
Ok(())
}

async fn run_search(args: SearchArgs, directories: Vec<UnifiedDirectory>) -> Result<()> {
async fn run_search(args: SearchArgs, pool: PgPool, config: IndexConfig) -> Result<()> {
if args.limit == 0 {
return Ok(());
}

let indexed_field_names = config.schema.get_indexed_fields();

let directories = open_unified_directories(&config.path, &pool)
.await?
.into_iter()
.map(|(_, x)| x)
.collect::<Vec<_>>();

let (tx, mut rx) = channel(args.limit);
let mut tx_handles = Vec::with_capacity(directories.len());

// Should be chunked to never starve the thread pool (default in tokio is 500 threads).
for directory in directories {
let tx = tx.clone();
let query = args.query.clone();
let indexed_field_names = indexed_field_names.clone();

// Should use rayon if search ends up being cpu bound (it seems io bound).
tx_handles.push(spawn_blocking(move || -> Result<()> {
Expand All @@ -399,15 +408,23 @@ async fn run_search(args: SearchArgs, directories: Vec<UnifiedDirectory>) -> Res
let schema = index.schema();

let dynamic_field = schema.get_field("_dynamic")?;
let timestamp_field = schema.get_field("timestamp")?;

let indexed_fields = {
let mut fields = indexed_field_names
.iter()
.map(|name| schema.get_field(name))
.collect::<tantivy::Result<Vec<_>>>()?;
fields.push(dynamic_field);
fields
};

let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
let searcher = reader.searcher();

let query_parser = QueryParser::for_index(&index, vec![dynamic_field, timestamp_field]);
let query_parser = QueryParser::for_index(&index, indexed_fields);
let query = query_parser.parse_query(&query)?;
let docs = searcher.search(&query, &TopDocs::with_limit(args.limit))?;

Expand Down Expand Up @@ -474,14 +491,8 @@ async fn async_main(args: Args) -> Result<()> {
run_merge(merge_args, pool, config).await?;
}
SubCommand::Search(search_args) => {
let path = get_index_path(&search_args.name, &pool).await?;
let directories = open_unified_directories(&path, &pool)
.await?
.into_iter()
.map(|(_, x)| x)
.collect::<Vec<_>>();
drop(pool);
run_search(search_args, directories).await?;
let config = get_index_config(&search_args.name, &pool).await?;
run_search(search_args, pool, config).await?;
}
}

Expand Down

0 comments on commit aab2dc8

Please sign in to comment.