From aab2dc8bfdb37cd49b843399fc751fa3ae07fab8 Mon Sep 17 00:00:00 2001 From: Tony Solomonik Date: Sat, 1 Jun 2024 11:42:33 +0300 Subject: [PATCH] search: Use index config schema to get indexed fields for search --- src/index_config.rs | 11 +++++++++++ src/main.rs | 39 +++++++++++++++++++++++++-------------- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/src/index_config.rs b/src/index_config.rs index 2caba45..1459886 100644 --- a/src/index_config.rs +++ b/src/index_config.rs @@ -169,6 +169,17 @@ pub struct IndexSchema { time_field: Option, } +impl IndexSchema { + pub fn get_indexed_fields(&self) -> Vec { + self.mappings + .iter() + .filter(|(_, v)| v.is_indexed()) + .map(|(k, _)| k) + .cloned() + .collect() + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct IndexConfig { pub name: String, diff --git a/src/main.rs b/src/main.rs index f7e6c9d..5ed0341 100644 --- a/src/main.rs +++ b/src/main.rs @@ -40,7 +40,7 @@ use unified_index::unified_directory::UnifiedDirectory; use crate::{ args::{parse_args, SubCommand}, - index_config::MappingFieldType, + index_config::FieldType, merge_directory::MergeDirectory, opendal_file_handle::OpenDalFileHandle, unified_index::{file_cache::build_file_cache, writer::UnifiedIndexWriter}, @@ -263,11 +263,11 @@ async fn run_index(args: IndexArgs, pool: PgPool, config: IndexConfig) -> Result let mut fields = Vec::<(String, Field, fn(serde_json::Value) -> Result)>::new(); for (name, schema) in config.schema.mappings { match schema.type_ { - MappingFieldType::Text(options) => { + FieldType::Text(options) => { let field = schema_builder.add_text_field(&name, options); fields.push((name, field, common_parse)); } - MappingFieldType::Datetime(options) => { + FieldType::Datetime(options) => { let field = schema_builder.add_date_field(&name, options); fields.push((name, field, |v| { let timestamp: i64 = serde_json::from_value(v)?; @@ -376,11 +376,19 @@ async fn run_merge(args: MergeArgs, pool: PgPool, config: IndexConfig) -> Result Ok(()) } -async fn run_search(args: SearchArgs, directories: Vec) -> Result<()> { +async fn run_search(args: SearchArgs, pool: PgPool, config: IndexConfig) -> Result<()> { if args.limit == 0 { return Ok(()); } + let indexed_field_names = config.schema.get_indexed_fields(); + + let directories = open_unified_directories(&config.path, &pool) + .await? + .into_iter() + .map(|(_, x)| x) + .collect::>(); + let (tx, mut rx) = channel(args.limit); let mut tx_handles = Vec::with_capacity(directories.len()); @@ -388,6 +396,7 @@ async fn run_search(args: SearchArgs, directories: Vec) -> Res for directory in directories { let tx = tx.clone(); let query = args.query.clone(); + let indexed_field_names = indexed_field_names.clone(); // Should use rayon if search ends up being cpu bound (it seems io bound). tx_handles.push(spawn_blocking(move || -> Result<()> { @@ -399,7 +408,15 @@ async fn run_search(args: SearchArgs, directories: Vec) -> Res let schema = index.schema(); let dynamic_field = schema.get_field("_dynamic")?; - let timestamp_field = schema.get_field("timestamp")?; + + let indexed_fields = { + let mut fields = indexed_field_names + .iter() + .map(|name| schema.get_field(name)) + .collect::>>()?; + fields.push(dynamic_field); + fields + }; let reader = index .reader_builder() @@ -407,7 +424,7 @@ async fn run_search(args: SearchArgs, directories: Vec) -> Res .try_into()?; let searcher = reader.searcher(); - let query_parser = QueryParser::for_index(&index, vec![dynamic_field, timestamp_field]); + let query_parser = QueryParser::for_index(&index, indexed_fields); let query = query_parser.parse_query(&query)?; let docs = searcher.search(&query, &TopDocs::with_limit(args.limit))?; @@ -474,14 +491,8 @@ async fn async_main(args: Args) -> Result<()> { run_merge(merge_args, pool, config).await?; } SubCommand::Search(search_args) => { - let path = get_index_path(&search_args.name, &pool).await?; - let directories = open_unified_directories(&path, &pool) - .await? - .into_iter() - .map(|(_, x)| x) - .collect::>(); - drop(pool); - run_search(search_args, directories).await?; + let config = get_index_config(&search_args.name, &pool).await?; + run_search(search_args, pool, config).await?; } }