{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"(sec-dask-dataframe-indexing)=\n",
"# Indexing"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"tags": [
"hide-cell"
]
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.\n",
"Perhaps you already have a cluster running?\n",
"Hosting the HTTP server on port 51899 instead\n",
" warnings.warn(\n"
]
}
],
"source": [
"%config InlineBackend.figure_format = 'svg'\n",
"import os\n",
"import sys\n",
"sys.path.append(\"..\")\n",
"from utils import nyc_flights\n",
"\n",
"import dask\n",
"dask.config.set({'dataframe.query-planning': False})\n",
"import dask.dataframe as dd\n",
"import pandas as pd\n",
"from dask.distributed import LocalCluster, Client\n",
"\n",
"cluster = LocalCluster()\n",
"client = Client(cluster)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As depicted in Figure {numref}`fig-pandas-dataframe-model`, the pandas DataFrame primarily processes two-dimensional tables with column labels and row labels. Row labels are often overlooked by users but actually play a crucial role: indexing. Most of the row labels in a pandas DataFrame are ordered indices, such as incrementing from 0. This ordered indexing ensures that the data within the pandas DataFrame is sequential.\n",
"\n",
"```{figure} ../img/ch-dask-dataframe/dataframe-model.svg\n",
"---\n",
"width: 200px\n",
"name: fig-pandas-dataframe-model\n",
"---\n",
"pandas DataFrame Data Model\n",
"```\n",
"\n",
"When creating a pandas DataFrame, an index column is automatically generated on the far left. As can be seen from the following example, the index column does not have a column name and cannot be considered a \"field\"; it is an additional column."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" A | \n",
" B | \n",
" C | \n",
" D | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" foo | \n",
" one | \n",
" 1 | \n",
" 10 | \n",
"
\n",
" \n",
" 1 | \n",
" bar | \n",
" one | \n",
" 2 | \n",
" 20 | \n",
"
\n",
" \n",
" 2 | \n",
" baz | \n",
" two | \n",
" 3 | \n",
" 30 | \n",
"
\n",
" \n",
" 3 | \n",
" qux | \n",
" three | \n",
" 4 | \n",
" 40 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" A B C D\n",
"0 foo one 1 10\n",
"1 bar one 2 20\n",
"2 baz two 3 30\n",
"3 qux three 4 40"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = pd.DataFrame({\n",
" 'A': ['foo', 'bar', 'baz', 'qux'],\n",
" 'B': ['one', 'one', 'two', 'three'],\n",
" 'C': [1, 2, 3, 4],\n",
" 'D': [10, 20, 30, 40]\n",
"})\n",
"df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can also specify a field as the index column:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" B | \n",
" C | \n",
" D | \n",
"
\n",
" \n",
" A | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" foo | \n",
" one | \n",
" 1 | \n",
" 10 | \n",
"
\n",
" \n",
" bar | \n",
" one | \n",
" 2 | \n",
" 20 | \n",
"
\n",
" \n",
" baz | \n",
" two | \n",
" 3 | \n",
" 30 | \n",
"
\n",
" \n",
" qux | \n",
" three | \n",
" 4 | \n",
" 40 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" B C D\n",
"A \n",
"foo one 1 10\n",
"bar one 2 20\n",
"baz two 3 30\n",
"qux three 4 40"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = df.set_index('A')\n",
"df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Or reset it:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" A | \n",
" B | \n",
" C | \n",
" D | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" foo | \n",
" one | \n",
" 1 | \n",
" 10 | \n",
"
\n",
" \n",
" 1 | \n",
" bar | \n",
" one | \n",
" 2 | \n",
" 20 | \n",
"
\n",
" \n",
" 2 | \n",
" baz | \n",
" two | \n",
" 3 | \n",
" 30 | \n",
"
\n",
" \n",
" 3 | \n",
" qux | \n",
" three | \n",
" 4 | \n",
" 40 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" A B C D\n",
"0 foo one 1 10\n",
"1 bar one 2 20\n",
"2 baz two 3 30\n",
"3 qux three 4 40"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = df.reset_index()\n",
"df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Ordered Row Indexing\n",
"\n",
"A Dask DataFrame is composed of multiple pandas DataFrames, but maintaining the row labels and row order across the entire Dask DataFrame globally presents a significant challenge. Dask DataFrame does not deliberately preserve global orderliness, which also prevents it from supporting all functionalities of the pandas DataFrame.\n",
"\n",
"As shown in Figure {numref}`fig-dask-dataframe-divisions`, the Dask DataFrame has `divisions` when it is divided.\n",
"\n",
"```{figure} ../img/ch-dask-dataframe/divisions.svg\n",
"---\n",
"width: 400px\n",
"name: fig-dask-dataframe-divisions\n",
"---\n",
"`divisions` in Dask DataFrame\n",
"```\n",
"\n",
"Taking the `dask.datasets.timeseries` example provided by Dask, it generates a time series using timestamps as row labels. The boundaries of each partition are recorded and stored in `.divisions`. The length of `divisions`, denoted as `len(divisions)`, is equal to `npartitions + 1`."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"df.npartitions: 1826\n",
"df.divisions: 1827\n"
]
}
],
"source": [
"ts_df = dask.datasets.timeseries(\"2018-01-01\", \"2023-01-01\")\n",
"print(f\"df.npartitions: {ts_df.npartitions}\")\n",
"print(f\"df.divisions: {len(ts_df.divisions)}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dask DataFrame does not record the number of rows in each partition; therefore, it does not support operations based on global row indexing, such as `iloc`."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"NotImplementedError, 'DataFrame.iloc' only supports selecting columns. It must be used like 'df.iloc[:, column_indexer]'.\n"
]
}
],
"source": [
"try:\n",
" ts_df.iloc[3].compute()\n",
"except Exception as e:\n",
" print(f\"{type(e).__name__}, {e}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"However, it can support selecting certain columns using column labels; or the `:` wildcard to select all rows:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" x | \n",
"
\n",
" \n",
" timestamp | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2018-01-01 00:00:00 | \n",
" 992 | \n",
" -0.711756 | \n",
"
\n",
" \n",
" 2018-01-01 00:00:01 | \n",
" 1018 | \n",
" -0.838596 | \n",
"
\n",
" \n",
" 2018-01-01 00:00:02 | \n",
" 1000 | \n",
" -0.735968 | \n",
"
\n",
" \n",
" 2018-01-01 00:00:03 | \n",
" 1004 | \n",
" 0.904384 | \n",
"
\n",
" \n",
" 2018-01-01 00:00:04 | \n",
" 1021 | \n",
" 0.025423 | \n",
"
\n",
" \n",
" ... | \n",
" ... | \n",
" ... | \n",
"
\n",
" \n",
" 2022-12-31 23:59:55 | \n",
" 1020 | \n",
" 0.961542 | \n",
"
\n",
" \n",
" 2022-12-31 23:59:56 | \n",
" 963 | \n",
" -0.663948 | \n",
"
\n",
" \n",
" 2022-12-31 23:59:57 | \n",
" 1010 | \n",
" 0.510401 | \n",
"
\n",
" \n",
" 2022-12-31 23:59:58 | \n",
" 964 | \n",
" -0.882126 | \n",
"
\n",
" \n",
" 2022-12-31 23:59:59 | \n",
" 1020 | \n",
" -0.532950 | \n",
"
\n",
" \n",
"
\n",
"
157766400 rows × 2 columns
\n",
"
"
],
"text/plain": [
" id x\n",
"timestamp \n",
"2018-01-01 00:00:00 992 -0.711756\n",
"2018-01-01 00:00:01 1018 -0.838596\n",
"2018-01-01 00:00:02 1000 -0.735968\n",
"2018-01-01 00:00:03 1004 0.904384\n",
"2018-01-01 00:00:04 1021 0.025423\n",
"... ... ...\n",
"2022-12-31 23:59:55 1020 0.961542\n",
"2022-12-31 23:59:56 963 -0.663948\n",
"2022-12-31 23:59:57 1010 0.510401\n",
"2022-12-31 23:59:58 964 -0.882126\n",
"2022-12-31 23:59:59 1020 -0.532950\n",
"\n",
"[157766400 rows x 2 columns]"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ts_df.iloc[:, [1, 2]].compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For CSV files, Dask DataFrame does not automatically generate `divisions`."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:640: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n",
" head = reader(BytesIO(b_sample), nrows=sample_rows, **head_kwargs)\n"
]
},
{
"data": {
"text/plain": [
"(None, None, None, None, None, None, None)"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"folder_path = nyc_flights()\n",
"file_path = os.path.join(folder_path, \"nyc-flights\", \"*.csv\")\n",
"flights_ddf = dd.read_csv(file_path,\n",
" parse_dates={'Date': [0, 1, 2]},\n",
" dtype={'TailNum': object,\n",
" 'CRSElapsedTime': float,\n",
" 'Cancelled': bool})\n",
"flights_ddf.divisions"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Dask DataFrame does not record the number of data entries in each partition, Dask DataFrame cannot effectively support certain operations, such as the percentile operation `median()`, because these operations require: (1) sorting the data; (2) locating specific rows."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"NotImplementedError, Dask doesn't implement an exact median in all cases as this is hard to do in parallel. See the `median_approximate` method instead, which uses an approximate algorithm.\n"
]
}
],
"source": [
"try:\n",
" flights_ddf['DepDelay'].median()\n",
"except Exception as e:\n",
" print(f\"{type(e).__name__}, {e}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set the Index Column\n",
"\n",
"### `set_index()`\n",
"\n",
"In Dask DataFrame, we can manually set a column as the index column using the `set_index()` method. This operation not only sets a specific field as the index column but also sorts the global data based on this field, which disrupts the original sorting of data within each partition, thus incurring a high cost.\n",
"\n",
"The following example illustrates the changes brought about by `set_index()`:"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" col1 col2\n",
"0 01 a\n",
"1 05 b\n",
"2 02 c\n",
" col1 col2\n",
"3 03 d\n",
"4 04 e\n"
]
}
],
"source": [
"def print_partitions(ddf):\n",
" for i in range(ddf.npartitions):\n",
" print(ddf.partitions[i].compute())\n",
"\n",
"df = pd.DataFrame(\n",
" {\"col1\": [\"01\", \"05\", \"02\", \"03\", \"04\"], \"col2\": [\"a\", \"b\", \"c\", \"d\", \"e\"]}\n",
")\n",
"ddf = dd.from_pandas(df, npartitions=2)\n",
"print_partitions(ddf)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-04-23 16:05:06,483 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 initialized by task ('shuffle-transfer-008ee90768895dabe7a3e94389222068', 0) executed on worker tcp://127.0.0.1:51911\n",
"2024-04-23 16:05:06,505 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 deactivated due to stimulus 'task-finished-1713859506.50483'\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
" col2\n",
"col1 \n",
"01 a\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-04-23 16:05:06,545 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 initialized by task ('shuffle-transfer-01fddf4f11082a43a6075f7888029dd3', 1) executed on worker tcp://127.0.0.1:51912\n",
"2024-04-23 16:05:06,604 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 deactivated due to stimulus 'task-finished-1713859506.6028118'\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
" col2\n",
"col1 \n",
"02 c\n",
"03 d\n",
"04 e\n",
"05 b\n"
]
}
],
"source": [
"ddf2 = ddf.set_index(\"col1\")\n",
"print_partitions(ddf2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This example sets `col1` as the index column, causing the data in the two partitions to be rearranged. If this is done in a scenario with a large volume of data, the cost of global data sorting and redistribution is extremely high. Therefore, this operation should be avoided if possible. The `set_index()` method also has its advantages; it can speed up downstream computations. Data redistribution, also known as Shuffle, will be discussed in the computational process and cost in section {numref}`sec-dask-dataframe-shuffle`."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Regarding the time series data, which uses timestamps as the index column, we use two approaches to `set_index()`. The first approach does not set `divisions`, while the second does.\n",
"\n",
"The first approach, without setting `divisions`, takes a long time because Dask DataFrame calculates the distribution of data across all partitions and rearranges all partitions based on this distribution. The number of partitions also changes."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-04-23 16:05:16,522 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914\n",
"2024-04-23 16:05:27,101 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859527.100699'\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"before set_index npartitions: 1826\n",
"after set_index npartitions: 165\n",
"CPU times: user 6.63 s, sys: 3.65 s, total: 10.3 s\n",
"Wall time: 20.6 s\n"
]
}
],
"source": [
"%%time\n",
"ts_df1 = ts_df.set_index(\"id\")\n",
"nu = ts_df1.loc[[1001]].name.nunique().compute()\n",
"print(f\"before set_index npartitions: {ts_df.npartitions}\")\n",
"print(f\"after set_index npartitions: {ts_df1.npartitions}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The second approach pre-obtains the `divisions` and then uses these `divisions` to `set_index()`. The `set_index()` with a defined `division` is faster."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"dask_computed_divisions = ts_df.set_index(\"id\").divisions\n",
"unique_divisions = list(dict.fromkeys(list(dask_computed_divisions)))"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-04-23 16:05:38,056 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914\n",
"2024-04-23 16:05:49,629 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859549.629161'\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 3.24 s, sys: 1.7 s, total: 4.94 s\n",
"Wall time: 11.9 s\n"
]
}
],
"source": [
"%%time\n",
"ts_df2 = ts_df.set_index(\"id\", divisions=unique_divisions)\n",
"nuids = ts_df2.loc[[1001]].name.nunique().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If the index column is not set and a direct query is performed on the `id` column, it is found to be faster."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.88 s, sys: 1.09 s, total: 2.97 s\n",
"Wall time: 8.38 s\n"
]
}
],
"source": [
"%%time\n",
"nu = ts_df.loc[ts_df[\"id\"] == 1001].name.nunique().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Therefore, `set_index()` in Dask DataFrame should be used with caution. If there are many following operations after `set_index()`, it may be worth considering using `set_index()`:\n",
"\n",
"* Filtering on the index column using `loc`\n",
"* Merging two Dask DataFrames on the index column (`merge()` method)\n",
"* Performing group aggregations on the index column (`groupby()` method)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### `reset_index()`\n",
"\n",
"In pandas, the `groupby` function defaults to `as_index=True`, which means that the grouping field becomes the index column after the `groupby()` operation. The index column is not a \"formal\" field within the DataFrame. If, after the group aggregation, there is only one \"formal\" field (not considering the index column), the result of the group aggregation becomes a `Series`. For example, in the following pandas example, the `Origin` column is the grouping field. If `as_index=False` is not set, `groupby(\"Origin\", as_index=False)[\"DepDelay\"].mean()` will produce a `Series`."
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/ipykernel_76150/639704942.py:3: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n",
" pdf = pd.read_csv(file_path,\n"
]
},
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" Origin | \n",
" AvgDepDelay | \n",
"
\n",
" \n",
" \n",
" \n",
" 2 | \n",
" LGA | \n",
" 5.726304 | \n",
"
\n",
" \n",
" 0 | \n",
" EWR | \n",
" 6.916220 | \n",
"
\n",
" \n",
" 1 | \n",
" JFK | \n",
" 9.311532 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" Origin AvgDepDelay\n",
"2 LGA 5.726304\n",
"0 EWR 6.916220\n",
"1 JFK 9.311532"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# pandas\n",
"file_path = os.path.join(folder_path, \"1991.csv\")\n",
"pdf = pd.read_csv(file_path,\n",
" parse_dates={'Date': [0, 1, 2]},\n",
" dtype={'TailNum': object,\n",
" 'CRSElapsedTime': float,\n",
" 'Cancelled': bool})\n",
"uncancelled_pdf = pdf[pdf[\"Cancelled\"] == False]\n",
"avg_pdf = uncancelled_pdf.groupby(\"Origin\", as_index=False)[\"DepDelay\"].mean()\n",
"avg_pdf.columns = [\"Origin\", \"AvgDepDelay\"]\n",
"avg_pdf.sort_values(\"AvgDepDelay\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Alternatively, using `reset_index()` can cancel the index column, and the grouping field will then become a formal field in the `DataFrame`."
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" Origin | \n",
" AvgDepDelay | \n",
"
\n",
" \n",
" \n",
" \n",
" 2 | \n",
" LGA | \n",
" 5.726304 | \n",
"
\n",
" \n",
" 0 | \n",
" EWR | \n",
" 6.916220 | \n",
"
\n",
" \n",
" 1 | \n",
" JFK | \n",
" 9.311532 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" Origin AvgDepDelay\n",
"2 LGA 5.726304\n",
"0 EWR 6.916220\n",
"1 JFK 9.311532"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"avg_pdf = uncancelled_pdf.groupby(\"Origin\")[\"DepDelay\"].mean().reset_index()\n",
"avg_pdf.columns = [\"Origin\", \"AvgDepDelay\"]\n",
"avg_pdf.sort_values(\"AvgDepDelay\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The `groupby()` function in Dask DataFrame does not support the `as_index` parameter. Dask DataFrame can only cancel the index column by using `reset_index()`."
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n",
" df = reader(bio, **kwargs)\n",
"/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n",
" df = reader(bio, **kwargs)\n",
"/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n",
" df = reader(bio, **kwargs)\n",
"/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n",
" df = reader(bio, **kwargs)\n"
]
},
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" Origin | \n",
" AvgDepDelay | \n",
"
\n",
" \n",
" \n",
" \n",
" 2 | \n",
" LGA | \n",
" 6.944939 | \n",
"
\n",
" \n",
" 0 | \n",
" EWR | \n",
" 9.997188 | \n",
"
\n",
" \n",
" 1 | \n",
" JFK | \n",
" 10.766914 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" Origin AvgDepDelay\n",
"2 LGA 6.944939\n",
"0 EWR 9.997188\n",
"1 JFK 10.766914"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"uncancelled_ddf = flights_ddf[flights_ddf[\"Cancelled\"] == False]\n",
"avg_ddf = uncancelled_ddf.groupby(\"Origin\")[\"DepDelay\"].mean().reset_index()\n",
"avg_ddf.columns = [\"Origin\", \"AvgDepDelay\"]\n",
"avg_ddf = avg_ddf.compute()\n",
"# pandas 只使用了一年数据,因此结果不一样\n",
"avg_ddf.sort_values(\"AvgDepDelay\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client.shutdown()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "dispy",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}