Skip to content

Commit

Permalink
[SPARK-49808][SQL] Fix a deadlock in subquery execution due to lazy vals
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Fix a deadlock in subquery execution due to lazy vals

### Why are the changes needed?
we observed a deadlock between `QueryPlan.canonicalized` and `QueryPlan.references`:
```
24/09/04 04:46:54 ERROR DeadlockDetector: Found 2 new deadlock thread(s):
"ScalaTest-run-running-SubquerySuite" prio=5 Id=1 BLOCKED on org.apache.spark.sql.execution.aggregate.HashAggregateExec87abc7f owned by "subquery-5" Id=112
	at app//org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:684)
	-  blocked on org.apache.spark.sql.execution.aggregate.HashAggregateExec87abc7f
	at app//org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:684)
	at app//org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$2(QueryPlan.scala:716)
	at app//org.apache.spark.sql.catalyst.plans.QueryPlan$$Lambda$4058/0x00007f740f3d0cb0.apply(Unknown Source)
	at app//org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1314)
	at app//org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1313)
	at app//org.apache.spark.sql.execution.WholeStageCodegenExec.mapChildren(WholeStageCodegenExec.scala:639)
	at app//org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:716)
	...

"subquery-5" daemon prio=5 Id=112 BLOCKED on org.apache.spark.sql.execution.WholeStageCodegenExec132a3243 owned by "ScalaTest-run-running-SubquerySuite" Id=1
	at app//org.apache.spark.sql.catalyst.plans.QueryPlan.references$lzycompute(QueryPlan.scala:101)
	-  blocked on org.apache.spark.sql.execution.WholeStageCodegenExec132a3243
	at app//org.apache.spark.sql.catalyst.plans.QueryPlan.references(QueryPlan.scala:101)
	at app//org.apache.spark.sql.execution.CodegenSupport.usedInputs(WholeStageCodegenExec.scala:325)
	at app//org.apache.spark.sql.execution.CodegenSupport.usedInputs$(WholeStageCodegenExec.scala:325)
	at app//org.apache.spark.sql.execution.WholeStageCodegenExec.usedInputs(WholeStageCodegenExec.scala:639)
	at app//org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:187)
	at app//org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:157)
	at app//org.apache.spark.sql.execution.aggregate.HashAggregateExec.consume(HashAggregateExec.scala:53)
```

The main thread `TakeOrderedAndProject.doExecute` is trying to compute `outputOrdering`, it top-down traverse the tree, and requires the lock of `QueryPlan.canonicalized` in the path.
In this deadlock, it successfully obtained the lock of `WholeStageCodegenExec` and requires the lock of `HashAggregateExec`;

Concurrently, a subquery execution thread is performing code generation and bottom-up traverses the tree via `def consume`, which checks `WholeStageCodegenExec.usedInputs` and refererences a lazy val `QueryPlan.references`. It requires the lock of `QueryPlan.references` in the path.
In this deadlock, it successfully obtained the lock of `HashAggregateExec` and requires the lock of `WholeStageCodegenExec`;

This is due to Scala's lazy val internally calls this.synchronized on the instance that contains the val. This creates a potential for deadlocks.

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
manually checked with `com.databricks.spark.sql.SubquerySuite`

we encountered this issue multiple times before this fix in `SubquerySuite`, and after this fix we didn't hit this issue in multiple runs.

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #48279 from zhengruifeng/fix_deadlock.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
zhengruifeng authored and cloud-fan committed Sep 27, 2024
1 parent 9b739d4 commit d7abddc
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.trees.TreePatternBits
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.LazyTry
import org.apache.spark.util.collection.BitSet

/**
Expand Down Expand Up @@ -94,9 +95,11 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
* All Attributes that appear in expressions from this operator. Note that this set does not
* include attributes that are implicitly referenced by being passed through to the output tuple.
*/
def references: AttributeSet = lazyReferences.get

@transient
lazy val references: AttributeSet = {
AttributeSet.fromAttributeSets(expressions.map(_.references)) -- producedAttributes
private val lazyReferences = LazyTry {
AttributeSet(expressions) -- producedAttributes
}

/**
Expand Down

0 comments on commit d7abddc

Please sign in to comment.