4.2. Indexing#

Hide code cell content
%config InlineBackend.figure_format = 'svg'
import os
import sys
sys.path.append("..")
from utils import nyc_flights

import dask
dask.config.set({'dataframe.query-planning': False})
import dask.dataframe as dd
import pandas as pd
from dask.distributed import LocalCluster, Client

cluster = LocalCluster()
client = Client(cluster)
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 51899 instead
  warnings.warn(

As depicted in Figure Fig. 4.2, the pandas DataFrame primarily processes two-dimensional tables with column labels and row labels. Row labels are often overlooked by users but actually play a crucial role: indexing. Most of the row labels in a pandas DataFrame are ordered indices, such as incrementing from 0. This ordered indexing ensures that the data within the pandas DataFrame is sequential.

../_images/dataframe-model.svg

Fig. 4.2 pandas DataFrame Data Model#

When creating a pandas DataFrame, an index column is automatically generated on the far left. As can be seen from the following example, the index column does not have a column name and cannot be considered a “field”; it is an additional column.

df = pd.DataFrame({
   'A': ['foo', 'bar', 'baz', 'qux'],
   'B': ['one', 'one', 'two', 'three'],
   'C': [1, 2, 3, 4],
   'D': [10, 20, 30, 40]
})
df
A B C D
0 foo one 1 10
1 bar one 2 20
2 baz two 3 30
3 qux three 4 40

You can also specify a field as the index column:

df = df.set_index('A')
df
B C D
A
foo one 1 10
bar one 2 20
baz two 3 30
qux three 4 40

Or reset it:

df = df.reset_index()
df
A B C D
0 foo one 1 10
1 bar one 2 20
2 baz two 3 30
3 qux three 4 40

Ordered Row Indexing#

A Dask DataFrame is composed of multiple pandas DataFrames, but maintaining the row labels and row order across the entire Dask DataFrame globally presents a significant challenge. Dask DataFrame does not deliberately preserve global orderliness, which also prevents it from supporting all functionalities of the pandas DataFrame.

As shown in Figure Fig. 4.3, the Dask DataFrame has divisions when it is divided.

../_images/divisions.svg

Fig. 4.3 divisions in Dask DataFrame#

Taking the dask.datasets.timeseries example provided by Dask, it generates a time series using timestamps as row labels. The boundaries of each partition are recorded and stored in .divisions. The length of divisions, denoted as len(divisions), is equal to npartitions + 1.

ts_df = dask.datasets.timeseries("2018-01-01", "2023-01-01")
print(f"df.npartitions: {ts_df.npartitions}")
print(f"df.divisions: {len(ts_df.divisions)}")
df.npartitions: 1826
df.divisions: 1827

Dask DataFrame does not record the number of rows in each partition; therefore, it does not support operations based on global row indexing, such as iloc.

try:
    ts_df.iloc[3].compute()
except Exception as e:
    print(f"{type(e).__name__}, {e}")
NotImplementedError, 'DataFrame.iloc' only supports selecting columns. It must be used like 'df.iloc[:, column_indexer]'.

However, it can support selecting certain columns using column labels; or the : wildcard to select all rows:

ts_df.iloc[:, [1, 2]].compute()
id x
timestamp
2018-01-01 00:00:00 992 -0.711756
2018-01-01 00:00:01 1018 -0.838596
2018-01-01 00:00:02 1000 -0.735968
2018-01-01 00:00:03 1004 0.904384
2018-01-01 00:00:04 1021 0.025423
... ... ...
2022-12-31 23:59:55 1020 0.961542
2022-12-31 23:59:56 963 -0.663948
2022-12-31 23:59:57 1010 0.510401
2022-12-31 23:59:58 964 -0.882126
2022-12-31 23:59:59 1020 -0.532950

157766400 rows × 2 columns

For CSV files, Dask DataFrame does not automatically generate divisions.

folder_path = nyc_flights()
file_path = os.path.join(folder_path, "nyc-flights", "*.csv")
flights_ddf = dd.read_csv(file_path,
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': object,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})
flights_ddf.divisions
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:640: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
  head = reader(BytesIO(b_sample), nrows=sample_rows, **head_kwargs)
(None, None, None, None, None, None, None)

Dask DataFrame does not record the number of data entries in each partition, Dask DataFrame cannot effectively support certain operations, such as the percentile operation median(), because these operations require: (1) sorting the data; (2) locating specific rows.

try:
    flights_ddf['DepDelay'].median()
except Exception as e:
    print(f"{type(e).__name__}, {e}")
NotImplementedError, Dask doesn't implement an exact median in all cases as this is hard to do in parallel. See the `median_approximate` method instead, which uses an approximate algorithm.

Set the Index Column#

set_index()#

In Dask DataFrame, we can manually set a column as the index column using the set_index() method. This operation not only sets a specific field as the index column but also sorts the global data based on this field, which disrupts the original sorting of data within each partition, thus incurring a high cost.

The following example illustrates the changes brought about by set_index():

def print_partitions(ddf):
    for i in range(ddf.npartitions):
        print(ddf.partitions[i].compute())

df = pd.DataFrame(
    {"col1": ["01", "05", "02", "03", "04"], "col2": ["a", "b", "c", "d", "e"]}
)
ddf = dd.from_pandas(df, npartitions=2)
print_partitions(ddf)
  col1 col2
0   01    a
1   05    b
2   02    c
  col1 col2
3   03    d
4   04    e
ddf2 = ddf.set_index("col1")
print_partitions(ddf2)
2024-04-23 16:05:06,483 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 initialized by task ('shuffle-transfer-008ee90768895dabe7a3e94389222068', 0) executed on worker tcp://127.0.0.1:51911
2024-04-23 16:05:06,505 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 deactivated due to stimulus 'task-finished-1713859506.50483'
     col2
col1     
01      a
2024-04-23 16:05:06,545 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 initialized by task ('shuffle-transfer-01fddf4f11082a43a6075f7888029dd3', 1) executed on worker tcp://127.0.0.1:51912
2024-04-23 16:05:06,604 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 deactivated due to stimulus 'task-finished-1713859506.6028118'
     col2
col1     
02      c
03      d
04      e
05      b

This example sets col1 as the index column, causing the data in the two partitions to be rearranged. If this is done in a scenario with a large volume of data, the cost of global data sorting and redistribution is extremely high. Therefore, this operation should be avoided if possible. The set_index() method also has its advantages; it can speed up downstream computations. Data redistribution, also known as Shuffle, will be discussed in the computational process and cost in section Section 4.4.

Regarding the time series data, which uses timestamps as the index column, we use two approaches to set_index(). The first approach does not set divisions, while the second does.

The first approach, without setting divisions, takes a long time because Dask DataFrame calculates the distribution of data across all partitions and rearranges all partitions based on this distribution. The number of partitions also changes.

%%time
ts_df1 = ts_df.set_index("id")
nu =  ts_df1.loc[[1001]].name.nunique().compute()
print(f"before set_index npartitions: {ts_df.npartitions}")
print(f"after set_index npartitions: {ts_df1.npartitions}")
2024-04-23 16:05:16,522 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914
2024-04-23 16:05:27,101 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859527.100699'
before set_index npartitions: 1826
after set_index npartitions: 165
CPU times: user 6.63 s, sys: 3.65 s, total: 10.3 s
Wall time: 20.6 s

The second approach pre-obtains the divisions and then uses these divisions to set_index(). The set_index() with a defined division is faster.

dask_computed_divisions = ts_df.set_index("id").divisions
unique_divisions = list(dict.fromkeys(list(dask_computed_divisions)))
%%time
ts_df2 = ts_df.set_index("id", divisions=unique_divisions)
nuids = ts_df2.loc[[1001]].name.nunique().compute()
2024-04-23 16:05:38,056 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914
2024-04-23 16:05:49,629 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859549.629161'
CPU times: user 3.24 s, sys: 1.7 s, total: 4.94 s
Wall time: 11.9 s

If the index column is not set and a direct query is performed on the id column, it is found to be faster.

%%time
nu = ts_df.loc[ts_df["id"] == 1001].name.nunique().compute()
CPU times: user 1.88 s, sys: 1.09 s, total: 2.97 s
Wall time: 8.38 s

Therefore, set_index() in Dask DataFrame should be used with caution. If there are many following operations after set_index(), it may be worth considering using set_index():

  • Filtering on the index column using loc

  • Merging two Dask DataFrames on the index column (merge() method)

  • Performing group aggregations on the index column (groupby() method)

reset_index()#

In pandas, the groupby function defaults to as_index=True, which means that the grouping field becomes the index column after the groupby() operation. The index column is not a “formal” field within the DataFrame. If, after the group aggregation, there is only one “formal” field (not considering the index column), the result of the group aggregation becomes a Series. For example, in the following pandas example, the Origin column is the grouping field. If as_index=False is not set, groupby("Origin", as_index=False)["DepDelay"].mean() will produce a Series.

# pandas
file_path = os.path.join(folder_path, "1991.csv")
pdf = pd.read_csv(file_path,
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': object,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})
uncancelled_pdf = pdf[pdf["Cancelled"] == False]
avg_pdf = uncancelled_pdf.groupby("Origin", as_index=False)["DepDelay"].mean()
avg_pdf.columns = ["Origin", "AvgDepDelay"]
avg_pdf.sort_values("AvgDepDelay")
/var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/ipykernel_76150/639704942.py:3: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
  pdf = pd.read_csv(file_path,
Origin AvgDepDelay
2 LGA 5.726304
0 EWR 6.916220
1 JFK 9.311532

Alternatively, using reset_index() can cancel the index column, and the grouping field will then become a formal field in the DataFrame.

avg_pdf = uncancelled_pdf.groupby("Origin")["DepDelay"].mean().reset_index()
avg_pdf.columns = ["Origin", "AvgDepDelay"]
avg_pdf.sort_values("AvgDepDelay")
Origin AvgDepDelay
2 LGA 5.726304
0 EWR 6.916220
1 JFK 9.311532

The groupby() function in Dask DataFrame does not support the as_index parameter. Dask DataFrame can only cancel the index column by using reset_index().

uncancelled_ddf = flights_ddf[flights_ddf["Cancelled"] == False]
avg_ddf = uncancelled_ddf.groupby("Origin")["DepDelay"].mean().reset_index()
avg_ddf.columns = ["Origin", "AvgDepDelay"]
avg_ddf = avg_ddf.compute()
# pandas 只使用了一年数据,因此结果不一样
avg_ddf.sort_values("AvgDepDelay")
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
  df = reader(bio, **kwargs)
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
  df = reader(bio, **kwargs)
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
  df = reader(bio, **kwargs)
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
  df = reader(bio, **kwargs)
Origin AvgDepDelay
2 LGA 6.944939
0 EWR 9.997188
1 JFK 10.766914
client.shutdown()