An exploration of local gen_server multi_call in Elixir

By: on January 30, 2020

OTP’s gen_server module provides a convenient multi_call method that forwards a given request to all gen_server processes that are locally registered with a given name on a given list of nodes. multi_call subsequently awaits the responses and returns a tuple of successful replies and nodes that failed to respond.

I would like a variant of multi_call that works with a list of pids on the local node. Of course, such a variant may already exist (though not in a place I have checked).

My hypothetical use case (which can be approached in a number of different ways) is that I want to shard an in-memory similarity search index across a number of processes. In order to answer a query, the shards would be queried via a local multi_call and once collected, the replies would be merged to yield the final result.

A brief look at OTP’s gen_server:multi_call

The existing gen_server:multi_call implementation has two execution paths: one with and one without timeout. In the case where a timeout is specified, communication is routed through a middleman process, which is terminated on completion or when the timeout occurs. Any late replies will be sent to the no longer existing middleman process and thus be dropped, shielding the original caller from unwanted messages. The case without timeout does not require a middleman process.

All message passing in Erlang is asynchronous. Synchronous calls such as gen_server:call are implemented on top of this by sending the request (tagged with a unique reference) and immediately afterwards entering a selective receive block that ‘waits’ for a response tagged with the same reference. There are two further ways to leave the receive block: through a timeout and through termination of the process whose reply is being awaited. The latter is achieved through process monitoring. A monitor is a unidirectional link from one process to another, and when a process terminates, all processes in the respective link set are notified.

Briefly summarized, gen_server:multi_call:

  • creates a unique reference
  • starts a timer (if a timeout is given)
  • for each target process, sets up a monitor and sends the original request (tagged with the unique reference)
  • awaits the responses / termination of the outstanding requests and for each incoming response demonitors the corresponding process
  • if no timeout occurred, cancels the timer and returns the results
  • if a timeout occurs, demonitors the processes and returns the responses received so far plus the responses currently queued in the mailbox

Interactive experimentation in Elixir

First of all, here is a simple gen_server module that can be the target of the new local multi_call:

defmodule DummyShard do
  use GenServer

  def init([id, sleep_millis]) do
    {:ok, {id, sleep_millis}}
  end

  def handle_call(:do_work, _from, {id, sleep_millis} = state) do
    case sleep_millis do
      n when n > 0 ->
        :timer.sleep(n)
      _ ->
        nil
    end

    {:reply, id, state}
  end

  def start_link(id, sleep_millis \\ nil) do
    GenServer.start_link(__MODULE__, [id, sleep_millis])
  end

  def do_work(pid) do
    GenServer.call(pid, :do_work)
  end
end

DummyShards are started with an id and an optional sleep duration. When their do_work function is invoked, they sleep and then return their id. In a real application, these gen_servers would be supervised and perform meaningful work. For experimentation, DummyShard gen_servers can be started in iex.

