{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "(sec-dask-dataframe-indexing)=\n", "# Indexing" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "tags": [ "hide-cell" ] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.\n", "Perhaps you already have a cluster running?\n", "Hosting the HTTP server on port 51899 instead\n", " warnings.warn(\n" ] } ], "source": [ "%config InlineBackend.figure_format = 'svg'\n", "import os\n", "import sys\n", "sys.path.append(\"..\")\n", "from utils import nyc_flights\n", "\n", "import dask\n", "dask.config.set({'dataframe.query-planning': False})\n", "import dask.dataframe as dd\n", "import pandas as pd\n", "from dask.distributed import LocalCluster, Client\n", "\n", "cluster = LocalCluster()\n", "client = Client(cluster)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As depicted in Figure {numref}`fig-pandas-dataframe-model`, the pandas DataFrame primarily processes two-dimensional tables with column labels and row labels. Row labels are often overlooked by users but actually play a crucial role: indexing. Most of the row labels in a pandas DataFrame are ordered indices, such as incrementing from 0. This ordered indexing ensures that the data within the pandas DataFrame is sequential.\n", "\n", "```{figure} ../img/ch-dask-dataframe/dataframe-model.svg\n", "---\n", "width: 200px\n", "name: fig-pandas-dataframe-model\n", "---\n", "pandas DataFrame Data Model\n", "```\n", "\n", "When creating a pandas DataFrame, an index column is automatically generated on the far left. As can be seen from the following example, the index column does not have a column name and cannot be considered a \"field\"; it is an additional column." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ABCD
0fooone110
1barone220
2baztwo330
3quxthree440
\n", "
" ], "text/plain": [ " A B C D\n", "0 foo one 1 10\n", "1 bar one 2 20\n", "2 baz two 3 30\n", "3 qux three 4 40" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = pd.DataFrame({\n", " 'A': ['foo', 'bar', 'baz', 'qux'],\n", " 'B': ['one', 'one', 'two', 'three'],\n", " 'C': [1, 2, 3, 4],\n", " 'D': [10, 20, 30, 40]\n", "})\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can also specify a field as the index column:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
BCD
A
fooone110
barone220
baztwo330
quxthree440
\n", "
" ], "text/plain": [ " B C D\n", "A \n", "foo one 1 10\n", "bar one 2 20\n", "baz two 3 30\n", "qux three 4 40" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = df.set_index('A')\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Or reset it:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ABCD
0fooone110
1barone220
2baztwo330
3quxthree440
\n", "
" ], "text/plain": [ " A B C D\n", "0 foo one 1 10\n", "1 bar one 2 20\n", "2 baz two 3 30\n", "3 qux three 4 40" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = df.reset_index()\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Ordered Row Indexing\n", "\n", "A Dask DataFrame is composed of multiple pandas DataFrames, but maintaining the row labels and row order across the entire Dask DataFrame globally presents a significant challenge. Dask DataFrame does not deliberately preserve global orderliness, which also prevents it from supporting all functionalities of the pandas DataFrame.\n", "\n", "As shown in Figure {numref}`fig-dask-dataframe-divisions`, the Dask DataFrame has `divisions` when it is divided.\n", "\n", "```{figure} ../img/ch-dask-dataframe/divisions.svg\n", "---\n", "width: 400px\n", "name: fig-dask-dataframe-divisions\n", "---\n", "`divisions` in Dask DataFrame\n", "```\n", "\n", "Taking the `dask.datasets.timeseries` example provided by Dask, it generates a time series using timestamps as row labels. The boundaries of each partition are recorded and stored in `.divisions`. The length of `divisions`, denoted as `len(divisions)`, is equal to `npartitions + 1`." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "df.npartitions: 1826\n", "df.divisions: 1827\n" ] } ], "source": [ "ts_df = dask.datasets.timeseries(\"2018-01-01\", \"2023-01-01\")\n", "print(f\"df.npartitions: {ts_df.npartitions}\")\n", "print(f\"df.divisions: {len(ts_df.divisions)}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask DataFrame does not record the number of rows in each partition; therefore, it does not support operations based on global row indexing, such as `iloc`." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "NotImplementedError, 'DataFrame.iloc' only supports selecting columns. It must be used like 'df.iloc[:, column_indexer]'.\n" ] } ], "source": [ "try:\n", " ts_df.iloc[3].compute()\n", "except Exception as e:\n", " print(f\"{type(e).__name__}, {e}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "However, it can support selecting certain columns using column labels; or the `:` wildcard to select all rows:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idx
timestamp
2018-01-01 00:00:00992-0.711756
2018-01-01 00:00:011018-0.838596
2018-01-01 00:00:021000-0.735968
2018-01-01 00:00:0310040.904384
2018-01-01 00:00:0410210.025423
.........
2022-12-31 23:59:5510200.961542
2022-12-31 23:59:56963-0.663948
2022-12-31 23:59:5710100.510401
2022-12-31 23:59:58964-0.882126
2022-12-31 23:59:591020-0.532950
\n", "

157766400 rows × 2 columns

\n", "
" ], "text/plain": [ " id x\n", "timestamp \n", "2018-01-01 00:00:00 992 -0.711756\n", "2018-01-01 00:00:01 1018 -0.838596\n", "2018-01-01 00:00:02 1000 -0.735968\n", "2018-01-01 00:00:03 1004 0.904384\n", "2018-01-01 00:00:04 1021 0.025423\n", "... ... ...\n", "2022-12-31 23:59:55 1020 0.961542\n", "2022-12-31 23:59:56 963 -0.663948\n", "2022-12-31 23:59:57 1010 0.510401\n", "2022-12-31 23:59:58 964 -0.882126\n", "2022-12-31 23:59:59 1020 -0.532950\n", "\n", "[157766400 rows x 2 columns]" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ts_df.iloc[:, [1, 2]].compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For CSV files, Dask DataFrame does not automatically generate `divisions`." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:640: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n", " head = reader(BytesIO(b_sample), nrows=sample_rows, **head_kwargs)\n" ] }, { "data": { "text/plain": [ "(None, None, None, None, None, None, None)" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "folder_path = nyc_flights()\n", "file_path = os.path.join(folder_path, \"nyc-flights\", \"*.csv\")\n", "flights_ddf = dd.read_csv(file_path,\n", " parse_dates={'Date': [0, 1, 2]},\n", " dtype={'TailNum': object,\n", " 'CRSElapsedTime': float,\n", " 'Cancelled': bool})\n", "flights_ddf.divisions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask DataFrame does not record the number of data entries in each partition, Dask DataFrame cannot effectively support certain operations, such as the percentile operation `median()`, because these operations require: (1) sorting the data; (2) locating specific rows." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "NotImplementedError, Dask doesn't implement an exact median in all cases as this is hard to do in parallel. See the `median_approximate` method instead, which uses an approximate algorithm.\n" ] } ], "source": [ "try:\n", " flights_ddf['DepDelay'].median()\n", "except Exception as e:\n", " print(f\"{type(e).__name__}, {e}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set the Index Column\n", "\n", "### `set_index()`\n", "\n", "In Dask DataFrame, we can manually set a column as the index column using the `set_index()` method. This operation not only sets a specific field as the index column but also sorts the global data based on this field, which disrupts the original sorting of data within each partition, thus incurring a high cost.\n", "\n", "The following example illustrates the changes brought about by `set_index()`:" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " col1 col2\n", "0 01 a\n", "1 05 b\n", "2 02 c\n", " col1 col2\n", "3 03 d\n", "4 04 e\n" ] } ], "source": [ "def print_partitions(ddf):\n", " for i in range(ddf.npartitions):\n", " print(ddf.partitions[i].compute())\n", "\n", "df = pd.DataFrame(\n", " {\"col1\": [\"01\", \"05\", \"02\", \"03\", \"04\"], \"col2\": [\"a\", \"b\", \"c\", \"d\", \"e\"]}\n", ")\n", "ddf = dd.from_pandas(df, npartitions=2)\n", "print_partitions(ddf)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2024-04-23 16:05:06,483 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 initialized by task ('shuffle-transfer-008ee90768895dabe7a3e94389222068', 0) executed on worker tcp://127.0.0.1:51911\n", "2024-04-23 16:05:06,505 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 deactivated due to stimulus 'task-finished-1713859506.50483'\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ " col2\n", "col1 \n", "01 a\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "2024-04-23 16:05:06,545 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 initialized by task ('shuffle-transfer-01fddf4f11082a43a6075f7888029dd3', 1) executed on worker tcp://127.0.0.1:51912\n", "2024-04-23 16:05:06,604 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 deactivated due to stimulus 'task-finished-1713859506.6028118'\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ " col2\n", "col1 \n", "02 c\n", "03 d\n", "04 e\n", "05 b\n" ] } ], "source": [ "ddf2 = ddf.set_index(\"col1\")\n", "print_partitions(ddf2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This example sets `col1` as the index column, causing the data in the two partitions to be rearranged. If this is done in a scenario with a large volume of data, the cost of global data sorting and redistribution is extremely high. Therefore, this operation should be avoided if possible. The `set_index()` method also has its advantages; it can speed up downstream computations. Data redistribution, also known as Shuffle, will be discussed in the computational process and cost in section {numref}`sec-dask-dataframe-shuffle`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Regarding the time series data, which uses timestamps as the index column, we use two approaches to `set_index()`. The first approach does not set `divisions`, while the second does.\n", "\n", "The first approach, without setting `divisions`, takes a long time because Dask DataFrame calculates the distribution of data across all partitions and rearranges all partitions based on this distribution. The number of partitions also changes." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2024-04-23 16:05:16,522 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914\n", "2024-04-23 16:05:27,101 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859527.100699'\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "before set_index npartitions: 1826\n", "after set_index npartitions: 165\n", "CPU times: user 6.63 s, sys: 3.65 s, total: 10.3 s\n", "Wall time: 20.6 s\n" ] } ], "source": [ "%%time\n", "ts_df1 = ts_df.set_index(\"id\")\n", "nu = ts_df1.loc[[1001]].name.nunique().compute()\n", "print(f\"before set_index npartitions: {ts_df.npartitions}\")\n", "print(f\"after set_index npartitions: {ts_df1.npartitions}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The second approach pre-obtains the `divisions` and then uses these `divisions` to `set_index()`. The `set_index()` with a defined `division` is faster." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "dask_computed_divisions = ts_df.set_index(\"id\").divisions\n", "unique_divisions = list(dict.fromkeys(list(dask_computed_divisions)))" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2024-04-23 16:05:38,056 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914\n", "2024-04-23 16:05:49,629 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859549.629161'\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 3.24 s, sys: 1.7 s, total: 4.94 s\n", "Wall time: 11.9 s\n" ] } ], "source": [ "%%time\n", "ts_df2 = ts_df.set_index(\"id\", divisions=unique_divisions)\n", "nuids = ts_df2.loc[[1001]].name.nunique().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If the index column is not set and a direct query is performed on the `id` column, it is found to be faster." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 1.88 s, sys: 1.09 s, total: 2.97 s\n", "Wall time: 8.38 s\n" ] } ], "source": [ "%%time\n", "nu = ts_df.loc[ts_df[\"id\"] == 1001].name.nunique().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Therefore, `set_index()` in Dask DataFrame should be used with caution. If there are many following operations after `set_index()`, it may be worth considering using `set_index()`:\n", "\n", "* Filtering on the index column using `loc`\n", "* Merging two Dask DataFrames on the index column (`merge()` method)\n", "* Performing group aggregations on the index column (`groupby()` method)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### `reset_index()`\n", "\n", "In pandas, the `groupby` function defaults to `as_index=True`, which means that the grouping field becomes the index column after the `groupby()` operation. The index column is not a \"formal\" field within the DataFrame. If, after the group aggregation, there is only one \"formal\" field (not considering the index column), the result of the group aggregation becomes a `Series`. For example, in the following pandas example, the `Origin` column is the grouping field. If `as_index=False` is not set, `groupby(\"Origin\", as_index=False)[\"DepDelay\"].mean()` will produce a `Series`." ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/ipykernel_76150/639704942.py:3: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n", " pdf = pd.read_csv(file_path,\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
OriginAvgDepDelay
2LGA5.726304
0EWR6.916220
1JFK9.311532
\n", "
" ], "text/plain": [ " Origin AvgDepDelay\n", "2 LGA 5.726304\n", "0 EWR 6.916220\n", "1 JFK 9.311532" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# pandas\n", "file_path = os.path.join(folder_path, \"1991.csv\")\n", "pdf = pd.read_csv(file_path,\n", " parse_dates={'Date': [0, 1, 2]},\n", " dtype={'TailNum': object,\n", " 'CRSElapsedTime': float,\n", " 'Cancelled': bool})\n", "uncancelled_pdf = pdf[pdf[\"Cancelled\"] == False]\n", "avg_pdf = uncancelled_pdf.groupby(\"Origin\", as_index=False)[\"DepDelay\"].mean()\n", "avg_pdf.columns = [\"Origin\", \"AvgDepDelay\"]\n", "avg_pdf.sort_values(\"AvgDepDelay\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Alternatively, using `reset_index()` can cancel the index column, and the grouping field will then become a formal field in the `DataFrame`." ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
OriginAvgDepDelay
2LGA5.726304
0EWR6.916220
1JFK9.311532
\n", "
" ], "text/plain": [ " Origin AvgDepDelay\n", "2 LGA 5.726304\n", "0 EWR 6.916220\n", "1 JFK 9.311532" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "avg_pdf = uncancelled_pdf.groupby(\"Origin\")[\"DepDelay\"].mean().reset_index()\n", "avg_pdf.columns = [\"Origin\", \"AvgDepDelay\"]\n", "avg_pdf.sort_values(\"AvgDepDelay\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `groupby()` function in Dask DataFrame does not support the `as_index` parameter. Dask DataFrame can only cancel the index column by using `reset_index()`." ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n", " df = reader(bio, **kwargs)\n", "/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n", " df = reader(bio, **kwargs)\n", "/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n", " df = reader(bio, **kwargs)\n", "/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n", " df = reader(bio, **kwargs)\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
OriginAvgDepDelay
2LGA6.944939
0EWR9.997188
1JFK10.766914
\n", "
" ], "text/plain": [ " Origin AvgDepDelay\n", "2 LGA 6.944939\n", "0 EWR 9.997188\n", "1 JFK 10.766914" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "uncancelled_ddf = flights_ddf[flights_ddf[\"Cancelled\"] == False]\n", "avg_ddf = uncancelled_ddf.groupby(\"Origin\")[\"DepDelay\"].mean().reset_index()\n", "avg_ddf.columns = [\"Origin\", \"AvgDepDelay\"]\n", "avg_ddf = avg_ddf.compute()\n", "# pandas 只使用了一年数据,因此结果不一样\n", "avg_ddf.sort_values(\"AvgDepDelay\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client.shutdown()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "dispy", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.7" } }, "nbformat": 4, "nbformat_minor": 2 }