8.2. Data Loading, Inspection, and Saving#
Ray Data is compatible with various data sources, including files, in-memory data, and databases.
Show code cell content
import os
import shutil
from pathlib import Path
import sys
sys.path.append("..")
from datasets import nyc_taxi
import ray
if ray.is_initialized:
ray.shutdown()
ray.init()
2024-04-23 15:58:57,243 INFO worker.py:1740 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8268
Loading Data#
Ray Data provides numerous built-in methods for loading data, including reading files, reading in-memory data such as pandas DataFrames, and reading data from databases. Here, we demonstrate reading a Parquet file using New York City taxi dataset.
Download the data, use the ray.data.read_parquet()
method to read the data, and get a Dataset
.
dataset_path = nyc_taxi()
dataset = ray.data.read_parquet(dataset_path)
dataset.take(1)
Show code cell output
2024-04-23 15:59:00,761 INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-58-55_565716_64002/logs
2024-04-23 15:59:00,761 INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
[{'VendorID': 2,
'tpep_pickup_datetime': datetime.datetime(2023, 1, 1, 0, 32, 10),
'tpep_dropoff_datetime': datetime.datetime(2023, 1, 1, 0, 40, 36),
'passenger_count': 1.0,
'trip_distance': 0.97,
'RatecodeID': 1.0,
'store_and_fwd_flag': 'N',
'PULocationID': 161,
'DOLocationID': 141,
'payment_type': 2,
'fare_amount': 9.3,
'extra': 1.0,
'mta_tax': 0.5,
'tip_amount': 0.0,
'tolls_amount': 0.0,
'improvement_surcharge': 1.0,
'total_amount': 14.3,
'congestion_surcharge': 2.5,
'airport_fee': 0.0}]
To inspect the schema of this dataset:
dataset.schema()
Column Type
------ ----
VendorID int64
tpep_pickup_datetime timestamp[us]
tpep_dropoff_datetime timestamp[us]
passenger_count double
trip_distance double
RatecodeID double
store_and_fwd_flag string
PULocationID int64
DOLocationID int64
payment_type int64
fare_amount double
extra double
mta_tax double
tip_amount double
tolls_amount double
improvement_surcharge double
total_amount double
congestion_surcharge double
airport_fee double
For other types of file formats (CSV, TFRecord, etc.), the reading methods are illustrated in Table 8.1.
Parquet |
Text |
CSV |
TFRecord |
Binary |
|
---|---|---|---|---|---|
Method |
Column and Row Pruning#
The original file contains many columns. If we are only interested in specific columns, such as passenger_count
, tip_amount
, payment_type
, etc., we can filter unnecessary columns by using the columns
parameter of the read_parquet()
method.
dataset = ray.data.read_parquet(
dataset_path,
columns=["passenger_count", "tip_amount", "payment_type"]
)
dataset.schema()
Column Type
------ ----
passenger_count double
tip_amount double
payment_type int64
After adding the columns
restriction, only the columns of interest are read, and other columns are not. This is known as column pruning.
In addition to column pruning, Ray Data also supports row pruning. For example, rows where tip_amount
is greater than 6.0 are filtered out:
import pyarrow as pa
dataset = ray.data.read_parquet(
dataset_path,
columns=["passenger_count", "tip_amount", "payment_type"],
filter=pa.dataset.field("tip_amount") > 6.0
)
dataset.show(limit=2)
2024-04-23 15:59:02,302 INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-58-55_565716_64002/logs
2024-04-23 15:59:02,302 INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=2]
{'passenger_count': 1.0, 'tip_amount': 15.0, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 10.0, 'payment_type': 1}
Parallelism#
As mentioned in Section 8.1, Ray Data uses task or actor to parallelize the data processing. During data reading, you can set the parallelism
parameter to optimize the parallel data processing. For various data reading methods provided by Ray Data, including read_parquet()
, you can set the parallelism
parameter to control the underlying parallel execution process. If parallelism
is not set, Ray Data probes parallelism
in the following way:
Ray gets the available number of CPU cores in the cluster.
parallelism
is set to twice the number of CPU cores. Ifparallelism
is less than 8, it is set to 8.Estimate the size of each
Block
. If the average size of eachBlock
is greater than 512 MiB, Ray increasesparallelism
until eachBlock
is less than 512 MiB.
Users can also manually set parallelism
based on the actual data size. For example, ray.data.read_parquet(path, parallelism=512)
will force the generation of 512 Ray tasks to read data in parallel.
Inspecting Data#
Inspecting data involves examining the schema, specific rows, or batches of data. This includes the show()
method used earlier, as well as upcoming methods such as count()
and take()
.
Get the number of samples of this Dataset
:
dataset.count()
2615778
To view rows of data, you can utilize the Dataset.take()
or Dataset.take_all()
methods. The take()
method extracts a particular row from the Dataset
and prints it in the form of a dictionary, where the keys represent field names, and the values correspond to their respective values.
dataset.take(limit=1)
2024-04-23 15:59:03,707 INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-58-55_565716_64002/logs
2024-04-23 15:59:03,707 INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
[{'passenger_count': 1.0, 'tip_amount': 15.0, 'payment_type': 1}]
Another option is to split the Dataset
into smaller batches using the Dataset.take_batch()
method. To inspect the data in a batch, the take_batch()
method is employed. An essential parameter of this method is batch_size
, which determines the size of each batch.
batch = dataset.take_batch(batch_size=2)
batch
2024-04-23 15:59:03,903 INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-58-55_565716_64002/logs
2024-04-23 15:59:03,903 INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=2]
{'passenger_count': array([1., 1.]),
'tip_amount': array([15., 10.]),
'payment_type': array([1, 1])}
Iterating Over Data#
Sequential Iterating#
Ray Data offers methods for iterating through data, namely Dataset.iter_rows()
and Dataset.iter_batches()
. iter_rows()
iterates through each row, while iter_batches()
iterates through each batch.
For example, to iterate through the first 5 rows:
cnt = 0
for row in dataset.iter_rows():
cnt += 1
if cnt > 5:
break
print(row)
{'passenger_count': 1.0, 'tip_amount': 15.0, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 10.0, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 10.74, 'payment_type': 1}
{'passenger_count': 4.0, 'tip_amount': 7.75, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 6.22, 'payment_type': 1}
To iterate through the first 5 batches:
cnt = 0
for batch in dataset.iter_batches(batch_size=2):
cnt += 1
if cnt > 5:
break
print(batch)
{'passenger_count': array([1., 1.]), 'tip_amount': array([15., 10.]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 4.]), 'tip_amount': array([10.74, 7.75]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 2.]), 'tip_amount': array([ 6.22, 13.26]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 1.]), 'tip_amount': array([15.85, 6.36]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 1.]), 'tip_amount': array([ 6.6 , 15.05]), 'payment_type': array([1, 1])}
In deep learning frameworks like PyTorch and TensorFlow, training or inference often involves processing data in batches. To seamlessly integrate with these frameworks, Ray Data provides Dataset.iter_torch_batches()
and Dataset.iter_tf_batches()
methods. These methods convert the data into PyTorch and TensorFlow Tensor
formats.
cnt = 0
for batch in dataset.iter_torch_batches(batch_size=2):
cnt += 1
if cnt > 5:
break
print(batch)
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([15., 10.], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 4.], dtype=torch.float64), 'tip_amount': tensor([10.7400, 7.7500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 2.], dtype=torch.float64), 'tip_amount': tensor([ 6.2200, 13.2600], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([15.8500, 6.3600], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([ 6.6000, 15.0500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
Randomly Iterating#
In machine learning, it is common to shuffle sample data, and Ray Data provides two methods for this purpose:
Full Data Shuffle
Local Buffer Shuffle
The Dataset.random_shuffle()
method shuffles the entire dataset, implying that data scattered across different compute nodes will be exchanged, incurring significant inter-node communication overhead and resulting in slow performance.
Local Cache Shuffle involves using a local cache on each compute node, shuffling the data within the cache. This reduces randomness but significantly improves performance compared to full data shuffle, and also reduces inter-node communication overhead. To implement this, you only need to use the local_shuffle_buffer_size
parameter in the iteration method, and set the random seed with local_shuffle_seed
.
In the following example, a cache region is set up, ensuring that the cache contains at least 250 rows of data, meaning shuffling is performed on a subset of at least 250 rows of data.
cnt = 0
for batch in dataset.iter_torch_batches(batch_size=2, local_shuffle_buffer_size=250):
cnt += 1
if cnt > 5:
break
print(batch)
{'passenger_count': tensor([2., 1.], dtype=torch.float64), 'tip_amount': tensor([10.9700, 16.1100], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 2.], dtype=torch.float64), 'tip_amount': tensor([23.5900, 16.1100], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([12.3900, 11.6600], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([2., 1.], dtype=torch.float64), 'tip_amount': tensor([ 6.0200, 14.2400], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 4.], dtype=torch.float64), 'tip_amount': tensor([14.8000, 12.6300], dtype=torch.float64), 'payment_type': tensor([1, 1])}
Note
Local Shuffle is a strategy that strikes a balance between randomness and performance. In machine learning, higher randomness often leads to higher model accuracy.
Saving Data#
Data saving can be categorized into two types:
Saving data to a local or shared file system, such as the local filesystem or S3.
Transforming data into other formats or writing to specific databases, such as pandas or MongoDB.
Writing to Filesystems#
When using HDFS, S3, or other filesystems, Ray Data adheres to the URI and file system scheme standards mentioned in Table 4.2. It is essential to explicitly specify the scheme information in the URI.
Table 8.2 lists several APIs for saving the Dataset
in various file formats.
Parquet |
CSV |
JSON |
TFRecord |
|
---|---|---|---|---|
Method |
When persisting data to the filesystem, do not forget to specify the filesystem scheme.
dataset.write_parquet("file:///tmp/trip")
2024-04-23 15:59:04,257 INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-58-55_565716_64002/logs
2024-04-23 15:59:04,257 INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Write]
By default, Ray Data writes data to the filesystem in the form of multiple chunked files, where each Block
corresponds to one file. The number of files is equal to the number of Block
s. You can use repartition()
to modify the number of files.
if os.path.exists("/tmp/files/"):
shutil.rmtree("/tmp/files/")
dataset.repartition(3).write_csv("/tmp/files/")
print(os.listdir("/tmp/files/"))
2024-04-23 15:59:04,750 INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-58-55_565716_64002/logs
2024-04-23 15:59:04,750 INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> AllToAllOperator[Repartition] -> TaskPoolMapOperator[Write]
['10_000002_000000.csv', '10_000000_000000.csv', '10_000001_000000.csv']
Converting to Other Framework Formats#
Ray Data allows the conversion of its data into single-node pandas DataFrame or a distributed Dask DataFrame, as illustrated in Table 8.3.
pandas |
Dask |
Spark |
|
---|---|---|---|
Method |