Skip to content

Commit

Permalink
[SPARK-45523][PYTHON] Return useful error message if UDTF returns Non…
Browse files Browse the repository at this point in the history
…e for any non-nullable column

### What changes were proposed in this pull request?

This PR updates Python UDTF evaluation to return a useful error message if UDTF returns None for any non-nullable column.

This implementation also checks recursively for None values in subfields of array/struct/map columns as well.

For example:

```
from pyspark.sql.functions import AnalyzeResult
from pyspark.sql.types import ArrayType, IntegerType, StringType, StructType

class Tvf:
    staticmethod
    def analyze(*args):
        return AnalyzeResult(
            schema=StructType()
                .add("result", ArrayType(IntegerType(), containsNull=False), True)
            )

    def eval(self, *args):
        yield [1, 2, 3, 4],

    def terminate(self):
        yield [1, 2, None, 3],
```

```
SELECT * FROM Tvf(TABLE(VALUES (0), (1)))

> org.apache.spark.api.python.PythonException
[UDTF_EXEC_ERROR] User defined table function encountered an error in the 'eval' or
'terminate' method: Column 0 within a returned row had a value of None, either directly or
within array/struct/map subfields, but the corresponding column type was declared as non
nullable; please update the UDTF to return a non-None value at this location or otherwise
declare the column type as nullable.
```

### Why are the changes needed?

Previously this case returned a null pointer exception.

### Does this PR introduce _any_ user-facing change?

Yes, see above.

### How was this patch tested?

This PR adds new test coverage.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43356 from dtenedor/improve-errors-null-checks.

Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
  • Loading branch information
dtenedor authored and ueshin committed Oct 20, 2023
1 parent 0a45a86 commit 227cd8b
Show file tree
Hide file tree
Showing 6 changed files with 626 additions and 10 deletions.
72 changes: 72 additions & 0 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,13 @@
)
from pyspark.sql.pandas.types import to_arrow_type
from pyspark.sql.types import (
ArrayType,
BinaryType,
DataType,
MapType,
Row,
StringType,
StructField,
StructType,
_create_row,
_parse_datatype_json_string,
Expand Down Expand Up @@ -841,6 +845,71 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
"the query again."
)

# This determines which result columns have nullable types.
def check_nullable_column(i: int, data_type: DataType, nullable: bool) -> None:
if not nullable:
nullable_columns.add(i)
elif isinstance(data_type, ArrayType):
check_nullable_column(i, data_type.elementType, data_type.containsNull)
elif isinstance(data_type, StructType):
for subfield in data_type.fields:
check_nullable_column(i, subfield.dataType, subfield.nullable)
elif isinstance(data_type, MapType):
check_nullable_column(i, data_type.valueType, data_type.valueContainsNull)

nullable_columns: set[int] = set()
for i, field in enumerate(return_type.fields):
check_nullable_column(i, field.dataType, field.nullable)

# Compares each UDTF output row against the output schema for this particular UDTF call,
# raising an error if the two are incompatible.
def check_output_row_against_schema(row: Any, expected_schema: StructType) -> None:
for result_column_index in nullable_columns:

def check_for_none_in_non_nullable_column(
value: Any, data_type: DataType, nullable: bool
) -> None:
if value is None and not nullable:
raise PySparkRuntimeError(
error_class="UDTF_EXEC_ERROR",
message_parameters={
"method_name": "eval' or 'terminate",
"error": f"Column {result_column_index} within a returned row had a "
+ "value of None, either directly or within array/struct/map "
+ "subfields, but the corresponding column type was declared as "
+ "non-nullable; please update the UDTF to return a non-None value at "
+ "this location or otherwise declare the column type as nullable.",
},
)
elif (
isinstance(data_type, ArrayType)
and isinstance(value, list)
and not data_type.containsNull
):
for sub_value in value:
check_for_none_in_non_nullable_column(
sub_value, data_type.elementType, data_type.containsNull
)
elif isinstance(data_type, StructType) and isinstance(value, Row):
for i in range(len(value)):
check_for_none_in_non_nullable_column(
value[i], data_type[i].dataType, data_type[i].nullable
)
elif isinstance(data_type, MapType) and isinstance(value, dict):
for map_key, map_value in value.items():
check_for_none_in_non_nullable_column(
map_key, data_type.keyType, nullable=False
)
check_for_none_in_non_nullable_column(
map_value, data_type.valueType, data_type.valueContainsNull
)

field: StructField = expected_schema[result_column_index]
if row is not None:
check_for_none_in_non_nullable_column(
list(row)[result_column_index], field.dataType, field.nullable
)

if eval_type == PythonEvalType.SQL_ARROW_TABLE_UDF:

