6.4. Ray Remote Classes#

The example in Section 6.2 demonstrates how to scale a stateless function. However, in real-world scenarios, there is often a need for stateful computations. The simplest form of stateful computation involves maintaining a counter that increments upon a specific condition. Different from stateless functions, stateful computations may not yield deterministic outputs for a given input, as the output is also dependent on the state. To implement stateful computations, Ray introduces Remote Classes, also known as Actors. Actors, named after the actor programming model [Hewitt et al., 1973], are a common paradigm in distributed computing, widely applied in big data and artificial intelligence domains. We will start by exploring a case study involving a counter to illustrate it.

Example 1: Distributed Counter#

Hide code cell content
import logging
from typing import Dict, List, Tuple
import ray

if ray.is_initialized:
    ray.shutdown()
ray.init(logging_level=logging.ERROR)

Ray’s remote class is decorated using ray.remote().

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

    def get_counter(self):
        return self.value

Initialize an object instance of this remote class by adding remote() after the class name Counter. The object instance is a remote actor.

counter = Counter.remote()

Next, we’ll call the increment() function of the Counter class. This function should also be called with remote(). Calling functions on an object instance should look like object_instance.function_name.remote().

obj_ref = counter.increment.remote()
print(ray.get(obj_ref))
1

It’s worth noting that multiple instances of actors can be created using the same class. Different actors can be executed in parallel, offering concurrency. However, within the same actor, member function calls are executed sequentially.

# create 10 actor instances
counters = [Counter.remote() for _ in range(10)]

# call increment of every actor
# these function callings can be executed in parallel
results = ray.get([c.increment.remote() for c in counters])
print(results)
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

Within the same actor instance, state is shared. Shared state implies that an actor may be scheduled across different nodes, and no matter which node it is scheduled to, any operation on the actor instance behaves just like operations on a single-node Python class. The data in the member variables of the object instance is accessible, modifiable, and updated in real-time.

# perform 5 increment operations on the first Actor
# these 5 increment operations are sequentially executed, sharing the state data 'value'
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results)
[2, 3, 4, 5, 6]

Actor Programming Model#

The actor programming model is a paradigm for distributed programming, and each programming language or framework has its own implementation. The fundamental element of the Actor programming model is the actor instance, where each actor instance is unique. An individual actor instance can be seen as a process with an address. Each actor has an address, allowing us to send messages to this actor from other actors, similar to sending messages using email addresses. An actor can have one or multiple addresses, and multiple actors can share the same address. The actor addresses depend on how we want to send and receive data. Multiple actors sharing the same address is analogous to a group email address in a company, where the group includes multiple people. Sending an email to this group address allows every individual in the group to receive the message.

With an address and memory space, an actor can:

  • Store data, such as state data.

  • Receive messages from other actors.

  • Send messages to other actors.

  • Create new actors.

An actor’s state data can only be managed by the actor itself and cannot be modified by other actors. This is somewhat similar to instances of classes in object-oriented programming languages. If we want to modify the data inside an actor, we should send a message to the actor. The actor, upon receiving the message, makes decisions based on its stored data, deciding whether to modify the state data or send messages to other actors. In the counter example, the actor receives the increment() message and, based on its stored state, performs the increment operation.

To ensure consistency in the distributed environment, multiple requests sent to the same actor are executed sequentially. In the counter example, performing 5 increment() operations on the same actor are executed sequentially.

The actor programming model is message-driven. Sending a message to an actor prompts it to respond by modifying its own state or to send messages to other actors. The actor programming model does not require explicit synchronization of data between multiple processes, avoiding lock issues and the waiting time for synchronization. This makes the actor programming model suitable for scenarios involving a large number of asynchronous operations.

Example 2: Leaderboard Ranking#

Next, let’s implement a more complex case based on the actor model: a leaderboard. The state of this leaderboard is a key-value pair named board. The keys are names (name), stored as str, and the values are scores (score), stored as float.

@ray.remote
class Ranking:
    def __init__(self, minimal_score: float = 60.0):
        self.minimal = minimal_score
        self.board = dict()

    def add(self, name: str, score: float) -> Dict[str, float]:
        try:
            score = float(score)
            if score < self.minimal:
                return
            if name in self.board:
                self.board[name] = max(score, self.board[name])
            else:
                self.board[name] = score
            self.board = dict(sorted(self.board.items(), key=lambda item: item[1]))
            return self.board
        except Exception as e:
            print(f"The data type of score should be float but we receive {type(score)}.")
            return self.board

    def top(self, n: int = 1) -> List[Tuple[str, float]]:
        n = min(n, len(self.board))
        results = list(self.board.items())[:n]
        return results

    def pop(self) -> Dict[str, float]:
        if len(self.board) <= 0:
            raise Exception("The board is empty.")
        else:
            _, _ = self.board.popitem()
        return self.board

