8.2. Data Loading, Inspection, and Saving#

Ray Data is compatible with various data sources, including files, in-memory data, and databases.

Hide code cell content
import os
import shutil
from pathlib import Path

import sys
sys.path.append("..")
from datasets import nyc_taxi

import ray

if ray.is_initialized:
    ray.shutdown()

ray.init()
2024-04-23 15:58:57,243	INFO worker.py:1740 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8268 

Loading Data#

Ray Data provides numerous built-in methods for loading data, including reading files, reading in-memory data such as pandas DataFrames, and reading data from databases. Here, we demonstrate reading a Parquet file using New York City taxi dataset.

Download the data, use the ray.data.read_parquet() method to read the data, and get a Dataset.

dataset_path = nyc_taxi()
dataset = ray.data.read_parquet(dataset_path)
dataset.take(1)
Hide code cell output
2024-04-23 15:59:00,761	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-58-55_565716_64002/logs
2024-04-23 15:59:00,761	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
[{'VendorID': 2,
  'tpep_pickup_datetime': datetime.datetime(2023, 1, 1, 0, 32, 10),
  'tpep_dropoff_datetime': datetime.datetime(2023, 1, 1, 0, 40, 36),
  'passenger_count': 1.0,
  'trip_distance': 0.97,
  'RatecodeID': 1.0,
  'store_and_fwd_flag': 'N',
  'PULocationID': 161,
  'DOLocationID': 141,
  'payment_type': 2,
  'fare_amount': 9.3,
  'extra': 1.0,
  'mta_tax': 0.5,
  'tip_amount': 0.0,
  'tolls_amount': 0.0,
  'improvement_surcharge': 1.0,
  'total_amount': 14.3,
  'congestion_surcharge': 2.5,
  'airport_fee': 0.0}]

To inspect the schema of this dataset:

dataset.schema()
Column                 Type
------                 ----
VendorID               int64
tpep_pickup_datetime   timestamp[us]
tpep_dropoff_datetime  timestamp[us]
passenger_count        double
trip_distance          double
RatecodeID             double
store_and_fwd_flag     string
PULocationID           int64
DOLocationID           int64
payment_type           int64
fare_amount            double
extra                  double
mta_tax                double
tip_amount             double
tolls_amount           double
improvement_surcharge  double
total_amount           double
congestion_surcharge   double
airport_fee            double

For other types of file formats (CSV, TFRecord, etc.), the reading methods are illustrated in Table 8.1.

Table 8.1 Methods for data reading#

Parquet

Text

CSV

TFRecord

Binary

Method

read_parquet()

read_text()

read_csv()

read_tfrecords()

read_binary_files()

Column and Row Pruning#

The original file contains many columns. If we are only interested in specific columns, such as passenger_count, tip_amount, payment_type, etc., we can filter unnecessary columns by using the columns parameter of the read_parquet() method.

dataset = ray.data.read_parquet(
    dataset_path, 
    columns=["passenger_count", "tip_amount", "payment_type"]
)
dataset.schema()
Column           Type
------           ----
passenger_count  double
tip_amount       double
payment_type     int64

After adding the columns restriction, only the columns of interest are read, and other columns are not. This is known as column pruning.

In addition to column pruning, Ray Data also supports row pruning. For example, rows where tip_amount is greater than 6.0 are filtered out:

import pyarrow as pa

dataset = ray.data.read_parquet(
    dataset_path, 
    columns=["passenger_count", "tip_amount", "payment_type"],
    filter=pa.dataset.field("tip_amount") > 6.0
)
dataset.show(limit=2)
2024-04-23 15:59:02,302	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-58-55_565716_64002/logs
2024-04-23 15:59:02,302	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=2]
{'passenger_count': 1.0, 'tip_amount': 15.0, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 10.0, 'payment_type': 1}

Parallelism#

As mentioned in Section 8.1, Ray Data uses task or actor to parallelize the data processing. During data reading, you can set the parallelism parameter to optimize the parallel data processing. For various data reading methods provided by Ray Data, including read_parquet(), you can set the parallelism parameter to control the underlying parallel execution process. If parallelism is not set, Ray Data probes parallelism in the following way:

  1. Ray gets the available number of CPU cores in the cluster.

  2. parallelism is set to twice the number of CPU cores. If parallelism is less than 8, it is set to 8.

  3. Estimate the size of each Block. If the average size of each Block is greater than 512 MiB, Ray increases parallelism until each Block is less than 512 MiB.

Users can also manually set parallelism based on the actual data size. For example, ray.data.read_parquet(path, parallelism=512) will force the generation of 512 Ray tasks to read data in parallel.

Inspecting Data#

Inspecting data involves examining the schema, specific rows, or batches of data. This includes the show() method used earlier, as well as upcoming methods such as count() and take().

Get the number of samples of this Dataset:

dataset.count()
2615778

To view rows of data, you can utilize the Dataset.take() or Dataset.take_all() methods. The take() method extracts a particular row from the Dataset and prints it in the form of a dictionary, where the keys represent field names, and the values correspond to their respective values.

dataset.take(limit=1)
2024-04-23 15:59:03,707	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-58-55_565716_64002/logs
2024-04-23 15:59:03,707	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
[{'passenger_count': 1.0, 'tip_amount': 15.0, 'payment_type': 1}]

Another option is to split the Dataset into smaller batches using the Dataset.take_batch() method. To inspect the data in a batch, the take_batch() method is employed. An essential parameter of this method is batch_size, which determines the size of each batch.

batch = dataset.take_batch(batch_size=2)
batch
2024-04-23 15:59:03,903	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-58-55_565716_64002/logs
2024-04-23 15:59:03,903	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=2]
{'passenger_count': array([1., 1.]),
 'tip_amount': array([15., 10.]),
 'payment_type': array([1, 1])}