iex(1)> {:ok, pid1} = DummyShard.start_link(1)
{:ok, #PID<0.191.0>}

iex(2)> {:ok, pid2} = DummyShard.start_link(2)
{:ok, #PID<0.193.0>}

iex(3)> DummyShard.do_work(pid1)
1

iex(4)> DummyShard.do_work(pid2)
2

Making calls without going through gen_server:call:

iex(5)> ref = make_ref()
#Reference<0.3038621705.935854087.188656>

iex(6)> send(pid1, {:"$gen_call", {self(), ref}, :do_work})
{:"$gen_call", {#PID<0.189.0>, #Reference<0.3038621705.935854087.188656>},
:do_work}

iex(7)> send(pid2, {:"$gen_call", {self(), ref}, :do_work})
{:"$gen_call", {#PID<0.189.0>, #Reference<0.3038621705.935854087.188656>},
:do_work}

Flushing iex’s mailbox to show the queued responses:

iex(8)> flush
{#Reference<0.3038621705.935854087.188656>, 1}
{#Reference<0.3038621705.935854087.188656>, 2}
:ok

Local multi_call

With the knowledge gleaned so far, and following the gen_server:multi_call source code, a local variant of multi_call can be put together.

defmodule Local do

First, the case where no timeout is specified:

  def multi_call(pids, request, :infinity) when is_list(pids) do
    tag = make_ref()
    monitors = send_multi(pids, tag, request, [])
    receive_multi(tag, monitors, :no_timer)
  end

send_multi/4 sets up a monitor for each pid, asynchronously sends the request and returns a list of {monitor_ref, pid}-tuples:

  defp send_multi([], _, _, monitors), do: monitors

  defp send_multi([pid | pids], tag, request, monitors) when is_pid(pid) do
    monitor = Process.monitor(pid)
    send(pid, {:"$gen_call", {self(), {monitor, tag}}, request})
    send_multi(pids, tag, request, [{monitor, pid} | monitors])
  end

receive_multi/3 sets up two empty accumulators (for successful and unsuccessful calls) and delegates the work of collecting replies to a helper function:

  defp receive_multi(tag, monitors, timer_id) do
    rec_m(tag, monitors, [], [], timer_id)
  end

The helper function rec_m/5 has three clauses. If all requests have been answered (indicated by the empty list of monitors in the second argument) and a timeout was given, the underlying timer is cancelled and the accumulated results are returned:

  defp rec_m(_tag, [], successes, failures, timer_id) when is_reference(timer_id) do
    Process.cancel_timer(timer_id)
    {successes, failures}
  end

If no timeout was given, no timer needs to be cancelled. The accumulated results are returned:

  defp rec_m(_tag, [], successes, failures, :no_timer) do
    {successes, failures}
  end

The third clause is rec_m/5‘s workhorse. It takes the first {monitor, pid}-tuple out of the list of outstanding requests and performs a selective receive against the mailbox. Three cases are handled. Firstly, if the monitored process has terminated, the respective pid is added to the failure accumulator. Secondly, if there is a proper reply, it is added to the success accumulator. In both cases, rec_m recurses with the remaining outstanding requests. Thirdly, if a timeout has occurred, the current pid is added to the failure accumulator and the mailbox is scanned (via the rec_m_rest/4 helper) for successful replies that may have arrived out of order. In all three cases the current pid is demonitored.

  defp rec_m(tag, [{monitor, pid} | monitors], successes, failures, timer_id) do
    receive do
      {:DOWN, ^monitor, :process, ^pid, reason} ->
        rec_m(tag, monitors, successes, [{pid, reason} | failures], timer_id)

      {{^monitor, ^tag}, reply} ->
        Process.demonitor(monitor, [:flush])
        rec_m(tag, monitors, [reply | successes], failures, timer_id)

      {:timeout, ^timer_id, _} ->
        Process.demonitor(monitor, [:flush])
        rec_m_rest(tag, monitors, successes, [{pid, :timeout} | failures])
    end
  end

rec_m_rest/4 has a similiar structure. It recursively checks the mailbox for a reply for the current {monitor, pid}-tuple. Three cases are handled. Firstly, if the respective process has terminated, a failure is recorded. Secondly, if a reply from the process is present, it is recorded as a success. Thirdly, if no corresponding message is present (determined in the receive’s after-clause), a failure is recorded. Once the list of outstanding requests has been processed recursively, the accumulated results are returned:

  defp rec_m_rest(_tag, [], successes, failures) do
    {successes, failures}
  end

  defp rec_m_rest(tag, [{monitor, pid} | monitors], successes, failures) do
    receive do
      {:DOWN, ^monitor, :process, ^pid, reason} ->
        rec_m_rest(tag, monitors, successes, [{pid, reason} | failures])

      {{^monitor, ^tag}, reply} ->
        Process.demonitor(monitor, [:flush])
        rec_m_rest(tag, monitors, [reply | successes], failures)
    after
      0 ->
        Process.demonitor(monitor, [:flush])
        rec_m_rest(tag, monitors, successes, [{pid, :timeout} | failures])
    end
  end

Finally, the code for multi_call with timeout follows very closely the gen_server:multi_call implementation. send_multi and receive_multi are now performed from within a freshly spawned middleman process. This middleman process starts the timer whose timeout message is already understood by receive_multi. The middleman process is monitored by the original caller and communicates the results to the original caller via an exit signal. For this to work reliably, the original caller must have established the monitor before the middleman process has started its actual work. This is achieved by having the middleman process wait for a start message from the original caller, which the original caller sends after it has established its monitor on the middleman process. The original caller can theoretically disappear before the middleman process has finished its work. The middleman process detects this by monitoring the original caller and exiting with an empty result should the original caller terminate. The resulting code is shown below:

  def multi_call(pids, request, timeout) when is_list(pids) and timeout >= 0 do
    tag = make_ref()
    caller = self()

    receiver =
      spawn(fn ->
        Process.flag(:trap_exit, true)
        m_caller = Process.monitor(caller)

        receive do
          {^caller, tag} ->
            monitors = send_multi(pids, tag, request, [])
            timer_id = :erlang.start_timer(timeout, self(), :ok)
            result = receive_multi(tag, monitors, timer_id)
            exit({self(), tag, result})

          {:DOWN, ^m_caller, _, _, _} ->
            exit(:normal)
        end
      end)

    m_receiver = Process.monitor(receiver)
    send(receiver, {self(), tag})

    receive do
      {:DOWN, ^m_receiver, _, _, {^receiver, ^tag, result}} ->
        result

      {:DOWN, ^m_receiver, _, _, reason} ->
        exit(reason)
    end
  end

This concludes the Local module:

end

A quick interactive trial in iex:

iex(9)> {:ok, pid1} = DummyShard.start_link(1, 100)
{:ok, #PID<0.321.0>}

iex(10)> {:ok, pid2} = DummyShard.start_link(2, 200)
{:ok, #PID<0.323.0>}

iex(11)> res = Local.multi_call([pid1, pid2], :do_work, :infinity)
{[1, 2], []}

iex(12)> res = Local.multi_call([pid1, pid2], :do_work, 50)
{[], [{#PID<0.321.0>, :timeout}, {#PID<0.323.0>, :timeout}]}

iex(13)> res = Local.multi_call([pid1, pid2], :do_work, 100)
{[], [{#PID<0.321.0>, :timeout}, {#PID<0.323.0>, :timeout}]}

iex(14)> res = Local.multi_call([pid1, pid2], :do_work, 101)
{[1], [{#PID<0.323.0>, :timeout}]}

iex(15)> res = Local.multi_call([pid1, pid2], :do_work, 500)
{[1, 2], []}

Conclusion

With OTP you do not normally interact directly with raw processes, monitors, links and asynchronous message passing. However, these building blocks are still available for creating your own abstractions, and peeking under the hood of OTP can be quite instructive.

Share

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*