In this leaderboard example, there are three functions:

  • __init__(): Constructor.

  • add(): Adds a new record, parsing the input. If the score cannot be converted to float, an exception is thrown. After a record is added, sort all the existing items.

  • pop(): Removes the record with the highest score. If board is empty, an exception is thrown.

.remote() is used to create an instance of the actor.

# create the ranking
ranking = Ranking.remote()

Here, ranking is an actor reference (actor handle), somewhat resembling ObjectRef. We use the ranking actor handle to manage the associated actor. Once the actor handle is destroyed, the corresponding actor and its state are also destroyed.

Multiple actor instances can be created, with each instance managing its own state. Additionally, ActorClass.options can be used to set options for these Actor instances, such as naming, and configuring CPU or GPU computing resources.

# create a math leaderboard 'math_ranking'
# independent of the previously created 'ranking'
math_ranking = Ranking.remote(minimal_score=80)

# create a chemistry leaderboard 'chem_ranking'
chem_ranking = Ranking.options(name="Chemistry").remote()

The actor handle can be obtained using ray.get_actor() with the name.

# get the 'Chemistry' actor handle
cr = ray.get_actor("Chemistry")

Add a new record to the ranking leaderboard by calling the ‘add()’ function. Remember to include .remote() when invoking class member functions, or an error will occur.

# add new records
ranking.add.remote("Alice", 90)
ranking.add.remote("Bob", 60)

print(f"Current ranking: {ray.get(ranking.top.remote(3))}")
Current ranking: [('Bob', 60.0), ('Alice', 90.0)]
ray.get(ranking.add.remote("Mark", 'a88'))
{'Bob': 60.0, 'Alice': 90.0}

In the above example, some calls may raise exceptions, such as inserting a string. Ray handles exceptions and prints them, but for safety, you can manually use try/except when calling these remote class member methods:

try:
    ray.get(ranking.pop.remote())
    ray.get(ranking.pop.remote())
    ray.get(ranking.pop.remote())
except Exception as e:
    print(e)
(Ranking pid=93896) The data type of score should be float but we receive <class 'str'>.
ray::Ranking.pop() (pid=93896, ip=127.0.0.1, actor_id=d61a503564d89fed72678f3b01000000, repr=<__main__.Ranking object at 0x107c62310>)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/ipykernel_93860/1570506600.py", line 29, in pop
Exception: The board is empty.

Example 3: Actor Pool#

In practice, it’s common to create an (ActorPool). An ActorPool is somewhat similar to multiprocessing.Pool. It contains multiple actors, each with the same functionality, and can be efficiently distributed across multiple computing nodes.

from ray.util import ActorPool


@ray.remote
class PoolActor:
    def add(self, operands):
        (a, b) = operands
        return a + b

    def double(self, operand):
        return operand * 2

# add the created Actors to the ActorPool
a1, a2, a3 = PoolActor.remote(), PoolActor.remote(), PoolActor.remote()
pool = ActorPool([a1, a2, a3])

If we want to call the actors added to the ActorPool, we can use the map(fn, values) and submit(fn, value) methods. These two methods are quite similar, taking a function fn and either a parameter value or a list of parameters values. map() takes a list of values, distributing the function in parallel to multiple actors for processing, while submit() takes a single value, selecting an actor from the ActorPool to execute the function. fn is a lambda expression or an anonymous function. This lambda expression has two parameters: actor and value. actor is the function call of an individual actor that we defined, and value is the parameter for this function.

The first parameter of the function is the actor in the ActorPool, and the second parameter is the function’s argument.

pool.map(lambda a, v: a.double.remote(v), [3, 4, 5, 4])

pool.submit(lambda a, v: a.double.remote(v), 3)
pool.submit(lambda a, v: a.double.remote(v), 4)

map() and submit() submit the computation tasks to the ActorPool. The ActorPool doesn’t directly return the results but asynchronously distributes them to different actors in the background. get_next() can retrieve the results in a blocking manner.

try:
    print(pool.get_next())
    print(pool.get_next())
    print(pool.get_next())
except Exception as e:
    print(e)
6
8
10

If all results have already been retrieved and an additional get_next() is attempted, an exception will be thrown.

Here, the value parameter of the submit() must be a single object and cannot be a parameter list. If you want to pass multiple parameters, you can wrap them in a tuple. For example, if the add() method performs a calculation on two operands, we can wrap the two operands into a tuple. When implementing the add() function, use (a, b) = operands to unpack this tuple.

当然,如果已经把所有结果都取回,仍然再去 get_next(),将会抛出异常。

在这里,value 只能是单个对象,不能是参数列表,如果想传入多个参数,可以把参数包裹成元组。比如 add() 方法对两个操作数做计算,我们把两个操作数包裹为一个元组,实现 add() 函数时使用 (a, b) = operands 解析这个元组。

pool.submit(lambda a, v: a.add.remote(v), (1, 10))
print(pool.get_next())
8
ray.shutdown()