Skip to content

Commit

Permalink
Enable UMAP to properly handle sparse data (#772)
Browse files Browse the repository at this point in the history
* Fix UMAP sparse data

Signed-off-by: Rishi Chandra <rishic@nvidia.com>

* Add TBDs

* Address comments - round 1

* Address comments pt 2

* Address comments pt 3

* CSR format implementation and tests

* update type check

* Fix benchmark script

* assert_umap_model func, parametrize metric

* Return dataframe from _fit as separate rows

* Broadcast at transform, fix CSR chunking

* Update NNZ warning, simplify chunking process

* type checking

* Add CSR chunk warning.

* Update warning msg

---------

Signed-off-by: Rishi Chandra <rishic@nvidia.com>
  • Loading branch information
rishic3 authored Nov 9, 2024
1 parent 2d814c4 commit d10e9f0
Show file tree
Hide file tree
Showing 5 changed files with 509 additions and 268 deletions.
11 changes: 0 additions & 11 deletions python/benchmark/benchmark/bench_kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,17 +192,6 @@ def gpu_cache_df(df: DataFrame) -> DataFrame:

cluster_centers = gpu_model.cluster_centers_

# temporary patch for DB with spark-rapids plugin
# this part is not timed so overhead is not critical, but should be reverted
# once https://github.com/NVIDIA/spark-rapids/issues/10770 is fixed
db_version = os.environ.get("DATABRICKS_RUNTIME_VERSION")
if db_version:
dim = len(cluster_centers[0])
# inject unsupported expr (slice) that is essentially a noop
df_for_scoring = df_for_scoring.select(
F.slice(feature_col, 1, dim).alias(feature_col), output_col
)

if num_cpus > 0:
from pyspark.ml.clustering import KMeans as SparkKMeans

Expand Down
63 changes: 43 additions & 20 deletions python/benchmark/benchmark/bench_umap.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import numpy as np
from pandas import DataFrame as PandasDataFrame
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.functions import array_to_vector, vector_to_array
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, sum
from pyspark.sql.functions import array, col, sum

from benchmark.base import BenchmarkBase
from benchmark.utils import inspect_default_params_from_func, with_benchmark
Expand Down Expand Up @@ -105,7 +105,7 @@ def score(

pdf: PandasDataFrame = transformed_df.toPandas()
embedding = np.array(pdf[transformed_col].to_list())
input = np.array(pdf[data_col].to_list())
input = np.array(pdf[data_col].to_list()).astype(np.float32)
score = trustworthiness(input, embedding, n_neighbors=15)

return score
Expand Down Expand Up @@ -162,39 +162,45 @@ def gpu_cache_df(df: DataFrame) -> DataFrame:
else:
gpu_estimator = gpu_estimator.setFeaturesCols(input_cols)

output_col = "embedding"
gpu_estimator = gpu_estimator.setOutputCol(output_col)

gpu_model, fit_time = with_benchmark(
"gpu fit", lambda: gpu_estimator.fit(train_df)
)

def transform(model: UMAPModel, df: DataFrame) -> DataFrame:
transformed_df = model.transform(df)
transformed_df.count()
return transformed_df

transformed_df, transform_time = with_benchmark(
"gpu transform", lambda: transform(gpu_model, train_df)
output_col = "embedding"
transformed_df = gpu_model.setOutputCol(output_col).transform(train_df)
_, transform_time = with_benchmark(
"gpu transform", lambda: transformed_df.foreach(lambda _: None)
)

total_time = round(time.time() - func_start_time, 2)
print(f"gpu total took: {total_time} sec")
data_col = "features"

df_for_scoring = transformed_df
feature_col = first_col
if not is_single_col:
feature_col = "features_array"
df_for_scoring = transformed_df.select(
array(*input_cols).alias("features_array"), output_col
)
elif is_vector_col:
df_for_scoring = transformed_df.select(
vector_to_array(col(feature_col)).alias(feature_col), output_col
)

if num_cpus > 0:
from pyspark.ml.feature import PCA as SparkPCA

assert num_gpus <= 0

if is_array_col:
vector_df = train_df.select(
array_to_vector(train_df[first_col]).alias(first_col)
)
elif not is_vector_col:
vector_assembler = VectorAssembler(outputCol="features").setInputCols(
vector_assembler = VectorAssembler(outputCol=first_col).setInputCols(
input_cols
)
vector_df = vector_assembler.transform(train_df).drop(*input_cols)
first_col = "features"
else:
vector_df = train_df

Expand All @@ -209,11 +215,10 @@ def cpu_cache_df(df: DataFrame) -> DataFrame:
"prepare dataset", lambda: cpu_cache_df(vector_df)
)

output_col = "pca_features"

params = self.class_params
print(f"Passing {params} to SparkPCA")

output_col = "pca_features"
cpu_pca = SparkPCA(**params).setInputCol(first_col).setOutputCol(output_col)

cpu_model, fit_time = with_benchmark(
Expand All @@ -233,9 +238,27 @@ def cpu_transform(df: DataFrame) -> None:

total_time = round(time.time() - func_start_time, 2)
print(f"cpu total took: {total_time} sec")
data_col = first_col

score = self.score(transformed_df, data_col, output_col)
# spark ml does not remove the mean in the transformed features, so do that here
# needed for scoring
standard_scaler = (
StandardScaler()
.setWithStd(False)
.setWithMean(True)
.setInputCol(output_col)
.setOutputCol(output_col + "_mean_removed")
)

scaler_model = standard_scaler.fit(transformed_df)
transformed_df = scaler_model.transform(transformed_df).drop(output_col)

feature_col = first_col
output_col = output_col + "_mean_removed"
df_for_scoring = transformed_df.select(
vector_to_array(col(output_col)).alias(output_col), feature_col
)

score = self.score(df_for_scoring, feature_col, output_col)
print(f"trustworthiness score: {score}")

report_dict = {
Expand Down
4 changes: 3 additions & 1 deletion python/src/spark_rapids_ml/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,9 @@ def _train_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame:
concated_nnz = sum(triplet[0].nnz for triplet in inputs) # type: ignore
if concated_nnz > np.iinfo(np.int32).max:
logger.warn(
"the number of non-zero values of a partition is larger than the int32 index dtype of cupyx csr_matrix"
f"The number of non-zero values of a partition exceeds the int32 index dtype. \
cupyx csr_matrix currently does not promote the dtype to int64 when concatenated; \
keeping as scipy csr_matrix to avoid overflow."
)
else:
inputs = [
Expand Down
Loading

0 comments on commit d10e9f0

Please sign in to comment.