5.2. Hyperparameter Tuning#
We can use Dask for hyperparameter tuning, mainly in two ways:
Based on the joblib backend of scikit-learn, distribute multiple hyperparameter tuning tasks to the Dask cluster.
Use the hyperparameter tuning API provided by Dask-ML.
Both of these methods are suitable for scenarios where the training data can fit into the memory of a single machine.
scikit-learn joblib#
scikit-learn provides a rich and easy-to-use interface for model training and hyperparameter tuning. However, most algorithms in scikit-learn can only be run on a single machine. By default, scikit-learn uses joblib to parallelize tasks across multiple cores on a single machine. Hyperparameter tuning tasks such as random search and grid search are easy to parallelize, as there are no dependencies between tasks, making them straightforward to run in parallel.
Example: Flights Prediction with scikit-learn#
Below is a machine learning classification example based on scikit-learn, where we use the grid search to search hyperparameters.
import os
import sys
sys.path.append("..")
from utils import nyc_flights
import numpy as np
import pandas as pd
folder_path = nyc_flights()
file_path = os.path.join(folder_path, "nyc-flights", "1991.csv")
input_cols = [
"Year",
"Month",
"DayofMonth",
"DayOfWeek",
"CRSDepTime",
"CRSArrTime",
"UniqueCarrier",
"FlightNum",
"ActualElapsedTime",
"Origin",
"Dest",
"Distance",
"Diverted",
"ArrDelay",
]
df = pd.read_csv(file_path, usecols=input_cols)
df = df.dropna()
# Delay or nor
df["ArrDelayBinary"] = 1.0 * (df["ArrDelay"] > 10)
df = df[df.columns.difference(["ArrDelay"])]
# convert Dest/Origin/UniqueCarrier fields into category types
for col in df.select_dtypes(["object"]).columns:
df[col] = df[col].astype("category").cat.codes.astype(np.int32)
for col in df.columns:
df[col] = df[col].astype(np.float32)
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV as SkGridSearchCV
from sklearn.model_selection import train_test_split as sk_train_test_split
_y_label = "ArrDelayBinary"
X_train, X_test, y_train, y_test = sk_train_test_split(
df.loc[:, df.columns != _y_label],
df[_y_label],
test_size=0.25,
shuffle=False,
)
model = SGDClassifier(penalty='elasticnet', max_iter=1_000, warm_start=True, loss='log_loss')
params = {'alpha': np.logspace(-4, 1, num=81)}
sk_grid_search = SkGridSearchCV(model, params)
When performing hyperparameter search, simply add with joblib.parallel_config('dask'):
to scale the grid search tasks to a Dask cluster.
import joblib
from dask.distributed import Client, LocalCluster
# change to your Dask Scheduler IP address
client = Client("10.0.0.3:8786")
2024-05-08 07:36:02,224 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
with joblib.parallel_config('dask'):
sk_grid_search.fit(X_train, y_train)
Use the score()
method to check the accuracy of the model:
sk_grid_search.score(X_test, y_test)
0.7775224665166276
Dask-ML API#
In the previous subsection, we show a scikit-learn example, where the entire process only requires modifying joblib.parallel_config('dask')
, and the computation tasks are distributed to the Dask cluster.
Dask-ML has also implemented some hyperparameter tuning APIs of its own. In addition to providing algorithms that correspond to scikit-learn’s GridSearchCV
, RandomizedSearchCV
, etc., it also offers algorithms such as Successive Halving and Hyperband, for example, SuccessiveHalvingSearchCV
, HyperbandSearchCV
.
Example: Flights Prediction with Dask-ML#
Below is an example of hyperband hyperparameter tuning based on Dask-ML.
The hyperparameter tuning algorithms in Dask-ML require inputs that are Dask DataFrame or Dask Array, which are partitioned. Therefore, the data loading and preprocessing part needs to be adapted to Dask, i.e., change the pandas/NumPy code to Dask DataFrame/Dask Array.
Noting that the algorithms SuccessiveHalvingSearchCV
and HyperbandSearchCV
provided by Dask-ML require the model to support partial_fit()
and score()
. partial_fit()
is one single iteration (such as gradient descent) of any iterative algorithm in scikit-learn. The Successive Halving and Hyperband algorithms initially allocate a certain amount of computational resources, rather than the quota for all iterations of the model training. With these computational resources, Dask would perform a limited number of iterations (by calling partial_fit()
a limited number of times), evaluate performance (by calling the score()
on the validation set), and stop experiments with poor performance.
import dask.dataframe as dd
input_cols = [
"Year",
"Month",
"DayofMonth",
"DayOfWeek",
"CRSDepTime",
"CRSArrTime",
"UniqueCarrier",
"FlightNum",
"ActualElapsedTime",
"Origin",
"Dest",
"Distance",
"Diverted",
"ArrDelay",
]
ddf = dd.read_csv(file_path, usecols=input_cols,)
# Delay or not
ddf["ArrDelayBinary"] = 1.0 * (ddf["ArrDelay"] > 10)
ddf = ddf[ddf.columns.difference(["ArrDelay"])]
ddf = ddf.dropna()
ddf = ddf.repartition(npartitions=8)
Additionally, Dask handles categorical variables slightly differently from pandas or scikit-learn, and we need to:
Convert the feature to the category type, for example, by using the
categorize()
method of Dask DataFrame, or the Dask-MLCategorizer
preprocessor.Perform one-hot encoding: The
DummyEncoder
in Dask-ML is used for one-hot encoding of categorical features and serves as the Dask alternative to scikit-learn’sOneHotEncoder
.
from dask_ml.preprocessing import DummyEncoder
dummy = DummyEncoder()
ddf = ddf.categorize(columns=["Dest", "Origin", "UniqueCarrier"])
dummified_ddf = dummy.fit_transform(ddf)
And use the train_test_split
method from Dask-ML to split the dataset into training and testing sets:
from dask_ml.model_selection import train_test_split as dsk_train_test_split
_y_label = "ArrDelayBinary"
X_train, X_test, y_train, y_test = dsk_train_test_split(
dummified_ddf.loc[:, dummified_ddf.columns != _y_label],
dummified_ddf[_y_label],
test_size=0.25,
shuffle=False,
)
Defining the model and the search space is similar to scikit-learn, and then call HyperbandSearchCV
from Dask-ML to perform hyperparameter tuning.
from dask_ml.model_selection import HyperbandSearchCV
# client = Client(LocalCluster())
model = SGDClassifier(penalty='elasticnet', max_iter=1_000, warm_start=True, loss='log_loss')
params = {'alpha': np.logspace(-4, 1, num=30)}
dsk_hyperband = HyperbandSearchCV(model, params, max_iter=243)
dsk_hyperband.fit(X_train, y_train, classes=[0.0, 1.0])
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/sklearn/model_selection/_search.py:318: UserWarning: The total space of parameters 30 is smaller than n_iter=81. Running 30 iterations. For exhaustive searches, use GridSearchCV.
warnings.warn(
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/sklearn/model_selection/_search.py:318: UserWarning: The total space of parameters 30 is smaller than n_iter=34. Running 30 iterations. For exhaustive searches, use GridSearchCV.
warnings.warn(
HyperbandSearchCV(estimator=SGDClassifier(loss='log_loss', penalty='elasticnet', warm_start=True), max_iter=243, parameters={'alpha': array([1.00000000e-04, 1.48735211e-04, 2.21221629e-04, 3.29034456e-04, 4.89390092e-04, 7.27895384e-04, 1.08263673e-03, 1.61026203e-03, 2.39502662e-03, 3.56224789e-03, 5.29831691e-03, 7.88046282e-03, 1.17210230e-02, 1.74332882e-02, 2.59294380e-02, 3.85662042e-02, 5.73615251e-02, 8.53167852e-02, 1.26896100e-01, 1.88739182e-01, 2.80721620e-01, 4.17531894e-01, 6.21016942e-01, 9.23670857e-01, 1.37382380e+00, 2.04335972e+00, 3.03919538e+00, 4.52035366e+00, 6.72335754e+00, 1.00000000e+01])})In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
HyperbandSearchCV(estimator=SGDClassifier(loss='log_loss', penalty='elasticnet', warm_start=True), max_iter=243, parameters={'alpha': array([1.00000000e-04, 1.48735211e-04, 2.21221629e-04, 3.29034456e-04, 4.89390092e-04, 7.27895384e-04, 1.08263673e-03, 1.61026203e-03, 2.39502662e-03, 3.56224789e-03, 5.29831691e-03, 7.88046282e-03, 1.17210230e-02, 1.74332882e-02, 2.59294380e-02, 3.85662042e-02, 5.73615251e-02, 8.53167852e-02, 1.26896100e-01, 1.88739182e-01, 2.80721620e-01, 4.17531894e-01, 6.21016942e-01, 9.23670857e-01, 1.37382380e+00, 2.04335972e+00, 3.03919538e+00, 4.52035366e+00, 6.72335754e+00, 1.00000000e+01])})
SGDClassifier(loss='log_loss', penalty='elasticnet', warm_start=True)
SGDClassifier(loss='log_loss', penalty='elasticnet', warm_start=True)
dsk_hyperband.score(X_test, y_test)
0.8241373877422404
This book will also introduce hyperparameter tuning with Ray. Compared to Dask, Ray offers more features.