5.2. Hyperparameter Tuning#

We can use Dask for hyperparameter tuning, mainly in two ways:

  • Based on the joblib backend of scikit-learn, distribute multiple hyperparameter tuning tasks to the Dask cluster.

  • Use the hyperparameter tuning API provided by Dask-ML.

Both of these methods are suitable for scenarios where the training data can fit into the memory of a single machine.

scikit-learn joblib#

scikit-learn provides a rich and easy-to-use interface for model training and hyperparameter tuning. However, most algorithms in scikit-learn can only be run on a single machine. By default, scikit-learn uses joblib to parallelize tasks across multiple cores on a single machine. Hyperparameter tuning tasks such as random search and grid search are easy to parallelize, as there are no dependencies between tasks, making them straightforward to run in parallel.

Example: Flights Prediction with scikit-learn#

Below is a machine learning classification example based on scikit-learn, where we use the grid search to search hyperparameters.

import os

import sys
sys.path.append("..")
from utils import nyc_flights

import numpy as np
import pandas as pd

folder_path = nyc_flights()
file_path = os.path.join(folder_path, "nyc-flights", "1991.csv")
input_cols = [
    "Year",
    "Month",
    "DayofMonth",
    "DayOfWeek",
    "CRSDepTime",
    "CRSArrTime",
    "UniqueCarrier",
    "FlightNum",
    "ActualElapsedTime",
    "Origin",
    "Dest",
    "Distance",
    "Diverted",
    "ArrDelay",
]

df = pd.read_csv(file_path, usecols=input_cols)
df = df.dropna()

# Delay or nor
df["ArrDelayBinary"] = 1.0 * (df["ArrDelay"] > 10)

df = df[df.columns.difference(["ArrDelay"])]

# convert Dest/Origin/UniqueCarrier fields into category types
for col in df.select_dtypes(["object"]).columns:
    df[col] = df[col].astype("category").cat.codes.astype(np.int32)

for col in df.columns:
    df[col] = df[col].astype(np.float32)
from sklearn.linear_model import SGDClassifier

from sklearn.model_selection import GridSearchCV as SkGridSearchCV
from sklearn.model_selection import train_test_split as sk_train_test_split

_y_label = "ArrDelayBinary"
X_train, X_test, y_train, y_test = sk_train_test_split(
    df.loc[:, df.columns != _y_label], 
    df[_y_label], 
    test_size=0.25,
    shuffle=False,
)

model = SGDClassifier(penalty='elasticnet', max_iter=1_000, warm_start=True, loss='log_loss')
params = {'alpha': np.logspace(-4, 1, num=81)}

sk_grid_search = SkGridSearchCV(model, params)

When performing hyperparameter search, simply add with joblib.parallel_config('dask'): to scale the grid search tasks to a Dask cluster.

import joblib
from dask.distributed import Client, LocalCluster

# change to your Dask Scheduler IP address
client = Client("10.0.0.3:8786")
2024-05-08 07:36:02,224 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
with joblib.parallel_config('dask'):
    sk_grid_search.fit(X_train, y_train)

Use the score() method to check the accuracy of the model:

sk_grid_search.score(X_test, y_test)
0.7775224665166276

Dask-ML API#

In the previous subsection, we show a scikit-learn example, where the entire process only requires modifying joblib.parallel_config('dask'), and the computation tasks are distributed to the Dask cluster.

Dask-ML has also implemented some hyperparameter tuning APIs of its own. In addition to providing algorithms that correspond to scikit-learn’s GridSearchCV, RandomizedSearchCV, etc., it also offers algorithms such as Successive Halving and Hyperband, for example, SuccessiveHalvingSearchCV, HyperbandSearchCV.

Example: Flights Prediction with Dask-ML#

Below is an example of hyperband hyperparameter tuning based on Dask-ML.

The hyperparameter tuning algorithms in Dask-ML require inputs that are Dask DataFrame or Dask Array, which are partitioned. Therefore, the data loading and preprocessing part needs to be adapted to Dask, i.e., change the pandas/NumPy code to Dask DataFrame/Dask Array.

