MixとOTP 03: GenServer
gumi TECH
Posted on March 26, 2019
本稿はElixir公式サイトの許諾を得て「GenServer」の解説にもとづき、加筆補正を加えて、ElixirでGenServer
を使ったプロセスの登録や監視の仕方についてご説明します。
Agent
はプロセスです。プロセスはPIDをもつものの、名前がありません。もっとも、名前をアトムで登録することはできます(「Elixir入門 11: プロセス」「状態」参照)。また、Agent.start_link/2
は、第2引数に:name
オプションでアトムの名前がつけられるのです。
iex> Agent.start_link(fn -> %{} end, name: :shopping)
{:ok, #PID<0.139.0>}
iex> KV.Bucket.put(:shopping, "milk", 1)
:ok
iex> KV.Bucket.get(:shopping, "milk")
1
けれど、動的なプロセスにアトムで名前をつけるのはやめるべきです。それは、外部クライアントから受け取ることもよくあるプロセス名を、アトムにすることになります。ユーザー入力はアトムにすべきではありません。アトムはガベージコレクションされないからです。ひとたびつくられたアトムは、再利用されないままになります。ユーザー入力をすべてアトムにしていけば、システムメモリがたちまち使い切られるでしょう。
実際には、メモリが足りなくなるより先に、おそらくErlang VMがもてるアトムの最大数に達してしまいます(デフォルトでは1,048,576です)。そうなれば、システムが落ちてしまうはずです。
組み込みの名前づけ機能は安易に使わず、プロセスが名前に関連づけられる独自の登録機能をつくりましょう。登録はつねに最新の状態であるようにしなければなりません。たとえば、あるプロセスがバグでクラッシュしたとき、登録の機能がこの変化を認識し、古い入力は処理しないようにする必要があります。Elixirでは、プロセスひとつひとつの登録を監視すべきだとされるのです。
GenServer
を使えば、プロセスを登録して監視するプロセスがつくれます。GenServer
には、ElixirとOTPのサーバーを構築するための業務用の強力な機能が備わっているのです。Registry
というモジュールを用いて、ローカルにもつ名前を動的に生成することもできます。
はじめてのGenServer
GenServer
はふたつの要素で組み立てられます。[1]クライアントAPIと[2]サーバーコールバックです。ふたつをひとつのモジュールにまとめてもよいですし、クライアントとサーバーのふたつのモジュールに分けても構いません。クライアントとサーバーはそれぞれ別のプロセスで動きます。クライアントは関数が呼ばれると、メッセージをサーバーとやり取りします。
ここでは、クライアントAPIとサーバーコールバックは、ひとつのモジュールを使って行うことにします。新たなファイルlib/kv/registry.ex
に、つぎのコードを書いてください。
defmodule KV.Registry do
use GenServer
## クライアントAPI
@doc """
登録を始める。
"""
def start_link(opts) do
GenServer.start_link(__MODULE__, :ok, opts)
end
@doc """
`server`に納められた`name`のプロセスのpidを探す。
プロセスがあれば`{:ok、pid}`を返し、見つからないときは`:error`を返す。
"""
def lookup(server, name) do
GenServer.call(server, {:lookup, name})
end
@doc """
`server`に`name`の与えられたプロセスがあることを請け合う。
"""
def create(server, name) do
GenServer.cast(server, {:create, name})
end
## サーバーコールバック
def init(:ok) do
{:ok, %{}}
end
def handle_call({:lookup, name}, _from, names) do
{:reply, Map.fetch(names, name), names}
end
def handle_cast({:create, name}, names) do
if Map.has_key?(names, name) do
{:noreply, names}
else
{:ok, bucket} = KV.Bucket.start_link([])
{:noreply, Map.put(names, name, bucket)}
end
end
end
クライアントAPI
関数start_link/1
GenServer.start_link/3
につぎの3つの引数を渡して呼び出し、現行プロセスにリンクした新たなGenServer
プロセスを始めます。
- 第1引数: サーバーコールバックが実装されているモジュール。
-
__MODULE__/0
: 現行モジュール。
-
- 第2引数: 初期化のための値。
- 第3引数: サーバー名などを定めるためのオプションのリストです。ここでは、
start_link/1
で受け取ったリストが渡されます。デフォルト値は空のリスト[]
です。
関数lookup/2
とcreate/2
GenServer
に送れるリクエストは、call
とcast
のふたつです。
-
call
: 同期的で、サーバーはそれらのリクエストにはレスポンスを返さなければなりません。 -
cast
: 非同期で、サーバーはレスポンスを返しません。
ふたつの関数lookup/2
およびcreate/2
は、それぞれGenServer.call/3
とGenServer.cast/2
でサーバーにリクエストを送ります。渡されるのは第2引数のタプルで、lookup/2
は{:lookup, name}
、create/2
が{:create, name}
です。リクエストには、複数の値を送れるタプルがよく用いられます。その場合、初めの要素でリクエストするアクションを定め、残りの要素をアクションのための引数にあてることが多いです。なお、リクエストのタプルは、それぞれサーバーコールバックhandle_call/3
とhandle_cast/2
の第1引数とマッチングしなければなりません。
サーバーコールバック
サーバー側のさまざまなコールバックを実装して、サーバーの初期化や終了を確かめたり、あるいはリクエストを扱ったりします。コールバックは省くこともできます。前掲モジュールには、とりあえず試したい処理だけ加えました。
コールバックinit/1
GenServer.start_link/3
の第2引数が渡されて、{:ok, state}
を返します。state
は新しい空のマップ%{}
です。GenServer
のAPIがクライアント/サーバーをどのように分けているかよくわかるでしょう。GenServer.start_link/3
はクライアントで呼ばれます。それに対して、サーバーで実行されるコールバックがinit/1
です。
コールバックhandle_call/3
call/2
のリクエストに対して実装されたコールバックです。
- 第1引数: 送られたリクエスト。
- 第2引数: リクエストを受け取ったプロセス。
- 第3引数: 現行サーバーの状態。
戻り値は{:reply, reply, new_state}
というかたちのタプルです。タプルの第1要素:reply
は、クライアントに返信しなければならないことを示します。第2要素reply
はクライアントに送られる中身で、第3要素new_state
がサーバーの新たな状態です。
コールバックhandle_cast/2
cast/2
のリクエストに対して実装されたコールバックです。
- 第1引数: 送られたリクエスト。
- 第2引数: 現行サーバーの状態。
戻り値は{:noreply, new_state}
というかたちのタプルです。実際のアプリケーションでは、おそらく非同期のcast
でなく、同期のcall
で:create
にコールバックを実装するでしょう。cast
のコールバックがどう実装されるかご説明するために、cast
を用いました。
コールバックhandle_call/3
とhandle_cast/2
が返すタプルの形式はほかにもありえます。また、コールバックには、ほかにGenServer.terminate/2やGenServer.code_change/3などもあります。詳しくは「GenServer behaviour」をご覧ください。
GenServerをテストする
では、GenServer
が意図したとおりに動くかどうか、テストを書いて確かめましょう。GenServer
のテストはAgent
の場合とほぼ変わりません。setup/1
のコールバックでサーバーを起動して、テストはそのサーバーで行います。テストのファイルtest/kv/registry_test.exs
に、つぎのようなコードを書いてください。
defmodule KV.RegistryTest do
use ExUnit.Case, async: true
setup do
registry = start_supervised!(KV.Registry)
%{registry: registry}
end
test "spawns buckets", %{registry: registry} do
assert KV.Registry.lookup(registry, "shopping") == :error
KV.Registry.create(registry, "shopping")
assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
KV.Bucket.put(bucket, "milk", 1)
assert KV.Bucket.get(bucket, "milk") == 1
end
end
$ mix test test/kv/registry_test.exs
.
Finished in 0.03 seconds
1 test, 0 failures
Randomized with seed 710912
Agent
を使ったモジュールのテスト(「MixとOTP 02: Agent」「ExUnitのコールバックによるテストの設定」参照)とは、setup
ブロックにひとつ重要な違いがあります。start_link/1
でプロセスの登録はせず、関数start_supervised!/2
を呼んで始めていることです。引数にモジュールを渡します。
start_supervised!/2
関数が、start_link/1
の呼び出しによりモジュールのプロセスを始めてくれます。この関数を使うとよいのは、ExUnit
がつぎのテストを始める前に、登録プロセスは必ずシャットダウンされることです。そうすると、テストした状態がつぎのテストと共有するリソースに依存していても、つぎの状態に影響を与えてしまうことが避けられます。
テストでプロセスを始めるときは、start_supervised!
関数を使うことが望ましいといえます。前に試したAgent
のテストも、start_supervised!/2
を使うようにsetup
ブロックは書き替えた方がよいでしょう。
アプリケーションロジックの中でGenServer
を止めたい場合には、関数GenServer.stop/3
が使えます。:
## クライアントAPI
@doc """
プロセスを止める。
"""
def stop(server) do
GenServer.stop(server)
end
モニタリングする
つくったモジュールには問題がひとつあります。サーバーにつくったプロセスが止まったりクラッシュしたら使えなくなります。けれど、それがわかりません。確かめるために、test/kv/registry_test.exs
にテストを加えましょう。
test "removes buckets on exit", %{registry: registry} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")
Agent.stop(bucket)
assert KV.Registry.lookup(registry, "shopping") == :error
end
Agent.stop/3
でプロセスを止めても名前は残ったままなので、最後のassert/1
が失敗します。
$ mix test test/kv/registry_test.exs
.
1) test removes buckets on exit (KV.RegistryTest)
test/kv/registry_test.exs:19
Assertion with == failed
code: assert KV.Registry.lookup(registry, "shopping") == :error
left: {:ok, #PID<0.145.0>}
right: :error
stacktrace:
test/kv/registry_test.exs:23: (test)
Finished in 0.04 seconds
2 tests, 1 failure
Randomized with seed 348772
この問題を解決するために必要なのは、つくられたプロセスひとつひとつを監視する登録です。モニターを定めれば、プロセスが終わるごとにその通知を受け取れます。そうすれば、その登録を消してしまうこともできるのです。コンソールをiex -S mix
で起ち上げて試しましょう。
Process.monitor/1
が返すのは、送られるメッセージと監視している参照をマッチさせる一意の参照です。Agent
を止めたあと、すべてのメッセージはflush/0
でクリアできます。返されたのは:DOWN
メッセージで、監視している参照と止めたプロセスのPID、そして終了の原因:normal
が示されます。
iex> {:ok, pid} = KV.Bucket.start_link([])
{:ok, #PID<0.131.0>}
iex> Process.monitor(pid)
#Reference<0.5749347.3730309123.136703>
iex> Agent.stop(pid)
:ok
iex> flush()
{:DOWN, #Reference<0.5749347.3730309123.136703>, :process, #PID<0.131.0>,
:normal}
:ok
前掲のテストがとおるように、lib/kv/registry.ex
のサーバーコールバックを実装し直しましょう。まず、GenServer
の状態には辞書をふたつもたせます。ひとつはname -> pid
で、もうひとつはref -> name
です。つぎに、handle_cast/2
でProcess.monitor/1
の呼び出しにより監視を始めます。handle_info/2
もメッセージの新しい状態が扱えるように書き替えなければなりません。そして、コールバックhandle_info/2
が新たに加わります。:DOWN
メッセージを受け取ったら、プロセスを削除するためです。そのほかのメッセージは破棄されます。
## サーバーコールバック
def init(:ok) do
names = %{}
refs = %{}
# {:ok, %{}}
{:ok, {names, refs}}
end
# def handle_call({:lookup, name}, _from, names) do
def handle_call({:lookup, name}, _from, {names, _} = state) do
# {:reply, Map.fetch(names, name), names}
{:reply, Map.fetch(names, name), state}
end
# def handle_cast({:create, name}, names) do
def handle_cast({:create, name}, {names, refs}) do
if Map.has_key?(names, name) do
# {:noreply, names}
{:noreply, {names, refs}}
else
# {:ok, bucket} = KV.Bucket.start_link([])
{:ok, pid} = KV.Bucket.start_link([])
ref = Process.monitor(pid)
refs = Map.put(refs, ref, name)
names = Map.put(names, name, pid)
# {:noreply, Map.put(names, name, bucket)}
{:noreply, {names, refs}}
end
end
# 追加
def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
{name, refs} = Map.pop(refs, ref)
names = Map.delete(names, name)
{:noreply, {names, refs}}
end
# 追加
def handle_info(_msg, state) do
{:noreply, state}
end
$ mix test test/kv/registry_test.exs
Compiling 3 files (.ex)
Generated kv app
..
Finished in 0.03 seconds
2 tests, 0 failures
Randomized with seed 405864
クライアントAPIにはまったく触れず、サーバーの実装だけ大きく手を入れることで済みました。サーバーとクライアントをはっきりと分ける利点のひとつです。クライアントAPIも含めたKV.Registry
のコード(lib/kv/registry.ex
)をつぎにまとめて掲げます(ドキュメントとコメントは基本的に省きました)。
defmodule KV.Registry do
use GenServer
## クライアントAPI
def start_link(opts) do
GenServer.start_link(__MODULE__, :ok, opts)
end
def lookup(server, name) do
GenServer.call(server, {:lookup, name})
end
def create(server, name) do
GenServer.cast(server, {:create, name})
end
## サーバーコールバック
def init(:ok) do
names = %{}
refs = %{}
{:ok, {names, refs}}
end
def handle_call({:lookup, name}, _from, {names, _} = state) do
{:reply, Map.fetch(names, name), state}
end
def handle_cast({:create, name}, {names, refs}) do
if Map.has_key?(names, name) do
{:noreply, {names, refs}}
else
{:ok, pid} = KV.Bucket.start_link([])
ref = Process.monitor(pid)
refs = Map.put(refs, ref, name)
names = Map.put(names, name, pid)
{:noreply, {names, refs}}
end
end
def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
{name, refs} = Map.pop(refs, ref)
names = Map.delete(names, name)
{:noreply, {names, refs}}
end
def handle_info(_msg, state) do
{:noreply, state}
end
end
call
とcast
およびinfo
これまでコールバックとして、handle_call/3
とhandle_cast/2
およびhandle_info/2
の3つを用いました。それぞれをいつ使うか決めるために、考えるべきことはつぎのとおりです。
-
handle_call/3
: 同期のリクエストに使わなければなりません。サーバーの応答を待つときにデフォルトの選択です。バックプレッシャーの仕組みに役立ちます。 -
handle_cast/2
: 応答を待たなくてよい非同期のリクエストに使わなければなりません。cast
はサーバーがメッセージを受け取ったか、確かめさえしません。ですから、あまり使わない方がよいでしょう。たとえば、前掲モジュールのcreate/2
関数の定めで、cast/2
を用いたのはよくない例です。call/2
を使うのが適切でした。 -
handle_info/2
: サーバーが受け取るメッセージのうち、GenServer.call/2
やGenServer.cast/2
で送られたのでないものに使います。send/3
により送信される通常のメッセージも含まれます。前掲モジュールが監視した:DOWN
メッセージもそのひとつです。
call/2
により送られたものを含めて、すべてのメッセージはhandle_info/2
に渡されます。中には予期しないメッセージがサーバーに届くかもしれません。そのため、前掲モジュールのように、すべてのケースを受け取るhandle_info/2
を設けなければならないのです。いずれにもマッチしないメッセージが来たとき、プロセスがクラッシュするのを防げます。handle_call/3
とhandle_cast/2
については、そのような心配はありません。メッセージがGenServer
のAPIで送られるからです。わからないメッセージは、おそらく開発者の誤りでしょう。
call
とcast
およびinfo
の違いや、戻り値などがわかるように、Benjamin Tan Wei Hao氏が「THE GenServer CHEATSHEET」(PDF)を公開しています。
モニターとリンク
モニターとリンクはどのように使い分ければよいでしょうか。
まず、リンクは双方向です。ふたつのプロセスをリンクし、どちらかがクラッシュすれば、もうひとつもクラッシュします(終了をトラップしないかぎり)。それに対して、モニターは一方向です。モニターをしているプロセスは、監視対象から通知を受け取るにすぎません。つまり、クラッシュを連携したいならリンク、クラッシュや終了などを知りたいだけであればモニターを使うということです。
前掲モジュールのhandle_cast/2
では、プロセスのリンクとモニターをともに実装しています。
{:ok, pid} = KV.Bucket.start_link([])
ref = Process.monitor(pid)
プロセスをモニターしているにもかかわらず、クラッシュが連携してしまうので、これは適切ではありません。通常は、プロセスを直接つくることは避け、スーパーバイザーに委ねます。
MixとOTPもくじ
Posted on March 26, 2019
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.