4.3. map_partitions
#
Aside from the tasks that require communication as mentioned in section Section 4.4, there is a simplest form of parallelism known as “Embarrassingly Parallel”. This term refers to computations that do not require communication across workers. For instance, adding one to a field can be performed by each worker simply by executing the addition operation, without the overhead of communication between workers. In Dask DataFrame, such Embarrassingly Parallel operations can be carried out using map_partitions(func)
. The argument of map_partitions(func)
is a function func
, which is executed on each pandas DataFrame, allowing the use of various operations within the pandas DataFrame. As shown in Figure Fig. 4.4, map_partitions(func)
performs a transformation operation on the original pandas DataFrame.
Case Study: New York Taxi Data#
We utilize the New York taxi dataset to perform a simple data preprocessing task: calculating the duration of each trip. In the original dataset, tpep_pickup_datetime
and tpep_dropoff_datetime
represent the times when passengers are picked up and dropped off, respectively. We subtract the pickup time tpep_pickup_datetime
from the drop-off time tpep_dropoff_datetime
. This calculation does not incur communication overhead across workers, making it a typical application scenario for Embarrassingly Parallel.
import sys
sys.path.append("..")
from utils import nyc_taxi
import pandas as pd
import dask
dask.config.set({'dataframe.query-planning': False})
import dask.dataframe as dd
import pandas as pd
from dask.distributed import LocalCluster, Client
cluster = LocalCluster()
client = Client(cluster)
dataset_path = nyc_taxi()
ddf = dd.read_parquet(dataset_path)
def transform(df):
df["trip_duration"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.seconds
# 将 `trip_duration` 挪到前面
dur_column = df.pop('trip_duration')
df.insert(1, dur_column.name, dur_column)
return df
ddf = ddf.map_partitions(transform)
ddf.compute()
ddf.head(5)
VendorID | trip_duration | tpep_pickup_datetime | tpep_dropoff_datetime | passenger_count | trip_distance | RatecodeID | store_and_fwd_flag | PULocationID | DOLocationID | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | Airport_fee | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1 | 1253 | 2023-06-01 00:08:48 | 2023-06-01 00:29:41 | 1.0 | 3.40 | 1.0 | N | 140 | 238 | 1 | 21.9 | 3.50 | 0.5 | 6.70 | 0.0 | 1.0 | 33.60 | 2.5 | 0.00 |
1 | 1 | 614 | 2023-06-01 00:15:04 | 2023-06-01 00:25:18 | 0.0 | 3.40 | 1.0 | N | 50 | 151 | 1 | 15.6 | 3.50 | 0.5 | 3.00 | 0.0 | 1.0 | 23.60 | 2.5 | 0.00 |
2 | 1 | 1123 | 2023-06-01 00:48:24 | 2023-06-01 01:07:07 | 1.0 | 10.20 | 1.0 | N | 138 | 97 | 1 | 40.8 | 7.75 | 0.5 | 10.00 | 0.0 | 1.0 | 60.05 | 0.0 | 1.75 |
3 | 2 | 1406 | 2023-06-01 00:54:03 | 2023-06-01 01:17:29 | 3.0 | 9.83 | 1.0 | N | 100 | 244 | 1 | 39.4 | 1.00 | 0.5 | 8.88 | 0.0 | 1.0 | 53.28 | 2.5 | 0.00 |
4 | 2 | 514 | 2023-06-01 00:18:44 | 2023-06-01 00:27:18 | 1.0 | 1.17 | 1.0 | N | 137 | 234 | 1 | 9.3 | 1.00 | 0.5 | 0.72 | 0.0 | 1.0 | 15.02 | 2.5 | 0.00 |
Some of the APIs in Dask DataFrame are Embarrassingly Parallel, and they are implemented using map_partitions()
.
As mentioned in Section 4.2, Dask DataFrame partitions on a specific column (index column), but if map_partitions()
modifies these index columns, it is necessary to use clear_divisions()
or to set_index()
again.
ddf.clear_divisions()
VendorID | trip_duration | tpep_pickup_datetime | tpep_dropoff_datetime | passenger_count | trip_distance | RatecodeID | store_and_fwd_flag | PULocationID | DOLocationID | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | Airport_fee | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
npartitions=1 | ||||||||||||||||||||
int32 | int32 | datetime64[us] | datetime64[us] | int64 | float64 | int64 | string | int32 | int32 | int64 | float64 | float64 | float64 | float64 | float64 | float64 | float64 | float64 | float64 | |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
client.shutdown()