From 8ed5bc5305cd65d840db19458aafaaec22416aed Mon Sep 17 00:00:00 2001 From: Yohan Wal <59358312+CookiePieWw@users.noreply.github.com> Date: Tue, 29 Oct 2024 23:46:24 +0800 Subject: [PATCH] refactor: json conversion (#4893) * refactor: json type update * test: update test * fix: convert when needed * revert: leave sqlness tests unchanged * fix: fmt * refactor: just refactor * Apply suggestions from code review Co-authored-by: Weny Xu * refactor: parse jsonb first * test: add bad cases * Update src/datatypes/src/vectors/binary.rs Co-authored-by: Weny Xu * fix: fmt * fix: fix clippy/check --------- Co-authored-by: Weny Xu --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/datatypes/src/error.rs | 10 +++- src/datatypes/src/lib.rs | 1 + src/datatypes/src/vectors/binary.rs | 80 +++++++++++++++++++++++++ src/datatypes/src/vectors/operations.rs | 8 +++ src/servers/src/postgres/types.rs | 4 +- tests-integration/tests/sql.rs | 35 +++++++++-- 8 files changed, 132 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 40b5948bcc7d..0d8058f6a9ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5524,7 +5524,7 @@ dependencies = [ [[package]] name = "jsonb" version = "0.4.1" -source = "git+https://github.com/datafuselabs/jsonb.git?rev=46ad50fc71cf75afbf98eec455f7892a6387c1fc#46ad50fc71cf75afbf98eec455f7892a6387c1fc" +source = "git+https://github.com/databendlabs/jsonb.git?rev=46ad50fc71cf75afbf98eec455f7892a6387c1fc#46ad50fc71cf75afbf98eec455f7892a6387c1fc" dependencies = [ "byteorder", "fast-float", diff --git a/Cargo.toml b/Cargo.toml index 6795e8927f76..f06c83a458ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -125,7 +125,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" -jsonb = { git = "https://github.com/datafuselabs/jsonb.git", rev = "46ad50fc71cf75afbf98eec455f7892a6387c1fc", default-features = false } +jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "46ad50fc71cf75afbf98eec455f7892a6387c1fc", default-features = false } lazy_static = "1.4" meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "a10facb353b41460eeb98578868ebf19c2084fac" } mockall = "0.11.4" diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index 5a255dc0a644..aca9b883a952 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -189,6 +189,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid JSON text: {}", value))] + InvalidJson { + value: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Value exceeds the precision {} bound", precision))] ValueExceedsPrecision { precision: u8, @@ -222,7 +229,8 @@ impl ErrorExt for Error { | DefaultValueType { .. } | DuplicateMeta { .. } | InvalidTimestampPrecision { .. } - | InvalidPrecisionOrScale { .. } => StatusCode::InvalidArguments, + | InvalidPrecisionOrScale { .. } + | InvalidJson { .. } => StatusCode::InvalidArguments, ValueExceedsPrecision { .. } | CastType { .. } diff --git a/src/datatypes/src/lib.rs b/src/datatypes/src/lib.rs index 3766cebf8755..3ce78322fe97 100644 --- a/src/datatypes/src/lib.rs +++ b/src/datatypes/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(let_chains)] +#![feature(assert_matches)] pub mod arrow_array; pub mod data_type; diff --git a/src/datatypes/src/vectors/binary.rs b/src/datatypes/src/vectors/binary.rs index e2074f949c2b..9a690ea4114b 100644 --- a/src/datatypes/src/vectors/binary.rs +++ b/src/datatypes/src/vectors/binary.rs @@ -36,6 +36,36 @@ impl BinaryVector { pub(crate) fn as_arrow(&self) -> &dyn Array { &self.array } + + /// Creates a new binary vector of JSONB from a binary vector. + /// The binary vector must contain valid JSON strings. + pub fn convert_binary_to_json(&self) -> Result { + let arrow_array = self.to_arrow_array(); + let mut vector = vec![]; + for binary in arrow_array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + { + let jsonb = if let Some(binary) = binary { + match jsonb::from_slice(binary) { + Ok(jsonb) => Some(jsonb.to_vec()), + Err(_) => { + let s = String::from_utf8_lossy(binary); + return error::InvalidJsonSnafu { + value: s.to_string(), + } + .fail(); + } + } + } else { + None + }; + vector.push(jsonb); + } + Ok(BinaryVector::from(vector)) + } } impl From for BinaryVector { @@ -233,6 +263,8 @@ vectors::impl_try_from_arrow_array_for_vector!(BinaryArray, BinaryVector); #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; + use arrow::datatypes::DataType as ArrowDataType; use common_base::bytes::Bytes; use serde_json; @@ -383,4 +415,52 @@ mod tests { assert_eq!(b"four", vector.get_data(3).unwrap()); assert_eq!(builder.len(), 4); } + + #[test] + fn test_binary_json_conversion() { + // json strings + let json_strings = vec![ + b"{\"hello\": \"world\"}".to_vec(), + b"{\"foo\": 1}".to_vec(), + b"123".to_vec(), + ]; + let json_vector = BinaryVector::from(json_strings.clone()) + .convert_binary_to_json() + .unwrap(); + let jsonbs = json_strings + .iter() + .map(|v| jsonb::parse_value(v).unwrap().to_vec()) + .collect::>(); + for i in 0..3 { + assert_eq!( + json_vector.get_ref(i).as_binary().unwrap().unwrap(), + jsonbs.get(i).unwrap().as_slice() + ); + } + + // jsonb + let json_vector = BinaryVector::from(jsonbs.clone()) + .convert_binary_to_json() + .unwrap(); + for i in 0..3 { + assert_eq!( + json_vector.get_ref(i).as_binary().unwrap().unwrap(), + jsonbs.get(i).unwrap().as_slice() + ); + } + + // binary with jsonb header (0x80, 0x40, 0x20) + let binary_with_jsonb_header: Vec = [0x80, 0x23, 0x40, 0x22].to_vec(); + let error = BinaryVector::from(vec![binary_with_jsonb_header]) + .convert_binary_to_json() + .unwrap_err(); + assert_matches!(error, error::Error::InvalidJson { .. }); + + // invalid json string + let json_strings = vec![b"{\"hello\": \"world\"".to_vec()]; + let error = BinaryVector::from(json_strings) + .convert_binary_to_json() + .unwrap_err(); + assert_matches!(error, error::Error::InvalidJson { .. }); + } } diff --git a/src/datatypes/src/vectors/operations.rs b/src/datatypes/src/vectors/operations.rs index b2de83c6e6f3..caa0730f7298 100644 --- a/src/datatypes/src/vectors/operations.rs +++ b/src/datatypes/src/vectors/operations.rs @@ -18,6 +18,8 @@ mod find_unique; mod replicate; mod take; +use std::sync::Arc; + use common_base::BitVec; use crate::error::{self, Result}; @@ -89,6 +91,12 @@ macro_rules! impl_scalar_vector_op { } fn cast(&self, to_type: &ConcreteDataType) -> Result { + if to_type == &ConcreteDataType::json_datatype() { + if let Some(vector) = self.as_any().downcast_ref::() { + let json_vector = vector.convert_binary_to_json()?; + return Ok(Arc::new(json_vector) as VectorRef); + } + } cast::cast_non_constant!(self, to_type) } diff --git a/src/servers/src/postgres/types.rs b/src/servers/src/postgres/types.rs index c268b893d847..a7ce2252b955 100644 --- a/src/servers/src/postgres/types.rs +++ b/src/servers/src/postgres/types.rs @@ -961,7 +961,7 @@ pub(super) fn parameters_to_scalar_values( if let Some(server_type) = &server_type { match server_type { ConcreteDataType::Binary(_) => { - ScalarValue::Binary(data.map(|d| jsonb::Value::from(d).to_vec())) + ScalarValue::Binary(data.map(|d| d.to_string().into_bytes())) } _ => { return Err(invalid_parameter_error( @@ -971,7 +971,7 @@ pub(super) fn parameters_to_scalar_values( } } } else { - ScalarValue::Binary(data.map(|d| jsonb::Value::from(d).to_vec())) + ScalarValue::Binary(data.map(|d| d.to_string().into_bytes())) } } _ => Err(invalid_parameter_error( diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index 19acc37ea6c6..af9374a2a826 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -145,7 +145,7 @@ pub async fn test_mysql_crud(store_type: StorageType) { .unwrap(); sqlx::query( - "create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt datetime default null, b blob default null)", + "create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt datetime default null, b blob default null, j json default null)", ) .execute(&pool) .await @@ -158,18 +158,30 @@ pub async fn test_mysql_crud(store_type: StorageType) { let d = NaiveDate::from_yo_opt(2015, 100).unwrap(); let hello = format!("hello{i}"); let bytes = hello.as_bytes(); - sqlx::query("insert into demo values(?, ?, ?, ?, ?)") + let jsons = serde_json::json!({ + "code": i, + "success": true, + "payload": { + "features": [ + "serde", + "json" + ], + "homepage": null + } + }); + sqlx::query("insert into demo values(?, ?, ?, ?, ?, ?)") .bind(i) .bind(i) .bind(d) .bind(dt) .bind(bytes) + .bind(jsons) .execute(&pool) .await .unwrap(); } - let rows = sqlx::query("select i, d, dt, b from demo") + let rows = sqlx::query("select i, d, dt, b, j from demo") .fetch_all(&pool) .await .unwrap(); @@ -180,6 +192,7 @@ pub async fn test_mysql_crud(store_type: StorageType) { let d: NaiveDate = row.get("d"); let dt: DateTime = row.get("dt"); let bytes: Vec = row.get("b"); + let json: serde_json::Value = row.get("j"); assert_eq!(ret, i as i64); let expected_d = NaiveDate::from_yo_opt(2015, 100).unwrap(); assert_eq!(expected_d, d); @@ -194,6 +207,18 @@ pub async fn test_mysql_crud(store_type: StorageType) { format!("{}", dt.format("%Y-%m-%d %H:%M:%S")) ); assert_eq!(format!("hello{i}"), String::from_utf8_lossy(&bytes)); + let expected_j = serde_json::json!({ + "code": i, + "success": true, + "payload": { + "features": [ + "serde", + "json" + ], + "homepage": null + } + }); + assert_eq!(json, expected_j); } let rows = sqlx::query("select i from demo where i=?") @@ -396,7 +421,7 @@ pub async fn test_postgres_crud(store_type: StorageType) { let dt = d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp_millis(); let bytes = "hello".as_bytes(); let json = serde_json::json!({ - "code": 200, + "code": i, "success": true, "payload": { "features": [ @@ -444,7 +469,7 @@ pub async fn test_postgres_crud(store_type: StorageType) { assert_eq!("hello".as_bytes(), bytes); let expected_j = serde_json::json!({ - "code": 200, + "code": i, "success": true, "payload": { "features": [