Distributed Machine Learning

5.3. Distributed Machine Learning#

If the volume of training data is large, Dask-ML provides distributed machine learning capabilities that allow for training on big data across a cluster. Currently, Dask offers two types of distributed machine learning APIs:

  • scikit-learn style

  • XGBoost and LightGBM decision tree style

scikit-learn API#

Leveraging the distributed computing capabilities of Dask Array, Dask DataFrame, and Dask Delayed, Dask-ML has implemented distributed versions of machine learning algorithms, similar to scikit-learn. For example, in dask_ml.linear_model, there are linear regression LinearRegression and logistic regression LogisticRegression, and in dask_ml.cluster, there is KMeans. Dask-ML strives to keep the usage of these machine learning algorithms consistent with scikit-learn.

On a Dask cluster consisting of 2 computing nodes, use the linear models from dask_ml.linear_model. Each computing node in this cluster has 90GiB of memory, and we randomly generate a dataset of 37GiB, which is then split into a training set and a testing set.

Hide code cell content
%config InlineBackend.figure_format = 'svg'
import time

import seaborn as sns
import pandas as pd

from dask.distributed import Client, LocalCluster
import dask_ml.datasets
import sklearn.linear_model
import dask_ml.linear_model
from dask_ml.model_selection import train_test_split
client = Client("10.0.0.3:8786")
client

Client

Client-ad77e682-0ae4-11ef-8730-000012e4fe80

Connection method: Direct
Dashboard: http://10.0.0.3:43549/status

Scheduler Info

Scheduler

Scheduler-c7851ab9-9963-4c85-b394-bb74e8e2967f

Comm: tcp://10.0.0.3:8786 Workers: 2
Dashboard: http://10.0.0.3:43549/status Total threads: 128
Started: 5 hours ago Total memory: 180.00 GiB

Workers

Worker: tcp://10.0.0.2:46501

Comm: tcp://10.0.0.2:46501 Total threads: 64
Dashboard: http://10.0.0.2:42539/status Memory: 90.00 GiB
Nanny: tcp://10.0.0.2:40241
Local directory: /tmp/dask-scratch-space/worker-rxylv59_
Tasks executing: Tasks in memory:
Tasks ready: Tasks in flight:
CPU usage: 6.0% Last seen: Just now
Memory usage: 301.68 MiB Spilled bytes: 0 B
Read bytes: 572.9739612289254 B Write bytes: 1.71 kiB

Worker: tcp://10.0.0.3:39997

