8.3. Data Transformation#

The core of data processing is a series of transformations, including:

  • How to transform individual rows or batches of data.

  • How to perform grouping with groupby.

Transformation#

map() and map_batches()#

Ray Data provides two types of data transformation operations, as shown in Fig. 8.4. These two transformation operations are typical examples of embarrassingly parallel computation, with no data shuffle overhead.

  • For each individual row, you can use Dataset.map() and Dataset.flat_map(). These APIs perform transformations on each individual row, similar to other big data frameworks like Spark or Flink. Input one row, output one row.

  • To package multiple rows into a batch and perform batch-wise transformations, you can use Dataset.map_batches(). Input one batch, output one batch.

../_images/map-map-batches.svg

Fig. 8.4 map() v.s.map_batches()#

Example: NYC Taxi#

We will use the New York City taxi dataset to demonstrate how to use these two types of transformation operations.

Hide code cell content
import os
import sys
from typing import Any, Dict

sys.path.append("..")
from datasets import nyc_taxi

import numpy as np
import pandas as pd
import torch
import ray

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

if ray.is_initialized:
    ray.shutdown()

ray.init()
2024-02-15 19:10:27,440	INFO worker.py:1724 -- Started a local Ray instance.

Read the data into the Dataset class and first examine the original data format, where tpep_pickup_datetime and tpep_dropoff_datetime represent the passenger pickup and drop-off times, including both date and time.

dataset_path = nyc_taxi()
dataset = ray.data.read_parquet(dataset_path)
dataset.take(1)
2024-02-15 19:10:28,931	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2024-02-15 19:10:28,933	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 200, each read task output is split into 50 smaller blocks.
2024-02-15 19:10:28,933	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
2024-02-15 19:10:28,933	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 19:10:28,934	INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(ReadParquet->SplitBlocks(50) pid=5836) /Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/ray/data/_internal/arrow_block.py:148: FutureWarning: promote has been superseded by mode='default'.
(ReadParquet->SplitBlocks(50) pid=5836)   return transform_pyarrow.concat(tables)
[{'VendorID': 1,
  'tpep_pickup_datetime': datetime.datetime(2023, 6, 1, 0, 8, 48),
  'tpep_dropoff_datetime': datetime.datetime(2023, 6, 1, 0, 29, 41),
  'passenger_count': 1,
  'trip_distance': 3.4,
  'RatecodeID': 1,
  'store_and_fwd_flag': 'N',
  'PULocationID': 140,
  'DOLocationID': 238,
  'payment_type': 1,
  'fare_amount': 21.9,
  'extra': 3.5,
  'mta_tax': 0.5,
  'tip_amount': 6.7,
  'tolls_amount': 0.0,
  'improvement_surcharge': 1.0,
  'total_amount': 33.6,
  'congestion_surcharge': 2.5,
  'Airport_fee': 0.0}]

Note

All operations in Ray Data are lazy. Instead, they are executed when encountering data viewing or saving operations such as show(), take(), iter_rows(), write_parquet(), etc.

The most important parameter of map(fn) is a custom callable function fn, which transforms each input row and returns an output row. In this example, we defined the function transform_row to extract the duration, distance, and price of each trip, ignoring other fields. Since map(fn)’s fn is used to transform a single row of data, the input type of the function is a key-value Dict, where the keys are the field names of the schema.

def transform_row(row: Dict[str, Any]) -> Dict[str, Any]:
    result = {}
    result["trip_duration"] = (row["tpep_dropoff_datetime"] - row["tpep_pickup_datetime"]).total_seconds()
    result["trip_distance"] = row["trip_distance"]
    result["fare_amount"] = row["fare_amount"]
    return result

row_ds = dataset.map(transform_row)
row_ds.take(1)
2024-02-15 19:10:29,496	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2024-02-15 19:10:29,497	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 200, each read task output is split into 50 smaller blocks.
2024-02-15 19:10:29,497	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Map(transform_row)] -> LimitOperator[limit=1]
2024-02-15 19:10:29,497	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 19:10:29,497	INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[{'trip_duration': 1253.0, 'trip_distance': 3.4, 'fare_amount': 21.9}]

Differing from map(), map_batches() processes an entire batch. map_batches() is to facilitate the seamless migration of previously written single-node programs. Users have their single-node programs and then use Ray Data to migrate it to a cluster. The data format for each batch in map_batches() is Dict[str, np.ndarray] or pd.DataFrame or pyarrow.Table, corresponding to NumPy, pandas, and Arrow, respectively.

The following example achieves similar functionality to map().

def transform_df(input_df: pd.DataFrame) -> pd.DataFrame:
    result_df = pd.DataFrame()
    result_df["trip_duration"] = (input_df["tpep_dropoff_datetime"] - input_df["tpep_pickup_datetime"]).dt.seconds
    result_df["trip_distance"] = input_df["trip_distance"]
    result_df["fare_amount"] = input_df["fare_amount"]
    return result_df

