-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-46913][SS] Add support for processing/event time based timers with transformWithState operator #45051
Conversation
…tely and also for each col family in RocksDB state store provider
@HeartSaVioR - PTAL, thx ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still reviewing, posting initial set of comments.
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
Show resolved
Hide resolved
sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala
Outdated
Show resolved
Hide resolved
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
Outdated
Show resolved
Hide resolved
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making these changes.
...re/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. One minor comment, thanks.
...re/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went through implementation part, will continue with tests.
sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done reviewing for the first round.
...test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 pending CI
Thanks! Merging to master. |
…with transformWithState operator ### What changes were proposed in this pull request? Add support for processing/event time based timers with `transformWithState` operator ### Why are the changes needed? Changes are required to add event-driven timer based support for stateful streaming applications based on arbitrary state API with the `transformWithState` operator As part of this change - we introduce a bunch of functions that users can use within the `StatefulProcessor` logic. Using the `StatefulProcessorHandle`, users can do the following: - register timer at a given timestamp - delete timer at a given timestamp - list timers Note that all the above operations are tied to the implicit grouping key. In terms of the implementation, we make use of additional column families to support the operations mentioned above. For registered timers, we maintain a primary index (as a col family) that keeps the mapping between the grouping key and expiry timestamp. This col family is used to add and delete timers with direct access to the key and also for listing registered timers for a given grouping key using `prefix scan`. We also maintain a secondary index that inverts the ordering of the timestamp and grouping key. We will incorporate the use of the range scan encoder for this col family in a separate PR. Few additional constraints: - only registered timers are tracked and occupy storage (locally and remotely) - col families starting with `_` are reserved and cannot be used as state variables - timers are checkpointed as before - users have to provide a `timeoutMode` to the operator. Currently, they can choose to not register timeouts or register timeouts that are processing-time based or event-time based. However, this mode has to be declared upfront within the operator arguments. ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Added unit tests as well as pseudo-integration tests StatefulProcessorHandleSuite ``` 13:58:42.463 WARN org.apache.spark.sql.execution.streaming.state.StatefulProcessorHandleSuite: ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.StatefulProcessorHandleSuite, threads: rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) ===== [info] Run completed in 4 seconds, 559 milliseconds. [info] Total number of tests run: 8 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` TransformWithStateSuite ``` 13:48:41.858 WARN org.apache.spark.sql.streaming.TransformWithStateSuite: ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.streaming.TransformWithStateSuite, threads: QueryStageCreator-0 (daemon=true), state-store-maintenance-thread-0 (daemon=true), ForkJoinPool.commonPool-worker-4 (daemon=true), state-store-maintenance-thread-1 (daemon=true), QueryStageCreator-1 (daemon=true), rpc-boss-3-1 (daemon=true), F orkJoinPool.commonPool-worker-3 (daemon=true), QueryStageCreator-2 (daemon=true), QueryStageCreator-3 (daemon=true), state-store-maintenance-task (daemon=true), ForkJoinPool.com... [info] Run completed in 1 minute, 32 seconds. [info] Total number of tests run: 20 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 20, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45051 from anishshri-db/task/SPARK-46913. Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
What changes were proposed in this pull request?
Add support for processing/event time based timers with
transformWithState
operatorWhy are the changes needed?
Changes are required to add event-driven timer based support for stateful streaming applications based on arbitrary state API with the
transformWithState
operatorAs part of this change - we introduce a bunch of functions that users can use within the
StatefulProcessor
logic. Using theStatefulProcessorHandle
, users can do the following:Note that all the above operations are tied to the implicit grouping key.
In terms of the implementation, we make use of additional column families to support the operations mentioned above. For registered timers, we maintain a primary index (as a col family) that keeps the mapping between the grouping key and expiry timestamp. This col family is used to add and delete timers with direct access to the key and also for listing registered timers for a given grouping key using
prefix scan
. We also maintain a secondary index that inverts the ordering of the timestamp and grouping key. We will incorporate the use of the range scan encoder for this col family in a separate PR.Few additional constraints:
_
are reserved and cannot be used as state variablestimeoutMode
to the operator. Currently, they can choose to not register timeouts or register timeouts that are processing-time based or event-time based. However, this mode has to be declared upfront within the operator arguments.Does this PR introduce any user-facing change?
Yes
How was this patch tested?
Added unit tests as well as pseudo-integration tests
StatefulProcessorHandleSuite
TransformWithStateSuite
Was this patch authored or co-authored using generative AI tooling?
No