Skip to content

Commit

Permalink
[dask] Sort by QID and auto partitioning.
Browse files Browse the repository at this point in the history
- Implement automatic local sort.
- Implement partitioning by query ID.
- Document for distributed ranking.
  • Loading branch information
trivialfis committed Nov 22, 2024
1 parent e988b7c commit 46b5d23
Show file tree
Hide file tree
Showing 12 changed files with 640 additions and 38 deletions.
201 changes: 201 additions & 0 deletions demo/dask/dask_learning_to_rank.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
"""
Learning to rank with the Dask Interface
========================================
.. versionadded:: 3.0.0
This is a demonstration of using XGBoost for learning to rank tasks using the
MSLR_10k_letor dataset. For more infomation about the dataset, please visit its
`description page <https://www.microsoft.com/en-us/research/project/mslr/>`_.
See :ref:`ltr-dist` for a general description for distributed learning to rank and
:ref:`ltr-dask` for Dask-specific features.
"""

from __future__ import annotations

import argparse
import os
from contextlib import contextmanager
from typing import Generator

import dask
import numpy as np
from dask import array as da
from dask import dataframe as dd
from distributed import Client, LocalCluster, wait
from sklearn.datasets import load_svmlight_file
from sklearn.metrics import root_mean_squared_error

from xgboost import dask as dxgb


def load_mlsr_10k(
device: str, data_path: str, cache_path: str
) -> tuple[dd.DataFrame, dd.DataFrame, dd.DataFrame]:
"""Load the MSLR10k dataset from data_path and save parquet files in the cache_path."""
root_path = os.path.expanduser(args.data)
cache_path = os.path.expanduser(args.cache)

# Use only the Fold1 for demo:
# Train, Valid, Test
# {S1,S2,S3}, S4, S5
fold = 1

if not os.path.exists(cache_path):
os.mkdir(cache_path)
fold_path = os.path.join(root_path, f"Fold{fold}")
train_path = os.path.join(fold_path, "train.txt")
valid_path = os.path.join(fold_path, "vali.txt")
test_path = os.path.join(fold_path, "test.txt")

X_train, y_train, qid_train = load_svmlight_file(
train_path, query_id=True, dtype=np.float32
)
columns = [f"f{i}" for i in range(X_train.shape[1])]
X_train = dd.from_array(X_train.toarray(), columns=columns)
y_train = y_train.astype(np.int32)
qid_train = qid_train.astype(np.int32)

X_train["y"] = dd.from_array(y_train)
X_train["qid"] = dd.from_array(qid_train)
X_train.to_parquet(os.path.join(cache_path, "train"), engine="pyarrow")

X_valid, y_valid, qid_valid = load_svmlight_file(
valid_path, query_id=True, dtype=np.float32
)
X_valid = dd.from_array(X_valid.toarray(), columns=columns)
y_valid = y_valid.astype(np.int32)
qid_valid = qid_valid.astype(np.int32)

X_valid["y"] = dd.from_array(y_valid)
X_valid["qid"] = dd.from_array(qid_valid)
X_valid.to_parquet(os.path.join(cache_path, "valid"), engine="pyarrow")

X_test, y_test, qid_test = load_svmlight_file(
test_path, query_id=True, dtype=np.float32
)

X_test = dd.from_array(X_test.toarray(), columns=columns)
y_test = y_test.astype(np.int32)
qid_test = qid_test.astype(np.int32)

X_test["y"] = dd.from_array(y_test)
X_test["qid"] = dd.from_array(qid_test)
X_test.to_parquet(os.path.join(cache_path, "test"), engine="pyarrow")

df_train = dd.read_parquet(
os.path.join(cache_path, "train"), calculate_divisions=True
)
df_valid = dd.read_parquet(
os.path.join(cache_path, "valid"), calculate_divisions=True
)
df_test = dd.read_parquet(
os.path.join(cache_path, "test"), calculate_divisions=True
)

return df_train, df_valid, df_test


def ranking_demo(client: Client, args: argparse.Namespace) -> None:
"""Learning to rank with data sorted locally."""
df_tr, df_va, _ = load_mlsr_10k(args.device, args.data, args.cache)

X_train: dd.DataFrame = df_tr[df_tr.columns.difference(["y", "qid"])]
y_train = df_tr[["y", "qid"]]
Xy_train = dxgb.DaskQuantileDMatrix(client, X_train, y_train.y, qid=y_train.qid)

X_valid: dd.DataFrame = df_va[df_va.columns.difference(["y", "qid"])]
y_valid = df_va[["y", "qid"]]
Xy_valid = dxgb.DaskQuantileDMatrix(
client, X_valid, y_valid.y, qid=y_valid.qid, ref=Xy_train
)
# Upon training, you will see a performance warning about sorting data based on
# query groups.
dxgb.train(
client,
{"objective": "rank:ndcg", "device": args.device},
Xy_train,
evals=[(Xy_train, "Train"), (Xy_valid, "Valid")],
)