def wrap_arrow_udtf(f, return_type):
Expand Down Expand Up @@ -879,6 +948,8 @@ def verify_result(result):
verify_pandas_result(
result, return_type, assign_cols_by_name=False, truncate_return_schema=False
)
for result_tuple in result.itertuples():
check_output_row_against_schema(list(result_tuple), return_type)
return result

# Wrap the exception thrown from the UDTF in a PySparkRuntimeError.
Expand Down Expand Up @@ -973,6 +1044,7 @@ def verify_and_convert_result(result):
},
)

check_output_row_against_schema(result, return_type)
return toInternal(result)

# Evaluate the function and return a tuple back to the executor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,66 @@ org.apache.spark.sql.AnalysisException
}


-- !query
SELECT * FROM InvalidEvalReturnsNoneToNonNullableColumnScalarType(TABLE(t2))
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
SELECT * FROM InvalidEvalReturnsNoneToNonNullableColumnArrayType(TABLE(t2))
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
SELECT * FROM InvalidEvalReturnsNoneToNonNullableColumnArrayElementType(TABLE(t2))
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
SELECT * FROM InvalidEvalReturnsNoneToNonNullableColumnStructType(TABLE(t2))
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
SELECT * FROM InvalidEvalReturnsNoneToNonNullableColumnMapType(TABLE(t2))
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
SELECT * FROM InvalidTerminateReturnsNoneToNonNullableColumnScalarType(TABLE(t2))
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
SELECT * FROM InvalidTerminateReturnsNoneToNonNullableColumnArrayType(TABLE(t2))
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
SELECT * FROM InvalidTerminateReturnsNoneToNonNullableColumnArrayElementType(TABLE(t2))
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
SELECT * FROM InvalidTerminateReturnsNoneToNonNullableColumnStructType(TABLE(t2))
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
SELECT * FROM InvalidTerminateReturnsNoneToNonNullableColumnMapType(TABLE(t2))
-- !query analysis
[Analyzer test output redacted due to nondeterminism]


-- !query
DROP VIEW t1
-- !query analysis
Expand Down
12 changes: 12 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,18 @@ SELECT * FROM
VALUES (0), (1) AS t(col)
JOIN LATERAL
UDTFInvalidOrderByWithoutPartitionBy(TABLE(t2) PARTITION BY partition_col);
-- The following UDTF calls should fail because the UDTF's 'eval' or 'terminate' method returns None
-- to a non-nullable column, either directly or within an array/struct/map subfield.
SELECT * FROM InvalidEvalReturnsNoneToNonNullableColumnScalarType(TABLE(t2));
SELECT * FROM InvalidEvalReturnsNoneToNonNullableColumnArrayType(TABLE(t2));
SELECT * FROM InvalidEvalReturnsNoneToNonNullableColumnArrayElementType(TABLE(t2));
SELECT * FROM InvalidEvalReturnsNoneToNonNullableColumnStructType(TABLE(t2));
SELECT * FROM InvalidEvalReturnsNoneToNonNullableColumnMapType(TABLE(t2));
SELECT * FROM InvalidTerminateReturnsNoneToNonNullableColumnScalarType(TABLE(t2));
SELECT * FROM InvalidTerminateReturnsNoneToNonNullableColumnArrayType(TABLE(t2));
SELECT * FROM InvalidTerminateReturnsNoneToNonNullableColumnArrayElementType(TABLE(t2));
SELECT * FROM InvalidTerminateReturnsNoneToNonNullableColumnStructType(TABLE(t2));
SELECT * FROM InvalidTerminateReturnsNoneToNonNullableColumnMapType(TABLE(t2));

-- cleanup
DROP VIEW t1;
Expand Down
90 changes: 90 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,96 @@ org.apache.spark.sql.AnalysisException
}


-- !query
SELECT * FROM InvalidEvalReturnsNoneToNonNullableColumnScalarType(TABLE(t2))
-- !query schema
struct<>
-- !query output
org.apache.spark.api.python.PythonException
pyspark.errors.exceptions.base.PySparkRuntimeError: [UDTF_EXEC_ERROR] User defined table function encountered an error in the 'eval' or 'terminate' method: Column 0 within a returned row had a value of None, either directly or within array/struct/map subfields, but the corresponding column type was declared as non-nullable; please update the UDTF to return a non-None value at this location or otherwise declare the column type as nullable.


-- !query
SELECT * FROM InvalidEvalReturnsNoneToNonNullableColumnArrayType(TABLE(t2))
-- !query schema
struct<>
-- !query output
org.apache.spark.api.python.PythonException
pyspark.errors.exceptions.base.PySparkRuntimeError: [UDTF_EXEC_ERROR] User defined table function encountered an error in the 'eval' or 'terminate' method: Column 0 within a returned row had a value of None, either directly or within array/struct/map subfields, but the corresponding column type was declared as non-nullable; please update the UDTF to return a non-None value at this location or otherwise declare the column type as nullable.