Iterating Over Data#

Sequential Iterating#

Ray Data offers methods for iterating through data, namely Dataset.iter_rows() and Dataset.iter_batches(). iter_rows() iterates through each row, while iter_batches() iterates through each batch.

For example, to iterate through the first 5 rows:

cnt = 0
for row in dataset.iter_rows():
    cnt += 1
    if cnt > 5:
        break
    print(row)
{'passenger_count': 1.0, 'tip_amount': 15.0, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 10.0, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 10.74, 'payment_type': 1}
{'passenger_count': 4.0, 'tip_amount': 7.75, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 6.22, 'payment_type': 1}

To iterate through the first 5 batches:

cnt = 0
for batch in dataset.iter_batches(batch_size=2):
    cnt += 1
    if cnt > 5:
        break
    print(batch)
{'passenger_count': array([1., 1.]), 'tip_amount': array([15., 10.]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 4.]), 'tip_amount': array([10.74,  7.75]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 2.]), 'tip_amount': array([ 6.22, 13.26]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 1.]), 'tip_amount': array([15.85,  6.36]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 1.]), 'tip_amount': array([ 6.6 , 15.05]), 'payment_type': array([1, 1])}

In deep learning frameworks like PyTorch and TensorFlow, training or inference often involves processing data in batches. To seamlessly integrate with these frameworks, Ray Data provides Dataset.iter_torch_batches() and Dataset.iter_tf_batches() methods. These methods convert the data into PyTorch and TensorFlow Tensor formats.

cnt = 0
for batch in dataset.iter_torch_batches(batch_size=2):
    cnt += 1
    if cnt > 5:
        break
    print(batch)
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([15., 10.], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 4.], dtype=torch.float64), 'tip_amount': tensor([10.7400,  7.7500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 2.], dtype=torch.float64), 'tip_amount': tensor([ 6.2200, 13.2600], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([15.8500,  6.3600], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([ 6.6000, 15.0500], dtype=torch.float64), 'payment_type': tensor([1, 1])}

Randomly Iterating#

In machine learning, it is common to shuffle sample data, and Ray Data provides two methods for this purpose:

  • Full Data Shuffle

  • Local Buffer Shuffle

The Dataset.random_shuffle() method shuffles the entire dataset, implying that data scattered across different compute nodes will be exchanged, incurring significant inter-node communication overhead and resulting in slow performance.

Local Cache Shuffle involves using a local cache on each compute node, shuffling the data within the cache. This reduces randomness but significantly improves performance compared to full data shuffle, and also reduces inter-node communication overhead. To implement this, you only need to use the local_shuffle_buffer_size parameter in the iteration method, and set the random seed with local_shuffle_seed.

In the following example, a cache region is set up, ensuring that the cache contains at least 250 rows of data, meaning shuffling is performed on a subset of at least 250 rows of data.

cnt = 0
for batch in dataset.iter_torch_batches(batch_size=2, local_shuffle_buffer_size=250):
    cnt += 1
    if cnt > 5:
        break
    print(batch)
{'passenger_count': tensor([2., 1.], dtype=torch.float64), 'tip_amount': tensor([10.9700, 16.1100], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 2.], dtype=torch.float64), 'tip_amount': tensor([23.5900, 16.1100], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([12.3900, 11.6600], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([2., 1.], dtype=torch.float64), 'tip_amount': tensor([ 6.0200, 14.2400], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 4.], dtype=torch.float64), 'tip_amount': tensor([14.8000, 12.6300], dtype=torch.float64), 'payment_type': tensor([1, 1])}

Note

Local Shuffle is a strategy that strikes a balance between randomness and performance. In machine learning, higher randomness often leads to higher model accuracy.

Saving Data#

Data saving can be categorized into two types:

  • Saving data to a local or shared file system, such as the local filesystem or S3.

  • Transforming data into other formats or writing to specific databases, such as pandas or MongoDB.

Writing to Filesystems#

When using HDFS, S3, or other filesystems, Ray Data adheres to the URI and file system scheme standards mentioned in Table 4.2. It is essential to explicitly specify the scheme information in the URI.

Table 8.2 lists several APIs for saving the Dataset in various file formats.

Table 8.2 Writing Dataset into filesystems#

Parquet

CSV

JSON

TFRecord

Method

Dataset.write_parquet()

Dataset.write_csv()

Dataset.write_json()

Dataset.write_tfrecords()

When persisting data to the filesystem, do not forget to specify the filesystem scheme.

dataset.write_parquet("file:///tmp/trip")
2024-04-23 15:59:04,257	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-58-55_565716_64002/logs
2024-04-23 15:59:04,257	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Write]

By default, Ray Data writes data to the filesystem in the form of multiple chunked files, where each Block corresponds to one file. The number of files is equal to the number of Blocks. You can use repartition() to modify the number of files.

if os.path.exists("/tmp/files/"):
    shutil.rmtree("/tmp/files/")
dataset.repartition(3).write_csv("/tmp/files/")
print(os.listdir("/tmp/files/"))
2024-04-23 15:59:04,750	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-58-55_565716_64002/logs
2024-04-23 15:59:04,750	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> AllToAllOperator[Repartition] -> TaskPoolMapOperator[Write]
['10_000002_000000.csv', '10_000000_000000.csv', '10_000001_000000.csv']

Converting to Other Framework Formats#

Ray Data allows the conversion of its data into single-node pandas DataFrame or a distributed Dask DataFrame, as illustrated in Table 8.3.

Table 8.3 Converting for other frameworks#

pandas

Dask

Spark

Method

Dataset.to_pandas()

Dataset.to_dask()

Dataset.to_spark()