Back to all posts

Making an Asynchronous Function Synchronous with GenServer.reply


I encountered an interesting problem while developing the absinthe_websocket library: I wanted to make an asynchronous function call appear to be synchronous. In this particular case, I wanted to run queries through the WebSocket connection1. The problem is that WebSockets use a single persistent connection and the responses can come out of order. How can we hide the asynchronous nature of the connection from the client? Like most times when I’ve needed a solution in Elixir, reading the documentation showed me they already had an OTP-based solution with GenServer.reply/2.

The following code is based on AbsintheWebSocket.QueryServer. While it’s not a complex file to begin with, I simplified it a bit to demonstrate the approach. The entire code is below, followed by commentary on each section:

defmodule QueryServer do
  use GenServer

  # Client

  def start_link(opts \\ []) do
    state = %{
      queries: %{},
    }
    GenServer.start_link(__MODULE__, state, opts)
  end

  def init(state) do
    {:ok, state}
  end

  def post(pid, query) do
    GenServer.call(pid, {:post, query})
  end

  # Server (callbacks)

  def handle_call({:post, query}, from, %{queries: queries} = state) do
    ref = make_ref()

    AsynchronousServer.query(self(), ref, query)

    queries = Map.put(queries, ref, from)
    state = Map.put(state, :queries, queries)

    {:noreply, state}
  end

  def handle_cast({:query_response, ref, response}, %{queries: queries} = state) do
    {from, queries} = Map.pop(queries, ref)

    GenServer.reply(from, response)

    state = Map.put(state, :queries, queries)

    {:noreply, state}
  end
end

defmodule AsynchronousServer do
  def query(pid, ref, query) do
    spawn fn ->
      :rand.uniform(2000)
      |> :timer.sleep()

      GenServer.cast(pid, {:query_response, ref, "RESPONSE: #{query}"})
    end

    IO.puts "Waiting for a response"
  end
end

{:ok, pid} = QueryServer.start_link()
IO.puts QueryServer.post(pid, "TEST")
# Waiting for a response
# RESPONSE: TEST

Breaking down each component:

defmodule QueryServer do
  use GenServer

  # Client

  def start_link(opts \\ []) do
    state = %{
      queries: %{},
    }
    GenServer.start_link(__MODULE__, state, opts)
  end

  def init(state) do
    {:ok, state}
  end

  def post(pid, query) do
    GenServer.call(pid, {:post, query})
  end

The first section is a standard GenServer Client. start_link/1 initializes the server with an initial state, which is a map of queries. We’ll use that later to route responses to the correct caller. The post/2 method is the what we want to appear synchronous, so it uses GenServer.call/3.

  def handle_call({:post, query}, from, %{queries: queries} = state) do
    ref = make_ref()

    AsynchronousServer.query(self(), ref, query)

    queries = Map.put(queries, ref, from)
    state = Map.put(state, :queries, queries)

    {:noreply, state}
  end

Here’s the callback where we handle the call from the post/2 function. What makes this different from most GenServer.handle_call/3 examples, is that we don’t return {:reply, reply, new_state}, but instead send {:noreply, state}. The documentation gives a couple of reasons why you might not reply, including our case, “To reply after returning from the callback because the response is not yet available”. To handle the asynchronous response, we make a unique reference with [make_ref/0](https://hexdocs.pm/elixir/Kernel.html#make_ref/0) and put that as a key into the queries map, with the value being the from parameter. The asynchronous function we'll use is AsynchronousServer.query/3`. In AbsintheWebSocket it’s a WebSockex implementation, but to demonstrate the concept the following code works:

defmodule AsynchronousServer do
  def query(pid, ref, query) do
    spawn fn ->
      :rand.uniform(2000)
      |> :timer.sleep()

      GenServer.cast(pid, {:query_response, ref, "RESPONSE: #{query}"})
    end

    IO.puts "Waiting for a response"
  end
end

When you run this code you’ll see “Waiting for a response” printed to the console immediately. It uses spawn to start another process that sleeps for up to 2 seconds, then calls GenServer.cast/2 on the QueryServer process with the original ref and a simple response.

  def handle_cast({:query_response, ref, response}, %{queries: queries} = state) do
    {from, queries} = Map.pop(queries, ref)

    GenServer.reply(from, response)

    state = Map.put(state, :queries, queries)

    {:noreply, state}
  end

The final step is to send a reply to the original caller. We pop the ref off the queries map to get the original from parameter. We then combine that with the response from the AsynchronousServer and call GenServer.reply/2. To see it in action you can run this code:

{:ok, pid} = QueryServer.start_link()
IO.puts QueryServer.post(pid, "TEST")
#> Waiting for a response
#> RESPONSE: TEST

Notice that this is used as a normal synchronous function by the caller. It was a bit of work, but this setup allowed the WebSocket Caller in CommonGraphQLClient to implement the same post behaviour as the HTTP Caller.

The only point for further consideration is what to do if the response takes too long and it times out. Your use case might require catching the exit or crashing. For AbsintheWebSocket, I’ll likely use the connection state to respond early if the connection is down (or when it crashes).

1 It’s a little difficult to answer why you’d want to do this though. HTTP connections are undoubtedly a better solution, as they scale over multiple connections and they’re a simple request-response.


Do you love Elixir? So do we!

Annkissam has been building production applications with Elixir since 2015 and we're excited to leverage the experience we've gained over the years to help others use this amazing technology.