MixとOTP 03: GenServer

gumitech

gumi TECH

Posted on March 26, 2019

MixとOTP 03: GenServer

本稿はElixir公式サイトの許諾を得て「GenServer」の解説にもとづき、加筆補正を加えて、ElixirでGenServerを使ったプロセスの登録や監視の仕方についてご説明します。

Agentはプロセスです。プロセスはPIDをもつものの、名前がありません。もっとも、名前をアトムで登録することはできます(「Elixir入門 11: プロセス」「状態」参照)。また、Agent.start_link/2は、第2引数に:nameオプションでアトムの名前がつけられるのです。

iex> Agent.start_link(fn -> %{} end, name: :shopping)
{:ok, #PID<0.139.0>}
iex> KV.Bucket.put(:shopping, "milk", 1)
:ok
iex> KV.Bucket.get(:shopping, "milk")
1
Enter fullscreen mode Exit fullscreen mode

けれど、動的なプロセスにアトムで名前をつけるのはやめるべきです。それは、外部クライアントから受け取ることもよくあるプロセス名を、アトムにすることになります。ユーザー入力はアトムにすべきではありません。アトムはガベージコレクションされないからです。ひとたびつくられたアトムは、再利用されないままになります。ユーザー入力をすべてアトムにしていけば、システムメモリがたちまち使い切られるでしょう。

実際には、メモリが足りなくなるより先に、おそらくErlang VMがもてるアトムの最大数に達してしまいます(デフォルトでは1,048,576です)。そうなれば、システムが落ちてしまうはずです。

組み込みの名前づけ機能は安易に使わず、プロセスが名前に関連づけられる独自の登録機能をつくりましょう。登録はつねに最新の状態であるようにしなければなりません。たとえば、あるプロセスがバグでクラッシュしたとき、登録の機能がこの変化を認識し、古い入力は処理しないようにする必要があります。Elixirでは、プロセスひとつひとつの登録を監視すべきだとされるのです。

GenServerを使えば、プロセスを登録して監視するプロセスがつくれます。GenServerには、ElixirとOTPのサーバーを構築するための業務用の強力な機能が備わっているのです。Registryというモジュールを用いて、ローカルにもつ名前を動的に生成することもできます。

はじめてのGenServer

GenServerはふたつの要素で組み立てられます。[1]クライアントAPIと[2]サーバーコールバックです。ふたつをひとつのモジュールにまとめてもよいですし、クライアントとサーバーのふたつのモジュールに分けても構いません。クライアントとサーバーはそれぞれ別のプロセスで動きます。クライアントは関数が呼ばれると、メッセージをサーバーとやり取りします。

ここでは、クライアントAPIとサーバーコールバックは、ひとつのモジュールを使って行うことにします。新たなファイルlib/kv/registry.exに、つぎのコードを書いてください。

defmodule KV.Registry do
  use GenServer

  ## クライアントAPI
  @doc """
  登録を始める。
  """
  def start_link(opts) do
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  @doc """
  `server`に納められた`name`のプロセスのpidを探す。

  プロセスがあれば`{:ok、pid}`を返し、見つからないときは`:error`を返す。
  """
  def lookup(server, name) do
    GenServer.call(server, {:lookup, name})
  end

  @doc """
  `server`に`name`の与えられたプロセスがあることを請け合う。
  """
  def create(server, name) do
    GenServer.cast(server, {:create, name})
  end

  ## サーバーコールバック
  def init(:ok) do
    {:ok, %{}}
  end

  def handle_call({:lookup, name}, _from, names) do
    {:reply, Map.fetch(names, name), names}
  end

  def handle_cast({:create, name}, names) do
    if Map.has_key?(names, name) do
      {:noreply, names}
    else
      {:ok, bucket} = KV.Bucket.start_link([])
      {:noreply, Map.put(names, name, bucket)}
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

クライアントAPI

関数start_link/1
GenServer.start_link/3につぎの3つの引数を渡して呼び出し、現行プロセスにリンクした新たなGenServerプロセスを始めます。

  • 第1引数: サーバーコールバックが実装されているモジュール。
  • 第2引数: 初期化のための値。
  • 第3引数: サーバー名などを定めるためのオプションのリストです。ここでは、start_link/1で受け取ったリストが渡されます。デフォルト値は空のリスト[]です。

関数lookup/2create/2
GenServerに送れるリクエストは、callcastのふたつです。

  • call: 同期的で、サーバーはそれらのリクエストにはレスポンスを返さなければなりません。
  • cast: 非同期で、サーバーはレスポンスを返しません。

ふたつの関数lookup/2およびcreate/2は、それぞれGenServer.call/3GenServer.cast/2でサーバーにリクエストを送ります。渡されるのは第2引数のタプルで、lookup/2{:lookup, name}create/2{:create, name}です。リクエストには、複数の値を送れるタプルがよく用いられます。その場合、初めの要素でリクエストするアクションを定め、残りの要素をアクションのための引数にあてることが多いです。なお、リクエストのタプルは、それぞれサーバーコールバックhandle_call/3handle_cast/2の第1引数とマッチングしなければなりません。

サーバーコールバック

サーバー側のさまざまなコールバックを実装して、サーバーの初期化や終了を確かめたり、あるいはリクエストを扱ったりします。コールバックは省くこともできます。前掲モジュールには、とりあえず試したい処理だけ加えました。

コールバックinit/1
GenServer.start_link/3の第2引数が渡されて、{:ok, state}を返します。stateは新しい空のマップ%{}です。GenServerのAPIがクライアント/サーバーをどのように分けているかよくわかるでしょう。GenServer.start_link/3はクライアントで呼ばれます。それに対して、サーバーで実行されるコールバックがinit/1です。

コールバックhandle_call/3
call/2のリクエストに対して実装されたコールバックです。

  • 第1引数: 送られたリクエスト。
  • 第2引数: リクエストを受け取ったプロセス。
  • 第3引数: 現行サーバーの状態。

戻り値は{:reply, reply, new_state}というかたちのタプルです。タプルの第1要素:replyは、クライアントに返信しなければならないことを示します。第2要素replyはクライアントに送られる中身で、第3要素new_stateがサーバーの新たな状態です。

コールバックhandle_cast/2
cast/2のリクエストに対して実装されたコールバックです。

  • 第1引数: 送られたリクエスト。
  • 第2引数: 現行サーバーの状態。

戻り値は{:noreply, new_state}というかたちのタプルです。実際のアプリケーションでは、おそらく非同期のcastでなく、同期のcall:createにコールバックを実装するでしょう。castのコールバックがどう実装されるかご説明するために、castを用いました。

コールバックhandle_call/3handle_cast/2が返すタプルの形式はほかにもありえます。また、コールバックには、ほかにGenServer.terminate/2GenServer.code_change/3などもあります。詳しくは「GenServer behaviour」をご覧ください。

GenServerをテストする

では、GenServerが意図したとおりに動くかどうか、テストを書いて確かめましょう。GenServerのテストはAgentの場合とほぼ変わりません。setup/1のコールバックでサーバーを起動して、テストはそのサーバーで行います。テストのファイルtest/kv/registry_test.exsに、つぎのようなコードを書いてください。

defmodule KV.RegistryTest do
  use ExUnit.Case, async: true

  setup do
    registry = start_supervised!(KV.Registry)
    %{registry: registry}
  end

  test "spawns buckets", %{registry: registry} do
    assert KV.Registry.lookup(registry, "shopping") == :error

    KV.Registry.create(registry, "shopping")
    assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

    KV.Bucket.put(bucket, "milk", 1)
    assert KV.Bucket.get(bucket, "milk") == 1
  end
end
Enter fullscreen mode Exit fullscreen mode
$ mix test test/kv/registry_test.exs
.

Finished in 0.03 seconds
1 test, 0 failures

Randomized with seed 710912
Enter fullscreen mode Exit fullscreen mode

Agentを使ったモジュールのテスト(「MixとOTP 02: Agent」「ExUnitのコールバックによるテストの設定」参照)とは、setupブロックにひとつ重要な違いがあります。start_link/1でプロセスの登録はせず、関数start_supervised!/2を呼んで始めていることです。引数にモジュールを渡します。

start_supervised!/2関数が、start_link/1の呼び出しによりモジュールのプロセスを始めてくれます。この関数を使うとよいのは、ExUnitがつぎのテストを始める前に、登録プロセスは必ずシャットダウンされることです。そうすると、テストした状態がつぎのテストと共有するリソースに依存していても、つぎの状態に影響を与えてしまうことが避けられます。

テストでプロセスを始めるときは、start_supervised!関数を使うことが望ましいといえます。前に試したAgentのテストも、start_supervised!/2を使うようにsetupブロックは書き替えた方がよいでしょう。

アプリケーションロジックの中でGenServerを止めたい場合には、関数GenServer.stop/3が使えます。:

## クライアントAPI

@doc """
プロセスを止める。
"""
def stop(server) do
  GenServer.stop(server)
end
Enter fullscreen mode Exit fullscreen mode

モニタリングする

つくったモジュールには問題がひとつあります。サーバーにつくったプロセスが止まったりクラッシュしたら使えなくなります。けれど、それがわかりません。確かめるために、test/kv/registry_test.exsにテストを加えましょう。

test "removes buckets on exit", %{registry: registry} do
  KV.Registry.create(registry, "shopping")
  {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
  Agent.stop(bucket)
  assert KV.Registry.lookup(registry, "shopping") == :error
end
Enter fullscreen mode Exit fullscreen mode

Agent.stop/3でプロセスを止めても名前は残ったままなので、最後のassert/1が失敗します。

$ mix test test/kv/registry_test.exs
.

  1) test removes buckets on exit (KV.RegistryTest)
     test/kv/registry_test.exs:19
     Assertion with == failed
     code:  assert KV.Registry.lookup(registry, "shopping") == :error
     left:  {:ok, #PID<0.145.0>}
     right: :error
     stacktrace:
       test/kv/registry_test.exs:23: (test)



Finished in 0.04 seconds
2 tests, 1 failure

Randomized with seed 348772
Enter fullscreen mode Exit fullscreen mode

この問題を解決するために必要なのは、つくられたプロセスひとつひとつを監視する登録です。モニターを定めれば、プロセスが終わるごとにその通知を受け取れます。そうすれば、その登録を消してしまうこともできるのです。コンソールをiex -S mixで起ち上げて試しましょう。

Process.monitor/1が返すのは、送られるメッセージと監視している参照をマッチさせる一意の参照です。Agentを止めたあと、すべてのメッセージはflush/0でクリアできます。返されたのは:DOWNメッセージで、監視している参照と止めたプロセスのPID、そして終了の原因:normalが示されます。

iex> {:ok, pid} = KV.Bucket.start_link([])
{:ok, #PID<0.131.0>}
iex> Process.monitor(pid)
#Reference<0.5749347.3730309123.136703>
iex> Agent.stop(pid)
:ok
iex> flush()
{:DOWN, #Reference<0.5749347.3730309123.136703>, :process, #PID<0.131.0>,
 :normal}
:ok
Enter fullscreen mode Exit fullscreen mode

前掲のテストがとおるように、lib/kv/registry.exのサーバーコールバックを実装し直しましょう。まず、GenServerの状態には辞書をふたつもたせます。ひとつはname -> pidで、もうひとつはref -> nameです。つぎに、handle_cast/2Process.monitor/1の呼び出しにより監視を始めます。handle_info/2もメッセージの新しい状態が扱えるように書き替えなければなりません。そして、コールバックhandle_info/2が新たに加わります。:DOWNメッセージを受け取ったら、プロセスを削除するためです。そのほかのメッセージは破棄されます。

## サーバーコールバック
def init(:ok) do
  names = %{}
  refs = %{}
  # {:ok, %{}}
  {:ok, {names, refs}}
end

# def handle_call({:lookup, name}, _from, names) do
def handle_call({:lookup, name}, _from, {names, _} = state) do
  # {:reply, Map.fetch(names, name), names}
  {:reply, Map.fetch(names, name), state}
end  

# def handle_cast({:create, name}, names) do
def handle_cast({:create, name}, {names, refs}) do
  if Map.has_key?(names, name) do
    # {:noreply, names}
    {:noreply, {names, refs}}
  else
    # {:ok, bucket} = KV.Bucket.start_link([])
    {:ok, pid} = KV.Bucket.start_link([])
    ref = Process.monitor(pid)
    refs = Map.put(refs, ref, name)
    names = Map.put(names, name, pid)
    # {:noreply, Map.put(names, name, bucket)}
    {:noreply, {names, refs}}
  end
end

# 追加
def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
  {name, refs} = Map.pop(refs, ref)
  names = Map.delete(names, name)
  {:noreply, {names, refs}}
end

# 追加
def handle_info(_msg, state) do
  {:noreply, state}
end
Enter fullscreen mode Exit fullscreen mode
$ mix test test/kv/registry_test.exs
Compiling 3 files (.ex)
Generated kv app
..

Finished in 0.03 seconds
2 tests, 0 failures

Randomized with seed 405864
Enter fullscreen mode Exit fullscreen mode

クライアントAPIにはまったく触れず、サーバーの実装だけ大きく手を入れることで済みました。サーバーとクライアントをはっきりと分ける利点のひとつです。クライアントAPIも含めたKV.Registryのコード(lib/kv/registry.ex)をつぎにまとめて掲げます(ドキュメントとコメントは基本的に省きました)。

defmodule KV.Registry do
  use GenServer

  ## クライアントAPI
  def start_link(opts) do
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  def lookup(server, name) do
    GenServer.call(server, {:lookup, name})
  end

  def create(server, name) do
    GenServer.cast(server, {:create, name})
  end

  ## サーバーコールバック
  def init(:ok) do
    names = %{}
    refs = %{}
    {:ok, {names, refs}}
  end

  def handle_call({:lookup, name}, _from, {names, _} = state) do
    {:reply, Map.fetch(names, name), state}
  end  

  def handle_cast({:create, name}, {names, refs}) do
    if Map.has_key?(names, name) do
      {:noreply, {names, refs}}
    else
      {:ok, pid} = KV.Bucket.start_link([])
      ref = Process.monitor(pid)
      refs = Map.put(refs, ref, name)
      names = Map.put(names, name, pid)
      {:noreply, {names, refs}}
    end
  end

  def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
    {name, refs} = Map.pop(refs, ref)
    names = Map.delete(names, name)
    {:noreply, {names, refs}}
  end

  def handle_info(_msg, state) do
    {:noreply, state}
  end
end
Enter fullscreen mode Exit fullscreen mode

callcastおよびinfo

これまでコールバックとして、handle_call/3handle_cast/2およびhandle_info/2の3つを用いました。それぞれをいつ使うか決めるために、考えるべきことはつぎのとおりです。

  • handle_call/3: 同期のリクエストに使わなければなりません。サーバーの応答を待つときにデフォルトの選択です。バックプレッシャーの仕組みに役立ちます。
  • handle_cast/2: 応答を待たなくてよい非同期のリクエストに使わなければなりません。castはサーバーがメッセージを受け取ったか、確かめさえしません。ですから、あまり使わない方がよいでしょう。たとえば、前掲モジュールのcreate/2関数の定めで、cast/2を用いたのはよくない例です。call/2を使うのが適切でした。
  • handle_info/2: サーバーが受け取るメッセージのうち、GenServer.call/2GenServer.cast/2で送られたのでないものに使います。send/3により送信される通常のメッセージも含まれます。前掲モジュールが監視した:DOWNメッセージもそのひとつです。

call/2により送られたものを含めて、すべてのメッセージはhandle_info/2に渡されます。中には予期しないメッセージがサーバーに届くかもしれません。そのため、前掲モジュールのように、すべてのケースを受け取るhandle_info/2を設けなければならないのです。いずれにもマッチしないメッセージが来たとき、プロセスがクラッシュするのを防げます。handle_call/3handle_cast/2については、そのような心配はありません。メッセージがGenServerのAPIで送られるからです。わからないメッセージは、おそらく開発者の誤りでしょう。

callcastおよびinfoの違いや、戻り値などがわかるように、Benjamin Tan Wei Hao氏が「THE GenServer CHEATSHEET」(PDF)を公開しています。

モニターとリンク

モニターとリンクはどのように使い分ければよいでしょうか。

まず、リンクは双方向です。ふたつのプロセスをリンクし、どちらかがクラッシュすれば、もうひとつもクラッシュします(終了をトラップしないかぎり)。それに対して、モニターは一方向です。モニターをしているプロセスは、監視対象から通知を受け取るにすぎません。つまり、クラッシュを連携したいならリンク、クラッシュや終了などを知りたいだけであればモニターを使うということです。

前掲モジュールのhandle_cast/2では、プロセスのリンクとモニターをともに実装しています。

{:ok, pid} = KV.Bucket.start_link([])
ref = Process.monitor(pid)
Enter fullscreen mode Exit fullscreen mode

プロセスをモニターしているにもかかわらず、クラッシュが連携してしまうので、これは適切ではありません。通常は、プロセスを直接つくることは避け、スーパーバイザーに委ねます。

MixとOTPもくじ

💖 💪 🙅 🚩
gumitech
gumi TECH

Posted on March 26, 2019

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related