5.3. Distributed Machine Learning#
If the volume of training data is large, Dask-ML provides distributed machine learning capabilities that allow for training on big data across a cluster. Currently, Dask offers two types of distributed machine learning APIs:
scikit-learn style
XGBoost and LightGBM decision tree style
scikit-learn API#
Leveraging the distributed computing capabilities of Dask Array, Dask DataFrame, and Dask Delayed, Dask-ML has implemented distributed versions of machine learning algorithms, similar to scikit-learn. For example, in dask_ml.linear_model
, there are linear regression LinearRegression
and logistic regression LogisticRegression
, and in dask_ml.cluster
, there is KMeans
. Dask-ML strives to keep the usage of these machine learning algorithms consistent with scikit-learn.
On a Dask cluster consisting of 2 computing nodes, use the linear models from dask_ml.linear_model
. Each computing node in this cluster has 90GiB of memory, and we randomly generate a dataset of 37GiB, which is then split into a training set and a testing set.
Show code cell content
%config InlineBackend.figure_format = 'svg'
import time
import seaborn as sns
import pandas as pd
from dask.distributed import Client, LocalCluster
import dask_ml.datasets
import sklearn.linear_model
import dask_ml.linear_model
from dask_ml.model_selection import train_test_split
client = Client("10.0.0.3:8786")
client
Client
Client-ad77e682-0ae4-11ef-8730-000012e4fe80
Connection method: Direct | |
Dashboard: http://10.0.0.3:43549/status |
Scheduler Info
Scheduler
Scheduler-c7851ab9-9963-4c85-b394-bb74e8e2967f
Comm: tcp://10.0.0.3:8786 | Workers: 2 |
Dashboard: http://10.0.0.3:43549/status | Total threads: 128 |
Started: 5 hours ago | Total memory: 180.00 GiB |
Workers
Worker: tcp://10.0.0.2:46501
Comm: tcp://10.0.0.2:46501 | Total threads: 64 |
Dashboard: http://10.0.0.2:42539/status | Memory: 90.00 GiB |
Nanny: tcp://10.0.0.2:40241 | |
Local directory: /tmp/dask-scratch-space/worker-rxylv59_ | |
Tasks executing: | Tasks in memory: |
Tasks ready: | Tasks in flight: |
CPU usage: 6.0% | Last seen: Just now |
Memory usage: 301.68 MiB | Spilled bytes: 0 B |
Read bytes: 572.9739612289254 B | Write bytes: 1.71 kiB |
Worker: tcp://10.0.0.3:39997
Comm: tcp://10.0.0.3:39997 | Total threads: 64 |
Dashboard: http://10.0.0.3:40955/status | Memory: 90.00 GiB |
Nanny: tcp://10.0.0.3:34825 | |
Local directory: /tmp/dask-scratch-space/worker-kdphx4zv | |
Tasks executing: | Tasks in memory: |
Tasks ready: | Tasks in flight: |
CPU usage: 4.0% | Last seen: Just now |
Memory usage: 300.18 MiB | Spilled bytes: 0 B |
Read bytes: 8.27 kiB | Write bytes: 10.57 kiB |
X, y = dask_ml.datasets.make_classification(n_samples=10_000_000,
n_features=500,
random_state=42,
chunks=10_000_000 // 100
)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
X
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/base.py:1462: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
warnings.warn(
|
Call the scikit-learn sytle fit()
method:
lr = dask_ml.linear_model.LogisticRegression(solver="lbfgs").fit(X_train, y_train)
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/config.py:789: FutureWarning: Dask configuration key 'fuse_ave_width' has been deprecated; please use 'optimization.fuse.ave-width' instead
warnings.warn(
The trained model can be used for prediction (predict()
), as well as for calculating accuracy (score()
).
y_predicted = lr.predict(X_test)
y_predicted[:5].compute()
array([ True, False, True, True, True])
lr.score(X_test, y_test).compute()
0.668674
If the same size of data is used to train a model on a single machine with scikit-learn, it would result in an error due to insufficient memory.
Although the distributed training API of Dask-ML is extremely similar to scikit-learn, the fact that scikit-learn can only use one single core while Dask-ML can utilize multiple cores or even a cluster does not mean that Dask-ML should be chosen in all scenarios, as it is not always the best option in terms of performance or cost-effectiveness. This is similar to the relationship between Dask DataFrame and pandas; if the dataset can fit into the memory of a single machine, the performance and compatibility of native pandas, NumPy, and scikit-learn are always optimal.
The following code performs a performance analysis on training data of different scales. In scenarios where the data volume is small and running on a single machine with multiple cores, the performance of Dask-ML is not faster than scikit-learn. There are many reasons for this, including:
Many machine learning algorithms are iterative. In scikit-learn, iterative algorithms are implemented using Python’s native
for
loops; Dask-ML has adopted this approach offor
loops. But for Dask’s Task Graph,for
loops can make the Task Graph quite bloated, and the execution efficiency is not very high.The distributed implementation requires distributing and collecting data across different processes, which adds a lot of extra data synchronization and communication overhead compared to a single machine and single process.
You can also test the performance based on the memory you have available.
client = Client(LocalCluster())
client
Client
Client-b4f64c31-0ae4-11ef-8730-000012e4fe80
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
1872fd25
Dashboard: http://127.0.0.1:8787/status | Workers: 8 |
Total threads: 64 | Total memory: 90.00 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-897dca6c-6012-4df7-9a10-bd08f8810617
Comm: tcp://127.0.0.1:38477 | Workers: 8 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 64 |
Started: Just now | Total memory: 90.00 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:44219 | Total threads: 8 |
Dashboard: http://127.0.0.1:36081/status | Memory: 11.25 GiB |
Nanny: tcp://127.0.0.1:34355 | |
Local directory: /tmp/dask-scratch-space/worker-439c1uaa |
Worker: 1
Comm: tcp://127.0.0.1:41549 | Total threads: 8 |
Dashboard: http://127.0.0.1:44857/status | Memory: 11.25 GiB |
Nanny: tcp://127.0.0.1:41265 | |
Local directory: /tmp/dask-scratch-space/worker-hyxlvh30 |
Worker: 2
Comm: tcp://127.0.0.1:42877 | Total threads: 8 |
Dashboard: http://127.0.0.1:40235/status | Memory: 11.25 GiB |
Nanny: tcp://127.0.0.1:40939 | |
Local directory: /tmp/dask-scratch-space/worker-e70v3hq2 |
Worker: 3
Comm: tcp://127.0.0.1:34321 | Total threads: 8 |
Dashboard: http://127.0.0.1:40295/status | Memory: 11.25 GiB |
Nanny: tcp://127.0.0.1:35007 | |
Local directory: /tmp/dask-scratch-space/worker-udlmb2zo |
Worker: 4
Comm: tcp://127.0.0.1:36039 | Total threads: 8 |
Dashboard: http://127.0.0.1:45691/status | Memory: 11.25 GiB |
Nanny: tcp://127.0.0.1:34883 | |
Local directory: /tmp/dask-scratch-space/worker-g5h5ob4b |
Worker: 5
Comm: tcp://127.0.0.1:35057 | Total threads: 8 |
Dashboard: http://127.0.0.1:43309/status | Memory: 11.25 GiB |
Nanny: tcp://127.0.0.1:43945 | |
Local directory: /tmp/dask-scratch-space/worker-61hsl1ap |
Worker: 6
Comm: tcp://127.0.0.1:36811 | Total threads: 8 |
Dashboard: http://127.0.0.1:44197/status | Memory: 11.25 GiB |
Nanny: tcp://127.0.0.1:44607 | |
Local directory: /tmp/dask-scratch-space/worker-syjczr8e |
Worker: 7
Comm: tcp://127.0.0.1:42081 | Total threads: 8 |
Dashboard: http://127.0.0.1:35819/status | Memory: 11.25 GiB |
Nanny: tcp://127.0.0.1:33971 | |
Local directory: /tmp/dask-scratch-space/worker-1rw7_3km |
num_sample = [500_000, 1_000_000, 1_500_000]
num_feature = 1_000
timings = []
for n in num_sample:
X, y = dask_ml.datasets.make_classification(n_samples=n,
n_features=num_feature,
random_state=42,
chunks=n // 10
)
t1 = time.time()
sklearn.linear_model.LogisticRegression(solver="lbfgs").fit(X, y)
timings.append(('scikit-learn', n, time.time() - t1))
t1 = time.time()
dask_ml.linear_model.LogisticRegression(solver="lbfgs").fit(X, y)
timings.append(('dask-ml', n, time.time() - t1))
df = pd.DataFrame(timings, columns=['method', '# of samples', 'time'])
sns.barplot(data=df, x='# of samples', y='time', hue='method')
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/base.py:1462: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
warnings.warn(
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/config.py:789: FutureWarning: Dask configuration key 'fuse_ave_width' has been deprecated; please use 'optimization.fuse.ave-width' instead
warnings.warn(
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/base.py:1462: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
warnings.warn(
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/config.py:789: FutureWarning: Dask configuration key 'fuse_ave_width' has been deprecated; please use 'optimization.fuse.ave-width' instead
warnings.warn(
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/base.py:1462: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
warnings.warn(
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/config.py:789: FutureWarning: Dask configuration key 'fuse_ave_width' has been deprecated; please use 'optimization.fuse.ave-width' instead
warnings.warn(
<Axes: xlabel='# of samples', ylabel='time'>
In the context of logistic regression, Dask-ML does not offer significant advantages over scikit-learn when running on a single machine with multiple cores. Moreover, many traditional machine learning algorithms do not have high demands for large training data, and their performance does not increase significantly with the addition of more training data. The relationship between the volume of training data and model performance can be visualized through learning curves; for algorithms like Naive Bayes, performance improvements with increased training data are quite limited. If certain machine learning algorithms cannot be trained in a distributed manner or if the cost of distributed training is high, it might be worth considering sampling the training data to a size that fits into the memory of a single machine and using a single-machine framework like scikit-learn.
In summary, if you have training data that exceeds the memory capacity of a single machine, you should consider various factors.
XGBoost and LightGBM#
XGBoost and LightGBM are two implementations of decision tree models that are inherently friendly to distributed training and have been integrated with Dask. Below, we use an example to illustrate how to use Dask and XGBoost for distributed training, and LightGBM is similar.
In XGBoost, training a model can be done using either the train()
method or the scikit-learn-style fit()
method. Both approaches support Dask distributed training.
The code below compares the performance of single-machine XGBoost and Dask distributed training. When using Dask, users need to change xgboost.DMatrix
to xgboost.dask.DaskDMatrix
, where xgboost.dask.DaskDMatrix
can convert distributed Dask Arrays or Dask DataFrames into the data format required by XGBoost. Users also need to replace xgboost.train()
with xgboost.dask.train()
and pass in the Dask cluster client client
.
import xgboost as xgb
num_sample = [100_000, 500_000]
num_feature = 1_000
xgb_timings = []
for n in num_sample:
X, y = dask_ml.datasets.make_classification(n_samples=n,
n_features=num_feature,
random_state=42,
chunks=n // 10
)
dtrain = xgb.DMatrix(X, y)
t1 = time.time()
xgb.train(
{"tree_method": "hist", "objective": "binary:hinge"},
dtrain,
num_boost_round=4,
evals=[(dtrain, "train")],
verbose_eval=False,
)
xgb_timings.append(('xgboost', n, time.time() - t1))
dtrain_dask = xgb.dask.DaskDMatrix(client, X, y)
t1 = time.time()
xgb.dask.train(
client,
{"tree_method": "hist", "objective": "binary:hinge"},
dtrain_dask,
num_boost_round=4,
evals=[(dtrain_dask, "train")],
verbose_eval=False,
)
xgb_timings.append(('dask-ml', n, time.time() - t1))
df = pd.DataFrame(xgb_timings, columns=['method', '# of samples', 'time'])
sns.barplot(data=df, x='# of samples', y='time', hue='method')
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/base.py:1462: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
warnings.warn(
[22:13:43] task [xgboost.dask-0]:tcp://127.0.0.1:44219 got new rank 0
[22:13:43] task [xgboost.dask-1]:tcp://127.0.0.1:41549 got new rank 1
[22:13:43] task [xgboost.dask-2]:tcp://127.0.0.1:42877 got new rank 2
[22:13:43] task [xgboost.dask-3]:tcp://127.0.0.1:34321 got new rank 3
[22:13:43] task [xgboost.dask-4]:tcp://127.0.0.1:36039 got new rank 4
[22:13:43] task [xgboost.dask-5]:tcp://127.0.0.1:35057 got new rank 5
[22:13:43] task [xgboost.dask-6]:tcp://127.0.0.1:36811 got new rank 6
[22:13:43] task [xgboost.dask-7]:tcp://127.0.0.1:42081 got new rank 7
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask/base.py:1462: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
warnings.warn(
[22:16:27] task [xgboost.dask-0]:tcp://127.0.0.1:44219 got new rank 0
[22:16:27] task [xgboost.dask-1]:tcp://127.0.0.1:41549 got new rank 1
[22:16:27] task [xgboost.dask-2]:tcp://127.0.0.1:42877 got new rank 2
[22:16:27] task [xgboost.dask-3]:tcp://127.0.0.1:34321 got new rank 3
[22:16:27] task [xgboost.dask-4]:tcp://127.0.0.1:36039 got new rank 4
[22:16:27] task [xgboost.dask-5]:tcp://127.0.0.1:35057 got new rank 5
[22:16:28] task [xgboost.dask-6]:tcp://127.0.0.1:36811 got new rank 6
[22:16:28] task [xgboost.dask-7]:tcp://127.0.0.1:42081 got new rank 7
<Axes: xlabel='# of samples', ylabel='time'>
If using XGBoost’s scikit-learn-style API, you need to change xgboost.XGBClassifier
to xgboost.dask.DaskXGBClassifier
or xgboost.XGBRegressor
to xgboost.dask.DaskXGBRegressor
.
Distributed GPU Training#
Dask can manage multiple GPUs, and XGBoost can perform multi-GPU training based on Dask. We need to install Dask-CUDA to launch a multi-GPU Dask cluster. Dask can distribute XGBoost training across several GPU devices.
from dask_cuda import LocalCUDACluster
import xgboost as xgb
client = Client(LocalCUDACluster())
client
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/distributed/deploy/spec.py:324: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 44607 instead
self.scheduler = cls(**self.scheduler_spec.get("options", {}))
Client
Client-7c3ce804-0aef-11ef-98d2-000012e4fe80
Connection method: Cluster object | Cluster type: dask_cuda.LocalCUDACluster |
Dashboard: http://127.0.0.1:44607/status |
Cluster Info
LocalCUDACluster
e461dd92
Dashboard: http://127.0.0.1:44607/status | Workers: 4 |
Total threads: 4 | Total memory: 90.00 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-a6b71eff-839c-4686-9316-a886dc1da17a
Comm: tcp://127.0.0.1:33619 | Workers: 4 |
Dashboard: http://127.0.0.1:44607/status | Total threads: 4 |
Started: Just now | Total memory: 90.00 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:45305 | Total threads: 1 |
Dashboard: http://127.0.0.1:46261/status | Memory: 22.50 GiB |
Nanny: tcp://127.0.0.1:37589 | |
Local directory: /tmp/dask-scratch-space/worker-qo8pr3rx |
Worker: 1
Comm: tcp://127.0.0.1:38835 | Total threads: 1 |
Dashboard: http://127.0.0.1:38961/status | Memory: 22.50 GiB |
Nanny: tcp://127.0.0.1:40985 | |
Local directory: /tmp/dask-scratch-space/worker-vhjea3dv |
Worker: 2
Comm: tcp://127.0.0.1:46315 | Total threads: 1 |
Dashboard: http://127.0.0.1:42153/status | Memory: 22.50 GiB |
Nanny: tcp://127.0.0.1:39945 | |
Local directory: /tmp/dask-scratch-space/worker-5uebhi4w |
Worker: 3
Comm: tcp://127.0.0.1:38331 | Total threads: 1 |
Dashboard: http://127.0.0.1:42005/status | Memory: 22.50 GiB |
Nanny: tcp://127.0.0.1:42591 | |
Local directory: /tmp/dask-scratch-space/worker-zylz7yva |
clf = xgb.dask.DaskXGBClassifier(verbosity=1)
clf.set_params(tree_method="hist", device="cuda")
clf.client = client
X, y = dask_ml.datasets.make_classification(n_samples=100_000,
n_features=1_000,
random_state=42,
chunks=100_000 // 100
)
clf.fit(X, y, eval_set=[(X, y)], verbose=False)
prediction = clf.predict(X)
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/dask_ml/datasets.py:373: UserWarning: Running on a single-machine scheduler when a distributed client is active might lead to unexpected results.
informative_idx, beta = dask.compute(
[23:01:19] task [xgboost.dask-0]:tcp://127.0.0.1:45305 got new rank 0
[23:01:19] task [xgboost.dask-1]:tcp://127.0.0.1:38835 got new rank 1
[23:01:19] task [xgboost.dask-2]:tcp://127.0.0.1:46315 got new rank 2
[23:01:20] task [xgboost.dask-3]:tcp://127.0.0.1:38331 got new rank 3