ruby ractor experiments: safe async communication
(This post is also available in 🇯🇵 Japanese thanks to @hachi8833.)
Ractors (api documentation, design documentation) are a new concurrency abstraction for Ruby 3.0 inspired on the actor model.
From the point of view of a Ractor that wants to send some information to another, communication can either be:
-
asynchrous (or non-blocking): a Ractor can send information to another using
Ractor#send
, placing it into an infinite queue that can be read by the destination Ractor withRactor.receive
-
synchronous (or blocking): a Ractor can use
Ractor.yield
to block until another Ractor callsRactor#take
Let’s consider the async case: Let’s say we want to send information to another Ractor, but don’t want to block for it to finish processing it. What happens if the receiving Ractor is too slow to process the data?
receiver_ractor = Ractor.new do
loop do
message = Ractor.receive
sleep 1
puts "Processed #{message}"
end
end
counter = 0
while true
counter += 1
receiver_ractor.send(counter)
end
As expected, if the receiver cannot keep up with the sender, more and more memory will be used, until the system memory is exhausted and the application crashes.
Looking at the Ractor API, there’s no built-in way for the sender to check if the receiver is falling behind or not, so I came up with the following approach:
receiver_ractor = Ractor.new do
processing_queue = Queue.new
Thread.new do
sleep(1) # simulate a slow start for this thread
loop do
message = processing_queue.pop
puts "Processed from queue: #{message}"
end
end
loop do
queue_size = processing_queue.size
sender, message = Ractor.select(Ractor.current, yield_value: queue_size)
if sender != :yield
processing_queue << message
puts "Added message to queue: #{message}"
else
puts "Sent queue status: #{queue_size}"
end
end
end
receiver_ractor.send(1)
receiver_ractor.send(2)
receiver_ractor.send(3)
puts "Finished submissions"
sleep(0.5)
receiver_ractor.take # force refresh status
puts "Receiver queue length: #{receiver_ractor.take}"
sleep(1)
receiver_ractor.take # force refresh status
puts "Receiver queue length: #{receiver_ractor.take}"
Here’s how this looks when it executes:
<internal:ractor>:267: warning: Ractor is experimental, and the behavior may change in future versions of Ruby! Also there are many implementation issues. Finished submissions Added message to queue: 1 Added message to queue: 2 Added message to queue: 3 Sent queue status: 3 Sent queue status: 3 Receiver queue length: 3 Processed from queue: 1 Processed from queue: 2 Processed from queue: 3 Sent queue status: 3 Sent queue status: 0 Receiver queue length: 0
And here’s the timeline:
-
The main Ractor sends three messages to the
receiver_ractor
, containing1
,2
and3
-
The main Ractor goes to sleep
-
The receiver Ractor wakes up, reads the three messages, and redirects them to the
processing_queue
-
The main Ractor wakes up, forces a refresh of the
queue_size
(see below for more details) -
The main Ractor calls
take
, observes thequeue_size
is 3 -
The main Ractor goes to sleep
-
The receiver Ractor’s second thread wakes up and processes the three messages:
1
,2
and3
-
The main Ractor wakes up, forces a refresh of the
queue_size
(again) — notice that this refresh was needed, because the value was outdated (goes from 3 to 0) -
The main Ractor observes that the queue is empty!
Inside the receiver Ractor, this strategy works as follows: There are now two threads. One of the threads uses Ractor.select
to do two things at once — either receive new items for processing, putting them on a regular thread-safe queue, or returning back the current size of the queue. The second thread just processes items from the thread-safe queue.
The sender can now either use send
to send items, or call take
twice to get the size of the queue. Why twice? Because this value is only refreshed before select
gets called, if a long time passes between select
is entered and any calls to send
or take
, this value can become outdated, as happened the example above. Calling take
twice in a row guarantees that we get a "fresh" value — we know the value was just refreshed for the second take
.
Building atop this construction, we can implement a number of strategies for better communication between two Ractors. For instance, we can review the original example to make sure that the sender never runs "too far ahead" the receiver:
receiver_ractor = Ractor.new do
processing_queue = Queue.new
Thread.new do
loop do
message = processing_queue.pop
sleep(1)
puts "Processed #{message}"
end
end
loop do
queue_size = processing_queue.size
sender, message = Ractor.select(Ractor.current, yield_value: queue_size)
if sender != :yield
processing_queue << message
puts "Added message to queue: #{message}"
else
puts "Sent queue status: #{queue_size}"
end
end
end
counter = 0
while true
counter += 1
receiver_ractor.send(counter)
if counter % 10 == 0
receiver_ractor.take # force refresh status
queue_size = receiver_ractor.take
if queue_size > 5
puts "Ractor is falling behind (#{queue_size} elements unprocessed); sleeping for a while"
sleep(1) while (receiver_ractor.take && receiver_ractor.take > 1)
end
end
end
And here’s how it looks:
<internal:ractor>:267: warning: Ractor is experimental, and the behavior may change in future versions of Ruby! Also there are many implementation issues. Added message to queue: 1 Added message to queue: 2 Added message to queue: 3 Added message to queue: 4 Added message to queue: 5 Added message to queue: 6 Added message to queue: 7 Added message to queue: 8 Added message to queue: 9 Added message to queue: 10 Sent queue status: 10 Sent queue status: 9 Ractor is falling behind (9 elements unprocessed); sleeping for a while Sent queue status: 9 Sent queue status: 9 Processed 1 Sent queue status: 9 Sent queue status: 8 Processed 2 Sent queue status: 8 Sent queue status: 7 Processed 3 Sent queue status: 7 Sent queue status: 6 Processed 4 Sent queue status: 6 Sent queue status: 5 Processed 5 Sent queue status: 5 Sent queue status: 4 Processed 6 Sent queue status: 4 Sent queue status: 3 Processed 7 Sent queue status: 3 Sent queue status: 2 Processed 8 Sent queue status: 2 Sent queue status: 1 Added message to queue: 11 Added message to queue: 12 Added message to queue: 13 Added message to queue: 14 Added message to queue: 15 Added message to queue: 16 Added message to queue: 17 Added message to queue: 18 Added message to queue: 19 Added message to queue: 20 Sent queue status: 11 Sent queue status: 11 Ractor is falling behind (11 elements unprocessed); sleeping for a while
Sleeping is a very simplistic solution, but gets the job done. Other alternatives could be to switch to synchronous communication, or to instead submit the work to a different Ractor/code path.
That’s it for my first experiment with Ractors!