batch_ds = dataset.map_batches(transform_df, batch_format="pandas")
batch_ds.take(10)
2024-02-15 19:10:31,936	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2024-02-15 19:10:31,936	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 200, each read task output is split into 50 smaller blocks.
2024-02-15 19:10:31,936	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(transform_df)] -> LimitOperator[limit=10]
2024-02-15 19:10:31,936	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 19:10:31,937	INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(ReadParquet->SplitBlocks(50) pid=5829) /Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/ray/data/_internal/arrow_block.py:148: FutureWarning: promote has been superseded by mode='default'.
(ReadParquet->SplitBlocks(50) pid=5829)   return transform_pyarrow.concat(tables)
[{'trip_duration': 1253, 'trip_distance': 3.4, 'fare_amount': 21.9},
 {'trip_duration': 614, 'trip_distance': 3.4, 'fare_amount': 15.6},
 {'trip_duration': 1123, 'trip_distance': 10.2, 'fare_amount': 40.8},
 {'trip_duration': 1406, 'trip_distance': 9.83, 'fare_amount': 39.4},
 {'trip_duration': 514, 'trip_distance': 1.17, 'fare_amount': 9.3},
 {'trip_duration': 796, 'trip_distance': 3.6, 'fare_amount': 18.4},
 {'trip_duration': 1136, 'trip_distance': 3.08, 'fare_amount': 19.8},
 {'trip_duration': 527, 'trip_distance': 1.1, 'fare_amount': 10.0},
 {'trip_duration': 237, 'trip_distance': 0.99, 'fare_amount': 6.5},
 {'trip_duration': 171, 'trip_distance': 0.73, 'fare_amount': 5.1}]

在实现 map() 或者 map_batch() 时,也可以使用 Python 的 lambda 表达式,即一个匿名的 Python 函数。比如:

filtered_dataset = dataset.map_batches(lambda df: df[df["trip_distance"] > 4], batch_format="pandas")
print(f"Filtered: {filtered_dataset.count()}")
2024-02-15 19:10:32,499	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2024-02-15 19:10:32,499	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 200, each read task output is split into 50 smaller blocks.
2024-02-15 19:10:32,500	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(<lambda>)]
2024-02-15 19:10:32,500	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 19:10:32,500	INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
Filtered: 730352

Task and Actor#

As observed, the transformation operation involves executing fn. This function takes an input, performs the transformation, and produces an output. By default, Ray Data utilizes Ray Task, which is stateless, for executing transformation operations. If the computation has state, Ray Actors should be used. For instance, loading a machine learning model and using it to predict different inputs. The following example simulates the process of predicting with a machine learning model. Since the model is reused, it involves stateful computation. This example is for demonstration purposes, and it uses an equivalent transformation, torch.nn.Identity(), which returns the input as the output.

class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity()
        self.model.eval()

    def __call__(self, df: pd.DataFrame) -> Dict[str, np.ndarray]:
        pred = {}
        inputs = torch.as_tensor(df['trip_distance'], dtype=torch.float32)
        with torch.inference_mode():
            pred["output"] = self.model(inputs).detach().numpy()
        return pred

pred_ds = batch_ds.limit(100).map_batches(TorchPredictor, compute=ray.data.ActorPoolStrategy(size=2))
pred_ds.take(3)
2024-02-15 19:10:33,990	WARNING util.py:546 -- The argument ``compute`` is deprecated in Ray 2.9. Please specify argument ``concurrency`` instead. For more information, see https://docs.ray.io/en/master/data/transforming-data.html#stateful-transforms.
2024-02-15 19:10:33,993	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2024-02-15 19:10:33,993	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 200, each read task output is split into 50 smaller blocks.
2024-02-15 19:10:33,993	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(transform_df)] -> LimitOperator[limit=100] -> ActorPoolMapOperator[MapBatches(TorchPredictor)] -> LimitOperator[limit=3]
2024-02-15 19:10:33,994	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 19:10:33,994	INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2024-02-15 19:10:34,007	INFO actor_pool_map_operator.py:114 -- MapBatches(TorchPredictor): Waiting for 2 pool actors to start...
(ReadParquet->SplitBlocks(50) pid=5834) /Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/ray/data/_internal/arrow_block.py:148: FutureWarning: promote has been superseded by mode='default'.
(ReadParquet->SplitBlocks(50) pid=5834)   return transform_pyarrow.concat(tables)
2024-02-15 19:10:35,525	WARNING actor_pool_map_operator.py:278 -- To ensure full parallelization across an actor pool of size 2, the Dataset should consist of at least 2 distinct blocks. Consider increasing the parallelism when creating the Dataset.
[{'output': 3.4000000953674316},
 {'output': 3.4000000953674316},
 {'output': 10.199999809265137}]

Using actors typically involves three steps:

  1. Create a class that includes an __init__() method and a __call__() method. The __init__() method initializes state data, and the __call__() method implements the transformation operation. Refer to the previously implemented TorchPredictor class for reference.

  2. Create an ActorPoolStrategy, specifying the total number of workers.

  3. Call the map_batch() method, passing the ActorPoolStrategy to the compute parameter.

Grouping#

Another frequently used primitive in data processing is grouping and aggregation. Ray Data provides groupby(). Ray Data first uses groupby() to group the data based on certain fields, and then uses map_groups() to aggregate the grouped data.

The groupby(key) parameter key specifies the field for grouping, and the map_groups(fn) parameter fn operates on data within the same group. Ray Data provides some predefined aggregation functions such as sum(), max(), mean(), etc. In the following example, mean() is used to aggregate the value field.

ds = ray.data.from_items([
    {"group": 1, "value": 1},
    {"group": 1, "value": 2},
    {"group": 2, "value": 3},
    {"group": 2, "value": 4}])
mean_ds = ds.groupby("group").mean("value")
mean_ds.show()
2024-02-15 19:10:35,568	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Aggregate] -> LimitOperator[limit=20]
2024-02-15 19:10:35,568	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-15 19:10:35,568	INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
{'group': 1, 'mean(value)': 1.5}
{'group': 2, 'mean(value)': 3.5}