8.5. Modin#
Ray Data’s data processing tools are relatively simple, capable of performing only some basic transformations, and may not offer a wide range of complex data processing functionalities. Modin is a framework aimed at enhancing the performance of pandas by distributing large data sets across multiple cores and clusters, achieving efficient processing of large datasets. Initially, Modin used Ray as its underlying distributed execution engine, sometimes referred to as pandas on Ray. Subsequently, Modin integrated Dask and unidist execution engines, with unidist being a distributed execution engine developed by the Modin team.
When installing Modin, users can choose and install the appropriate execution engine according to their needs, for example, by using pip install "modin[ray]"
or pip install "modin[dask]"
. Modin defaults to using Ray as its execution engine.
API Compatibility#
Dask DataFrame differs from pandas DataFrames, and many pandas workflows cannot be quickly migrated to Dask DataFrames. Modin places a strong emphasis on compatibility with pandas, allowing users to quickly migrate most pandas workflows to Modin by using import modin.pandas as pd
.
Dask DataFrame splits large data by columns and does not keep track of the data volume in each partition. In contrast, Modin splits data across multiple dimensions and retains row and column labels. Modin supports iloc()
for row indexing; it keeps track of the data volume in each data block, thus supporting operations like median()
, quantile()
; and also supports row and column transformations, such as pivot()
, transpose()
, etc. For more on Modin’s design, refer to its two papers [Petersohn et al., 2020] [Petersohn et al., 2021].
import os
import sys
sys.path.append("..")
from utils import nyc_flights
folder_path = nyc_flights()
file_path = os.path.join(folder_path, "nyc-flights", "*.csv")
Downloading nyc-flights.zip
Note
The APIs of Modin aim to be consistent with that of pandas. For example, the read_csv()
function in pandas can only read a single file and does not support wildcards like *.csv
.
Modin has extended the read_csv()
function and introduces the read_csv_glob()
method. This method can read files with wildcards such as *.csv
, which is suitable for processing large-scale datasets.
These newly added APIs are in the modin.experimental.pandas
module.
import modin.experimental.pandas as pd
df = pd.read_csv_glob(file_path, parse_dates={'Date': [0, 1, 2]})
df.iloc[3]
2025-01-07 11:11:17,335 INFO worker.py:1749 -- Started a local Ray instance.
FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
UserWarning: `read_*` implementation has mismatches with pandas:
Data types of partitions are different! Please refer to the troubleshooting section of the Modin documentation to fix this issue.
Date 1991-01-11 00:00:00
DayOfWeek 5
DepTime 1303.0
CRSDepTime 1215
ArrTime 1439.0
CRSArrTime 1336
UniqueCarrier US
FlightNum 121
TailNum NaN
ActualElapsedTime 96.0
CRSElapsedTime 81
AirTime NaN
ArrDelay 63.0
DepDelay 48.0
Origin EWR
Dest PIT
Distance 319.0
TaxiIn NaN
TaxiOut NaN
Cancelled 0
Diverted 0
Name: 3, dtype: object
df['ArrDelay'].median()
0.0
If certain APIs have not been implemented in Modin yet, Modin will fallback to pandas to ensure compatibility. However, there are also obvious drawbacks in this design: there will be additional overhead when converting a Modin DataFrame to a pandas DataFrame. If the DataFrame is originally distributed across multiple nodes, the data will be moved into the single machine when converting the Modin DataFrame to the pandas DataFrame, which may exceed the memory capacity of the single machine.
Eager Execution#
The computation in Modin is executed immediately, which is consistent with pandas. Users don’t need to call the .compute()
method like Dask DataFrame to trigger the computation. Modin doesn’t require the data type inference like Dask DataFrame. In the example of aircraft takeoff and landing data mentioned in Section 4.1, the tail()
method of Dask DataFrame may throw an exception, while Modin can provide the same semantic as pandas.
df.tail(3)
Date | DayOfWeek | DepTime | CRSDepTime | ArrTime | CRSArrTime | UniqueCarrier | FlightNum | TailNum | ActualElapsedTime | ... | AirTime | ArrDelay | DepDelay | Origin | Dest | Distance | TaxiIn | TaxiOut | Cancelled | Diverted | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1555982 | 1994-12-27 | 2 | 1721.0 | 1715 | 1930.0 | 1945 | DL | 149 | NaN | 129.0 | ... | NaN | -15.0 | 6.0 | JFK | ATL | 760.0 | NaN | NaN | 0 | 0 |
1555983 | 1994-12-28 | 3 | 1715.0 | 1715 | 1934.0 | 1945 | DL | 149 | NaN | 139.0 | ... | NaN | -11.0 | 0.0 | JFK | ATL | 760.0 | NaN | NaN | 0 | 0 |
1555984 | 1994-12-29 | 4 | 1715.0 | 1715 | 1941.0 | 1945 | DL | 149 | NaN | 146.0 | ... | NaN | -4.0 | 0.0 | JFK | ATL | 760.0 | NaN | NaN | 0 | 0 |
3 rows x 21 columns
Execution Engines#
Modin supports distributed execution engines including Ray, Dask and unidist, which can make use of multi-cores on a single machine and also run on a cluster. Taking Ray as an example, users can submit jobs to a Ray cluster. After initializing the Ray runtime in the code with ray.init(address="auto")
, the job will be run on the Ray cluster.
By default, Modin uses Ray as the execution backend. The execution backend can also be set through the environment variable MODIN_ENGINE
. In the command line, you can use export MODIN_ENGINE=dask
; or in a Jupyter Notebook:
import modin.config as modin_cfg
modin_cfg.Engine.put("ray")
unidist is an execution backend implemented by Modin itself. It supports MPI. If you want to use unidist MPI, besides setting MODIN_ENGINE
, you also need to set UNIDIST_BACKEND
:
export MODIN_ENGINE=unidist
export UNIDIST_BACKEND=mpi
Case Study: New York Taxi Data#
We will use Modin to conduct data analysis on NYC taxi data.
%matplotlib inline
import matplotlib_inline
matplotlib_inline.backend_inline.set_matplotlib_formats('svg')
import matplotlib.pyplot as plt
from utils import nyc_taxi
taxi_path = nyc_taxi()
Downloading yellow_tripdata_2023-01.parquet
Downloading yellow_tripdata_2023-02.parquet
Downloading yellow_tripdata_2023-03.parquet
Downloading yellow_tripdata_2023-04.parquet
Downloading yellow_tripdata_2023-05.parquet
Downloading yellow_tripdata_2023-06.parquet
We first read the data. We can use the read_parquet_glob()
method along with the wildcard *
to directly read multiple Parquet files. Here, we will just use the read_parquet()
method instead.
df = pd.read_parquet(os.path.join(taxi_path, "yellow_tripdata_2023-01.parquet"))
df.head()
FutureWarning: Passing 'use_legacy_dataset' is deprecated as of pyarrow 15.0.0 and will be removed in a future version.
VendorID | tpep_pickup_datetime | tpep_dropoff_datetime | passenger_count | trip_distance | RatecodeID | store_and_fwd_flag | PULocationID | DOLocationID | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | airport_fee | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2 | 2023-01-01 00:32:10 | 2023-01-01 00:40:36 | 1.0 | 0.97 | 1.0 | N | 161 | 141 | 2 | 9.3 | 1.00 | 0.5 | 0.00 | 0.0 | 1.0 | 14.30 | 2.5 | 0.00 |
1 | 2 | 2023-01-01 00:55:08 | 2023-01-01 01:01:27 | 1.0 | 1.10 | 1.0 | N | 43 | 237 | 1 | 7.9 | 1.00 | 0.5 | 4.00 | 0.0 | 1.0 | 16.90 | 2.5 | 0.00 |
2 | 2 | 2023-01-01 00:25:04 | 2023-01-01 00:37:49 | 1.0 | 2.51 | 1.0 | N | 48 | 238 | 1 | 14.9 | 1.00 | 0.5 | 15.00 | 0.0 | 1.0 | 34.90 | 2.5 | 0.00 |
3 | 1 | 2023-01-01 00:03:48 | 2023-01-01 00:13:25 | 0.0 | 1.90 | 1.0 | N | 138 | 7 | 1 | 12.1 | 7.25 | 0.5 | 0.00 | 0.0 | 1.0 | 20.85 | 0.0 | 1.25 |
4 | 2 | 2023-01-01 00:10:29 | 2023-01-01 00:21:19 | 1.0 | 1.43 | 1.0 | N | 107 | 79 | 1 | 11.4 | 1.00 | 0.5 | 3.28 | 0.0 | 1.0 | 19.68 | 2.5 | 0.00 |
Next, we will show the process of data preprocessing.
df = df.dropna(subset=['total_amount', 'RatecodeID'])
# Convert data types
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
# Calculate the trip duration
df['trip_duration'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.total_seconds() / 60
Next, we will conduct filtering on the data and use the groupby()
method.
# Select trips with non-zero passengers
non_zero_passenger_df = df[df['passenger_count'] > 0]
average_trip_duration = non_zero_passenger_df['trip_duration'].mean()
print("Average trip duration (with non-zero passengers):")
print(average_trip_duration)
# Groupby payment method and calculate the distribution of the total cost
total_amount_by_payment = non_zero_passenger_df.groupby('payment_type')['total_amount'].sum()
print("Total cost grouped by payment method:")
print(total_amount_by_payment)
# Sort the data grouped by payment method
sorted_total_by_payment = total_amount_by_payment.sort_values(ascending=False)
print("\nPayment methods sorted in descending order by total cost:")
print(sorted_total_by_payment)
# Group by the number of passengers and calculate the average trip distance and total cost
avg_distance_amount_by_passenger = df.groupby('passenger_count').agg({
'trip_distance': 'mean',
'total_amount': 'mean'
})
print("\nAverage trip distance and total cost grouped by the number of passengers:")
print(avg_distance_amount_by_passenger)
Average trip duration (with non-zero passengers):
15.68557239550762
Total cost grouped by payment method:
payment_type
1 67232359.58
2 12062049.01
3 165402.32
4 79019.28
Name: total_amount, dtype: float64
Payment methods sorted in descending order by total cost:
payment_type
1 67232359.58
2 12062049.01
3 165402.32
4 79019.28
Name: total_amount, dtype: float64
Average trip distance and total cost grouped by the number of passengers:
trip_distance total_amount
passenger_count
0.0 2.761904 24.162124
1.0 3.338169 26.443472
2.0 3.931051 29.313282
3.0 3.664393 28.475420
4.0 3.812581 29.611602
5.0 3.282478 26.588261
6.0 3.250963 26.558484
7.0 4.238333 85.111667
8.0 4.270769 99.336923
9.0 0.000000 92.250000
We then use the apply()
function on the data and use the pivot_table()
function to create a pivot table.
# calculate the fare with tax
df['fare_with_tax'] = df.apply(lambda row: row['fare_amount'] + row['mta_tax'], axis=1)
print("Calculate the fare including tax")
print(df[['fare_amount', 'mta_tax', 'fare_with_tax']].head())
# Pivot table
pivot_table = df.pivot_table(values='total_amount', index='DOLocationID', columns='payment_type', aggfunc='mean')
print("\nPivot table (total cost by drop-off location and payment method):")
print(pivot_table.head())
Calculate the fare including tax
fare_amount mta_tax fare_with_tax
0 9.3 0.5 9.8
1 7.9 0.5 8.4
2 14.9 0.5 15.4
3 12.1 0.5 12.6
4 11.4 0.5 11.9
Pivot table (total cost by drop-off location and payment method):
payment_type 1 2 3 4
DOLocationID
1 130.041868 108.956124 73.196774 1.598767
2 58.446250 31.411538 NaN 0.000000
3 62.407438 65.358209 1.500000 12.360000
4 27.395179 20.115848 12.228704 2.635455
5 103.328113 135.500000 NaN NaN
UserWarning: `pivot_table` implementation has mismatches with pandas:
Order of columns could be different from pandas.
最后我们展示可视化部分。
# Draw a bar chart of the average trip distance and total cost grouped by the number of passengers.
fig, ax1 = plt.subplots(figsize=(10, 6))
avg_distance_amount_by_passenger['trip_distance'].plot(kind='bar', color='blue', ax=ax1, position=1, width=0.4)
ax1.set_ylabel('Average Trip Distance (miles)', color='blue')
ax2 = ax1.twinx()
avg_distance_amount_by_passenger['total_amount'].plot(kind='bar', color='red', ax=ax2, position=0, width=0.4)
ax2.set_ylabel('Average Total Amount ($)', color='red')
plt.title('Average Trip Distance and Total Amount by Passenger Count')
ax1.set_xlabel('Passenger Count')
plt.grid(True)
plt.show()