Noting that the algorithms SuccessiveHalvingSearchCV and HyperbandSearchCV provided by Dask-ML require the model to support partial_fit() and score(). partial_fit() is one single iteration (such as gradient descent) of any iterative algorithm in scikit-learn. The Successive Halving and Hyperband algorithms initially allocate a certain amount of computational resources, rather than the quota for all iterations of the model training. With these computational resources, Dask would perform a limited number of iterations (by calling partial_fit() a limited number of times), evaluate performance (by calling the score() on the validation set), and stop experiments with poor performance.

import dask.dataframe as dd

input_cols = [
    "Year",
    "Month",
    "DayofMonth",
    "DayOfWeek",
    "CRSDepTime",
    "CRSArrTime",
    "UniqueCarrier",
    "FlightNum",
    "ActualElapsedTime",
    "Origin",
    "Dest",
    "Distance",
    "Diverted",
    "ArrDelay",
]

ddf = dd.read_csv(file_path, usecols=input_cols,)

# Delay or not
ddf["ArrDelayBinary"] = 1.0 * (ddf["ArrDelay"] > 10)

ddf = ddf[ddf.columns.difference(["ArrDelay"])]
ddf = ddf.dropna()
ddf = ddf.repartition(npartitions=8)

Additionally, Dask handles categorical variables slightly differently from pandas or scikit-learn, and we need to:

  • Convert the feature to the category type, for example, by using the categorize() method of Dask DataFrame, or the Dask-ML Categorizer preprocessor.

  • Perform one-hot encoding: The DummyEncoder in Dask-ML is used for one-hot encoding of categorical features and serves as the Dask alternative to scikit-learn’s OneHotEncoder.

from dask_ml.preprocessing import DummyEncoder

dummy = DummyEncoder()
ddf = ddf.categorize(columns=["Dest", "Origin", "UniqueCarrier"])
dummified_ddf = dummy.fit_transform(ddf)

And use the train_test_split method from Dask-ML to split the dataset into training and testing sets:

from dask_ml.model_selection import train_test_split as dsk_train_test_split

_y_label = "ArrDelayBinary"
X_train, X_test, y_train, y_test = dsk_train_test_split(
    dummified_ddf.loc[:, dummified_ddf.columns != _y_label], 
    dummified_ddf[_y_label], 
    test_size=0.25,
    shuffle=False,
)

Defining the model and the search space is similar to scikit-learn, and then call HyperbandSearchCV from Dask-ML to perform hyperparameter tuning.

from dask_ml.model_selection import HyperbandSearchCV

# client = Client(LocalCluster())
model = SGDClassifier(penalty='elasticnet', max_iter=1_000, warm_start=True, loss='log_loss')
params = {'alpha': np.logspace(-4, 1, num=30)}

dsk_hyperband = HyperbandSearchCV(model, params, max_iter=243)
dsk_hyperband.fit(X_train, y_train, classes=[0.0, 1.0])
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/sklearn/model_selection/_search.py:318: UserWarning: The total space of parameters 30 is smaller than n_iter=81. Running 30 iterations. For exhaustive searches, use GridSearchCV.
  warnings.warn(
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/sklearn/model_selection/_search.py:318: UserWarning: The total space of parameters 30 is smaller than n_iter=34. Running 30 iterations. For exhaustive searches, use GridSearchCV.
  warnings.warn(
HyperbandSearchCV(estimator=SGDClassifier(loss='log_loss', penalty='elasticnet',
                                          warm_start=True),
                  max_iter=243,
                  parameters={'alpha': array([1.00000000e-04, 1.48735211e-04, 2.21221629e-04, 3.29034456e-04,
       4.89390092e-04, 7.27895384e-04, 1.08263673e-03, 1.61026203e-03,
       2.39502662e-03, 3.56224789e-03, 5.29831691e-03, 7.88046282e-03,
       1.17210230e-02, 1.74332882e-02, 2.59294380e-02, 3.85662042e-02,
       5.73615251e-02, 8.53167852e-02, 1.26896100e-01, 1.88739182e-01,
       2.80721620e-01, 4.17531894e-01, 6.21016942e-01, 9.23670857e-01,
       1.37382380e+00, 2.04335972e+00, 3.03919538e+00, 4.52035366e+00,
       6.72335754e+00, 1.00000000e+01])})
In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
dsk_hyperband.score(X_test, y_test)
0.8241373877422404

This book will also introduce hyperparameter tuning with Ray. Compared to Dask, Ray offers more features.