map_partitions

4.3. map_partitions#

Aside from the tasks that require communication as mentioned in section Section 4.4, there is a simplest form of parallelism known as “Embarrassingly Parallel”. This term refers to computations that do not require communication across workers. For instance, adding one to a field can be performed by each worker simply by executing the addition operation, without the overhead of communication between workers. In Dask DataFrame, such Embarrassingly Parallel operations can be carried out using map_partitions(func). The argument of map_partitions(func) is a function func, which is executed on each pandas DataFrame, allowing the use of various operations within the pandas DataFrame. As shown in Figure Fig. 4.4, map_partitions(func) performs a transformation operation on the original pandas DataFrame.

../_images/map-partitions.svg

Fig. 4.4 map_partitions()#

Case Study: New York Taxi Data#

We utilize the New York taxi dataset to perform a simple data preprocessing task: calculating the duration of each trip. In the original dataset, tpep_pickup_datetime and tpep_dropoff_datetime represent the times when passengers are picked up and dropped off, respectively. We subtract the pickup time tpep_pickup_datetime from the drop-off time tpep_dropoff_datetime. This calculation does not incur communication overhead across workers, making it a typical application scenario for Embarrassingly Parallel.

import sys
sys.path.append("..")
from utils import nyc_taxi

import pandas as pd
import dask
dask.config.set({'dataframe.query-planning': False})
import dask.dataframe as dd
import pandas as pd
from dask.distributed import LocalCluster, Client

cluster = LocalCluster()
client = Client(cluster)
dataset_path = nyc_taxi()
ddf = dd.read_parquet(dataset_path)
def transform(df):
    df["trip_duration"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.seconds
    # 将 `trip_duration` 挪到前面
    dur_column = df.pop('trip_duration')
    df.insert(1, dur_column.name, dur_column)
    return df

ddf = ddf.map_partitions(transform)
ddf.compute()
ddf.head(5)
VendorID trip_duration tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount congestion_surcharge Airport_fee
0 1 1253 2023-06-01 00:08:48 2023-06-01 00:29:41 1.0 3.40 1.0 N 140 238 1 21.9 3.50 0.5 6.70 0.0 1.0 33.60 2.5 0.00
1 1 614 2023-06-01 00:15:04 2023-06-01 00:25:18 0.0 3.40 1.0 N 50 151 1 15.6 3.50 0.5 3.00 0.0 1.0 23.60 2.5 0.00
2 1 1123 2023-06-01 00:48:24 2023-06-01 01:07:07 1.0 10.20 1.0 N 138 97 1 40.8 7.75 0.5 10.00 0.0 1.0 60.05 0.0 1.75
3 2 1406 2023-06-01 00:54:03 2023-06-01 01:17:29 3.0 9.83 1.0 N 100 244 1 39.4 1.00 0.5 8.88 0.0 1.0 53.28 2.5 0.00
4 2 514 2023-06-01 00:18:44 2023-06-01 00:27:18 1.0 1.17 1.0 N 137 234 1 9.3 1.00 0.5 0.72 0.0 1.0 15.02 2.5 0.00

Some of the APIs in Dask DataFrame are Embarrassingly Parallel, and they are implemented using map_partitions().

As mentioned in Section 4.2, Dask DataFrame partitions on a specific column (index column), but if map_partitions() modifies these index columns, it is necessary to use clear_divisions() or to set_index() again.

ddf.clear_divisions()
Dask DataFrame Structure:
VendorID trip_duration tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount congestion_surcharge Airport_fee
npartitions=1
int32 int32 datetime64[us] datetime64[us] int64 float64 int64 string int32 int32 int64 float64 float64 float64 float64 float64 float64 float64 float64 float64
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: transform, 2 graph layers
client.shutdown()