diff --git a/RFC-0004-support-distributed-procedure.md b/RFC-0004-support-distributed-procedure.md new file mode 100644 index 00000000..cf477a70 --- /dev/null +++ b/RFC-0004-support-distributed-procedure.md @@ -0,0 +1,387 @@ + +## **Expand Procedure Architecture to Support Distributed Execution** + +## Proposers + +* Dong Wang + +## Related Issues + +* https://github.com/prestodb/presto/issues/20425 + +## Summary + +Propose design to expand the current procedure architecture in presto, support defining, registering and calling procedures which need to be executed in a distributed way. + +Besides, in order to demonstrate as an example and figure out the functional boundaries among the different architecture levels, also describe the design for Iceberg to support distributed procedure and a specific distributed procedure `rewrite_data_files` here. + +## Background + +### Problem + +Currently, presto only support procedures which running on coordinator only, these procedures are mainly used to do some metadata managements, so do not need to be executed distributively. That is, no worker nodes participate in its execution process. + +But in practice, there indeed exists some functions that could not simply be completed in coordinator only. These functions often involve operations on table's data, for example, rewrite table data, merge small data files, sort table data, repartition table data etc. These features may not have a specific standard SQL syntax corresponding to them, and often related to specific connectors. + +Therefore, compared to extending SQL syntax, it is more suitable to be implemented as a procedure. + +### Other Computing Engines + +#### Spark + +Spark support calling procedures which need to be executed distributively. For example, Spark has procedures as follows on Iceberg: + +`rewrite_data_files` is used to compact data files in parallel, it will combine small files into larger files to reduce metadata overhead and runtime file open cost. + +```sql +CALL catalog_name.system.rewrite_data_files('db.sample'); +CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"'); +``` + +`rewrite_position_delete_files` is used to serves two purposes: +* Minor Compaction: Compact small position delete files into larger ones. +* Remove Dangling Deletes: Filter out position delete records that refer to data files that are no longer live. + +```sql +CALL catalog_name.system.rewrite_position_delete_files('db.sample'); +CALL catalog_name.system.rewrite_position_delete_files(table => 'db.sample', options => map('rewrite-all', 'true')); +``` + +Reference: +https://iceberg.apache.org/docs/nightly/spark-procedures/#rewrite_data_files +https://iceberg.apache.org/docs/nightly/spark-procedures/#rewrite_position_delete_files + +#### BigQuery + +BigQuery's system procedure `BQ.REFRESH_MATERIALIZED_VIEW` is used to refresh a materialized view. It involves scanning the data from source table, calculating and rewriting to the target materialized view, so need to be executed distributively. + +```sql +CALL BQ.REFRESH_MATERIALIZED_VIEW(view_name); +``` + +Reference: https://cloud.google.com/bigquery/docs/reference/system-procedures#bqrefresh_materialized_view + +#### Trino + +Trino use an self expanded SQL grammar `ALTER TABLE EXECUTE` to support these scenarios. For example, it use optimize command to rewrite the content of the specified table so that it is merged into fewer but larger files: + +```sql +ALTER TABLE test_table EXECUTE optimize; +ALTER TABLE test_table EXECUTE optimize(file_size_threshold => '128MB'); +ALTER TABLE test_partitioned_table EXECUTE optimize WHERE partition_key = 1; +``` + +Reference: https://trino.io/docs/current/connector/iceberg.html#alter-table-execute + +## Goals + +* Expand the ability of procedures, enable them to leverage worker nodes to manipulate table data other than metadata. +* Expand currently procedure architecture to support definition, registration, invocation and execution of distributed procedures. +* Provide clarify architecture hierarchy and well defined SPI interface, enable various connectors to easily support distributed procedures. +* Support distributed procedure for Iceberg connector, and implement a specific distributed procedure: `rewrite_data_files`. + +## Proposed Implementation + +### overview + +→ Distributed procedure is used for processing of actual data for a target table, such as data rewriting, merging. We need to re-factor and expand `Procedure/ProcedureRegistry` architecture, so that we can: +* Define a distributed procedure and register it into procedure registry +* Acquire and use procedure registry in `presto-analyzer` and connectors module + +→ Define a new query type `CALL_DISTRIBUTED_PROCEDURE`, and set the call distributed procedure statement to this type in preparer stage. So that this kind of statements would fall into the path of `SqlQueryExecutionFactory.createQueryExecution(...)`. The generated `SqlQueryExecution` object can utilize the existing distributed querying, writing, and final committing mechanism to implement the execution of distributed procedures. + +→ Add a new plan node type: `CallDistributedProcedureNode`. During the analysis and planning stage, construct a logical plan with the following shape for `call distributed procedure` statement: + +```text +TableScanNode -> FilterNode -> CallDistributedProcedureNode -> TableFinishNode -> OutputNode +``` + +Wherein, `CallDistributedProcedureNode` holds `CallDistributedProcedureTarget`, which is a subclass of `WriterTarget`, used to maintain the procedure information and related call parameters information. + + + +→ The optimizing, segmenting, group execution tagging, and local planning of `CallDistributedProcedureNode` are similar to `TableWriterNode`. And it would be ultimately local planned as a `TableWriterOperator` (which holds a different type of ExecutionWriterTarget subclass). When creating a `PageSink` to execute data writing, the corresponding `ConnectorPageSink` will be generated based on the specific subclass and property values of `ExecutionWriterTarget` that are actually held. + +→ Similar to statements such as `create table as`/`insert`/`delete`/`refresh material view` that involve distributed processing, two related SPI methods are defined for `call distributed procedure` statement in the metadata and connector metadata interfaces. `beginCallDistributedProcedure` is used for preparation work before the start of distributed scheduling. And `finishCallDistributedProcedure` is used for transaction commitment after the completion of distributed writing. + +→ In the implementation logic of method `beginCallDistributedProcedure` and `finishCallDistributedProcedure` in `ConnectorMetadata` for specific connector (such as Iceberg), in addition to accomplish the common logic (such as starting a transaction, building a transaction context, committing a transaction, etc.), it will also resolve the specified distributed procedure and call it's relevant method to execute the specific logic. + +### Expand procedure architecture to support executing distributively + +#### 1. Re-factor Procedure/ProcedureRegistry + +Add a new type `DistributedProcedure`, which is a subclass of `Procedure`, but with some special requirements: the declared parameters must include `schema` and `table_name`, which are used to specify the target table. There are no strict restrictions on the position and order of these two parameters in the parameter list, and are generally placed in the first two by default. + +`DistributedProcedure` does not utilize the property `methodHandle` in the parent class, but instead defines two additional method interfaces: `BeginCallDistributedProcedure` and `FinishCallDistributedProcedure`. These two method interfaces would be invoked from related methods in the specific implementation of `ConnectorMetadata`. And a distributed procedure can customize its actual actions in the distributed execution by implementing these two method interfaces: + +```java +@FunctionalInterface +public interface BeginCallDistributedProcedure +{ + ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorTransactionContext transactionContext, ConnectorTableLayoutHandle tableLayoutHandle, Object[] arguments); +} + +@FunctionalInterface +public interface FinishCallDistributedProcedure +{ + void finish(ConnectorTransactionContext transactionContext, ConnectorDistributedProcedureHandle procedureHandle, Collection fragments); +} +``` + +Due to the fact that `DistributedProcedure` is a subclass of `Procedure`, the current implementation of `ProcedureRegistry` directly supports registration, deletion, and maintenance of `DistributedProcedure`. Besides, two additional methods would be added to `ProcessureRegistry` to support the related judgment and resolving functions of `DistributedProcedure`: + +```java + + public DistributedProcedure resolveDistributed(ConnectorId connectorId, SchemaTableName name); + public boolean isDistributedProcedure(ConnectorId connectorId, SchemaTableName name); + +``` + +Extract interface `IProcedureRegistry` for `ProcedureRegistry` and declare it in presto-spi module, so that it can be recognized and used in both the presto-analyzer and connector modules. + +```java +public interface IProcedureRegistry +{ + void addProcedures(ConnectorId connectorId, Collection procedures); + + void removeProcedures(ConnectorId connectorId); + + Procedure resolve(ConnectorId connectorId, SchemaTableName name); + + DistributedProcedure resolveDistributed(ConnectorId connectorId, SchemaTableName name); + + boolean isDistributedProcedure(ConnectorId connectorId, SchemaTableName name); +} +``` + +#### 2. Set new query type during preparation stage + +Add a new type in `QueryType` called `CALL_DISTRIBUTED_PROCEDURE(10)` for `call distributed procedure` statement, it's different from the query type 'DATA-DEFINITION' originally set for `call procedure` statement. + +In the preparer stage, when encountering a call statement, use the injected procedure registry to determine whether the called procedure is a `DistributedProcedure`. If so, set its query type to `CALL_DISTRIBUTED_PROCEDURE`. + +In this way, in `LocalDispatchQueryFactory` we can choose `SqlQueryExecutionFactory` to generate the query execution for `call distributed procedure` statement. This allows for the utilization of existing distributed scan data, write, and unified commit mechanisms during actual execution. + +#### 3. Processing logic during analyzing and planning stage + +. Add a new plan node type `CallDistributedProcedureNode` for `call distributed procedure` statement, which is defined as follows: + +```java +public class CallDistributedProcedureNode + extends InternalPlanNode +{ + private final PlanNode source; + private final Optional target; + private final VariableReferenceExpression rowCountVariable; + private final VariableReferenceExpression fragmentVariable; + private final VariableReferenceExpression tableCommitContextVariable; + private final List columns; + private final List columnNames; + private final Set notNullColumnVariables; + private final Optional partitioningScheme; + private final List outputs; + + ...... +} +``` + +At present, it can be regarded as a simplified version of `TableWriterNode`, and holds `CallDistributedProcedureTarget`, a subclass of `WriterTarget` used to maintain information about the procedure and the call parameters: + +```java +public static class CallDistributedProcedureTarget + extends WriterTarget +{ + private final QualifiedObjectName procedureName; + private final Object[] procedureArguments; + private final Optional sourceHandle; + private final SchemaTableName schemaTableName; + + ..... +} +``` + +. During the analysis and planning phase, the call distributed procedure statement will be planned into a logical plan with the following shape: + +```text +TableScanNode -> FilterNode -> CallDistributedProcedureNode -> TableFinishNode -> OutputNode +``` + +Among them, `TableScanNode -> FilterNode` defines the data to be processed. It's based on the target table determined by `schema` and `table_name` in the parameters, as well as the possibly existing filter conditions. + +The `CallDistributedProcedureNode -> TableFinishNode` structure is similar to the `TableWriterNode -> TableFinishNode` structure, used to perform distributed execution and final unified submission behavior for actual data operations. + +Note: when supporting `call distributed procedure` statement, using a new plan node `CallDistributedProcedureNode` instead of directly reusing `TableWriterNode`, mainly considering that although they are very similar in the entire process, they may have many differences in the future, for example: +* The `call distributed procedure` statement may not only return the updated rows number, but also a lot of content (such as the number of affected files, etc., referring spark's procedure for iceberg `rewrite_data_files`) +* We may allow each specific distributed procedure to define its own return result columns in the future + +#### 4. Processing logic during optimizing, segmenting, and local planning + +The optimizing, segmenting, group execution tagging, and local planning of `CallDistributedProcedureNode` are similar to `TableWriterNode`. Due to the current definition of `CallDistributedProcedureNode` skipping some fields in `TableWriterNode` that are currently considered not so necessary for distributed procedure, the current handling of `CallDistributedProcedureNode` in various visitors can be considered as a simplified version of `TableWriterNode`. It was ultimately planned to a `TableWriterOperator`, which holds a `ExecuteProcedureHandle` used to maintain informations of the called procedure and the target table: + +```java +public static class ExecuteProcedureHandle + extends ExecutionWriterTarget +{ + private final DistributedProcedureHandle handle; + private final SchemaTableName schemaTableName; + private final QualifiedObjectName procedureName; + + ...... +} + +public final class DistributedProcedureHandle +{ + private final ConnectorId connectorId; + private final ConnectorTransactionHandle transactionHandle; + private final ConnectorDistributedProcedureHandle connectorHandle; + + ...... +} +``` + +`ConnectorDistributedProcedureHandle` is an empty interface that needs to be implemented by various connectors. It represents a handle related to the execution of distributed procedures that Presto engine itself does not interpret and utilize, but is passed to the specific connector through connector metadata related interfaces for interpretation and use. + +. Finally, a new optimizer `RewriteWriterTarget` is added specifically, which is placed after all optimization rules. The reason is that after optimizing the entire plan, some filter conditions may be pushed down to `TableScanNode`, and at this time, the information in `TableHandle` of the target table held in `CallDistributedProcedureTarget` built during the logical planning phase may become inaccurate (without including the pushed down filter conditions). Therefore, it is necessary to update the `CallDistributedProcedureTarget` held in `TableFinishNode` and `CallDistributedProcedureNode` based on the `TableHandle` held in the underlying `TableScanNode` after the entire optimization is completed. + +#### 5. Definition of metadata interface + +Similar to statements such as `create table as`/`insert`/`delete`/`refresh material view` that involve distributed processing, two related SPI methods are defined for `call distributed procedure` statement in the metadata and connector metadata interfaces: + +```java +public interface Metadata +{ + ...... + + /** + * Begin call distributed procedure + */ + DistributedProcedureHandle beginCallDistributedProcedure(Session session, QualifiedObjectName procedureName, TableHandle tableHandle, Object[] arguments); + + /** + * Finish call distributed procedure + */ + void finishCallDistributedProcedure(Session session, DistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection fragments); + + ...... +} + +public interface ConnectorMetadata +{ + ...... + + /** + * Begin call distributed procedure + */ + default ConnectorDistributedProcedureHandle beginCallDistributedProcedure( + ConnectorSession session, + QualifiedObjectName procedureName, + ConnectorTableLayoutHandle tableLayoutHandle, + Object[] arguments) + { + throw new PrestoException(NOT_SUPPORTED, "This connector does not support distributed procedure"); + } + + /** + * Finish call distributed procedure + * + * @param fragments all fragments returned by {@link com.facebook.presto.spi.UpdatablePageSource#finish()} + */ + default void finishCallDistributedProcedure(ConnectorSession session, ConnectorDistributedProcedureHandle procedureHandle, QualifiedObjectName procedureName, Collection fragments) + { + throw new PrestoException(NOT_SUPPORTED, "This connector does not support distributed procedure"); + } + + ...... +} +``` + +`beginCallDistributedProcedure` is used for preparation work before the start of distributed scheduling and generate the execution information that needs to be passed to worker nodes. And `finishCallDistributedProcedure` is used for transaction commitment after the completion of distributed writing. + +In the implementation logic of method `beginCallDistributedProcedure` and `finishCallDistributedProcedure` in `ConnectorMetadata` for specific connector (such as Iceberg), in addition to accomplish the common logic (such as starting a transaction, building a transaction context, committing a transaction, etc.), it will also resolve the specified distributed procedure and call it's relevant method to execute the specific logic. + +### Iceberg connector support distributed procedure + +In Iceberg, we often need to record the original data files that have been scanned and rewritten during the actual execution of table data operations (including deleted files that have been fully applied), and in the final submission, combine the newly generated data files due to rewriting to make some changes and transaction submissions at the metadata level. + +Therefore, we extended the concept of transaction context, and maintain the connector split source planned by the execution of `BeginCallDistributedProcedure` into current transaction context. Besides, we also maintain the actual scanned and processed data files and delete files during data file planning into current transaction context. The definition of the extended Iceberg transaction context is as follows: + +```java +public class IcebergTransactionContext + implements ConnectorTransactionContext +{ + final Optional table; + final Transaction transaction; + final Set scannedDataFiles = new HashSet<>(); + final Set fullyAppliedDeleteFiles = new HashSet<>(); + Optional connectorSplitSource = Optional.empty(); + + ...... +} +``` + +We also extended `IcebergSplitManager`, which will first check whether there is a planned `ConnectorSplitSource` set in current transaction context. If there is, then the split source will be used directly. This allows us to customize our own connector split source, and implement some additional logic within it, such as recording the actual scanned data files and delete files as we batch by batch obtain splits information during scheduling. The scanned data files and delete files are also maintained in current transaction context for future usage in unified submission. + +Define the `IcebergDistributedProcedureHandle` class, which is an implementation class of the `ConnectorDistributedProcedureHandle` interface described above. It represents the information related to the actual writing of specific data split: + +```java +public class IcebergDistributedProcedureHandle + implements ConnectorDistributedProcedureHandle +{ + private final String schemaName; + private final IcebergTableName tableName; + private final String schemaAsJson; + private final String partitionSpecAsJson; + private final List inputColumns; + private final String outputPath; + private final FileFormat fileFormat; + private final Map storageProperties; + + ...... +} +``` + +The implementation logic of Iceberg for `ConnectorMetadata.beginCallDistributedProcedure(...)` is as follows: +* Firstly, get a reference to the iceberg table based on the parameters +* Next, open a new Iceberg transaction through the reference of the table, and encapsulate it as a transaction context `IcebergTransactionContext` +* Resolve the procedure specified in the parameters through the injected procedure registry, and verify that it is of type `DistributedProcedure` +* Finally, call `begin(...)` of the method handle `beginCallDistributedProcedure` held by the procedure, pass in the transaction context plus with other relevant parameters, and return the result of type `ConnectorDistributedProcedureHandle`. + +The implementation logic of Iceberg for `ConnectorMetadata.finishCallDistributedProcedure(...)` is as follows: +* Firstly, resolve the procedure specified in the parameters through the injected procedure registry, and verify that it is of type `DistributedProcedure` +* Then, call `finish(...)` of the method handle `finishCallDistributedProcedure` held by the procedure, pass in the transaction context plus with other relevant parameters +* Finally, commit the Iceberg transaction held in current transaction context and then destroy the transaction context + +### Iceberg connector's implementation for `rewrite_data_files` + +Firstly, a dedicated `CallDistributedProcedureSplitSource` was defined to support generating a split source with additional custom logic. It will receive a consumer for `FileScanTask`, and would be used in the implementation logic of the interface method `BeginCallDistributedProcedure` by `rewrite_data_files`. As the split information is obtained batch by batch during actual scheduling, the consumer logic passed in by `rewrite_data_files` will be executed, which records the actual scanned data files and delete files information in the current transaction context for later use in submission. + +Then build a `DistributedProcedure` instance with parameters description as follows and a namespace of `system.rewrite_data_files`. And follow the usual procedure to register it into the `ProcedureRegistry`. + +```text +(schema: varchar, table_name: varchar, filter: varchar, options: map (varchar, varchar)) +``` + +The implementation logic for `BeginCallDistributedProcedure` in `rewrite_data_files` is: +* Obtain `Table icebergTable` and `IcebergTableHandle tableHandle` from current transaction context +* Build Iceberg's table scan object based on `snapshotId` and the valid filter condition `validPredicate` recorded in parameter `icebergTableLayoutHandle` +* Build a consumer for `FileScanTask`, where the data files and delete files from corresponding scan task will be maintained in the corresponding properties of current transaction context. +* Build a `CallDistributedProcedureSplitSource` object based on the table scan and consumer obtained above, and set it into the current transaction context +* Finally, based on the relevant information of `icebergTable` and `tableHandle`, construct an `IcebergDistributedProcedureHandle` object and return it + +The implementation logic for `FinishCallDistributedProcedure` in `rewrite_data_files` is: +* Obtain `Table icebergTable` from current transaction context +* Deserialize the parameter `fragments` to obtain the submission information of each task `List commitTasks` passed from the previous stage +* Obtain the set of new data files `Set newFiles` generated due to the actual distributed writing through `commitTasks` +* Retrieve the set of scanned data files `Set scannedDataFiles` as well as the set of fully applied deleted files `Set fullyAppliedDeleteFiles` from current transaction context +* Leverage `RewriteFiles` API from current Iceberg transaction to replace the scanned data files and fully applied delete files with the newly generated data files at metadata level +* Commit the single action, not the whole transaction. (The whole transaction would be unified committed in the related method of iceberg metadata) + +## Adoption Plan + +- As this is an extended feature, it should not have any impact on existing users. +- For connector maintainers, they can decide to implement the new defined SPI method to support distributed procedure, or leave it unimplemented. +- We will document the procedure architecture expansion and related SPI methods, so that connector maintainers could more easily understand and support this expansion. + +## Test Plan + +We will add test cases in each stages involving this procedure architecture expansion, including distributed procedures' creation and register, preparing for call distributed procedure, analyzing for call distributed procedure, logical planning and optimizing for call distributed procedure, and finally test for a specific iceberg distributed procedure: `rewrite_data_files`. \ No newline at end of file