-- !query
SELECT * FROM InvalidEvalReturnsNoneToNonNullableColumnArrayElementType(TABLE(t2))
-- !query schema
struct<>
-- !query output
org.apache.spark.api.python.PythonException
pyspark.errors.exceptions.base.PySparkRuntimeError: [UDTF_EXEC_ERROR] User defined table function encountered an error in the 'eval' or 'terminate' method: Column 0 within a returned row had a value of None, either directly or within array/struct/map subfields, but the corresponding column type was declared as non-nullable; please update the UDTF to return a non-None value at this location or otherwise declare the column type as nullable.


-- !query
SELECT * FROM InvalidEvalReturnsNoneToNonNullableColumnStructType(TABLE(t2))
-- !query schema
struct<>
-- !query output
org.apache.spark.api.python.PythonException
pyspark.errors.exceptions.base.PySparkRuntimeError: [UDTF_EXEC_ERROR] User defined table function encountered an error in the 'eval' or 'terminate' method: Column 0 within a returned row had a value of None, either directly or within array/struct/map subfields, but the corresponding column type was declared as non-nullable; please update the UDTF to return a non-None value at this location or otherwise declare the column type as nullable.


-- !query
SELECT * FROM InvalidEvalReturnsNoneToNonNullableColumnMapType(TABLE(t2))
-- !query schema
struct<>
-- !query output
org.apache.spark.api.python.PythonException
pyspark.errors.exceptions.base.PySparkRuntimeError: [UDTF_EXEC_ERROR] User defined table function encountered an error in the 'eval' or 'terminate' method: Column 0 within a returned row had a value of None, either directly or within array/struct/map subfields, but the corresponding column type was declared as non-nullable; please update the UDTF to return a non-None value at this location or otherwise declare the column type as nullable.


-- !query
SELECT * FROM InvalidTerminateReturnsNoneToNonNullableColumnScalarType(TABLE(t2))
-- !query schema
struct<>
-- !query output
org.apache.spark.api.python.PythonException
pyspark.errors.exceptions.base.PySparkRuntimeError: [UDTF_EXEC_ERROR] User defined table function encountered an error in the 'eval' or 'terminate' method: Column 0 within a returned row had a value of None, either directly or within array/struct/map subfields, but the corresponding column type was declared as non-nullable; please update the UDTF to return a non-None value at this location or otherwise declare the column type as nullable.


-- !query
SELECT * FROM InvalidTerminateReturnsNoneToNonNullableColumnArrayType(TABLE(t2))
-- !query schema
struct<>
-- !query output
org.apache.spark.api.python.PythonException
pyspark.errors.exceptions.base.PySparkRuntimeError: [UDTF_EXEC_ERROR] User defined table function encountered an error in the 'eval' or 'terminate' method: Column 0 within a returned row had a value of None, either directly or within array/struct/map subfields, but the corresponding column type was declared as non-nullable; please update the UDTF to return a non-None value at this location or otherwise declare the column type as nullable.


-- !query
SELECT * FROM InvalidTerminateReturnsNoneToNonNullableColumnArrayElementType(TABLE(t2))
-- !query schema
struct<>
-- !query output
org.apache.spark.api.python.PythonException
pyspark.errors.exceptions.base.PySparkRuntimeError: [UDTF_EXEC_ERROR] User defined table function encountered an error in the 'eval' or 'terminate' method: Column 0 within a returned row had a value of None, either directly or within array/struct/map subfields, but the corresponding column type was declared as non-nullable; please update the UDTF to return a non-None value at this location or otherwise declare the column type as nullable.


-- !query
SELECT * FROM InvalidTerminateReturnsNoneToNonNullableColumnStructType(TABLE(t2))
-- !query schema
struct<>
-- !query output
org.apache.spark.api.python.PythonException
pyspark.errors.exceptions.base.PySparkRuntimeError: [UDTF_EXEC_ERROR] User defined table function encountered an error in the 'eval' or 'terminate' method: Column 0 within a returned row had a value of None, either directly or within array/struct/map subfields, but the corresponding column type was declared as non-nullable; please update the UDTF to return a non-None value at this location or otherwise declare the column type as nullable.


-- !query
SELECT * FROM InvalidTerminateReturnsNoneToNonNullableColumnMapType(TABLE(t2))
-- !query schema
struct<>
-- !query output
org.apache.spark.api.python.PythonException
pyspark.errors.exceptions.base.PySparkRuntimeError: [UDTF_EXEC_ERROR] User defined table function encountered an error in the 'eval' or 'terminate' method: Column 0 within a returned row had a value of None, either directly or within array/struct/map subfields, but the corresponding column type was declared as non-nullable; please update the UDTF to return a non-None value at this location or otherwise declare the column type as nullable.


-- !query
DROP VIEW t1
-- !query schema
Expand Down
Loading

0 comments on commit 227cd8b

Please sign in to comment.