Skip to content

Commit

Permalink
refactor: support distinct JSON format
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 12, 2024
1 parent 0b6d78a commit 062557d
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 37 deletions.
16 changes: 5 additions & 11 deletions src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
ConcreteDataType::UInt64(_) => ColumnDataType::Uint64,
ConcreteDataType::Float32(_) => ColumnDataType::Float32,
ConcreteDataType::Float64(_) => ColumnDataType::Float64,
ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => ColumnDataType::Binary,
ConcreteDataType::Binary(_) => ColumnDataType::Binary,
ConcreteDataType::String(_) => ColumnDataType::String,
ConcreteDataType::Date(_) => ColumnDataType::Date,
ConcreteDataType::DateTime(_) => ColumnDataType::Datetime,
Expand All @@ -290,6 +290,7 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
IntervalType::MonthDayNano(_) => ColumnDataType::IntervalMonthDayNano,
},
ConcreteDataType::Decimal128(_) => ColumnDataType::Decimal128,
ConcreteDataType::Json(_) => ColumnDataType::Json,
ConcreteDataType::Vector(_) => ColumnDataType::Vector,
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
Expand All @@ -309,16 +310,9 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
})),
})
}
ColumnDataType::Binary => {
if datatype == ConcreteDataType::json_datatype() {
// Json is the same as binary in proto. The extension marks the binary in proto is actually a json.
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
})
} else {
None
}
}
ColumnDataType::Json => datatype.as_json().map(|_| ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
ColumnDataType::Vector => {
datatype
.as_vector()
Expand Down
7 changes: 7 additions & 0 deletions src/datatypes/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,13 @@ impl ConcreteDataType {
}
}

pub fn as_json(&self) -> Option<JsonType> {
match self {
ConcreteDataType::Json(j) => Some(*j),
_ => None,
}
}

pub fn as_vector(&self) -> Option<VectorType> {
match self {
ConcreteDataType::Vector(v) => Some(*v),
Expand Down
4 changes: 3 additions & 1 deletion src/datatypes/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ pub use duration_type::{
pub use interval_type::{
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType,
};
pub use json_type::{JsonType, JSON_TYPE_NAME};
pub use json_type::{
json_type_value_to_string, parse_string_to_json_type_value, JsonType, JSON_TYPE_NAME,
};
pub use list_type::ListType;
pub use null_type::NullType;
pub use primitive_type::{
Expand Down
44 changes: 37 additions & 7 deletions src/datatypes/src/types/json_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,42 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use arrow::datatypes::DataType as ArrowDataType;
use common_base::bytes::Bytes;
use serde::{Deserialize, Serialize};

use crate::data_type::{DataType, DataTypeRef};
use crate::data_type::DataType;
use crate::error::{InvalidJsonSnafu, Result};
use crate::scalars::ScalarVectorBuilder;
use crate::type_id::LogicalTypeId;
use crate::value::Value;
use crate::vectors::{BinaryVectorBuilder, MutableVector};

pub const JSON_TYPE_NAME: &str = "Json";

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub enum JsonFormat {
Jsonb,
}

impl Default for JsonFormat {
fn default() -> Self {
Self::Jsonb
}
}

/// JsonType is a data type for JSON data. It is stored as binary data of jsonb format.
/// It utilizes current binary value and vector implementation.
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct JsonType;
#[derive(
Debug, Default, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize,
)]
pub struct JsonType {
pub format: JsonFormat,
}

impl JsonType {
pub fn arc() -> DataTypeRef {
Arc::new(Self)
pub fn new(format: JsonFormat) -> Self {
Self { format }
}
}

Expand Down Expand Up @@ -65,3 +79,19 @@ impl DataType for JsonType {
}
}
}

/// Converts a json type value to string
pub fn json_type_value_to_string(val: &[u8], format: &JsonFormat) -> Result<String> {
match format {
JsonFormat::Jsonb => Ok(jsonb::to_string(val)),
}
}

/// Parses a string to a json type value
pub fn parse_string_to_json_type_value(s: &str, format: &JsonFormat) -> Result<Vec<u8>> {
match format {
JsonFormat::Jsonb => jsonb::parse_value(s.as_bytes())
.map_err(|_| InvalidJsonSnafu { value: s }.build())
.map(|json| json.to_vec()),
}
}
1 change: 1 addition & 0 deletions src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataTyp
match (column_type, value_type) {
(ct, vt) if ct == vt => true,
(ColumnDataType::Vector, ColumnDataType::Binary) => true,
(ColumnDataType::Json, ColumnDataType::Binary) => true,
_ => false,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/operator/src/req_convert/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ fn prepare_rows(rows: &mut Option<Rows>) -> Result<()> {
column.datatype_extension = Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
});
column.datatype = ColumnDataType::Binary.into();
column.datatype = ColumnDataType::Json.into();
}

for idx in &indexes {
Expand Down
8 changes: 5 additions & 3 deletions src/servers/src/mysql/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::{debug, error};
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::SchemaRef;
use datatypes::types::vector_type_value_to_string;
use datatypes::types::{json_type_value_to_string, vector_type_value_to_string};
use futures::StreamExt;
use opensrv_mysql::{
Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
Expand Down Expand Up @@ -212,8 +212,10 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
Value::Float64(v) => row_writer.write_col(v.0)?,
Value::String(v) => row_writer.write_col(v.as_utf8())?,
Value::Binary(v) => match column.data_type {
ConcreteDataType::Json(_) => {
row_writer.write_col(jsonb::to_string(&v))?;
ConcreteDataType::Json(j) => {
let s = json_type_value_to_string(&v, &j.format)
.context(ConvertSqlValueSnafu)?;
row_writer.write_col(s)?;
}
ConcreteDataType::Vector(d) => {
let s = vector_type_value_to_string(&v, d.dim)
Expand Down
18 changes: 14 additions & 4 deletions src/servers/src/postgres/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use datafusion_expr::LogicalPlan;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::Schema;
use datatypes::types::{vector_type_value_to_string, IntervalType, TimestampType};
use datatypes::types::{
json_type_value_to_string, vector_type_value_to_string, IntervalType, TimestampType,
};
use datatypes::value::ListValue;
use pgwire::api::portal::{Format, Portal};
use pgwire::api::results::{DataRowEncoder, FieldInfo};
Expand Down Expand Up @@ -350,13 +352,17 @@ fn encode_array(
.collect::<PgWireResult<Vec<Option<String>>>>()?;
builder.encode_field(&array)
}
&ConcreteDataType::Json(_) => {
&ConcreteDataType::Json(j) => {
let array = value_list
.items()
.iter()
.map(|v| match v {
Value::Null => Ok(None),
Value::Binary(v) => Ok(Some(jsonb::to_string(v))),
Value::Binary(v) => {
let s = json_type_value_to_string(v, &j.format)
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
Ok(Some(s))
}
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
err_msg: format!("Invalid list item type, find {v:?}, expected json",),
}))),
Expand Down Expand Up @@ -412,7 +418,11 @@ pub(super) fn encode_value(
Value::Float64(v) => builder.encode_field(&v.0),
Value::String(v) => builder.encode_field(&v.as_utf8()),
Value::Binary(v) => match datatype {
ConcreteDataType::Json(_) => builder.encode_field(&jsonb::to_string(v)),
ConcreteDataType::Json(j) => {
let s = json_type_value_to_string(v, &j.format)
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
builder.encode_field(&s)
}
ConcreteDataType::Vector(d) => {
let s = vector_type_value_to_string(v, d.dim)
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
Expand Down
16 changes: 6 additions & 10 deletions src/sql/src/statements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, COMMENT_KEY};
use datatypes::types::{cast, parse_string_to_vector_type_value, TimestampType};
use datatypes::types::{
cast, parse_string_to_json_type_value, parse_string_to_vector_type_value, TimestampType,
};
use datatypes::value::{OrderedF32, OrderedF64, Value};
use snafu::{ensure, OptionExt, ResultExt};
use sqlparser::ast::{ExactNumberInfo, Ident, ObjectName, UnaryOperator};
Expand Down Expand Up @@ -126,15 +128,9 @@ fn parse_string_to_value(
}
}
ConcreteDataType::Binary(_) => Ok(Value::Binary(s.as_bytes().into())),
ConcreteDataType::Json(_) => {
if let Ok(json) = jsonb::parse_value(s.as_bytes()) {
Ok(Value::Binary(json.to_vec().into()))
} else {
ParseSqlValueSnafu {
msg: format!("Failed to parse {s} to Json value"),
}
.fail()
}
ConcreteDataType::Json(j) => {
let v = parse_string_to_json_type_value(&s, &j.format).context(DatatypeSnafu)?;
Ok(Value::Binary(v.into()))
}
ConcreteDataType::Vector(d) => {
let v = parse_string_to_vector_type_value(&s, d.dim).context(DatatypeSnafu)?;
Expand Down

0 comments on commit 062557d

Please sign in to comment.