Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[auto-merge] branch-24.10 to branch-24.12 [skip ci] [bot] #776

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 77 additions & 33 deletions python/src/spark_rapids_ml/knn.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,12 @@ class ApproximateNearestNeighbors(
k: int (default = 5)
the default number of approximate nearest neighbors to retrieve for each query.

If fewer than k neighbors are found for a query (for example, due to a small nprobe value):
(1)In ivfflat and ivfpq:
(a) If no item vector is probed, the indices are filled with long_max (9,223,372,036,854,775,807) and distances are set to infinity.
(b) If at least one item vector is probed, the indices are filled with the top-1 neighbor's ID, and distances are filled with infinity.
(2) cagra does not have this problem, as at least itopk_size (where itopk_size ≥ k) items are always probed.

algorithm: str (default = 'ivfflat')
the algorithm parameter to be passed into cuML. It currently must be 'ivfflat', 'ivfpq' or 'cagra'. Other algorithms are expected to be supported later.

Expand Down Expand Up @@ -1329,6 +1335,30 @@ def _cal_cuvs_ivf_flat_params_and_check(

return (ivfflat_index_params, ivfflat_search_params)

@classmethod
def _cal_cuvs_ivf_pq_params_and_check(
cls, algoParams: Optional[Dict[str, Any]], metric: str, topk: int
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
pq_index_params: Dict[str, Any] = {"metric": metric}
pq_search_params: Dict[str, Any] = {}

if algoParams is not None:
for p in algoParams:
if p in {"n_probes", "nprobe"}:
pq_search_params["n_probes"] = algoParams[p]
elif p in {"lut_dtype", "internal_distance_dtype"}:
pq_search_params[p] = algoParams[p]
elif p in {"n_lists", "nlist"}:
pq_index_params["n_lists"] = algoParams[p]
elif p in {"M", "pq_dim"}:
pq_index_params["pq_dim"] = algoParams[p]
elif p in {"n_bits", "pq_bits"}:
pq_index_params["pq_bits"] = algoParams[p]
else:
pq_index_params[p] = algoParams[p]

return (pq_index_params, pq_search_params)

def kneighbors(
self, query_df: DataFrame, sort_knn_df_by_query_id: bool = True
) -> Tuple[DataFrame, DataFrame, DataFrame]:
Expand Down Expand Up @@ -1416,12 +1446,17 @@ def _get_cuml_transform_func(
"cosine",
}

if cuml_alg_params["algorithm"] != "ivfpq":
check_fn = (
self._cal_cagra_params_and_check
if cuml_alg_params["algorithm"] == "cagra"
else self._cal_cuvs_ivf_flat_params_and_check
)
if (
cuml_alg_params["algorithm"] != "brute"
): # brute links to CPUNearestNeighborsModel of benchmark.bench_nearest_neighbors
if cuml_alg_params["algorithm"] == "cagra":
check_fn = self._cal_cagra_params_and_check
elif cuml_alg_params["algorithm"] in {"ivf_flat", "ivfflat"}:
check_fn = self._cal_cuvs_ivf_flat_params_and_check
else:
assert cuml_alg_params["algorithm"] in {"ivf_pq", "ivfpq"}
check_fn = self._cal_cuvs_ivf_pq_params_and_check

index_params, search_params = check_fn(
algoParams=self.cuml_params["algo_params"],
metric=self.cuml_params["metric"],
Expand All @@ -1431,19 +1466,9 @@ def _get_cuml_transform_func(
def _construct_sgnn() -> CumlT:

if cuml_alg_params["algorithm"] in {"ivf_pq", "ivfpq"}:
from cuml.neighbors import NearestNeighbors as SGNN
from cuvs.neighbors import ivf_pq

# Currently 'usePrecomputedTables' is required by cuml cython API, though the value is ignored in C++.
if (
cuml_alg_params["algorithm"] == "ivfpq"
and cuml_alg_params["algo_params"]
):
if "usePrecomputedTables" not in cuml_alg_params["algo_params"]:
cuml_alg_params["algo_params"]["usePrecomputedTables"] = False

nn_object = SGNN(output_type="cupy", **cuml_alg_params)

return nn_object
return ivf_pq
elif cuml_alg_params["algorithm"] in {"ivfflat" or "ivf_flat"}:
from cuvs.neighbors import ivf_flat

Expand All @@ -1470,7 +1495,7 @@ def _transform_internal(
nn_object: CumlT, df: Union[pd.DataFrame, np.ndarray]
) -> pd.DataFrame:

item_row_number = df[row_number_col].to_numpy()
item_row_number = df[row_number_col].to_numpy(dtype=np.int64)
item = df.drop(row_number_col, axis=1) # type: ignore
if input_col is not None:
assert len(item.columns) == 1
Expand Down Expand Up @@ -1498,12 +1523,9 @@ def _transform_internal(

start_time = time.time()

from cuml.neighbors import NearestNeighbors as cumlSGNN
from cuvs.neighbors import cagra, ivf_flat

if not inspect.ismodule(
nn_object
): # ivfpq and derived class (e.g. benchmark.bench_nearest_neighbors.CPUNearestNeighborsModel)
): # derived class (e.g. benchmark.bench_nearest_neighbors.CPUNearestNeighborsModel)
nn_object.fit(item)
else: # cuvs ivf_flat or cagra
build_params = nn_object.IndexParams(**index_params)
Expand Down Expand Up @@ -1544,35 +1566,57 @@ def _transform_internal(

if not inspect.ismodule(
nn_object
): # ivfpq and derived class (e.g. benchmark.bench_nearest_neighbors.CPUNearestNeighborsModel)
): # derived class (e.g. benchmark.bench_nearest_neighbors.CPUNearestNeighborsModel)
distances, indices = nn_object.kneighbors(bcast_qfeatures.value)
else: # cuvs ivf_flat cagra
else: # cuvs ivf_flat cagra ivf_pq
gpu_qfeatures = cp.array(
bcast_qfeatures.value, order="C", dtype="float32"
)

assert cuml_alg_params["n_neighbors"] <= len(
item
), "k is larger than the number of item vectors on a GPU. Please increase the dataset size or use less GPUs"

distances, indices = nn_object.search(
nn_object.SearchParams(**search_params),
index_obj,
gpu_qfeatures,
cuml_alg_params["n_neighbors"],
)

if cuml_alg_params["algorithm"] in {"ivf_pq", "ivfpq"}:
from cuvs.neighbors import refine

distances, indices = refine(
dataset=item,
queries=gpu_qfeatures,
candidates=indices,
k=cuml_alg_params["n_neighbors"],
metric=cuml_alg_params["metric"],
)

distances = cp.asarray(distances)
indices = cp.asarray(indices)

# Note cuML kneighbors applys an extra square root on the l2 distances.
# Here applies square to obtain the actual l2 distances.
if isinstance(nn_object, cumlSGNN):
if (
cuml_alg_params["metric"] == "euclidean"
or cuml_alg_params["metric"] == "l2"
):
distances = distances * distances
# in case refine API reset inf distances to 0.
if cuml_alg_params["algorithm"] in {"ivf_pq", "ivfpq"}:
distances[indices >= len(item)] = float("inf")

# for the case top-1 nn got filled into indices
top1_ind = indices[:, 0]
rest_indices = indices[:, 1:]
rest_distances = distances[:, 1:]
rest_distances[rest_indices == top1_ind[:, cp.newaxis]] = float(
"inf"
)

if isinstance(distances, cp.ndarray):
distances = distances.get()

# in case a query did not probe any items, indices are filled with int64 max and distances are filled with inf
item_row_number = np.append(item_row_number, np.iinfo("int64").max)
if isinstance(indices, cp.ndarray):
indices[indices >= len(item)] = len(item)
indices = indices.get()

indices_global = item_row_number[indices]
Expand Down
2 changes: 2 additions & 0 deletions python/src/spark_rapids_ml/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ def dtype_to_pyspark_type(dtype: Union[np.dtype, str]) -> str:
return "double"
elif dtype == np.int32:
return "int"
elif dtype == np.int64:
return "long"
elif dtype == np.int16:
return "short"
elif dtype == np.int64:
Expand Down
Loading
Loading