(This post is also available in 🇯🇵 Japanese thanks to @hachi8833.)

Ractors (api documentation, design documentation) are a new concurrency abstraction for Ruby 3.0 inspired on the actor model.

From the point of view of a Ractor that wants to send some information to another, communication can either be:

  • asynchrous (or non-blocking): a Ractor can send information to another using Ractor#send, placing it into an infinite queue that can be read by the destination Ractor with Ractor.receive

  • synchronous (or blocking): a Ractor can use Ractor.yield to block until another Ractor calls Ractor#take

Let’s consider the async case: Let’s say we want to send information to another Ractor, but don’t want to block for it to finish processing it. What happens if the receiving Ractor is too slow to process the data?

receiver_ractor = Ractor.new do
  loop do
    message = Ractor.receive
    sleep 1
    puts "Processed #{message}"
  end
end

counter = 0
while true
  counter += 1
  receiver_ractor.send(counter)
end
ractor unbounded memory

As expected, if the receiver cannot keep up with the sender, more and more memory will be used, until the system memory is exhausted and the application crashes.

Looking at the Ractor API, there’s no built-in way for the sender to check if the receiver is falling behind or not, so I came up with the following approach:

receiver_ractor = Ractor.new do
  processing_queue = Queue.new

  Thread.new do
    sleep(1) # simulate a slow start for this thread

    loop do
      message = processing_queue.pop
      puts "Processed from queue: #{message}"
    end
  end

  loop do
    queue_size = processing_queue.size
    sender, message = Ractor.select(Ractor.current, yield_value: queue_size)

    if sender != :yield
      processing_queue << message
      puts "Added message to queue: #{message}"
    else
      puts "Sent queue status: #{queue_size}"
    end
  end
end

receiver_ractor.send(1)
receiver_ractor.send(2)
receiver_ractor.send(3)
puts "Finished submissions"

sleep(0.5)

receiver_ractor.take # force refresh status
puts "Receiver queue length: #{receiver_ractor.take}"

sleep(1)

receiver_ractor.take # force refresh status
puts "Receiver queue length: #{receiver_ractor.take}"

Here’s how this looks when it executes:

<internal:ractor>:267: warning: Ractor is experimental, and the behavior may change in future versions of Ruby! Also there are many implementation issues.
Finished submissions
Added message to queue: 1
Added message to queue: 2
Added message to queue: 3
Sent queue status: 3
Sent queue status: 3
Receiver queue length: 3
Processed from queue: 1
Processed from queue: 2
Processed from queue: 3
Sent queue status: 3
Sent queue status: 0
Receiver queue length: 0

And here’s the timeline:

  1. The main Ractor sends three messages to the receiver_ractor, containing 1, 2 and 3

  2. The main Ractor goes to sleep

  3. The receiver Ractor wakes up, reads the three messages, and redirects them to the processing_queue

  4. The main Ractor wakes up, forces a refresh of the queue_size (see below for more details)

  5. The main Ractor calls take, observes the queue_size is 3

  6. The main Ractor goes to sleep

  7. The receiver Ractor’s second thread wakes up and processes the three messages: 1, 2 and 3

  8. The main Ractor wakes up, forces a refresh of the queue_size (again) — notice that this refresh was needed, because the value was outdated (goes from 3 to 0)

  9. The main Ractor observes that the queue is empty!

Inside the receiver Ractor, this strategy works as follows: There are now two threads. One of the threads uses Ractor.select to do two things at once — either receive new items for processing, putting them on a regular thread-safe queue, or returning back the current size of the queue. The second thread just processes items from the thread-safe queue.

The sender can now either use send to send items, or call take twice to get the size of the queue. Why twice? Because this value is only refreshed before select gets called, if a long time passes between select is entered and any calls to send or take, this value can become outdated, as happened the example above. Calling take twice in a row guarantees that we get a "fresh" value — we know the value was just refreshed for the second take.

Building atop this construction, we can implement a number of strategies for better communication between two Ractors. For instance, we can review the original example to make sure that the sender never runs "too far ahead" the receiver:

receiver_ractor = Ractor.new do
  processing_queue = Queue.new

  Thread.new do
    loop do
      message = processing_queue.pop
      sleep(1)
      puts "Processed #{message}"
    end
  end

  loop do
    queue_size = processing_queue.size
    sender, message = Ractor.select(Ractor.current, yield_value: queue_size)

    if sender != :yield
      processing_queue << message
      puts "Added message to queue: #{message}"
    else
      puts "Sent queue status: #{queue_size}"
    end
  end
end

counter = 0
while true
  counter += 1
  receiver_ractor.send(counter)

  if counter % 10 == 0
    receiver_ractor.take # force refresh status
    queue_size = receiver_ractor.take
    if queue_size > 5
      puts "Ractor is falling behind (#{queue_size} elements unprocessed); sleeping for a while"
      sleep(1) while (receiver_ractor.take && receiver_ractor.take > 1)
    end
  end
end

And here’s how it looks:

<internal:ractor>:267: warning: Ractor is experimental, and the behavior may change in future versions of Ruby! Also there are many implementation issues.
Added message to queue: 1
Added message to queue: 2
Added message to queue: 3
Added message to queue: 4
Added message to queue: 5
Added message to queue: 6
Added message to queue: 7
Added message to queue: 8
Added message to queue: 9
Added message to queue: 10
Sent queue status: 10
Sent queue status: 9
Ractor is falling behind (9 elements unprocessed); sleeping for a while
Sent queue status: 9
Sent queue status: 9
Processed 1
Sent queue status: 9
Sent queue status: 8
Processed 2
Sent queue status: 8
Sent queue status: 7
Processed 3
Sent queue status: 7
Sent queue status: 6
Processed 4
Sent queue status: 6
Sent queue status: 5
Processed 5
Sent queue status: 5
Sent queue status: 4
Processed 6
Sent queue status: 4
Sent queue status: 3
Processed 7
Sent queue status: 3
Sent queue status: 2
Processed 8
Sent queue status: 2
Sent queue status: 1
Added message to queue: 11
Added message to queue: 12
Added message to queue: 13
Added message to queue: 14
Added message to queue: 15
Added message to queue: 16
Added message to queue: 17
Added message to queue: 18
Added message to queue: 19
Added message to queue: 20
Sent queue status: 11
Sent queue status: 11
Ractor is falling behind (11 elements unprocessed); sleeping for a while

Sleeping is a very simplistic solution, but gets the job done. Other alternatives could be to switch to synchronous communication, or to instead submit the work to a different Ractor/code path.

That’s it for my first experiment with Ractors!