Mechanics of Async Await / Habr

Mechanics of Async Await / Habr

This post explores the async await mechanic in Elixir. The competitive model adopted in Elixir is excellent as a platform for implementing such mechanics. However, don’t treat this post as a guide to developing real-world applications on Elixir.

→ The code for this position is posted on GitHub


Introduction

Many of us first encountered async await when we were learning JavaScript. Since JavaScript offers one of the most popular implementations of this technique, it is the JavaScript approach that usually sticks in memory as the basic variant of async await. Therefore, we unwittingly associate the fundamental principles of asynchronous programming with the concrete implementation and concrete decisions made in JavaScript.

One of the most common misconceptions about async await is that the paradigm requires a single-threaded, non-blocking runtime. The JavaScript runtime is non-blocking, first and foremost agothat it is single-stream. However, asynchronous programming models are fully implemented in both single-threaded and multi-threaded execution environments, and in both blocking and non-blocking forms.

From Sync to Async

The path from synchronous to asynchronous programming is essentially a transition from sequential to concurrent execution. At each moment of time, we will perform not one task, but several.

Let’s try to think about synchronous and asynchronous programming in terms of pairs of events that occur during execution. We assume that the following events of interest to us are traced in the program trace:

  • With synchronous programming

o

invoke function

and

return value

.

  • In asynchronous programming

o

invoke function

and

return promise

,

o

await promise

and

return value

.

Synchronous execution

Synchronous execution is a one-way operation in which a “call a function and return a value” type of interaction occurs between the calling party and the called party. The calling party stops execution until the called party completes its task and returns a value (Fig. 1., left).

Asynchronous execution and promises

Asynchronous execution is a round trip, two interactions. The first operation is that we call and return a promise, and the second is that we expect and return a value from the interaction between the calling party and the callee.

A promise can be interpreted either as a representation of the called party or as a representation of a future value (future). A promise can either be in a suspended state (meaning that the callee is still busy doing work and has not returned a value), or as completed — in the latter case, the callee has already completed work and returned a value.

If the promise is in a suspended state at the await stage, then all callers stop execution and wait until the called party completes its work and returns a value (Fig. 1, center). If the promise ends at the await stage, the calling party continues with the returned value (Fig. 1., Right).

Cycle of events

The async await execution environment is usually called the event loop (Event Loop). An event loop is a scheduler that allows an asynchronously executing program to register interest in a specific event. For the purposes of this article, we are only interested in completing the promise. When registration has occurred, execution stops. When the promise is completed, the execution of the event loop resumes.

The rest of this article explains how to implement async await and event loop capabilities in Elixir. Why on Elixir? First, it is easy to design async await on Elixir, it turns out a great illustrative implementation. More importantly, this mapping of async await to Elixir dispels the common myth that the async await mechanism is inherently non-blocking. It is actually possible to map asynchronous operations to Elixir processes and purposefully block the process while waiting for a transaction.

A brief introduction to Elixir

Elixir is a dynamically typed functional programming language that runs on the Erlang Virtual Machine (BEAM).

A key abstraction in the Elixir language is called a “process”. It is the smallest independently executable unit that has a unique identifier. To find out the ID of the currently running process, you need to execute self(). All code is executed in those processes. Elixir processes run concurrently, but each process executes its instructions sequentially.

# Создаём новый процесс – воспользовавшись идентификатором процесса, можно отправлять этому процессу сообщения 
pid = spawn(fn ->
  # Этот блок кода будет выполняться в отдельном процессе
end)

Processes exchange information and coordinate actions by exchanging messages. Sending a message is a non-blocking operation, so the execution of the process can continue.

# Отправить сообщение процессу с идентификатором pid (неблокирующая операция)
send(pid, {:Hello, "World"})

In contrast, receiving a message is a blocking operation, and execution of the process will be suspended until the corresponding message arrives.

# Получить сообщение (блокирующая операция)
receive do
  {:Hello, name} -> 
    IO.puts("Hello, #{name}!")
