4.4. Shuffle#

For a distributed system, transferring data between different workers is referred to as Shuffle. In other words, shuffle moves data from one partition to another. Some shuffles are explicit, such as repartition(), and from the API name, it exchanges data between different partitions; some Shuffles are implicit, such as sort(), merge(), or groupby(), all of which have a shuffling process behind the scenes. Shuffle has always been one of the challenges in the field of distributed computing. Operations like sort(), merge(), or groupby() are relatively simple to implement on a single machine in pandas, but they are not so easy to implement in a distributed scenario.

Shuffle Implementation Mechanism#

Section 3.5 introduces that Dask is mainly built based on the Task Graph, and Dask’s Task Graph is a directed acyclic graph. Directed edges denote that the downstream partitions depend on the upstream partitions, and any data movement will generate a directed edge on the Task Graph. Many shuffles involve a large amount of data movement, and in some scenarios, all data will be scattered, which means that there will be lots of edges. This Task-Graph-based Shuffle will make the Task Graph very large, and an overly large Task Graph will make the Dask Scheduler’s load too heavy, further causing the computation to be extremely slow. As shown on the left side of Figure Fig. 4.5, tasks are based on the Task Graph mechanism, establishing directed edges between upstream and downstream. If there is an intermediate layer (usually because the data flowing into the upstream is too large, and it is necessary to further divide the data into multiple intermediate partitions), then the intermediate layer will further increase the complexity of the Task Graph.

To solve the problem of the too large Task Graph, Dask designed a peer-to-peer shuffle mechanism. As shown on the right side of Figure Fig. 4.5, p2p introduces a virtual barrier node in the Task Graph. The barrier is not a real Task, and the Barrier node can significantly reduce the complexity of the Task Graph.

../_images/shuffle-tasks-p2p.svg

Fig. 4.5 Dask Shuffle: tasks v.s. p2p#

At present, Dask provides two types of shuffle implementation strategies: single-machine and distributed.

  • Single-machine. If the data size exceeds memory capacity, intermediate data can be written to disk. This strategy is the default for single-machine scenarios.

  • Distributed. As shown in Figure Fig. 4.5, the distributed scenario offers two shuffle strategies: tasks and p2p. The tasks strategy is based on the Task Graph and is often less efficient in many scenarios, encountering the issue of an overly large Task Graph as previously mentioned. The p2p strategy is a peer-to-peer shuffle implementation that significantly reduces the complexity of the Task Graph and markedly improves performance. Dask gives priority to the p2p strategy.

Setting dask.config.set({"dataframe.shuffle.method": "p2p"}) applies the p2p shuffle method to all computations within the current Python query. It is also possible to specify the shuffle strategy for a particular operator, for example, ddf.merge(shuffle_method="p2p").

To compare the performance of the two distributed shuffle mechanisms, we use a two-node Dask cluster here for testing. Readers can also utilize a single-machine LocalCluster, increase the data volume, and observe the performance of the two shuffle mechanisms.

import dask
from dask.distributed import Client, LocalCluster

dask.config.set({'dataframe.query-planning': False})

# change `10.0.0.3:8786` to your Scheduler address
# if you don't have a Dask, use LocalCluster
# client = Client(LocalCluster())
client = Client("10.0.0.3:8786")

ddf = dask.datasets.timeseries(
        start="2024-01-01",
        end="2024-07-01",
        dtypes={"x": float, "y": float},
        freq="1 h",
    )
%%time
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
    shuffled = ddf.shuffle(on="x")
    shuffled.compute()
CPU times: user 138 ms, sys: 19 ms, total: 157 ms
Wall time: 5.58 s
%%time
with dask.config.set({"dataframe.shuffle.method": "tasks"}):
    shuffled = ddf.shuffle(on="x")
    shuffled.compute()
CPU times: user 136 ms, sys: 8.48 ms, total: 144 ms
Wall time: 15.8 s

Data Re-partition#

Dask DataFrame offers three methods for data re-distribution: set_index(), repartition(), and shuffle(). All three methods may globally shuffle and re-partition data across partitions.

Table 4.3 Three methods of data re-partition in Dask#

Method

Description

Change Index

Change # of Partitions

DataFrame.set_index()

Set the index (row labels), usually to accelerate subsequent computations.

Yes

Yes

DataFrame.repartition()

Re-partition, usually used in data skewing scenarios.

No

Yes

DataFrame.shuffle()

Rearrange DataFrame into new partitions

In Section 4.2, we mention that set_index() sets a specific field as the index column. When subsequent computations heavily rely on this field, set_index() can significantly speed up these calculations. repartition() is primarily used to address the issue of data skew, where some partitions have an excessive amount of data, and these large partitions may lead to insufficient memory.

Case Analysis: groupby#

We will use groupby(by=key).sum() as an example to understand the shuffle process behind it. Fig. 4.6 illustrates the computation process, which mainly consists of three stages: Split, Apply Aggregation (such as sum), and Combine.

../_images/groupby.svg

Fig. 4.6 DataFrame groupby#

If data is distributed across different partitions, stages involving shuffle include:

  • Split or Grouping: Split or Group is done according to the grouping field specified by by. Identical grouping fields are gathered together, which involves a significant amount of shuffle operations.

  • Intra-group aggregation: There is relatively less shuffle operation involved in the aggregation within the group.

  • Inter-group aggregation: There is also relatively less Shuffle operation involved in the aggregation between groups.

Based on the number of shuffle operations, it is not difficult to draw the following conclusions:

  • The performance of groupby(by=indexed_columns).agg() and groupby(by=indexed_columns).apply(user_def_fn) is the best. The indexed_columns means that the by columns are the the columns set by set_index(). agg refers to the official aggregation methods like sum, mean, etc., provided by Dask DataFrame. Since indexed_columns are already sorted, it is quick to group and distribute data based on indexed_columns.

  • The data exchange volume for groupby(by=non_indexed_columns).agg() is somewhat larger, as the agg methods provided by Dask are optimized by the Dask team.

  • The cost for groupby(by=non_indexed_columns).apply(user_def_fn) is the highest. It involves exchanging all data and executing a user-defined function, the efficiency of which is lower than that of the official methods provided by Dask.