Skip to content

Commit

Permalink
[SPARK-45999][PS] Use dedicated PandasProduct in cumprod
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Use dedicated `PandasProduct` in `cumprod`

### Why are the changes needed?
to be consistent with the `prod`

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #43901 from zhengruifeng/ps_cumprod.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
  • Loading branch information
zhengruifeng committed Nov 20, 2023
1 parent 1270798 commit 398bff7
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 29 deletions.
2 changes: 0 additions & 2 deletions python/pyspark/pandas/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,6 @@ def cumsum(self: FrameLike, skipna: bool = True) -> FrameLike:
return self._apply_series_op(lambda psser: psser._cumsum(skipna), should_resolve=True)

# TODO: add 'axis' parameter
# TODO: use pandas_udf to support negative values and other options later
# other window except unbounded ones is supported as of Spark 3.0.
def cumprod(self: FrameLike, skipna: bool = True) -> FrameLike:
"""
Return cumulative product over a DataFrame or Series axis.
Expand Down
34 changes: 7 additions & 27 deletions python/pyspark/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
DoubleType,
FloatType,
IntegerType,
IntegralType,
LongType,
NumericType,
Row,
Expand Down Expand Up @@ -6973,36 +6972,17 @@ def _cumsum(self, skipna: bool, part_cols: Sequence["ColumnOrName"] = ()) -> "Se
return psser._cum(F.sum, skipna, part_cols)

def _cumprod(self, skipna: bool, part_cols: Sequence["ColumnOrName"] = ()) -> "Series":
if isinstance(self.spark.data_type, BooleanType):
scol = self._cum(
lambda scol: F.min(F.coalesce(scol, F.lit(True))), skipna, part_cols
).spark.column.cast(LongType())
elif isinstance(self.spark.data_type, NumericType):
num_zeros = self._cum(
lambda scol: F.sum(F.when(scol == 0, 1).otherwise(0)), skipna, part_cols
).spark.column
num_negatives = self._cum(
lambda scol: F.sum(F.when(scol < 0, 1).otherwise(0)), skipna, part_cols
).spark.column
sign = F.when(num_negatives % 2 == 0, 1).otherwise(-1)

abs_prod = F.exp(
self._cum(lambda scol: F.sum(F.log(F.abs(scol))), skipna, part_cols).spark.column
)

scol = F.when(num_zeros > 0, 0).otherwise(sign * abs_prod)

if isinstance(self.spark.data_type, IntegralType):
scol = F.round(scol).cast(LongType())
else:
psser = self
if isinstance(psser.spark.data_type, BooleanType):
psser = psser.spark.transform(lambda scol: scol.cast(LongType()))
elif not isinstance(psser.spark.data_type, NumericType):
raise TypeError(
"Could not convert {} ({}) to numeric".format(
spark_type_to_pandas_dtype(self.spark.data_type),
self.spark.data_type.simpleString(),
spark_type_to_pandas_dtype(psser.spark.data_type),
psser.spark.data_type.simpleString(),
)
)

return self._with_new_scol(scol)
return psser._cum(lambda c: SF.product(c, skipna), skipna, part_cols)

# ----------------------------------------------------------------------
# Accessor Methods
Expand Down

0 comments on commit 398bff7

Please sign in to comment.