end

Perhaps the most popular abstraction in Elixir is GenServer. GenServer is a process just like any other process in Elixir. GenServer abstracts away all the boilerplate code needed to build a stateful server.

defmodule Counter do
  use GenServer

  # Клиентский API

  # Запускает GenServer
  def start_link() do
    GenServer.start_link(__MODULE__, 0, name: __MODULE__)
  end

  # Синхронный вызов, при помощи которого мы получаем текущее значение счётчика 
  def value do
    GenServer.call(__MODULE__, :value)
  end

  # Асинхронный вызов для инкремента счётчика
  def increment do
    GenServer.cast(__MODULE__, :increment)
  end

  # Обратные вызовы сервера

  # Инициализируем GenServer с исходным значением
  def init(value) do
    {:ok, value}
  end
  
  # Обрабатываем синхронные вызовы
  def handle_call(:value, _from, state) do
    {:reply, state, state}
  end

  # Обрабатываем асинхронные сообщения
  def handle_cast(:increment, state) do
    {:noreply, state + 1}
  end
end

Async Await in Elixir

With the help of the Async Await module, the developer can

express

competitive computing structure, while the event loop

implements

competitive computing structure.

Let’s display the asynchronous execution of the function, the Elixir process performs this function. Using the process identifier (pid), it is convenient to refer to both the execution of this process and the promise announcing such execution.

We aim to accomplish something like this:

# outer относится к pid внешнего процесса Elixir 
outer = Async.invoke(fn ->
 
  # inner относится к pid внутреннего процесса Elixir 
  inner = Async.invoke(fn ->

    42
  
  end)

  # Воспользуемся pid, и с его помощью будем ожидать внутреннего промиса 
  v = Async.await(inner)

  2 * v

end)

# Воспользуемся pid, и с его помощью будем ожидать внешнего промиса
IO.puts(Async.await(outer))

Library

Let’s start with a simple library component. Let me remind you that we have only two interactions, in the first we call a function and return a promise, and in the second we expect a promise and return a value. During the Invoke operation, the work of the calling party does not stop, but during the await operation, it can be suspended if the promise has not yet been granted.

defmodule Async do
  def invoke(func, args \\ []) do
    # вызвать функцию, вернуть промис
    # ни в коем случае не заблокирует вызывателя
    GenServer.call(EventLoop, {:invoke, func, args})
  end

  def await(p) do
    # ожидать промис, вернуть значение
    # может заблокировать вызывателя
    GenServer.call(EventLoop, {:await, p})
  end
end

In Elixir terms:

  • GenServer.call(EventLoop, {:invoke, func, args}) – This is a blocking call. But as we can see, this method always returns immediately, so it cannot stop the rogue party in any case.
  • GenServer.call(EventLoop, {:await, p}) – This is a blocking call. As we’ll see below, the function doesn’t always return immediately; therefore, the call may suspend the calling party.

Cycle of events

Let’s move on to a more complex component – the cycle of events.

State

Entities of two types are tracked in the event cycle: promises and expectations.

