6.3. Distributed Object Storage#

In Ray, shared data can be stored in the distributed object store, and data placed in the distributed object store is referred to as a remote object. We can use ray.get() and ray.put() to read and write these remote objects. Unlike in-memory Python object instances that are mutable, remote objects are immutable, i.e., they cannot be changed in place.

ray.put() and ray.get()#

Hide code cell content
import logging
import random
from typing import Tuple
import numpy as np
import pandas as pd
import ray
import torch

if ray.is_initialized:
    ray.shutdown()
ray.init(logging_level=logging.ERROR)

As shown in Fig. 6.4, working with remote objects involves two APIs: ray.put() and ray.get().

  • ray.put() serializes the object data and writes it into the distributed object store. It returns a RefObjectID, which is a pointer to this remote object. By referencing this RefObjectID, we can use this data object in a distributed manner in remote functions or remote classes.

  • ray.get() retrieves the data from the distributed object store via the RefObjectID and performs deserialization.

../_images/put-get-object-store.svg

Fig. 6.4 Ray distributed object store#

def create_rand_tensor(size: Tuple[int, int, int]) -> torch.tensor:
    return torch.randn(size=(size), dtype=torch.float)

torch.manual_seed(42)
# create 16 tensors, each is (X, 8, 8)
tensor_obj_ref_list = [ray.put(create_rand_tensor((i, 8, 8))) for i in range(1, 16)]
tensor_obj_ref_list[0], len(tensor_obj_ref_list)
(ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001e1f505), 15)

Get the data from the distributed object store with ray.get():

val = ray.get(tensor_obj_ref_list[0])
val.size(), val
(torch.Size([1, 8, 8]),
 tensor([[[ 1.9269,  1.4873,  0.9007, -2.1055,  0.6784, -1.2345, -0.0431,
           -1.6047],
          [-0.7521,  1.6487, -0.3925, -1.4036, -0.7279, -0.5594, -0.7688,
            0.7624],
          [ 1.6423, -0.1596, -0.4974,  0.4396, -0.7581,  1.0783,  0.8008,
            1.6806],
          [ 1.2791,  1.2964,  0.6105,  1.3347, -0.2316,  0.0418, -0.2516,
            0.8599],
          [-1.3847, -0.8712, -0.2234,  1.7174,  0.3189, -0.4245,  0.3057,
           -0.7746],
          [-1.5576,  0.9956, -0.8798, -0.6011, -1.2742,  2.1228, -1.2347,
           -0.4879],
          [-0.9138, -0.6581,  0.0780,  0.5258, -0.4880,  1.1914, -0.8140,
           -0.7360],
          [-1.4032,  0.0360, -0.0635,  0.6756, -0.0978,  1.8446, -1.1845,
            1.3835]]]))

Or you can fetch the list of ObjectRefIDs:

results = ray.get(tensor_obj_ref_list)
results[0].size(), results[0]
(torch.Size([1, 8, 8]),
 tensor([[[ 1.9269,  1.4873,  0.9007, -2.1055,  0.6784, -1.2345, -0.0431,
           -1.6047],
          [-0.7521,  1.6487, -0.3925, -1.4036, -0.7279, -0.5594, -0.7688,
            0.7624],
          [ 1.6423, -0.1596, -0.4974,  0.4396, -0.7581,  1.0783,  0.8008,
            1.6806],
          [ 1.2791,  1.2964,  0.6105,  1.3347, -0.2316,  0.0418, -0.2516,
            0.8599],
          [-1.3847, -0.8712, -0.2234,  1.7174,  0.3189, -0.4245,  0.3057,
           -0.7746],
          [-1.5576,  0.9956, -0.8798, -0.6011, -1.2742,  2.1228, -1.2347,
           -0.4879],
          [-0.9138, -0.6581,  0.0780,  0.5258, -0.4880,  1.1914, -0.8140,
           -0.7360],
          [-1.4032,  0.0360, -0.0635,  0.6756, -0.0978,  1.8446, -1.1845,
            1.3835]]]))

Example 1: Transforming Data#

Data in a remote object cannot be modified in place. While the following operations are possible on your local memory, they cannot be directly applied to a remote object.

a = torch.rand(size=(1, 8, 8))
a[0] = torch.ones(8, 8)
a
tensor([[[1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.]]])

If you want to use new data, you should use a remote function or remote class to perform the transformation operation on the remote object, generating a new remote object.

@ray.remote
def transform_tensor(tensor: torch.tensor) -> torch.tensor:
    return torch.transpose(tensor, 0, 1)

transformed_object_list = [transform_tensor.remote(t_obj_ref) for t_obj_ref in tensor_obj_ref_list]
transformed_object_list[0].size()
28

Passing Parameters#

Remote Objects can be passed between tasks and actors via RefObjectID.

Automatic De-referencing#

Directly pass the RefObjectID as a parameter when calling a task or actor. In the example below, x_obj_ref is a RefObjectID, and the echo() remote function will automatically get the value of x from x_obj_ref, which is called de-referencing of RefObjectID.

@ray.remote
def echo(x):
    print(f"current value of argument x: {x}")
    return x

x = list(range(5))
x_obj_ref = ray.put(x)
x_obj_ref
ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)
ray.get(echo.remote(x_obj_ref))
[0, 1, 2, 3, 4]
ray.get(echo.remote(x))
[0, 1, 2, 3, 4]

Complex Data Structures#

If a RefObjectID is in a complex data structure, Ray does not automatically get the value of the RefObjectID. In other words, de-referencing is not automatic for complex data structures, including:

  • When a RefObjectID is in a dict, for example: .remote({"obj": x_obj_ref})

  • When a RefObjectID is in a list, for example: .remote([x_obj_ref])

ray.get(echo.remote({"obj": x_obj_ref}))
(echo pid=95333) current value of argument x: [0, 1, 2, 3, 4]
{'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)}
ray.get(echo.remote([x_obj_ref]))
(echo pid=95325) current value of argument x: {'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)}
[ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)]

Implementation#

Each computing node in the Ray cluster has a shared memory object store. The data of a remote object is stored in the object store of one or more computing nodes in the cluster. The shared memory of all computing nodes collectively forms the distributed object store.

When the data volume of a remote object is small (<= 100 KB), it is stored in the memory of the computing node process. When the data volume is large, it is stored in the distributed shared memory. When the shared memory space of the cluster is insufficient, data is spilled to persistent storage, such as a hard disk or S3.

ray.shutdown()