8.3. Data Transformation#
The core of data processing is a series of transformations, including:
How to transform individual rows or batches of data.
How to perform grouping with
groupby
.
Transformation#
map()
and map_batches()
#
Ray Data provides two types of data transformation operations, as shown in Fig. 8.4. These two transformation operations are typical examples of embarrassingly parallel computation, with no data shuffle overhead.
For each individual row, you can use
Dataset.map()
andDataset.flat_map()
. These APIs perform transformations on each individual row, similar to other big data frameworks like Spark or Flink. Input one row, output one row.To package multiple rows into a batch and perform batch-wise transformations, you can use
Dataset.map_batches()
. Input one batch, output one batch.
Example: NYC Taxi#
We will use the New York City taxi dataset to demonstrate how to use these two types of transformation operations.
Show code cell content
import os
import sys
from typing import Any, Dict
sys.path.append("..")
from datasets import nyc_taxi
import numpy as np
import pandas as pd
import torch
import ray
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
if ray.is_initialized:
ray.shutdown()
ray.init()
2024-02-15 19:10:27,440 INFO worker.py:1724 -- Started a local Ray instance.
Read the data into the Dataset
class and first examine the original data format, where tpep_pickup_datetime
and tpep_dropoff_datetime
represent the passenger pickup and drop-off times, including both date and time.
dataset_path = nyc_taxi()
dataset = ray.data.read_parquet(dataset_path)
dataset.take(1)
2024-02-15 19:10:28,931 INFO set_read_parallelism.py:115 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2024-02-15 19:10:28,933 INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 200, each read task output is split into 50 smaller blocks.
2024-02-15 19:10:28,933 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
2024-02-15 19:10:28,933 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 19:10:28,934 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(ReadParquet->SplitBlocks(50) pid=5836) /Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/ray/data/_internal/arrow_block.py:148: FutureWarning: promote has been superseded by mode='default'.
(ReadParquet->SplitBlocks(50) pid=5836) return transform_pyarrow.concat(tables)
[{'VendorID': 1,
'tpep_pickup_datetime': datetime.datetime(2023, 6, 1, 0, 8, 48),
'tpep_dropoff_datetime': datetime.datetime(2023, 6, 1, 0, 29, 41),
'passenger_count': 1,
'trip_distance': 3.4,
'RatecodeID': 1,
'store_and_fwd_flag': 'N',
'PULocationID': 140,
'DOLocationID': 238,
'payment_type': 1,
'fare_amount': 21.9,
'extra': 3.5,
'mta_tax': 0.5,
'tip_amount': 6.7,
'tolls_amount': 0.0,
'improvement_surcharge': 1.0,
'total_amount': 33.6,
'congestion_surcharge': 2.5,
'Airport_fee': 0.0}]
Note
All operations in Ray Data are lazy. Instead, they are executed when encountering data viewing or saving operations such as show()
, take()
, iter_rows()
, write_parquet()
, etc.
The most important parameter of map(fn)
is a custom callable function fn
, which transforms each input row and returns an output row. In this example, we defined the function transform_row
to extract the duration, distance, and price of each trip, ignoring other fields. Since map(fn)
’s fn
is used to transform a single row of data, the input type of the function is a key-value Dict
, where the keys are the field names of the schema.
def transform_row(row: Dict[str, Any]) -> Dict[str, Any]:
result = {}
result["trip_duration"] = (row["tpep_dropoff_datetime"] - row["tpep_pickup_datetime"]).total_seconds()
result["trip_distance"] = row["trip_distance"]
result["fare_amount"] = row["fare_amount"]
return result
row_ds = dataset.map(transform_row)
row_ds.take(1)
2024-02-15 19:10:29,496 INFO set_read_parallelism.py:115 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2024-02-15 19:10:29,497 INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 200, each read task output is split into 50 smaller blocks.
2024-02-15 19:10:29,497 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Map(transform_row)] -> LimitOperator[limit=1]
2024-02-15 19:10:29,497 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 19:10:29,497 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[{'trip_duration': 1253.0, 'trip_distance': 3.4, 'fare_amount': 21.9}]
Differing from map()
, map_batches()
processes an entire batch. map_batches()
is to facilitate the seamless migration of previously written single-node programs. Users have their single-node programs and then use Ray Data to migrate it to a cluster. The data format for each batch in map_batches()
is Dict[str, np.ndarray]
or pd.DataFrame
or pyarrow.Table
, corresponding to NumPy, pandas, and Arrow, respectively.
The following example achieves similar functionality to map()
.
def transform_df(input_df: pd.DataFrame) -> pd.DataFrame:
result_df = pd.DataFrame()
result_df["trip_duration"] = (input_df["tpep_dropoff_datetime"] - input_df["tpep_pickup_datetime"]).dt.seconds
result_df["trip_distance"] = input_df["trip_distance"]
result_df["fare_amount"] = input_df["fare_amount"]
return result_df
batch_ds = dataset.map_batches(transform_df, batch_format="pandas")
batch_ds.take(10)
2024-02-15 19:10:31,936 INFO set_read_parallelism.py:115 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2024-02-15 19:10:31,936 INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 200, each read task output is split into 50 smaller blocks.
2024-02-15 19:10:31,936 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(transform_df)] -> LimitOperator[limit=10]
2024-02-15 19:10:31,936 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 19:10:31,937 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(ReadParquet->SplitBlocks(50) pid=5829) /Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/ray/data/_internal/arrow_block.py:148: FutureWarning: promote has been superseded by mode='default'.
(ReadParquet->SplitBlocks(50) pid=5829) return transform_pyarrow.concat(tables)
[{'trip_duration': 1253, 'trip_distance': 3.4, 'fare_amount': 21.9},
{'trip_duration': 614, 'trip_distance': 3.4, 'fare_amount': 15.6},
{'trip_duration': 1123, 'trip_distance': 10.2, 'fare_amount': 40.8},
{'trip_duration': 1406, 'trip_distance': 9.83, 'fare_amount': 39.4},
{'trip_duration': 514, 'trip_distance': 1.17, 'fare_amount': 9.3},
{'trip_duration': 796, 'trip_distance': 3.6, 'fare_amount': 18.4},
{'trip_duration': 1136, 'trip_distance': 3.08, 'fare_amount': 19.8},
{'trip_duration': 527, 'trip_distance': 1.1, 'fare_amount': 10.0},
{'trip_duration': 237, 'trip_distance': 0.99, 'fare_amount': 6.5},
{'trip_duration': 171, 'trip_distance': 0.73, 'fare_amount': 5.1}]
在实现 map()
或者 map_batch()
时,也可以使用 Python 的 lambda 表达式,即一个匿名的 Python 函数。比如:
filtered_dataset = dataset.map_batches(lambda df: df[df["trip_distance"] > 4], batch_format="pandas")
print(f"Filtered: {filtered_dataset.count()}")
2024-02-15 19:10:32,499 INFO set_read_parallelism.py:115 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2024-02-15 19:10:32,499 INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 200, each read task output is split into 50 smaller blocks.
2024-02-15 19:10:32,500 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(<lambda>)]
2024-02-15 19:10:32,500 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 19:10:32,500 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
Filtered: 730352
Task and Actor#
As observed, the transformation operation involves executing fn
. This function takes an input, performs the transformation, and produces an output. By default, Ray Data utilizes Ray Task, which is stateless, for executing transformation operations. If the computation has state, Ray Actors should be used. For instance, loading a machine learning model and using it to predict different inputs. The following example simulates the process of predicting with a machine learning model. Since the model is reused, it involves stateful computation. This example is for demonstration purposes, and it uses an equivalent transformation, torch.nn.Identity()
, which returns the input as the output.
class TorchPredictor:
def __init__(self):
self.model = torch.nn.Identity()
self.model.eval()
def __call__(self, df: pd.DataFrame) -> Dict[str, np.ndarray]:
pred = {}
inputs = torch.as_tensor(df['trip_distance'], dtype=torch.float32)
with torch.inference_mode():
pred["output"] = self.model(inputs).detach().numpy()
return pred
pred_ds = batch_ds.limit(100).map_batches(TorchPredictor, compute=ray.data.ActorPoolStrategy(size=2))
pred_ds.take(3)
2024-02-15 19:10:33,990 WARNING util.py:546 -- The argument ``compute`` is deprecated in Ray 2.9. Please specify argument ``concurrency`` instead. For more information, see https://docs.ray.io/en/master/data/transforming-data.html#stateful-transforms.
2024-02-15 19:10:33,993 INFO set_read_parallelism.py:115 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2024-02-15 19:10:33,993 INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 200, each read task output is split into 50 smaller blocks.
2024-02-15 19:10:33,993 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(transform_df)] -> LimitOperator[limit=100] -> ActorPoolMapOperator[MapBatches(TorchPredictor)] -> LimitOperator[limit=3]
2024-02-15 19:10:33,994 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 19:10:33,994 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2024-02-15 19:10:34,007 INFO actor_pool_map_operator.py:114 -- MapBatches(TorchPredictor): Waiting for 2 pool actors to start...
(ReadParquet->SplitBlocks(50) pid=5834) /Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/ray/data/_internal/arrow_block.py:148: FutureWarning: promote has been superseded by mode='default'.
(ReadParquet->SplitBlocks(50) pid=5834) return transform_pyarrow.concat(tables)
2024-02-15 19:10:35,525 WARNING actor_pool_map_operator.py:278 -- To ensure full parallelization across an actor pool of size 2, the Dataset should consist of at least 2 distinct blocks. Consider increasing the parallelism when creating the Dataset.
[{'output': 3.4000000953674316},
{'output': 3.4000000953674316},
{'output': 10.199999809265137}]
Using actors typically involves three steps:
Create a class that includes an
__init__()
method and a__call__()
method. The__init__()
method initializes state data, and the__call__()
method implements the transformation operation. Refer to the previously implementedTorchPredictor
class for reference.Create an
ActorPoolStrategy
, specifying the total number of workers.Call the
map_batch()
method, passing theActorPoolStrategy
to thecompute
parameter.
Grouping#
Another frequently used primitive in data processing is grouping and aggregation. Ray Data provides groupby(). Ray Data first uses groupby()
to group the data based on certain fields, and then uses map_groups()
to aggregate the grouped data.
The groupby(key)
parameter key
specifies the field for grouping, and the map_groups(fn)
parameter fn
operates on data within the same group. Ray Data provides some predefined aggregation functions such as sum()
, max()
, mean()
, etc. In the following example, mean()
is used to aggregate the value
field.
ds = ray.data.from_items([
{"group": 1, "value": 1},
{"group": 1, "value": 2},
{"group": 2, "value": 3},
{"group": 2, "value": 4}])
mean_ds = ds.groupby("group").mean("value")
mean_ds.show()
2024-02-15 19:10:35,568 INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Aggregate] -> LimitOperator[limit=20]
2024-02-15 19:10:35,568 INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 19:10:35,568 INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
{'group': 1, 'mean(value)': 1.5}
{'group': 2, 'mean(value)': 3.5}