%State{
  promises: %{#PID<0.269.0> => :pending, #PID<0.270.0> => {:completed, 42}},
  awaiters: %{
    #PID<0.269.0> => [
      # При помощи этой структуры данных можно отложить отклик на запрос 
      # см. GenServer.reply
      {#PID<0.152.0>,
       [:alias | #Reference<0.0.19459.4203495588.2524250117.118052>]}
    ],
    #PID<0.270.0> => []
  }
}

Trades

promises associates the promise identifier with the asynchronous execution status of the function represented in the promise. The promise can be in one of two states:

  • :pendingmeans that execution is still in progress, or
  • {:completed, result}Indicates that execution has completed with receipt result.

Waiters

awaiters associates a promise ID with a list of execution IDs waiting for the promise to resolve. Each pid in the list corresponds to a process that performed an await operation on the promise and is now terminated, waiting for the promise to be fulfilled. When the promise is resolved, pending processes are notified about it, and their execution can continue, but already taking into account the result of the promise.

Behavior

When tracking the current state of each operation, which is executed asynchronously through promises, as well as dependencies between execution operations through waitors, the event cycle comes to our aid. It helps orchestrate competitive code execution. For this we need only three methods:

defmodule EventLoop do
  use GenServer

  alias State

  def start_link(_opts \\ []) do
    GenServer.start_link(__MODULE__, State.new(), name: __MODULE__)
  end

  def init(state) do
    {:ok, state}
  end

  def handle_call({:invoke, func, args}, {caller, _} = _from, state) do
    # ...
  end

  def handle_call({:await, promise}, {caller, _} = from, state) do
    # ...
  end

  def handle_call({:return, callee, result}, {caller, _} = _from, state) do
    # ...
  end

end

Challenge

The invoke method spawns a new Elixir process and uses the assigned process ID callee as a promise identifier. A process performs a function using apply(func, args) and then calls the return method associated with the event loop. With this, the result of the function from which the transaction is executed is returned.

def handle_call({:invoke, func, args}, {caller, _} = _from, state) do
    # Здесь мы используем id процесса, одновременно являющийся id промиса 
    callee =
      spawn(fn ->
        GenServer.call(EventLoop, {:return, self(), apply(func, args)})
      end)

    new_state =
      state
      |> State.add_promise(callee)

    {:reply, callee, new_state}
  end

Expectation

This is the essence of the cycle of events. When calling await, two cases should be distinguished:

  • If the promise is resolved, we immediately respond to the party (without suspending its work) and return the result.
  • If the promise is suspended, there is no immediate response to the calling party (the party is suspended), and the calling party is registered as a unit waiting for a promise.
def handle_call({:await, promise}, {caller, _} = from, state) do
  # Центральный if-оператор
  case State.get_promise(state, promise) do
    # Промис приостановлен, отклик откладывается до его завершения 
    :pending ->
      new_state =
        state
        |> State.add_awaiter(promise, from)

      {:noreply, new_state}

    # Промис завершён, сразу же отвечаем
    {:completed, result} ->
      {:reply, result, state}
  end
end

Return

When the process completes, we iterate through the waitlist and respond to the request (resuming work on the caller), then return the result. Additionally, we update the state of the promise from Pending to Completed.

def handle_call({:return, callee, result}, {caller, _} = _from, state) do
    Enum.each(State.get_awaiter(state, callee), fn {cid, _} = caller ->
      GenServer.reply(caller, result)
    end)

    new_state =
      state
      |> State.set_promise(callee, result)

    {:reply, nil, new_state}
  end

Launching the program

Now everything is ready to run the program. In case you are unable to reproduce my results, I leave you the code in the form of an Elixir Livebook, it is located at

GitHub

.

IO.inspect(self())

outer = Async.invoke(fn ->
  
  IO.inspect(self())

  inner = Async.invoke(fn ->

    IO.inspect(self())

    42
  
  end)

  v = Async.await(inner)

  2 * v

end)

IO.puts(Async.await(outer))

As a result of the program, we will get approximately the following conclusion:

#PID<0.152.0>
#PID<0.269.0>
#PID<0.270.0>
84

In addition, look at the entity diagram and the sequence diagram, which illustrate the entire structure and all the behavior options for the execution of functions and promises.

Entity diagram

Sequence diagram

Review

In this article, we’ve explored the basic mechanics of async await, but that’s not the end of the story. For example, there are mechanisms for combining promises, such as Promise.all (wait until all promises from the list are fulfilled) or Promise.one (wait for at least one promise from the list to be fulfilled). Another interesting topic is the binding of promises, when the function does not return a value, but a promise. Study these topics yourself.

Conclusion

Async await is a programming model in which concurrency becomes paramount. With async await, it is convenient for the developer to express the concurrency structure of calculations, while the event loop will perform these calculations.

Related posts