3.3. Scaling Dask to a Cluster#
The computations we’ve discussed so far are performed on a single machine. dask.distributed
allows us to extend Dask computations to multiple computing nodes.
Dask Cluster#
As depicted in Fig. 3.3, a Dask cluster must include a scheduler and multiple workers. Users submit computational tasks to the scheduler on a client, and the scheduler analyzes the tasks, generates a Task Graph, and distributes tasks to multiple workers. Each worker is responsible for a small portion of the tasks, and communication between workers is essential for tasks such as aggregating computation results.
The scheduler and workers together constitute a Dask cluster.
LocalCluster
#
By default (without any additional settings), Dask initiates a LocalCluster
and connects to it using the Client
.
from dask.distributed import LocalCluster, Client
cluster = LocalCluster()
cluster
LocalCluster
008c0d37
Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
Total threads: 8 | Total memory: 16.00 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-d62c583b-949f-47ca-a09a-62e8b76b1f52
Comm: tcp://127.0.0.1:56578 | Workers: 4 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
Started: Just now | Total memory: 16.00 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:56592 | Total threads: 2 |
Dashboard: http://127.0.0.1:56593/status | Memory: 4.00 GiB |
Nanny: tcp://127.0.0.1:56581 | |
Local directory: /var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/dask-scratch-space/worker-b2u1_h84 |
Worker: 1
Comm: tcp://127.0.0.1:56596 | Total threads: 2 |
Dashboard: http://127.0.0.1:56599/status | Memory: 4.00 GiB |
Nanny: tcp://127.0.0.1:56583 | |
Local directory: /var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/dask-scratch-space/worker-w2ws5x2t |
Worker: 2
Comm: tcp://127.0.0.1:56595 | Total threads: 2 |
Dashboard: http://127.0.0.1:56597/status | Memory: 4.00 GiB |
Nanny: tcp://127.0.0.1:56585 | |
Local directory: /var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/dask-scratch-space/worker-sg9v8s9i |
Worker: 3
Comm: tcp://127.0.0.1:56601 | Total threads: 2 |
Dashboard: http://127.0.0.1:56602/status | Memory: 4.00 GiB |
Nanny: tcp://127.0.0.1:56587 | |
Local directory: /var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/dask-scratch-space/worker-wnsy1wd3 |
Dask detects the local resources, in this case, having 4 CPU cores and 16GB of memory, and creates a LocalCluster
based on the available local resources. This LocalCluster
consists of 4 workers, with each worker corresponding to one CPU core.
Dask also provides a dashboard link for monitoring detailed information about the cluster and jobs through a web page.
By connecting to this LocalCluster
using the Client
, all Dask computations are submitted to this LocalCluster
:
client = Client(cluster)
Starting a Dask Cluster from the Command Line#
When you have more computing nodes, you can use the command line to start the Dask Scheduler and Dask Workers on different computing nodes. For example, to start the Dask Scheduler on a node with the IP address 192.0.0.1
, you would enter the following command in the terminal:
# Current node is 192.0.0.1
$ dask scheduler
Dask will print the log, i.e., the IP and port of the Dask Scheduler tcp://192.0.0.1:8786
.
Scheduler at: tcp://192.0.0.1:8786
dashboard at: ...
On other computing nodes, start the Dask Worker, and these Workers should connect to the previously started Scheduler. Append the IP address and port of the Dask Scheduler obtained earlier to the dask worker
command. For example, to start a Dask Worker on 192.0.0.2
, enter:
# Current node is 192.0.0.2
$ dask worker tcp://192.0.0.1:8786
The log indicates that Dask has started the worker and connected it to the scheduler:
Start worker at: tcp://192.0.0.2:40483
Registered to: tcp://192.0.0.1:8786
By default, Dask Scheduler uses 8786 as the connection port. If a different port is used, specify it using the --port
option when starting the Dask Scheduler:
$ dask scheduler --port 8000
In addition to --port
, Dask Scheduler has many other parameters that users can set according to their needs.
Python Environment and Dependency Management#
When using Dask on a cluster, it’s important to ensure that all computing nodes have the required Python packages installed. distributed.diagnostics.plugin.PipInstall
can be used to install dependencies for each worker during runtime. The PipInstall
’s packages
parameter takes a list of package names, and it installs these packages similar to what pip install
does. This approach is more suitable for the prototype testing stage when the required packages are not yet finalized.
from dask.distributed import PipInstall
plugin = PipInstall(packages=["scikit-learn"], pip_options=["--upgrade"])
client.register_plugin(plugin)
Once the required dependencies are fixed, it’s best to manage the Python environment and packages on the cluster in the following ways:
Install the same version of software on all computing nodes. For example, use
conda
orpip
to install Python and packages in the same directory on each Dask Worker.Mount a shared file system: Install the Python and other software environments in a shared file system so that all Dask Workers see the same directory and content. Common shared file systems include Network File System (NFS).
Use containers: Package Python and dependencies into containers, distribute them on the cluster, and start the same container on each computing node.
SSH, Kubernetes, High-Performance Computing Clusters#
Dask clusters essentially consist of a Dask Scheduler and a series of Dask Workers. Different deployment scenarios share similarities. Dask provides encapsulated libraries for various deployment scenarios, eliminating the need to individually log in to different nodes, sequentially start Dask Scheduler and Dask Workers. The following will introduce three deployment types: SSH, Kubernetes, and High-Performance Computing (HPC) Clusters.
SSH#
As long as the IP address or hostname of each computing node is known, you can start a Dask cluster via Python code or command line. dask.distributed.SSHCluster
encapsulates the asyncssh package. It logs into each computing node via SSH and then starts the Dask Scheduler or Dask Worker on that node.
Note
If you are operating SSHCluster
on your personal computer (i.e., your personal computer acts as the client, and various computing nodes form the cluster), you should set up passwordless login between your personal computer and each computing node. Specifically, the authorized_keys
file on each computing node should store the public key of your personal computer.
You can use Python code to start a Dask cluster.
from dask.distributed import Client, SSHCluster
cluster = SSHCluster(
hosts=["localhost", "node1", "node2"],
connect_options={
"username": "xxx",
"password": "yyy",
}
)
client = Client(cluster)
The hosts
parameter is a list of hostnames, and the first node on the list starts the Dask Scheduler, while the remaining nodes start Dask Workers. The connect_options
parameter is key-value options for SSH login, such as username, password, and port.
Kubernetes#
Kubernetes has become the de facto standard for cloud resource management, and the easiest way to install software on Kubernetes is to use Helm. Dask develops dask-kubernetes and provides two classes, KubeCluster
and HelmCluster
.
High-Performance Computing (HPC) Clusters#
High-Performance Computing (HPC) clusters typically use scheduling software such as Slurm. Dask develops dask-jobqueue for high-performance clusters compatible with scheduling software including Slurm.
For example, using SLURMCluster
to request 10 nodes for running Dask Workers in the Slurm cpu
queue, with each node having 32 cores and 128GB memory. The scale()
method will apply for 10 nodes from Slurm.
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(cores=32,
memory="128GB",
queue="cpu")
cluster.scale(n=10)
client = Client(cluster)
In comparison to cloud resources, HPC clusters are equipped with not only Ethernet but also high-bandwidth and low-latency RDMA networks, such as InfiniBand. RDMA networks can accelerate many tasks with high network requirements. If you want to fully utilize RDMA networks, you can pass the NIC name to the SLURMCluster
via the interface
parameter. To get name of the NIC, use command ifconfig
. For example, InfiniBand cards are generally with ib0
, and Ethernet is usually displayed as eth0
. Different clusters may have different network card naming conventions. If the interface is not set, Dask defaults to Ethernet.
Auto-Scaling#
As previously mentioned, Kubernetes and HPC clusters support Auto-Scaling (or Adaptive Scaling). Kubernetes and Slurm are resource management software. They manage a large number of computing resources, and different applications request resources from them. Dask clusters built on Kubernetes or Slurm request a subset of the computing resources managed by Kubernetes or Slurm. On Kubernetes or Slurm clusters, Dask’s auto-scaling technology can be utilized to automatically increase or decrease the required computing resources. Auto-scaling considers the following scenarios:
The current job demands high computing resources, requiring more resources to meet computational needs.
The current job’s requested computing resources are idle and can be used by other users, especially when users are engaging in interactive data visualization rather than large-scale computations.
Both KubeCluster
and SLURMCluster
provide an adapt()
method. The following example dynamically scales between 0 and 10 workers. Auto-scaling decides how many Dask Workers to run based on the load on the Dask Scheduler. Dask collects information such as used and available memory on each Dask Worker and adjusts the number of computing resources adaptively.
from dask_kubernetes import KubeCluster
cluster = KubeCluster()
cluster.adapt(minimum=0, maximum=10)
Dask Nanny#
When Dask clusters are launched, in addition to starting the Dask Scheduler and Dask Worker, a monitoring service called Dask Nanny is also initiated. Just like its name, Dask Nanny monitors the CPU and memory usage of Dask Workers to prevent them from exceeding resource limits. If a Dask Worker crashes, Dask Nanny restarts it. If a Dask Worker is restarted by Dask Nanny, the computation tasks on that worker are re-executed. Other Dask Workers hold on the data and wait for this worker to recover. This can significantly burden other Dask Workers. If Dask workers are frequently restarting, you should consider adjusting the data size of each partition using rechunk()
or repartition()
.