def ranking_wo_split_demo(client: Client, args: argparse.Namespace) -> None:
"""Learning to rank with data partitioned according to query groups."""
df_tr, df_va, df_te = load_mlsr_10k(args.device, args.data, args.cache)

X_tr = df_tr[df_tr.columns.difference(["y", "qid"])]
X_va = df_va[df_va.columns.difference(["y", "qid"])]

# `allow_group_split=False` makes sure data is partitioned according the the query
# groups.
ltr = dxgb.DaskXGBRanker(allow_group_split=False)
ltr.client = client
ltr = ltr.fit(
X_tr,
df_tr.y,
qid=df_tr.qid,
eval_set=[(X_tr, df_tr.y), (X_va, df_va.y)],
eval_qid=[df_tr.qid, df_va.qid],
)

df_te = df_te.persist()
wait([df_te])

X_te = df_te[df_te.columns.difference(["y", "qid"])]
predt = ltr.predict(X_te).compute()
y = client.compute(df_te.y).result()

# No available group-based evaluation metric outside of XGBoost we can use, RMSE
# here only for demostration.
print("RMSE:", root_mean_squared_error(y, predt))


@contextmanager
def gen_client(device: str) -> Generator[Client, None, None]:
match device:
case "cuda":
from dask_cuda import LocalCUDACluster

with LocalCUDACluster() as cluster:
with Client(cluster) as client:
with dask.config.set(
{"array.backend": "cupy", "dataframe.backend": "cudf"}
):
yield client
case "cpu":
with LocalCluster() as cluster:
with Client(cluster) as client:
yield client


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Demonstration of learning to rank using XGBoost."
)
parser.add_argument(
"--data",
type=str,
help="Root directory of the MSLR-WEB10K data.",
required=True,
)
parser.add_argument(
"--cache",
type=str,
help="Directory for caching processed data.",
required=True,
)
parser.add_argument("--device", choices=["cpu", "cuda"], default="cpu")
parser.add_argument(
"--no-split",
action="store_true",
help="Flag to indicate query groups should not be split.",
)
args = parser.parse_args()

