{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "(sec-dask-map_partitions)=\n", "# `map_partitions`\n", "\n", "Aside from the tasks that require communication as mentioned in section {numref}`sec-dask-dataframe-shuffle`, there is a simplest form of parallelism known as \"Embarrassingly Parallel\". This term refers to computations that do not require communication across workers. For instance, adding one to a field can be performed by each worker simply by executing the addition operation, without the overhead of communication between workers. In Dask DataFrame, such Embarrassingly Parallel operations can be carried out using `map_partitions(func)`. The argument of `map_partitions(func)` is a function `func`, which is executed on each pandas DataFrame, allowing the use of various operations within the pandas DataFrame. As shown in Figure {numref}`fig-dask-map-partitions`, `map_partitions(func)` performs a transformation operation on the original pandas DataFrame.\n", "\n", "```{figure} ../img/ch-dask-dataframe/map-partitions.svg\n", "---\n", "width: 600px\n", "name: fig-dask-map-partitions\n", "---\n", "map_partitions()\n", "```\n", "\n", "## Case Study: New York Taxi Data\n", "\n", "We utilize the New York taxi dataset to perform a simple data preprocessing task: calculating the duration of each trip. In the original dataset, `tpep_pickup_datetime` and `tpep_dropoff_datetime` represent the times when passengers are picked up and dropped off, respectively. We subtract the pickup time `tpep_pickup_datetime` from the drop-off time `tpep_dropoff_datetime`. This calculation does not incur communication overhead across workers, making it a typical application scenario for Embarrassingly Parallel." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "import sys\n", "sys.path.append(\"..\")\n", "from utils import nyc_taxi\n", "\n", "import pandas as pd\n", "import dask\n", "dask.config.set({'dataframe.query-planning': False})\n", "import dask.dataframe as dd\n", "import pandas as pd\n", "from dask.distributed import LocalCluster, Client\n", "\n", "cluster = LocalCluster()\n", "client = Client(cluster)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "dataset_path = nyc_taxi()\n", "ddf = dd.read_parquet(dataset_path)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
VendorIDtrip_durationtpep_pickup_datetimetpep_dropoff_datetimepassenger_counttrip_distanceRatecodeIDstore_and_fwd_flagPULocationIDDOLocationIDpayment_typefare_amountextramta_taxtip_amounttolls_amountimprovement_surchargetotal_amountcongestion_surchargeAirport_fee
0112532023-06-01 00:08:482023-06-01 00:29:411.03.401.0N140238121.93.500.56.700.01.033.602.50.00
116142023-06-01 00:15:042023-06-01 00:25:180.03.401.0N50151115.63.500.53.000.01.023.602.50.00
2111232023-06-01 00:48:242023-06-01 01:07:071.010.201.0N13897140.87.750.510.000.01.060.050.01.75
3214062023-06-01 00:54:032023-06-01 01:17:293.09.831.0N100244139.41.000.58.880.01.053.282.50.00
425142023-06-01 00:18:442023-06-01 00:27:181.01.171.0N13723419.31.000.50.720.01.015.022.50.00
\n", "
" ], "text/plain": [ " VendorID trip_duration tpep_pickup_datetime tpep_dropoff_datetime \\\n", "0 1 1253 2023-06-01 00:08:48 2023-06-01 00:29:41 \n", "1 1 614 2023-06-01 00:15:04 2023-06-01 00:25:18 \n", "2 1 1123 2023-06-01 00:48:24 2023-06-01 01:07:07 \n", "3 2 1406 2023-06-01 00:54:03 2023-06-01 01:17:29 \n", "4 2 514 2023-06-01 00:18:44 2023-06-01 00:27:18 \n", "\n", " passenger_count trip_distance RatecodeID store_and_fwd_flag \\\n", "0 1.0 3.40 1.0 N \n", "1 0.0 3.40 1.0 N \n", "2 1.0 10.20 1.0 N \n", "3 3.0 9.83 1.0 N \n", "4 1.0 1.17 1.0 N \n", "\n", " PULocationID DOLocationID payment_type fare_amount extra mta_tax \\\n", "0 140 238 1 21.9 3.50 0.5 \n", "1 50 151 1 15.6 3.50 0.5 \n", "2 138 97 1 40.8 7.75 0.5 \n", "3 100 244 1 39.4 1.00 0.5 \n", "4 137 234 1 9.3 1.00 0.5 \n", "\n", " tip_amount tolls_amount improvement_surcharge total_amount \\\n", "0 6.70 0.0 1.0 33.60 \n", "1 3.00 0.0 1.0 23.60 \n", "2 10.00 0.0 1.0 60.05 \n", "3 8.88 0.0 1.0 53.28 \n", "4 0.72 0.0 1.0 15.02 \n", "\n", " congestion_surcharge Airport_fee \n", "0 2.5 0.00 \n", "1 2.5 0.00 \n", "2 0.0 1.75 \n", "3 2.5 0.00 \n", "4 2.5 0.00 " ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def transform(df):\n", " df[\"trip_duration\"] = (df[\"tpep_dropoff_datetime\"] - df[\"tpep_pickup_datetime\"]).dt.seconds\n", " # 将 `trip_duration` 挪到前面\n", " dur_column = df.pop('trip_duration')\n", " df.insert(1, dur_column.name, dur_column)\n", " return df\n", "\n", "ddf = ddf.map_partitions(transform)\n", "ddf.compute()\n", "ddf.head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Some of the APIs in Dask DataFrame are Embarrassingly Parallel, and they are implemented using `map_partitions()`.\n", "\n", "As mentioned in {numref}`sec-dask-dataframe-indexing`, Dask DataFrame partitions on a specific column (index column), but if `map_partitions()` modifies these index columns, it is necessary to use `clear_divisions()` or to `set_index()` again." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
VendorIDtrip_durationtpep_pickup_datetimetpep_dropoff_datetimepassenger_counttrip_distanceRatecodeIDstore_and_fwd_flagPULocationIDDOLocationIDpayment_typefare_amountextramta_taxtip_amounttolls_amountimprovement_surchargetotal_amountcongestion_surchargeAirport_fee
npartitions=1
int32int32datetime64[us]datetime64[us]int64float64int64stringint32int32int64float64float64float64float64float64float64float64float64float64
............................................................
\n", "
\n", "
Dask Name: transform, 2 graph layers
" ], "text/plain": [ "Dask DataFrame Structure:\n", " VendorID trip_duration tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount congestion_surcharge Airport_fee\n", "npartitions=1 \n", " int32 int32 datetime64[us] datetime64[us] int64 float64 int64 string int32 int32 int64 float64 float64 float64 float64 float64 float64 float64 float64 float64\n", " ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n", "Dask Name: transform, 2 graph layers" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ddf.clear_divisions()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "client.shutdown()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "dispy", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.7" } }, "nbformat": 4, "nbformat_minor": 2 }