4.4. Shuffle#
For a distributed system, transferring data between different workers is referred to as Shuffle. In other words, shuffle moves data from one partition to another. Some shuffles are explicit, such as repartition()
, and from the API name, it exchanges data between different partitions; some Shuffles are implicit, such as sort()
, merge()
, or groupby()
, all of which have a shuffling process behind the scenes. Shuffle has always been one of the challenges in the field of distributed computing. Operations like sort()
, merge()
, or groupby()
are relatively simple to implement on a single machine in pandas, but they are not so easy to implement in a distributed scenario.
Shuffle Implementation Mechanism#
Section 3.5 introduces that Dask is mainly built based on the Task Graph, and Dask’s Task Graph is a directed acyclic graph. Directed edges denote that the downstream partitions depend on the upstream partitions, and any data movement will generate a directed edge on the Task Graph. Many shuffles involve a large amount of data movement, and in some scenarios, all data will be scattered, which means that there will be lots of edges. This Task-Graph-based Shuffle will make the Task Graph very large, and an overly large Task Graph will make the Dask Scheduler’s load too heavy, further causing the computation to be extremely slow. As shown on the left side of Figure Fig. 4.5, tasks
are based on the Task Graph mechanism, establishing directed edges between upstream and downstream. If there is an intermediate layer (usually because the data flowing into the upstream is too large, and it is necessary to further divide the data into multiple intermediate partitions), then the intermediate layer will further increase the complexity of the Task Graph.
To solve the problem of the too large Task Graph, Dask designed a peer-to-peer shuffle mechanism. As shown on the right side of Figure Fig. 4.5, p2p
introduces a virtual barrier node in the Task Graph. The barrier is not a real Task, and the Barrier node can significantly reduce the complexity of the Task Graph.
At present, Dask provides two types of shuffle implementation strategies: single-machine and distributed.
Single-machine. If the data size exceeds memory capacity, intermediate data can be written to disk. This strategy is the default for single-machine scenarios.
Distributed. As shown in Figure Fig. 4.5, the distributed scenario offers two shuffle strategies:
tasks
andp2p
. Thetasks
strategy is based on the Task Graph and is often less efficient in many scenarios, encountering the issue of an overly large Task Graph as previously mentioned. Thep2p
strategy is a peer-to-peer shuffle implementation that significantly reduces the complexity of the Task Graph and markedly improves performance. Dask gives priority to thep2p
strategy.
Setting dask.config.set({"dataframe.shuffle.method": "p2p"})
applies the p2p
shuffle method to all computations within the current Python query. It is also possible to specify the shuffle strategy for a particular operator, for example, ddf.merge(shuffle_method="p2p")
.
To compare the performance of the two distributed shuffle mechanisms, we use a two-node Dask cluster here for testing. Readers can also utilize a single-machine LocalCluster
, increase the data volume, and observe the performance of the two shuffle mechanisms.
import dask
from dask.distributed import Client, LocalCluster
dask.config.set({'dataframe.query-planning': False})
# change `10.0.0.3:8786` to your Scheduler address
# if you don't have a Dask, use LocalCluster
# client = Client(LocalCluster())
client = Client("10.0.0.3:8786")
ddf = dask.datasets.timeseries(
start="2024-01-01",
end="2024-07-01",
dtypes={"x": float, "y": float},
freq="1 h",
)
%%time
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
shuffled = ddf.shuffle(on="x")
shuffled.compute()
CPU times: user 138 ms, sys: 19 ms, total: 157 ms
Wall time: 5.58 s
%%time
with dask.config.set({"dataframe.shuffle.method": "tasks"}):
shuffled = ddf.shuffle(on="x")
shuffled.compute()
CPU times: user 136 ms, sys: 8.48 ms, total: 144 ms
Wall time: 15.8 s
Data Re-partition#
Dask DataFrame offers three methods for data re-distribution: set_index()
, repartition()
, and shuffle()
. All three methods may globally shuffle and re-partition data across partitions.
Method |
Description |
Change Index |
Change # of Partitions |
---|---|---|---|
Set the index (row labels), usually to accelerate subsequent computations. |
Yes |
Yes |
|
Re-partition, usually used in data skewing scenarios. |
No |
Yes |
|
Rearrange DataFrame into new partitions |
否 |
是 |
In Section 4.2, we mention that set_index()
sets a specific field as the index column. When subsequent computations heavily rely on this field, set_index()
can significantly speed up these calculations. repartition()
is primarily used to address the issue of data skew, where some partitions have an excessive amount of data, and these large partitions may lead to insufficient memory.
Case Analysis: groupby
#
We will use groupby(by=key).sum()
as an example to understand the shuffle process behind it. Fig. 4.6 illustrates the computation process, which mainly consists of three stages: Split, Apply Aggregation (such as sum
), and Combine.
If data is distributed across different partitions, stages involving shuffle include:
Split or Grouping: Split or Group is done according to the grouping field specified by
by
. Identical grouping fields are gathered together, which involves a significant amount of shuffle operations.Intra-group aggregation: There is relatively less shuffle operation involved in the aggregation within the group.
Inter-group aggregation: There is also relatively less Shuffle operation involved in the aggregation between groups.
Based on the number of shuffle operations, it is not difficult to draw the following conclusions:
The performance of
groupby(by=indexed_columns).agg()
andgroupby(by=indexed_columns).apply(user_def_fn)
is the best. Theindexed_columns
means that theby
columns are the the columns set byset_index()
.agg
refers to the official aggregation methods likesum
,mean
, etc., provided by Dask DataFrame. Sinceindexed_columns
are already sorted, it is quick to group and distribute data based onindexed_columns
.The data exchange volume for
groupby(by=non_indexed_columns).agg()
is somewhat larger, as theagg
methods provided by Dask are optimized by the Dask team.The cost for
groupby(by=non_indexed_columns).apply(user_def_fn)
is the highest. It involves exchanging all data and executing a user-defined function, the efficiency of which is lower than that of the official methods provided by Dask.