with gen_client(args.device) as client:
if args.no_split:
ranking_wo_split_demo(client, args)
else:
ranking_demo(client, args)
4 changes: 2 additions & 2 deletions demo/guide-python/learning_to_rank.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
train on relevance degree, and the second part simulates click data and enable the
position debiasing training.
For an overview of learning to rank in XGBoost, please see
:doc:`Learning to Rank </tutorials/learning_to_rank>`.
For an overview of learning to rank in XGBoost, please see :doc:`Learning to Rank
</tutorials/learning_to_rank>`.
"""

from __future__ import annotations
Expand Down
60 changes: 51 additions & 9 deletions doc/tutorials/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,18 @@ Working with asyncio

.. versionadded:: 1.2.0

XGBoost's dask interface supports the new ``asyncio`` in Python and can be integrated into
asynchronous workflows. For using dask with asynchronous operations, please refer to
`this dask example <https://examples.dask.org/applications/async-await.html>`_ and document in
`distributed <https://distributed.dask.org/en/latest/asynchronous.html>`_. To use XGBoost's
dask interface asynchronously, the ``client`` which is passed as an argument for training and
prediction must be operating in asynchronous mode by specifying ``asynchronous=True`` when the
``client`` is created (example below). All functions (including ``DaskDMatrix``) provided
by the functional interface will then return coroutines which can then be awaited to retrieve
their result.
XGBoost's dask interface supports the new :py:mod:`asyncio` in Python and can be
integrated into asynchronous workflows. For using dask with asynchronous operations,
please refer to `this dask example
<https://examples.dask.org/applications/async-await.html>`_ and document in `distributed
<https://distributed.dask.org/en/latest/asynchronous.html>`_. To use XGBoost's Dask
interface asynchronously, the ``client`` which is passed as an argument for training and
prediction must be operating in asynchronous mode by specifying ``asynchronous=True`` when
the ``client`` is created (example below). All functions (including ``DaskDMatrix``)
provided by the functional interface will then return coroutines which can then be awaited
to retrieve their result. Please note that XGBoost is a compute-bounded application, where
parallelism is more important than concurrency. The support for `asyncio` is more about
compatibility instead of performance gain.

Functional interface:

Expand Down Expand Up @@ -526,6 +529,45 @@ See https://github.com/coiled/dask-xgboost-nyctaxi for a set of examples of usin
with dask and optuna.


.. _ltr-dask:

****************
Learning to Rank
****************

.. versionadded:: 3.0.0

.. note::

Position debiasing is not yet supported.

There are two operation modes in the Dask learning to rank for performance reasons. The
difference is whether a distributed global sort is needed. Please see :ref:`ltr-dist` for
how rankings work with distributed training in general. Below we will discuss some of the
Dask-specific features.

First, if you use the :py:class:`~xgboost.dask.DaskQuantileDMatrix` interface or the
:py:class:`~xgboost.dask.DaskXGBRanker` with ``allow_group_split`` set to ``True``,
XGBoost will try to sort and group the samples for each worker based on the query ID. This
mode tries to skip the global sort and sort only worker-local data, and hence no
inter-worker data shuffle. Please note that even worker-local sort is costly, particularly
in terms of memory usage as there's no spilling when
:py:meth:`~pandas.DataFrame.sort_values` is used. XGBoost first checks whether the QID is
already sorted before actually performing the sorting operation. One can choose this if
the query groups are relatively consecutive, meaning most of the samples within a query
group are close to each other and are likely to be resided to the same worker. Don't use
this if you have performed a random shuffle on your data.

If the input data is random, then there's no way we can guarantee most of data within the
same group being in the same worker. For large query groups, this might not be an
issue. But for small query groups, it's possible that each worker gets only one or two
samples from their group for all groups, which can lead to disastrous performance. In that
case, we can partition the data according to query group, which is the default behavior of
the :py:class:`~xgboost.dask.DaskXGBRanker` unless the ``allow_group_split`` is set to
``True``. This mode performs a sort and a groupby on the entire dataset in addition to an
encoding operation for the query group IDs, which can lead to slow performance. See
:ref:`sphx_glr_python_dask-examples_dask_learning_to_rank.py` for a worked example.

.. _tracker-ip:

***************
Expand Down
18 changes: 17 additions & 1 deletion doc/tutorials/learning_to_rank.rst
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,26 @@ On the other hand, if you have comparatively small amount of training data:

For any method chosen, you can modify ``lambdarank_num_pair_per_sample`` to control the amount of pairs generated.

.. _ltr-dist:

********************
Distributed Training
********************
XGBoost implements distributed learning-to-rank with integration of multiple frameworks including Dask, Spark, and PySpark. The interface is similar to the single-node counterpart. Please refer to document of the respective XGBoost interface for details. Scattering a query group onto multiple workers is theoretically sound but can affect the model accuracy. For most of the use cases, the small discrepancy is not an issue, as the amount of training data is usually large when distributed training is used. As a result, users don't need to partition the data based on query groups. As long as each data partition is correctly sorted by query IDs, XGBoost can aggregate sample gradients accordingly.

XGBoost implements distributed learning-to-rank with integration of multiple frameworks
including :doc:`Dask </tutorials/dask>`, :doc:`Spark </jvm/xgboost4j_spark_tutorial>`, and
:doc:`PySpark </tutorials/spark_estimator>`. The interface is similar to the single-node
counterpart. Please refer to document of the respective XGBoost interface for details.

.. warning::

Position-debiasing is not yet supported for existing distributed interfaces.

XGBoost works with collective operations, which means data is scattered to multiple workers. We can divide the data partitions by query group and ensure no query group is split among workers. However, this requires a costly sort and groupby operation and might only be necessary for selected use cases. Splitting and scattering a query group to multiple workers is theoretically sound but can affect the model's accuracy. If there are only a small number of groups sitting at the boundaries of workers, the small discrepancy is not an issue, as the amount of training data is usually large when distributed training is used.

For a longer explanation, assuming the pairwise ranking method is used, we calculate the gradient based on relevance degree by constructing pairs within a query group. If a single query group is split among workers and we use worker-local data for gradient calculation, then we are simply sampling pairs from a smaller group for each worker to calculate the gradient and the evaluation metric. The comparison between each pair doesn't change because a group is split into sub-groups, what changes is the number of total and effective pairs and normalizers like `IDCG`. One can generate more pairs from a large group than it's from two smaller subgroups. As a result, the obtained gradient is still valid from a theoretical standpoint but might not be optimal. As long as each data partitions within a worker are correctly sorted by query IDs, XGBoost can aggregate sample gradients accordingly. And both the (Py)Spark interface and the Dask interface can sort the data according to query ID, please see respected tutorials for more information.

However, it's possible that a distributed framework shuffles the data during map reduce and splits every query group into multiple workers. In that case, the performance would be disastrous. As a result, it depends on the data and the framework for whether a sorted groupby is needed.

*******************
Reproducible Result
Expand Down
Loading

0 comments on commit 46b5d23

Please sign in to comment.