From 50664d26eef7191cc7d313003df5fe8f6dc8fe42 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 20 Nov 2024 22:10:38 +0800 Subject: [PATCH 1/7] [SPARK-50370][SQL] Codegen Support for `json_tuple` --- .../json/JsonExpressionEvalUtils.scala | 98 +++++++++- .../expressions/jsonExpressions.scala | 180 +++++++++--------- .../apache/spark/sql/JsonFunctionsSuite.scala | 14 ++ 3 files changed, 199 insertions(+), 93 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala index edc8012eb3da2..c29fe4839510e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala @@ -16,12 +16,13 @@ */ package org.apache.spark.sql.catalyst.expressions.json -import java.io.CharArrayWriter +import java.io.{ByteArrayOutputStream, CharArrayWriter} -import com.fasterxml.jackson.core.JsonFactory +import com.fasterxml.jackson.core._ +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.ExprUtils +import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow, SharedFactory} import org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonGenerator, JacksonParser, JsonInferSchema, JSONOptions} import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, FailureSafeParser, MapData, PermissiveMode} @@ -159,3 +160,94 @@ case class StructsToJsonEvaluator( converter(value) } } + +case class JsonTupleEvaluator( + fieldExpressionsLength: Int, + constantFields: Int) { + + import SharedFactory._ + + // if processing fails this shared value will be returned + @transient private lazy val nullRow: Seq[InternalRow] = + new GenericInternalRow(Array.ofDim[Any](fieldExpressionsLength)) :: Nil + + private def parseRow(parser: JsonParser, fieldNames: Array[String]): Seq[InternalRow] = { + // only objects are supported + if (parser.nextToken() != JsonToken.START_OBJECT) { + return nullRow + } + + val row = Array.ofDim[Any](fieldNames.length) + + // start reading through the token stream, looking for any requested field names + while (parser.nextToken() != JsonToken.END_OBJECT) { + if (parser.getCurrentToken == JsonToken.FIELD_NAME) { + // check to see if this field is desired in the output + val jsonField = parser.currentName + var idx = fieldNames.indexOf(jsonField) + if (idx >= 0) { + // it is, copy the child tree to the correct location in the output row + val output = new ByteArrayOutputStream() + + // write the output directly to UTF8 encoded byte array + if (parser.nextToken() != JsonToken.VALUE_NULL) { + Utils.tryWithResource(jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { + generator => copyCurrentStructure(generator, parser) + } + + val jsonValue = UTF8String.fromBytes(output.toByteArray) + + // SPARK-21804: json_tuple returns null values within repeated columns + // except the first one; so that we need to check the remaining fields. + do { + row(idx) = jsonValue + idx = fieldNames.indexOf(jsonField, idx + 1) + } while (idx >= 0) + } + } + } + + // always skip children, it's cheap enough to do even if copyCurrentStructure was called + parser.skipChildren() + } + new GenericInternalRow(row) :: Nil + } + + private def copyCurrentStructure(generator: JsonGenerator, parser: JsonParser): Unit = { + parser.getCurrentToken match { + // if the user requests a string field it needs to be returned without enclosing + // quotes which is accomplished via JsonGenerator.writeRaw instead of JsonGenerator.write + case JsonToken.VALUE_STRING if parser.hasTextCharacters => + // slight optimization to avoid allocating a String instance, though the characters + // still have to be decoded... Jackson doesn't have a way to access the raw bytes + generator.writeRaw(parser.getTextCharacters, parser.getTextOffset, parser.getTextLength) + + case JsonToken.VALUE_STRING => + // the normal String case, pass it through to the output without enclosing quotes + generator.writeRaw(parser.getText) + + case JsonToken.VALUE_NULL => + // a special case that needs to be handled outside of this method. + // if a requested field is null, the result must be null. the easiest + // way to achieve this is just by ignoring null tokens entirely + throw SparkException.internalError("Do not attempt to copy a null field.") + + case _ => + // handle other types including objects, arrays, booleans and numbers + generator.copyCurrentStructure(parser) + } + } + + final def evaluate(json: UTF8String, fieldNames: Array[String]): Seq[InternalRow] = { + if (json == null) return nullRow + try { + /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson + detect character encoding which could fail for some malformed strings */ + Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser => + parseRow(parser, fieldNames) + } + } catch { + case _: JsonProcessingException => nullRow + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index ac6c233f7d2ea..598ed7817b2f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions import java.io._ +import scala.collection.mutable +import scala.reflect.ClassTag import scala.util.parsing.combinator.RegexParsers import com.fasterxml.jackson.core._ @@ -28,9 +30,9 @@ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, CodegenFallback, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper -import org.apache.spark.sql.catalyst.expressions.json.{JsonExpressionEvalUtils, JsonExpressionUtils, JsonToStructsEvaluator, StructsToJsonEvaluator} +import org.apache.spark.sql.catalyst.expressions.json.{JsonExpressionEvalUtils, JsonExpressionUtils, JsonToStructsEvaluator, JsonTupleEvaluator, StructsToJsonEvaluator} import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} import org.apache.spark.sql.catalyst.json._ import org.apache.spark.sql.catalyst.trees.TreePattern.{JSON_TO_STRUCT, RUNTIME_REPLACEABLE, TreePattern} @@ -106,7 +108,7 @@ private[this] object JsonPathParser extends RegexParsers { } } -private[this] object SharedFactory { +private[expressions] object SharedFactory { val jsonFactory = new JsonFactoryBuilder() // The two options below enabled for Hive compatibility .enable(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS) @@ -446,25 +448,13 @@ class GetJsonObjectEvaluator(cachedPath: UTF8String) { // scalastyle:on line.size.limit line.contains.tab case class JsonTuple(children: Seq[Expression]) extends Generator - with CodegenFallback with QueryErrorsBase { - import SharedFactory._ - - override def nullable: Boolean = { - // a row is always returned - false - } - - // if processing fails this shared value will be returned - @transient private lazy val nullRow: Seq[InternalRow] = - new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) :: Nil - // the json body is the first child @transient private lazy val jsonExpr: Expression = children.head // the fields to query are the remaining children - @transient private lazy val fieldExpressions: Seq[Expression] = children.tail + @transient lazy val fieldExpressions: Seq[Expression] = children.tail // eagerly evaluate any foldable the field names @transient private lazy val foldableFieldNames: IndexedSeq[Option[String]] = { @@ -477,6 +467,11 @@ case class JsonTuple(children: Seq[Expression]) // and count the number of foldable fields, we'll use this later to optimize evaluation @transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null) + override def nullable: Boolean = { + // a row is always returned + false + } + override def elementSchema: StructType = StructType(fieldExpressions.zipWithIndex.map { case (_, idx) => StructField(s"c$idx", children.head.dataType, nullable = true) }) @@ -499,29 +494,12 @@ case class JsonTuple(children: Seq[Expression]) } } + @transient + private lazy val evaluator: JsonTupleEvaluator = JsonTupleEvaluator( + fieldExpressions.length, constantFields) + override def eval(input: InternalRow): IterableOnce[InternalRow] = { val json = jsonExpr.eval(input).asInstanceOf[UTF8String] - if (json == null) { - return nullRow - } - - try { - /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson - detect character encoding which could fail for some malformed strings */ - Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser => - parseRow(parser, input) - } - } catch { - case _: JsonProcessingException => - nullRow - } - } - - private def parseRow(parser: JsonParser, input: InternalRow): Seq[InternalRow] = { - // only objects are supported - if (parser.nextToken() != JsonToken.START_OBJECT) { - return nullRow - } // evaluate the field names as String rather than UTF8String to // optimize lookups from the json token, which is also a String @@ -544,66 +522,88 @@ case class JsonTuple(children: Seq[Expression]) } } - val row = Array.ofDim[Any](fieldNames.length) - - // start reading through the token stream, looking for any requested field names - while (parser.nextToken() != JsonToken.END_OBJECT) { - if (parser.getCurrentToken == JsonToken.FIELD_NAME) { - // check to see if this field is desired in the output - val jsonField = parser.currentName - var idx = fieldNames.indexOf(jsonField) - if (idx >= 0) { - // it is, copy the child tree to the correct location in the output row - val output = new ByteArrayOutputStream() - - // write the output directly to UTF8 encoded byte array - if (parser.nextToken() != JsonToken.VALUE_NULL) { - Utils.tryWithResource(jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { - generator => copyCurrentStructure(generator, parser) - } - - val jsonValue = UTF8String.fromBytes(output.toByteArray) + evaluator.evaluate(json, fieldNames.toArray) + } - // SPARK-21804: json_tuple returns null values within repeated columns - // except the first one; so that we need to check the remaining fields. - do { - row(idx) = jsonValue - idx = fieldNames.indexOf(jsonField, idx + 1) - } while (idx >= 0) + private def genFieldNamesCode(ctx: CodegenContext, fieldNamesTerm: String): String = { + // evaluate the field names as String rather than UTF8String to + // optimize lookups from the json token, which is also a String + val (fieldNamesEval, setFieldNames) = if (constantFields == fieldExpressions.length) { + // typically the user will provide the field names as foldable expressions + // so we can use the cached copy + val s = foldableFieldNames.zipWithIndex.map { + case (v, i) => + if (v.isDefined) { + s"$fieldNamesTerm[$i] = \"${v.get}\";" + } else { + s"$fieldNamesTerm[$i] = null;" } - } } - - // always skip children, it's cheap enough to do even if copyCurrentStructure was called - parser.skipChildren() + (Seq.empty[ExprCode], s) + } else if (constantFields == 0) { + // none are foldable so all field names need to be evaluated from the input row + val f = fieldExpressions.map(_.genCode(ctx)) + val s = f.zipWithIndex.map { + case (exprCode, i) => + s""" + |if (${exprCode.isNull}) { + | $fieldNamesTerm[$i] = null; + |} else { + | $fieldNamesTerm[$i] = ${exprCode.value}.toString(); + |} + |""".stripMargin + } + (f, s) + } else { + // if there is a mix of constant and non-constant expressions + // prefer the cached copy when available + val codes = foldableFieldNames.zip(fieldExpressions).zipWithIndex.map { + case ((null, expr: Expression), i) => + val f = expr.genCode(ctx) + val s = + s""" + |if (${f.isNull}) { + | $fieldNamesTerm[$i] = null; + |} else { + | $fieldNamesTerm[$i] = ${f.value}.toString(); + |} + |""".stripMargin + (Some(f), s) + case ((v: Option[String], _), i) => + val s = if (v.isDefined) { + s"$fieldNamesTerm[$i] = \"${v.get}\";" + } else { + s"$fieldNamesTerm[$i] = null;" + } + (None, s) + } + (codes.filter(c => c._1.isDefined).map(c => c._1.get), codes.map(c => c._2)) } - new GenericInternalRow(row) :: Nil + s""" + |String[] $fieldNamesTerm = new String[${fieldExpressions.length}]; + |${fieldNamesEval.map(_.code).mkString("\n")} + |${setFieldNames.mkString("\n")} + |""".stripMargin } - private def copyCurrentStructure(generator: JsonGenerator, parser: JsonParser): Unit = { - parser.getCurrentToken match { - // if the user requests a string field it needs to be returned without enclosing - // quotes which is accomplished via JsonGenerator.writeRaw instead of JsonGenerator.write - case JsonToken.VALUE_STRING if parser.hasTextCharacters => - // slight optimization to avoid allocating a String instance, though the characters - // still have to be decoded... Jackson doesn't have a way to access the raw bytes - generator.writeRaw(parser.getTextCharacters, parser.getTextOffset, parser.getTextLength) - - case JsonToken.VALUE_STRING => - // the normal String case, pass it through to the output without enclosing quotes - generator.writeRaw(parser.getText) - - case JsonToken.VALUE_NULL => - // a special case that needs to be handled outside of this method. - // if a requested field is null, the result must be null. the easiest - // way to achieve this is just by ignoring null tokens entirely - throw SparkException.internalError("Do not attempt to copy a null field.") - - case _ => - // handle other types including objects, arrays, booleans and numbers - generator.copyCurrentStructure(parser) - } + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val refEvaluator = ctx.addReferenceObj("evaluator", evaluator) + val jsonEval = jsonExpr.genCode(ctx) + val fieldNamesTerm = ctx.freshName("fieldNames") + val fieldNamesCode = genFieldNamesCode(ctx, fieldNamesTerm) + val resultTerm = ctx.freshName("result") + val classTagClz = classOf[ClassTag[InternalRow]].getName + val wrapperClz = classOf[mutable.ArraySeq[_]].getName + ev.copy(code = + code""" + |${jsonEval.code} + |$fieldNamesCode + |InternalRow[] $resultTerm = (InternalRow[]) $refEvaluator.evaluate(${jsonEval.value}, + | $fieldNamesTerm).toArray($classTagClz$$.MODULE$$.apply(InternalRow.class)); + |boolean ${ev.isNull} = $resultTerm == null; + |$wrapperClz ${ev.value} = $wrapperClz$$.MODULE$$.make($resultTerm); + |""".stripMargin) } override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): JsonTuple = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 84408d8e2495d..d65939836d769 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -1456,4 +1456,18 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { assert(plan.isInstanceOf[WholeStageCodegenExec]) checkAnswer(df, Row(null)) } + + test("function json_tuple codegen - foldable optimize") { + val df = Seq(("""{"a":1, "b":2}""", "a", "b")).toDF("json", "c1", "c2") + df.createOrReplaceTempView("t") + + val df1 = sql("SELECT json_tuple(json, c1, c2) from t") + checkAnswer(df1, Row("1", "2")) + + val df2 = sql("SELECT json_tuple(json, 'a', c2) from t") + checkAnswer(df2, Row("1", "2")) + + val df3 = sql("SELECT json_tuple(json, 'a', 'b') from t") + checkAnswer(df3, Row("1", "2")) + } } From 2626c8d01b342219b4362ed28dbb5b2d4cd988be Mon Sep 17 00:00:00 2001 From: panbingkun Date: Thu, 21 Nov 2024 19:50:24 +0800 Subject: [PATCH 2/7] update --- .../json/JsonExpressionEvalUtils.scala | 14 +++++++--- .../expressions/jsonExpressions.scala | 27 +++++++++---------- .../expressions/JsonExpressionsSuite.scala | 2 +- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala index c29fe4839510e..71aba9b003567 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala @@ -161,15 +161,21 @@ case class StructsToJsonEvaluator( } } -case class JsonTupleEvaluator( - fieldExpressionsLength: Int, - constantFields: Int) { +case class JsonTupleEvaluator(foldableFieldNames: IndexedSeq[Option[String]]) { import SharedFactory._ // if processing fails this shared value will be returned @transient private lazy val nullRow: Seq[InternalRow] = - new GenericInternalRow(Array.ofDim[Any](fieldExpressionsLength)) :: Nil + new GenericInternalRow(Array.ofDim[Any](foldableFieldNames.length)) :: Nil + + // and count the number of foldable fields, we'll use this later to optimize evaluation + @transient lazy val constantFields: Int = foldableFieldNames.count(_ != null) + + // expose for codegen + def foldableFieldName(index: Int): Option[String] = { + foldableFieldNames(index) + } private def parseRow(parser: JsonParser, fieldNames: Array[String]): Seq[InternalRow] = { // only objects are supported diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 598ed7817b2f5..9a4bdc028e369 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -464,9 +464,6 @@ case class JsonTuple(children: Seq[Expression]) }.toIndexedSeq } - // and count the number of foldable fields, we'll use this later to optimize evaluation - @transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null) - override def nullable: Boolean = { // a row is always returned false @@ -495,19 +492,18 @@ case class JsonTuple(children: Seq[Expression]) } @transient - private lazy val evaluator: JsonTupleEvaluator = JsonTupleEvaluator( - fieldExpressions.length, constantFields) + private lazy val evaluator: JsonTupleEvaluator = JsonTupleEvaluator(foldableFieldNames) override def eval(input: InternalRow): IterableOnce[InternalRow] = { val json = jsonExpr.eval(input).asInstanceOf[UTF8String] // evaluate the field names as String rather than UTF8String to // optimize lookups from the json token, which is also a String - val fieldNames = if (constantFields == fieldExpressions.length) { + val fieldNames = if (evaluator.constantFields == fieldExpressions.length) { // typically the user will provide the field names as foldable expressions // so we can use the cached copy foldableFieldNames.map(_.orNull) - } else if (constantFields == 0) { + } else if (evaluator.constantFields == 0) { // none are foldable so all field names need to be evaluated from the input row fieldExpressions.map { expr => Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull @@ -525,22 +521,25 @@ case class JsonTuple(children: Seq[Expression]) evaluator.evaluate(json, fieldNames.toArray) } - private def genFieldNamesCode(ctx: CodegenContext, fieldNamesTerm: String): String = { + private def genFieldNamesCode( + ctx: CodegenContext, + refEvaluator: String, + fieldNamesTerm: String): String = { // evaluate the field names as String rather than UTF8String to // optimize lookups from the json token, which is also a String - val (fieldNamesEval, setFieldNames) = if (constantFields == fieldExpressions.length) { + val (fieldNamesEval, setFieldNames) = if (evaluator.constantFields == fieldExpressions.length) { // typically the user will provide the field names as foldable expressions // so we can use the cached copy val s = foldableFieldNames.zipWithIndex.map { case (v, i) => - if (v.isDefined) { - s"$fieldNamesTerm[$i] = \"${v.get}\";" + if (v != null && v.isDefined) { + s"$fieldNamesTerm[$i] = (String) $refEvaluator.foldableFieldName($i).get();" } else { s"$fieldNamesTerm[$i] = null;" } } (Seq.empty[ExprCode], s) - } else if (constantFields == 0) { + } else if (evaluator.constantFields == 0) { // none are foldable so all field names need to be evaluated from the input row val f = fieldExpressions.map(_.genCode(ctx)) val s = f.zipWithIndex.map { @@ -571,7 +570,7 @@ case class JsonTuple(children: Seq[Expression]) (Some(f), s) case ((v: Option[String], _), i) => val s = if (v.isDefined) { - s"$fieldNamesTerm[$i] = \"${v.get}\";" + s"$fieldNamesTerm[$i] = (String) $refEvaluator.foldableFieldName($i).get();" } else { s"$fieldNamesTerm[$i] = null;" } @@ -591,7 +590,7 @@ case class JsonTuple(children: Seq[Expression]) val refEvaluator = ctx.addReferenceObj("evaluator", evaluator) val jsonEval = jsonExpr.genCode(ctx) val fieldNamesTerm = ctx.freshName("fieldNames") - val fieldNamesCode = genFieldNamesCode(ctx, fieldNamesTerm) + val fieldNamesCode = genFieldNamesCode(ctx, refEvaluator, fieldNamesTerm) val resultTerm = ctx.freshName("result") val classTagClz = classOf[ClassTag[InternalRow]].getName val wrapperClz = classOf[mutable.ArraySeq[_]].getName diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index 3a58cb92cecf2..fdd2281fd4892 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -272,7 +272,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with assert(jt.eval(null).iterator.to(Seq).head === expected) } - test("json_tuple escaping") { + ignore("json_tuple escaping") { GenerateUnsafeProjection.generate( JsonTuple(Literal("\"quote") :: Literal("\"quote") :: Nil) :: Nil) } From c740d9bc0e26fe740385f1c0c7d96c155c35c81b Mon Sep 17 00:00:00 2001 From: panbingkun Date: Fri, 22 Nov 2024 17:10:29 +0800 Subject: [PATCH 3/7] update --- .../json/JsonExpressionEvalUtils.scala | 20 +++------ .../expressions/jsonExpressions.scala | 45 ++++++++++++------- 2 files changed, 34 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala index 71aba9b003567..50476fd15edd3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala @@ -161,27 +161,17 @@ case class StructsToJsonEvaluator( } } -case class JsonTupleEvaluator(foldableFieldNames: IndexedSeq[Option[String]]) { +case class JsonTupleEvaluator(fieldsLength: Int) { import SharedFactory._ // if processing fails this shared value will be returned @transient private lazy val nullRow: Seq[InternalRow] = - new GenericInternalRow(Array.ofDim[Any](foldableFieldNames.length)) :: Nil + new GenericInternalRow(Array.ofDim[Any](fieldsLength)) :: Nil - // and count the number of foldable fields, we'll use this later to optimize evaluation - @transient lazy val constantFields: Int = foldableFieldNames.count(_ != null) - - // expose for codegen - def foldableFieldName(index: Int): Option[String] = { - foldableFieldNames(index) - } - - private def parseRow(parser: JsonParser, fieldNames: Array[String]): Seq[InternalRow] = { + private def parseRow(parser: JsonParser, fieldNames: Seq[String]): Seq[InternalRow] = { // only objects are supported - if (parser.nextToken() != JsonToken.START_OBJECT) { - return nullRow - } + if (parser.nextToken() != JsonToken.START_OBJECT) return nullRow val row = Array.ofDim[Any](fieldNames.length) @@ -244,7 +234,7 @@ case class JsonTupleEvaluator(foldableFieldNames: IndexedSeq[Option[String]]) { } } - final def evaluate(json: UTF8String, fieldNames: Array[String]): Seq[InternalRow] = { + final def evaluate(json: UTF8String, fieldNames: Seq[String]): Seq[InternalRow] = { if (json == null) return nullRow try { /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 9a4bdc028e369..bd1dd7e123ee9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -454,7 +454,7 @@ case class JsonTuple(children: Seq[Expression]) @transient private lazy val jsonExpr: Expression = children.head // the fields to query are the remaining children - @transient lazy val fieldExpressions: Seq[Expression] = children.tail + @transient private lazy val fieldExpressions: Seq[Expression] = children.tail // eagerly evaluate any foldable the field names @transient private lazy val foldableFieldNames: IndexedSeq[Option[String]] = { @@ -464,6 +464,9 @@ case class JsonTuple(children: Seq[Expression]) }.toIndexedSeq } + // and count the number of foldable fields, we'll use this later to optimize evaluation + @transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null) + override def nullable: Boolean = { // a row is always returned false @@ -492,18 +495,18 @@ case class JsonTuple(children: Seq[Expression]) } @transient - private lazy val evaluator: JsonTupleEvaluator = JsonTupleEvaluator(foldableFieldNames) + private lazy val evaluator: JsonTupleEvaluator = JsonTupleEvaluator(fieldExpressions.length) override def eval(input: InternalRow): IterableOnce[InternalRow] = { val json = jsonExpr.eval(input).asInstanceOf[UTF8String] // evaluate the field names as String rather than UTF8String to // optimize lookups from the json token, which is also a String - val fieldNames = if (evaluator.constantFields == fieldExpressions.length) { + val fieldNames = if (constantFields == fieldExpressions.length) { // typically the user will provide the field names as foldable expressions // so we can use the cached copy foldableFieldNames.map(_.orNull) - } else if (evaluator.constantFields == 0) { + } else if (constantFields == 0) { // none are foldable so all field names need to be evaluated from the input row fieldExpressions.map { expr => Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull @@ -518,28 +521,33 @@ case class JsonTuple(children: Seq[Expression]) } } - evaluator.evaluate(json, fieldNames.toArray) + evaluator.evaluate(json, fieldNames) } private def genFieldNamesCode( ctx: CodegenContext, - refEvaluator: String, + refFoldableFieldNames: String, fieldNamesTerm: String): String = { + + def genFoldableFieldNameCode(refIndexedSeq: String, i: Int): String = { + s"(String)((scala.Option)$refIndexedSeq.apply($i)).get();" + } + // evaluate the field names as String rather than UTF8String to // optimize lookups from the json token, which is also a String - val (fieldNamesEval, setFieldNames) = if (evaluator.constantFields == fieldExpressions.length) { + val (fieldNamesEval, setFieldNames) = if (constantFields == fieldExpressions.length) { // typically the user will provide the field names as foldable expressions // so we can use the cached copy val s = foldableFieldNames.zipWithIndex.map { case (v, i) => if (v != null && v.isDefined) { - s"$fieldNamesTerm[$i] = (String) $refEvaluator.foldableFieldName($i).get();" + s"$fieldNamesTerm[$i] = ${genFoldableFieldNameCode(refFoldableFieldNames, i)};" } else { s"$fieldNamesTerm[$i] = null;" } } (Seq.empty[ExprCode], s) - } else if (evaluator.constantFields == 0) { + } else if (constantFields == 0) { // none are foldable so all field names need to be evaluated from the input row val f = fieldExpressions.map(_.genCode(ctx)) val s = f.zipWithIndex.map { @@ -570,7 +578,7 @@ case class JsonTuple(children: Seq[Expression]) (Some(f), s) case ((v: Option[String], _), i) => val s = if (v.isDefined) { - s"$fieldNamesTerm[$i] = (String) $refEvaluator.foldableFieldName($i).get();" + s"$fieldNamesTerm[$i] = ${genFoldableFieldNameCode(refFoldableFieldNames, i)};" } else { s"$fieldNamesTerm[$i] = null;" } @@ -588,20 +596,25 @@ case class JsonTuple(children: Seq[Expression]) override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val refEvaluator = ctx.addReferenceObj("evaluator", evaluator) + val refFoldableFieldNames = ctx.addReferenceObj("foldableFieldNames", foldableFieldNames) val jsonEval = jsonExpr.genCode(ctx) val fieldNamesTerm = ctx.freshName("fieldNames") - val fieldNamesCode = genFieldNamesCode(ctx, refEvaluator, fieldNamesTerm) + val fieldNamesCode = genFieldNamesCode(ctx, refFoldableFieldNames, fieldNamesTerm) val resultTerm = ctx.freshName("result") val classTagClz = classOf[ClassTag[InternalRow]].getName - val wrapperClz = classOf[mutable.ArraySeq[_]].getName + val immutableArraySeqClz = classOf[scala.collection.immutable.ArraySeq[_]].getName + val mutableArraySeqClz = classOf[mutable.ArraySeq[_]].getName ev.copy(code = code""" |${jsonEval.code} |$fieldNamesCode - |InternalRow[] $resultTerm = (InternalRow[]) $refEvaluator.evaluate(${jsonEval.value}, - | $fieldNamesTerm).toArray($classTagClz$$.MODULE$$.apply(InternalRow.class)); - |boolean ${ev.isNull} = $resultTerm == null; - |$wrapperClz ${ev.value} = $wrapperClz$$.MODULE$$.make($resultTerm); + |InternalRow[] $resultTerm = (InternalRow[]) $refEvaluator.evaluate( + | ${jsonEval.value}, + | new $immutableArraySeqClz.ofRef($fieldNamesTerm) + |).toArray($classTagClz$$.MODULE$$.apply(InternalRow.class)); + |boolean ${ev.isNull} = false; + |$mutableArraySeqClz ${ev.value} = + | $mutableArraySeqClz$$.MODULE$$.make($resultTerm); |""".stripMargin) } From bf9c2872e48fc835ccdaf793cb16e8ee5f312fd5 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Fri, 22 Nov 2024 19:46:30 +0800 Subject: [PATCH 4/7] improve --- .../catalyst/expressions/jsonExpressions.scala | 17 +++++------------ .../expressions/JsonExpressionsSuite.scala | 7 ++++--- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index bd1dd7e123ee9..31b7e05a4aaac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import java.io._ -import scala.collection.mutable -import scala.reflect.ClassTag +import scala.collection.immutable.ArraySeq import scala.util.parsing.combinator.RegexParsers import com.fasterxml.jackson.core._ @@ -597,24 +596,18 @@ case class JsonTuple(children: Seq[Expression]) override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val refEvaluator = ctx.addReferenceObj("evaluator", evaluator) val refFoldableFieldNames = ctx.addReferenceObj("foldableFieldNames", foldableFieldNames) + val wrapperClass = classOf[Seq[_]].getName val jsonEval = jsonExpr.genCode(ctx) val fieldNamesTerm = ctx.freshName("fieldNames") val fieldNamesCode = genFieldNamesCode(ctx, refFoldableFieldNames, fieldNamesTerm) - val resultTerm = ctx.freshName("result") - val classTagClz = classOf[ClassTag[InternalRow]].getName - val immutableArraySeqClz = classOf[scala.collection.immutable.ArraySeq[_]].getName - val mutableArraySeqClz = classOf[mutable.ArraySeq[_]].getName + val fieldNamesClz = classOf[ArraySeq[_]].getName ev.copy(code = code""" |${jsonEval.code} |$fieldNamesCode - |InternalRow[] $resultTerm = (InternalRow[]) $refEvaluator.evaluate( - | ${jsonEval.value}, - | new $immutableArraySeqClz.ofRef($fieldNamesTerm) - |).toArray($classTagClz$$.MODULE$$.apply(InternalRow.class)); |boolean ${ev.isNull} = false; - |$mutableArraySeqClz ${ev.value} = - | $mutableArraySeqClz$$.MODULE$$.make($resultTerm); + |$wrapperClass ${ev.value} = $refEvaluator.evaluate( + | ${jsonEval.value}, new $fieldNamesClz.ofRef($fieldNamesTerm)); |""".stripMargin) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index fdd2281fd4892..bb32a6d7adf26 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -272,9 +272,10 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with assert(jt.eval(null).iterator.to(Seq).head === expected) } - ignore("json_tuple escaping") { - GenerateUnsafeProjection.generate( - JsonTuple(Literal("\"quote") :: Literal("\"quote") :: Nil) :: Nil) + test("json_tuple escaping") { + checkJsonTuple( + JsonTuple(Literal("\"quote") :: Literal("\"quote") :: Nil), + InternalRow.fromSeq(Seq(null).map(UTF8String.fromString))) } test("json_tuple - hive key 1") { From 0a3f6bfa0c76376b11350b1e2815d83cb7829d09 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Fri, 22 Nov 2024 19:55:23 +0800 Subject: [PATCH 5/7] update UT --- .../apache/spark/sql/JsonFunctionsSuite.scala | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index d65939836d769..909a0db6473d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -1457,17 +1457,22 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { checkAnswer(df, Row(null)) } - test("function json_tuple codegen - foldable optimize") { - val df = Seq(("""{"a":1, "b":2}""", "a", "b")).toDF("json", "c1", "c2") - df.createOrReplaceTempView("t") - - val df1 = sql("SELECT json_tuple(json, c1, c2) from t") - checkAnswer(df1, Row("1", "2")) - - val df2 = sql("SELECT json_tuple(json, 'a', c2) from t") - checkAnswer(df2, Row("1", "2")) - - val df3 = sql("SELECT json_tuple(json, 'a', 'b') from t") - checkAnswer(df3, Row("1", "2")) + test("function json_tuple codegen - field name foldable optimize") { + withTempView("t") { + val df = Seq(("""{"a":1, "b":2}""", "a", "b")).toDF("json", "c1", "c2") + df.createOrReplaceTempView("t") + + // all field names are non-foldable + val df1 = sql("SELECT json_tuple(json, c1, c2) from t") + checkAnswer(df1, Row("1", "2")) + + // some foldable, some non-foldable + val df2 = sql("SELECT json_tuple(json, 'a', c2) from t") + checkAnswer(df2, Row("1", "2")) + + // all field names are foldable + val df3 = sql("SELECT json_tuple(json, 'a', 'b') from t") + checkAnswer(df3, Row("1", "2")) + } } } From 9201cb8ea0cd33953cda29ed92ecd51183cbd1e5 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Mon, 25 Nov 2024 15:05:45 +0800 Subject: [PATCH 6/7] update --- .../expressions/jsonExpressions.scala | 23 ++++++++----------- .../apache/spark/sql/JsonFunctionsSuite.scala | 20 ++++++++-------- 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 31b7e05a4aaac..15a609d43d369 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -449,6 +449,11 @@ case class JsonTuple(children: Seq[Expression]) extends Generator with QueryErrorsBase { + override def nullable: Boolean = { + // a row is always returned + false + } + // the json body is the first child @transient private lazy val jsonExpr: Expression = children.head @@ -466,11 +471,6 @@ case class JsonTuple(children: Seq[Expression]) // and count the number of foldable fields, we'll use this later to optimize evaluation @transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null) - override def nullable: Boolean = { - // a row is always returned - false - } - override def elementSchema: StructType = StructType(fieldExpressions.zipWithIndex.map { case (_, idx) => StructField(s"c$idx", children.head.dataType, nullable = true) }) @@ -528,19 +528,16 @@ case class JsonTuple(children: Seq[Expression]) refFoldableFieldNames: String, fieldNamesTerm: String): String = { - def genFoldableFieldNameCode(refIndexedSeq: String, i: Int): String = { + def getFoldableFieldName(refIndexedSeq: String, i: Int): String = { s"(String)((scala.Option)$refIndexedSeq.apply($i)).get();" } - // evaluate the field names as String rather than UTF8String to - // optimize lookups from the json token, which is also a String val (fieldNamesEval, setFieldNames) = if (constantFields == fieldExpressions.length) { - // typically the user will provide the field names as foldable expressions - // so we can use the cached copy + // all field names are foldable, so we can use the cached copy val s = foldableFieldNames.zipWithIndex.map { case (v, i) => if (v != null && v.isDefined) { - s"$fieldNamesTerm[$i] = ${genFoldableFieldNameCode(refFoldableFieldNames, i)};" + s"$fieldNamesTerm[$i] = ${getFoldableFieldName(refFoldableFieldNames, i)};" } else { s"$fieldNamesTerm[$i] = null;" } @@ -561,7 +558,7 @@ case class JsonTuple(children: Seq[Expression]) } (f, s) } else { - // if there is a mix of constant and non-constant expressions + // if there is a mix of constant and non-constant field name, // prefer the cached copy when available val codes = foldableFieldNames.zip(fieldExpressions).zipWithIndex.map { case ((null, expr: Expression), i) => @@ -577,7 +574,7 @@ case class JsonTuple(children: Seq[Expression]) (Some(f), s) case ((v: Option[String], _), i) => val s = if (v.isDefined) { - s"$fieldNamesTerm[$i] = ${genFoldableFieldNameCode(refFoldableFieldNames, i)};" + s"$fieldNamesTerm[$i] = ${getFoldableFieldName(refFoldableFieldNames, i)};" } else { s"$fieldNamesTerm[$i] = null;" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 909a0db6473d4..a4e4a00d454bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -1459,20 +1459,20 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { test("function json_tuple codegen - field name foldable optimize") { withTempView("t") { - val df = Seq(("""{"a":1, "b":2}""", "a", "b")).toDF("json", "c1", "c2") + val df = Seq(("""{"a":1, "b":2, "c":3}""", "a", "b", "c")).toDF("json", "c1", "c2", "c3") df.createOrReplaceTempView("t") - // all field names are non-foldable - val df1 = sql("SELECT json_tuple(json, c1, c2) from t") - checkAnswer(df1, Row("1", "2")) + // all field names are foldable + val df1 = sql("SELECT json_tuple(json, 'a', 'b', 'c') from t") + checkAnswer(df1, Row("1", "2", "3")) - // some foldable, some non-foldable - val df2 = sql("SELECT json_tuple(json, 'a', c2) from t") - checkAnswer(df2, Row("1", "2")) + // the field names some foldable, some non-foldable + val df2 = sql("SELECT json_tuple(json, 'a', c2, 'c') from t") + checkAnswer(df2, Row("1", "2", "3")) - // all field names are foldable - val df3 = sql("SELECT json_tuple(json, 'a', 'b') from t") - checkAnswer(df3, Row("1", "2")) + // all field names are non-foldable + val df3 = sql("SELECT json_tuple(json, c1, c2, c3) from t") + checkAnswer(df3, Row("1", "2", "3")) } } } From 3302fbc5d4f99ff5d45f362505f3144b793912ca Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 26 Nov 2024 09:35:31 +0800 Subject: [PATCH 7/7] update --- .../json/JsonExpressionEvalUtils.scala | 37 ++++++++++--------- .../expressions/jsonExpressions.scala | 32 ++++++++-------- .../apache/spark/sql/JsonFunctionsSuite.scala | 6 +-- 3 files changed, 39 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala index 50476fd15edd3..a1cd4b77c8d96 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala @@ -161,31 +161,34 @@ case class StructsToJsonEvaluator( } } +/** + * The expression `JsonTuple` will utilize it to support codegen. + */ case class JsonTupleEvaluator(fieldsLength: Int) { import SharedFactory._ - // if processing fails this shared value will be returned + // If processing fails this shared value will be returned. @transient private lazy val nullRow: Seq[InternalRow] = new GenericInternalRow(Array.ofDim[Any](fieldsLength)) :: Nil private def parseRow(parser: JsonParser, fieldNames: Seq[String]): Seq[InternalRow] = { - // only objects are supported + // Only objects are supported. if (parser.nextToken() != JsonToken.START_OBJECT) return nullRow val row = Array.ofDim[Any](fieldNames.length) - // start reading through the token stream, looking for any requested field names + // Start reading through the token stream, looking for any requested field names. while (parser.nextToken() != JsonToken.END_OBJECT) { if (parser.getCurrentToken == JsonToken.FIELD_NAME) { - // check to see if this field is desired in the output + // Check to see if this field is desired in the output. val jsonField = parser.currentName var idx = fieldNames.indexOf(jsonField) if (idx >= 0) { - // it is, copy the child tree to the correct location in the output row + // It is, copy the child tree to the correct location in the output row. val output = new ByteArrayOutputStream() - // write the output directly to UTF8 encoded byte array + // Write the output directly to UTF8 encoded byte array. if (parser.nextToken() != JsonToken.VALUE_NULL) { Utils.tryWithResource(jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { generator => copyCurrentStructure(generator, parser) @@ -203,7 +206,7 @@ case class JsonTupleEvaluator(fieldsLength: Int) { } } - // always skip children, it's cheap enough to do even if copyCurrentStructure was called + // Always skip children, it's cheap enough to do even if copyCurrentStructure was called. parser.skipChildren() } new GenericInternalRow(row) :: Nil @@ -211,25 +214,25 @@ case class JsonTupleEvaluator(fieldsLength: Int) { private def copyCurrentStructure(generator: JsonGenerator, parser: JsonParser): Unit = { parser.getCurrentToken match { - // if the user requests a string field it needs to be returned without enclosing - // quotes which is accomplished via JsonGenerator.writeRaw instead of JsonGenerator.write + // If the user requests a string field it needs to be returned without enclosing + // quotes which is accomplished via JsonGenerator.writeRaw instead of JsonGenerator.write. case JsonToken.VALUE_STRING if parser.hasTextCharacters => - // slight optimization to avoid allocating a String instance, though the characters - // still have to be decoded... Jackson doesn't have a way to access the raw bytes + // Slight optimization to avoid allocating a String instance, though the characters + // still have to be decoded... Jackson doesn't have a way to access the raw bytes. generator.writeRaw(parser.getTextCharacters, parser.getTextOffset, parser.getTextLength) case JsonToken.VALUE_STRING => - // the normal String case, pass it through to the output without enclosing quotes + // The normal String case, pass it through to the output without enclosing quotes. generator.writeRaw(parser.getText) case JsonToken.VALUE_NULL => - // a special case that needs to be handled outside of this method. - // if a requested field is null, the result must be null. the easiest - // way to achieve this is just by ignoring null tokens entirely + // A special case that needs to be handled outside of this method. + // If a requested field is null, the result must be null. The easiest + // way to achieve this is just by ignoring null tokens entirely. throw SparkException.internalError("Do not attempt to copy a null field.") case _ => - // handle other types including objects, arrays, booleans and numbers + // Handle other types including objects, arrays, booleans and numbers. generator.copyCurrentStructure(parser) } } @@ -238,7 +241,7 @@ case class JsonTupleEvaluator(fieldsLength: Int) { if (json == null) return nullRow try { /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson - detect character encoding which could fail for some malformed strings */ + detect character encoding which could fail for some malformed strings. */ Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser => parseRow(parser, fieldNames) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 15a609d43d369..de322d075fd91 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -450,17 +450,17 @@ case class JsonTuple(children: Seq[Expression]) with QueryErrorsBase { override def nullable: Boolean = { - // a row is always returned + // A row is always returned. false } - // the json body is the first child + // The json body is the first child. @transient private lazy val jsonExpr: Expression = children.head - // the fields to query are the remaining children + // The fields to query are the remaining children. @transient private lazy val fieldExpressions: Seq[Expression] = children.tail - // eagerly evaluate any foldable the field names + // Eagerly evaluate any foldable the field names. @transient private lazy val foldableFieldNames: IndexedSeq[Option[String]] = { fieldExpressions.map { case expr if expr.foldable => Option(expr.eval()).map(_.asInstanceOf[UTF8String].toString) @@ -468,7 +468,7 @@ case class JsonTuple(children: Seq[Expression]) }.toIndexedSeq } - // and count the number of foldable fields, we'll use this later to optimize evaluation + // And count the number of foldable fields, we'll use this later to optimize evaluation. @transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null) override def elementSchema: StructType = StructType(fieldExpressions.zipWithIndex.map { @@ -499,20 +499,20 @@ case class JsonTuple(children: Seq[Expression]) override def eval(input: InternalRow): IterableOnce[InternalRow] = { val json = jsonExpr.eval(input).asInstanceOf[UTF8String] - // evaluate the field names as String rather than UTF8String to - // optimize lookups from the json token, which is also a String + // Evaluate the field names as String rather than UTF8String to + // optimize lookups from the json token, which is also a String. val fieldNames = if (constantFields == fieldExpressions.length) { - // typically the user will provide the field names as foldable expressions - // so we can use the cached copy + // Typically the user will provide the field names as foldable expressions + // so we can use the cached copy. foldableFieldNames.map(_.orNull) } else if (constantFields == 0) { - // none are foldable so all field names need to be evaluated from the input row + // None are foldable so all field names need to be evaluated from the input row. fieldExpressions.map { expr => Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull } } else { - // if there is a mix of constant and non-constant expressions - // prefer the cached copy when available + // If there is a mix of constant and non-constant expressions + // prefer the cached copy when available. foldableFieldNames.zip(fieldExpressions).map { case (null, expr) => Option(expr.eval(input)).map(_.asInstanceOf[UTF8String].toString).orNull @@ -533,7 +533,7 @@ case class JsonTuple(children: Seq[Expression]) } val (fieldNamesEval, setFieldNames) = if (constantFields == fieldExpressions.length) { - // all field names are foldable, so we can use the cached copy + // All field names are foldable, so we can use the cached copy. val s = foldableFieldNames.zipWithIndex.map { case (v, i) => if (v != null && v.isDefined) { @@ -544,7 +544,7 @@ case class JsonTuple(children: Seq[Expression]) } (Seq.empty[ExprCode], s) } else if (constantFields == 0) { - // none are foldable so all field names need to be evaluated from the input row + // None are foldable so all field names need to be evaluated from the input row. val f = fieldExpressions.map(_.genCode(ctx)) val s = f.zipWithIndex.map { case (exprCode, i) => @@ -558,8 +558,8 @@ case class JsonTuple(children: Seq[Expression]) } (f, s) } else { - // if there is a mix of constant and non-constant field name, - // prefer the cached copy when available + // If there is a mix of constant and non-constant field name, + // prefer the cached copy when available. val codes = foldableFieldNames.zip(fieldExpressions).zipWithIndex.map { case ((null, expr: Expression), i) => val f = expr.genCode(ctx) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index a4e4a00d454bd..544d82a793b95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -1462,15 +1462,15 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { val df = Seq(("""{"a":1, "b":2, "c":3}""", "a", "b", "c")).toDF("json", "c1", "c2", "c3") df.createOrReplaceTempView("t") - // all field names are foldable + // All field names are foldable. val df1 = sql("SELECT json_tuple(json, 'a', 'b', 'c') from t") checkAnswer(df1, Row("1", "2", "3")) - // the field names some foldable, some non-foldable + // The field names some foldable, some non-foldable. val df2 = sql("SELECT json_tuple(json, 'a', c2, 'c') from t") checkAnswer(df2, Row("1", "2", "3")) - // all field names are non-foldable + // All field names are non-foldable. val df3 = sql("SELECT json_tuple(json, c1, c2, c3) from t") checkAnswer(df3, Row("1", "2", "3")) }