Comm: tcp://10.0.0.3:39997 Total threads: 64
Dashboard: http://10.0.0.3:40955/status Memory: 90.00 GiB
Nanny: tcp://10.0.0.3:34825
Local directory: /tmp/dask-scratch-space/worker-kdphx4zv
Tasks executing: Tasks in memory:
Tasks ready: Tasks in flight:
CPU usage: 4.0% Last seen: Just now
Memory usage: 300.18 MiB Spilled bytes: 0 B
Read bytes: 8.27 kiB Write bytes: 10.57 kiB
X, y = dask_ml.datasets.make_classification(n_samples=10_000_000, 
        n_features=500, 
        random_state=42,
        chunks=10_000_000 // 100
)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
X
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/base.py:1462: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
  warnings.warn(
Array Chunk
Bytes 37.25 GiB 381.47 MiB
Shape (10000000, 500) (100000, 500)
Dask graph 100 chunks in 1 graph layer
Data type float64 numpy.ndarray
500 10000000

Call the scikit-learn sytle fit() method:

lr = dask_ml.linear_model.LogisticRegression(solver="lbfgs").fit(X_train, y_train)
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/config.py:789: FutureWarning: Dask configuration key 'fuse_ave_width' has been deprecated; please use 'optimization.fuse.ave-width' instead
  warnings.warn(

The trained model can be used for prediction (predict()), as well as for calculating accuracy (score()).

y_predicted = lr.predict(X_test)
y_predicted[:5].compute()
array([ True, False,  True,  True,  True])
lr.score(X_test, y_test).compute()
0.668674

If the same size of data is used to train a model on a single machine with scikit-learn, it would result in an error due to insufficient memory.

Although the distributed training API of Dask-ML is extremely similar to scikit-learn, the fact that scikit-learn can only use one single core while Dask-ML can utilize multiple cores or even a cluster does not mean that Dask-ML should be chosen in all scenarios, as it is not always the best option in terms of performance or cost-effectiveness. This is similar to the relationship between Dask DataFrame and pandas; if the dataset can fit into the memory of a single machine, the performance and compatibility of native pandas, NumPy, and scikit-learn are always optimal.

The following code performs a performance analysis on training data of different scales. In scenarios where the data volume is small and running on a single machine with multiple cores, the performance of Dask-ML is not faster than scikit-learn. There are many reasons for this, including:

  • Many machine learning algorithms are iterative. In scikit-learn, iterative algorithms are implemented using Python’s native for loops; Dask-ML has adopted this approach of for loops. But for Dask’s Task Graph, for loops can make the Task Graph quite bloated, and the execution efficiency is not very high.

  • The distributed implementation requires distributing and collecting data across different processes, which adds a lot of extra data synchronization and communication overhead compared to a single machine and single process.

You can also test the performance based on the memory you have available.

client = Client(LocalCluster())
client

Client

Client-b4f64c31-0ae4-11ef-8730-000012e4fe80

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

num_sample = [500_000, 1_000_000, 1_500_000]
num_feature = 1_000
timings = []

for n in num_sample:
    X, y = dask_ml.datasets.make_classification(n_samples=n, 
        n_features=num_feature, 
        random_state=42,
        chunks=n // 10
    )
    t1 = time.time()
    sklearn.linear_model.LogisticRegression(solver="lbfgs").fit(X, y)
    timings.append(('scikit-learn', n, time.time() - t1))
    t1 = time.time()
    dask_ml.linear_model.LogisticRegression(solver="lbfgs").fit(X, y)
    timings.append(('dask-ml', n, time.time() - t1))

df = pd.DataFrame(timings, columns=['method', '# of samples', 'time'])
sns.barplot(data=df, x='# of samples', y='time', hue='method')
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/base.py:1462: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
  warnings.warn(
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/config.py:789: FutureWarning: Dask configuration key 'fuse_ave_width' has been deprecated; please use 'optimization.fuse.ave-width' instead
  warnings.warn(
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/base.py:1462: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
  warnings.warn(
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/config.py:789: FutureWarning: Dask configuration key 'fuse_ave_width' has been deprecated; please use 'optimization.fuse.ave-width' instead
  warnings.warn(
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/base.py:1462: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
  warnings.warn(
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/config.py:789: FutureWarning: Dask configuration key 'fuse_ave_width' has been deprecated; please use 'optimization.fuse.ave-width' instead
  warnings.warn(
<Axes: xlabel='# of samples', ylabel='time'>
../_images/aefaeebf9a83283fd3aecb6b6e4ec3fdb2a6e5ddcda3cd34545294c7e240369f.svg

In the context of logistic regression, Dask-ML does not offer significant advantages over scikit-learn when running on a single machine with multiple cores. Moreover, many traditional machine learning algorithms do not have high demands for large training data, and their performance does not increase significantly with the addition of more training data. The relationship between the volume of training data and model performance can be visualized through learning curves; for algorithms like Naive Bayes, performance improvements with increased training data are quite limited. If certain machine learning algorithms cannot be trained in a distributed manner or if the cost of distributed training is high, it might be worth considering sampling the training data to a size that fits into the memory of a single machine and using a single-machine framework like scikit-learn.

In summary, if you have training data that exceeds the memory capacity of a single machine, you should consider various factors.

XGBoost and LightGBM#

XGBoost and LightGBM are two implementations of decision tree models that are inherently friendly to distributed training and have been integrated with Dask. Below, we use an example to illustrate how to use Dask and XGBoost for distributed training, and LightGBM is similar.

In XGBoost, training a model can be done using either the train() method or the scikit-learn-style fit() method. Both approaches support Dask distributed training.

The code below compares the performance of single-machine XGBoost and Dask distributed training. When using Dask, users need to change xgboost.DMatrix to xgboost.dask.DaskDMatrix, where xgboost.dask.DaskDMatrix can convert distributed Dask Arrays or Dask DataFrames into the data format required by XGBoost. Users also need to replace xgboost.train() with xgboost.dask.train() and pass in the Dask cluster client client.

import xgboost as xgb

num_sample = [100_000, 500_000]
num_feature = 1_000
xgb_timings = []

for n in num_sample:
    X, y = dask_ml.datasets.make_classification(n_samples=n, 
        n_features=num_feature, 
        random_state=42,
        chunks=n // 10
    )
    dtrain = xgb.DMatrix(X, y)
    t1 = time.time()
    xgb.train(
        {"tree_method": "hist", "objective": "binary:hinge"},
        dtrain,
        num_boost_round=4,
        evals=[(dtrain, "train")],
        verbose_eval=False,
    )
    xgb_timings.append(('xgboost', n, time.time() - t1))
    dtrain_dask = xgb.dask.DaskDMatrix(client, X, y)
    t1 = time.time()
    xgb.dask.train(
        client,
        {"tree_method": "hist", "objective": "binary:hinge"},
        dtrain_dask,
        num_boost_round=4,
        evals=[(dtrain_dask, "train")],
        verbose_eval=False,
    )
    xgb_timings.append(('dask-ml', n, time.time() - t1))

df = pd.DataFrame(xgb_timings, columns=['method', '# of samples', 'time'])
sns.barplot(data=df, x='# of samples', y='time', hue='method')
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/base.py:1462: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
  warnings.warn(
[22:13:43] task [xgboost.dask-0]:tcp://127.0.0.1:44219 got new rank 0
[22:13:43] task [xgboost.dask-1]:tcp://127.0.0.1:41549 got new rank 1
[22:13:43] task [xgboost.dask-2]:tcp://127.0.0.1:42877 got new rank 2
[22:13:43] task [xgboost.dask-3]:tcp://127.0.0.1:34321 got new rank 3
[22:13:43] task [xgboost.dask-4]:tcp://127.0.0.1:36039 got new rank 4
[22:13:43] task [xgboost.dask-5]:tcp://127.0.0.1:35057 got new rank 5
[22:13:43] task [xgboost.dask-6]:tcp://127.0.0.1:36811 got new rank 6
[22:13:43] task [xgboost.dask-7]:tcp://127.0.0.1:42081 got new rank 7
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/base.py:1462: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
  warnings.warn(
[22:16:27] task [xgboost.dask-0]:tcp://127.0.0.1:44219 got new rank 0
[22:16:27] task [xgboost.dask-1]:tcp://127.0.0.1:41549 got new rank 1
[22:16:27] task [xgboost.dask-2]:tcp://127.0.0.1:42877 got new rank 2
[22:16:27] task [xgboost.dask-3]:tcp://127.0.0.1:34321 got new rank 3
[22:16:27] task [xgboost.dask-4]:tcp://127.0.0.1:36039 got new rank 4
[22:16:27] task [xgboost.dask-5]:tcp://127.0.0.1:35057 got new rank 5
[22:16:28] task [xgboost.dask-6]:tcp://127.0.0.1:36811 got new rank 6
[22:16:28] task [xgboost.dask-7]:tcp://127.0.0.1:42081 got new rank 7
<Axes: xlabel='# of samples', ylabel='time'>
../_images/ea474dce5570c7b556fd9d35e310a40fb87354e3c5dcf5fd524193deb8fef31c.svg

If using XGBoost’s scikit-learn-style API, you need to change xgboost.XGBClassifier to xgboost.dask.DaskXGBClassifier or xgboost.XGBRegressor to xgboost.dask.DaskXGBRegressor.

Distributed GPU Training#

Dask can manage multiple GPUs, and XGBoost can perform multi-GPU training based on Dask. We need to install Dask-CUDA to launch a multi-GPU Dask cluster. Dask can distribute XGBoost training across several GPU devices.

from dask_cuda import LocalCUDACluster
import xgboost as xgb
client = Client(LocalCUDACluster())
client
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/distributed/deploy/spec.py:324: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 44607 instead
  self.scheduler = cls(**self.scheduler_spec.get("options", {}))

Client

Client-7c3ce804-0aef-11ef-98d2-000012e4fe80

Connection method: Cluster object Cluster type: dask_cuda.LocalCUDACluster
Dashboard: http://127.0.0.1:44607/status

Cluster Info

clf = xgb.dask.DaskXGBClassifier(verbosity=1)
clf.set_params(tree_method="hist", device="cuda")
clf.client = client
X, y = dask_ml.datasets.make_classification(n_samples=100_000, 
        n_features=1_000, 
        random_state=42,
        chunks=100_000 // 100
)
clf.fit(X, y, eval_set=[(X, y)], verbose=False)
prediction = clf.predict(X)
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask_ml/datasets.py:373: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
  informative_idx, beta = dask.compute(
[23:01:19] task [xgboost.dask-0]:tcp://127.0.0.1:45305 got new rank 0
[23:01:19] task [xgboost.dask-1]:tcp://127.0.0.1:38835 got new rank 1
[23:01:19] task [xgboost.dask-2]:tcp://127.0.0.1:46315 got new rank 2
[23:01:20] task [xgboost.dask-3]:tcp://127.0.0.1:38331 got new rank 3