6.3. Distributed Object Storage#
In Ray, shared data can be stored in the distributed object store, and data placed in the distributed object store is referred to as a remote object. We can use ray.get()
and ray.put()
to read and write these remote objects. Unlike in-memory Python object instances that are mutable, remote objects are immutable, i.e., they cannot be changed in place.
ray.put()
and ray.get()
#
Show code cell content
import logging
import random
from typing import Tuple
import numpy as np
import pandas as pd
import ray
import torch
if ray.is_initialized:
ray.shutdown()
ray.init(logging_level=logging.ERROR)
As shown in Fig. 6.4, working with remote objects involves two APIs: ray.put()
and ray.get()
.
ray.put()
serializes the object data and writes it into the distributed object store. It returns aRefObjectID
, which is a pointer to this remote object. By referencing thisRefObjectID
, we can use this data object in a distributed manner in remote functions or remote classes.ray.get()
retrieves the data from the distributed object store via theRefObjectID
and performs deserialization.
def create_rand_tensor(size: Tuple[int, int, int]) -> torch.tensor:
return torch.randn(size=(size), dtype=torch.float)
torch.manual_seed(42)
# create 16 tensors, each is (X, 8, 8)
tensor_obj_ref_list = [ray.put(create_rand_tensor((i, 8, 8))) for i in range(1, 16)]
tensor_obj_ref_list[0], len(tensor_obj_ref_list)
(ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001e1f505), 15)
Get the data from the distributed object store with ray.get()
:
val = ray.get(tensor_obj_ref_list[0])
val.size(), val
(torch.Size([1, 8, 8]),
tensor([[[ 1.9269, 1.4873, 0.9007, -2.1055, 0.6784, -1.2345, -0.0431,
-1.6047],
[-0.7521, 1.6487, -0.3925, -1.4036, -0.7279, -0.5594, -0.7688,
0.7624],
[ 1.6423, -0.1596, -0.4974, 0.4396, -0.7581, 1.0783, 0.8008,
1.6806],
[ 1.2791, 1.2964, 0.6105, 1.3347, -0.2316, 0.0418, -0.2516,
0.8599],
[-1.3847, -0.8712, -0.2234, 1.7174, 0.3189, -0.4245, 0.3057,
-0.7746],
[-1.5576, 0.9956, -0.8798, -0.6011, -1.2742, 2.1228, -1.2347,
-0.4879],
[-0.9138, -0.6581, 0.0780, 0.5258, -0.4880, 1.1914, -0.8140,
-0.7360],
[-1.4032, 0.0360, -0.0635, 0.6756, -0.0978, 1.8446, -1.1845,
1.3835]]]))
Or you can fetch the list of ObjectRefID
s:
results = ray.get(tensor_obj_ref_list)
results[0].size(), results[0]
(torch.Size([1, 8, 8]),
tensor([[[ 1.9269, 1.4873, 0.9007, -2.1055, 0.6784, -1.2345, -0.0431,
-1.6047],
[-0.7521, 1.6487, -0.3925, -1.4036, -0.7279, -0.5594, -0.7688,
0.7624],
[ 1.6423, -0.1596, -0.4974, 0.4396, -0.7581, 1.0783, 0.8008,
1.6806],
[ 1.2791, 1.2964, 0.6105, 1.3347, -0.2316, 0.0418, -0.2516,
0.8599],
[-1.3847, -0.8712, -0.2234, 1.7174, 0.3189, -0.4245, 0.3057,
-0.7746],
[-1.5576, 0.9956, -0.8798, -0.6011, -1.2742, 2.1228, -1.2347,
-0.4879],
[-0.9138, -0.6581, 0.0780, 0.5258, -0.4880, 1.1914, -0.8140,
-0.7360],
[-1.4032, 0.0360, -0.0635, 0.6756, -0.0978, 1.8446, -1.1845,
1.3835]]]))
Example 1: Transforming Data#
Data in a remote object cannot be modified in place. While the following operations are possible on your local memory, they cannot be directly applied to a remote object.
a = torch.rand(size=(1, 8, 8))
a[0] = torch.ones(8, 8)
a
tensor([[[1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1.]]])
If you want to use new data, you should use a remote function or remote class to perform the transformation operation on the remote object, generating a new remote object.
@ray.remote
def transform_tensor(tensor: torch.tensor) -> torch.tensor:
return torch.transpose(tensor, 0, 1)
transformed_object_list = [transform_tensor.remote(t_obj_ref) for t_obj_ref in tensor_obj_ref_list]
transformed_object_list[0].size()
28
Passing Parameters#
Remote Objects can be passed between tasks and actors via RefObjectID
.
Automatic De-referencing#
Directly pass the RefObjectID
as a parameter when calling a task or actor. In the example below, x_obj_ref
is a RefObjectID
, and the echo()
remote function will automatically get the value of x
from x_obj_ref
, which is called de-referencing of RefObjectID
.
@ray.remote
def echo(x):
print(f"current value of argument x: {x}")
return x
x = list(range(5))
x_obj_ref = ray.put(x)
x_obj_ref
ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)
ray.get(echo.remote(x_obj_ref))
[0, 1, 2, 3, 4]
ray.get(echo.remote(x))
[0, 1, 2, 3, 4]
Complex Data Structures#
If a RefObjectID
is in a complex data structure, Ray does not automatically get the value of the RefObjectID
. In other words, de-referencing is not automatic for complex data structures, including:
When a
RefObjectID
is in adict
, for example:.remote({"obj": x_obj_ref})
When a
RefObjectID
is in alist
, for example:.remote([x_obj_ref])
ray.get(echo.remote({"obj": x_obj_ref}))
(echo pid=95333) current value of argument x: [0, 1, 2, 3, 4]
{'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)}
ray.get(echo.remote([x_obj_ref]))
(echo pid=95325) current value of argument x: {'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)}
[ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)]
Implementation#
Each computing node in the Ray cluster has a shared memory object store. The data of a remote object is stored in the object store of one or more computing nodes in the cluster. The shared memory of all computing nodes collectively forms the distributed object store.
When the data volume of a remote object is small (<= 100 KB), it is stored in the memory of the computing node process. When the data volume is large, it is stored in the distributed shared memory. When the shared memory space of the cluster is insufficient, data is spilled to persistent storage, such as a hard disk or S3.
ray.shutdown()