Skip to content

Commit

Permalink
[SPARK-50258][SQL] Fix output column order changed issue after AQE op…
Browse files Browse the repository at this point in the history
…timization

### What changes were proposed in this pull request?

The root cause of this issue is the planner turns `Limit` + `Sort` into `TakeOrderedAndProjectExec` which adds an additional `Project` that does not exist in the logical plan. We shouldn't use this additional `Project` to optimize out other `Project`s, otherwise when AQE turns physical plan back to logical plan, we lose the `Project` and may mess up the output column order.

This PR makes it does not remove redundant projects if AEQ is enabled and projectList is the same as child output in `TakeOrderedAndProjectExec`.

### Why are the changes needed?

Fix potential data issue and avoid Spark Driver crash:
```
# more hs_err_pid193136.log
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f9d14841bc0, pid=193136, tid=223205
#
# JRE version: OpenJDK Runtime Environment Zulu17.36+18-SA (17.0.4.1+1) (build 17.0.4.1+1-LTS)
# Java VM: OpenJDK 64-Bit Server VM Zulu17.36+18-SA (17.0.4.1+1-LTS, mixed mode, sharing, tiered, compressed class ptrs, g1 gc, linux-amd64)
# Problematic frame:
# v  ~StubRoutines::jint_disjoint_arraycopy_avx3
#
# Core dump will be written. Default location: /apache/spark-release/3.5.0-20241105/spark/core.193136
...
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #48789 from wangyum/SPARK-50258.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
wangyum authored and cloud-fan committed Nov 20, 2024
1 parent 81a56df commit 6ee53da
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ object RemoveRedundantProjects extends Rule[SparkPlan] {
p.mapChildren(removeProject(_, false))
}
case op: TakeOrderedAndProjectExec =>
op.mapChildren(removeProject(_, false))
// The planner turns Limit + Sort into TakeOrderedAndProjectExec which adds an additional
// Project that does not exist in the logical plan. We shouldn't use this additional Project
// to optimize out other Projects, otherwise when AQE turns physical plan back to
// logical plan, we lose the Project and may mess up the output column order. So column
// ordering is required if AQE is enabled and projectList is the same as child output.
val requireColOrdering = conf.adaptiveExecutionEnabled && op.projectList == op.child.output
op.mapChildren(removeProject(_, requireColOrdering))
case a: BaseAggregateExec =>
// BaseAggregateExec require specific column ordering when mode is Final or PartialMerge.
// See comments in BaseAggregateExec inputAttributes method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, EmptyRelationExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnaryExecNode, UnionExec}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.columnar.{InMemoryTableScanExec, InMemoryTableScanLike}
import org.apache.spark.sql.execution.command.DataWritingCommandExec
Expand All @@ -43,6 +43,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ENSURE_RE
import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, ShuffledJoin, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SparkListenerSQLExecutionStart}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
Expand Down Expand Up @@ -3086,6 +3087,26 @@ class AdaptiveQueryExecSuite
}
}
}

test("SPARK-50258: Fix output column order changed issue after AQE optimization") {
withTable("t") {
sql("SELECT course, year, earnings FROM courseSales").write.saveAsTable("t")
val df = sql(
"""
|SELECT year, course, earnings, SUM(earnings) OVER (ORDER BY year, course) AS balance
|FROM t ORDER BY year, course
|LIMIT 100
|""".stripMargin)
df.collect()

val plan = df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
assert(plan.inputPlan.isInstanceOf[TakeOrderedAndProjectExec])
assert(plan.finalPhysicalPlan.isInstanceOf[WindowExec])
plan.inputPlan.output.zip(plan.finalPhysicalPlan.output).foreach { case (o1, o2) =>
assert(o1.semanticEquals(o2), "Different output column order after AQE optimization")
}
}
}
}

/**
Expand Down

0 comments on commit 6ee53da

Please sign in to comment.