MixとOTP 06: ETS
gumi TECH
Posted on April 23, 2019
本稿はElixir公式サイトの許諾を得て「ETS」の解説にもとづき、加筆補正を加えて、ElixirにおけるETSの使い方についてご説明します。
プロセスを探すには、それらの登録先プロセスにメッセージを送らなければなりません。複数のプロセスから同時にアクセスされると、登録先プロセスがボトルネックになってしまいます。そこで、ETS(Erlang Term Storage)について学習し、これをキャッシュメカニズムとして使ってみましょう。
ETSをいきなりキャッシュに使わないようにしてください。アプリケーションのパフォーマンスを記録および分析し、どの部分がボトルネックなのかを明らかにするのが先です。そのうえで、そもそもキャッシュすべきか、何をキャッシュするかを決めなければなりません。これから述べるのは、キャッシュが必要とされた場合に、ETSをどう使うかという説明です。
ETSでキャッシュする
ETSを使うと、Elixirのどのような項目でもメモリ内のテーブルに納められます。ETSテーテブルはErlangの:ets
モジュールで操作します。
ETSテーブルをつくるets.new/2
には、ふたつの引数を渡します。テーブルの名前とオプションのリストです。つぎのコードはオプションとして、テーブルのタイプとアクセス権限を定めています。タイプ:set
のテーブルはキーが複製できません。アクセス権限:protected
のテーブルに書き込みができるのは、テーブルをつくったプロセスだけです。他のプロセスからテーブルを読むことはできます。もっとも、このふたつはデフォルト値です。したがって、このあとはこれらを省きます。
iex> table = :ets.new(:buckets_registry, [:set, :protected])
#Reference<0.2154069976.707395585.80154>
iex> self()
#PID<0.133.0>
iex> :ets.insert(table, {"foo", self()})
true
iex> :ets.lookup(table, "foo")
[{"foo", #PID<0.133.0>}]
つくるETSテーブルに:named_table
のオプションを与えれば、テーブルが名前で参照できるようになります。
iex> :ets.new(:buckets_registry, [:named_table])
:buckets_registry
iex> :ets.insert(:buckets_registry, {"foo", self()})
true
iex> :ets.lookup(:buckets_registry, "foo")
[{"foo", #PID<0.133.0>}]
KV.Registry
モジュールがETSテーブルを使えるように書き替えましょう。まず、登録プロセスを始めるstart_link/1
に、必須の引数として名前を渡すことにします。これがETSテーブルとプロセス自身の名前になるのです。ふたつの名前はそれぞれ異なる場所に納められますので、重複の問題は生じません。lib/kv/registry.ex
のクライアントAPIの実装はつぎのように改めてください。
## クライアントAPI
@doc """
登録を始める。
引数のオプションには`:name`が必須。
"""
def start_link(opts) do
# [1]GenServerのintに名前を渡す。
server = Keyword.fetch!(opts, :name)
# GenServer.start_link(__MODULE__, :ok, opts)
GenServer.start_link(__MODULE__, server, opts)
end
@doc """
`server`に納められた`name`のプロセスのpidを探す。
プロセスがあれば`{:ok、pid}`を返し、見つからないときは`:error`を返す。
"""
def lookup(server, name) do
# [2]lookupはサーバーでなく、ETSで直接行う。
# GenServer.call(server, {:lookup, name})
case :ets.lookup(server, name) do
[{^name, pid}] -> {:ok, pid}
[] -> :error
end
end
KV.Registry.lookup/2
はリクエストをサーバーに送るのでなく、ETSテーブルから直に読み取るようにしました。ETSテーブルはすべてのプロセスで共有します。これが、これから実装するキャッシュの仕組みの基本です。
キャッシュが正しく働くようにETSテーブルは、サーバーコールバックのinit/1
がデフォルトのアクセス権限:protected
で開始します。つまり、クライアントのプロセスからは読み取り専用です。KV.Registry
プロセスだけが書き込みできます。read_concurrency: true
オプションは、同時読み取りの操作を最適化する定めです。
## サーバーコールバック
# def init(:ok) do
def init(table) do
# [3]マップnamesをETSテーブルで置き替える。
# names = %{}
names = :ets.new(table, [:named_table, read_concurrency: true])
refs = %{}
{:ok, {names, refs}}
end
# [4]lookupのコールバックhandle_callは削除。
# def handle_call({:lookup, name}, _from, {names, _} = state) do
# {:reply, Map.fetch(names, name), state}
# end
def handle_cast({:create, name}, {names, refs}) do
# [5]マップでなくETSテーブルに読み書きする。
# if Map.has_key?(names, name) do
case lookup(names, name) do
{:ok, _pid} ->
{:noreply, {names, refs}}
# else
:error ->
{:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
ref = Process.monitor(pid)
refs = Map.put(refs, ref, name)
# names = Map.put(names, name, pid)
:ets.insert(names, {name, pid})
{:noreply, {names, refs}}
end
end
def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
# [6]マップでなくETSテーブルから削除する。
{name, refs} = Map.pop(refs, ref)
# names = Map.delete(names, name)
:ets.delete(names, name)
{:noreply, {names, refs}}
end
登録のプロセスを起動するには、:name
オプションが求められることになりました。また、lookup/2
などの操作には、PIDでなく名前を引数として渡さなければなりません。その名前でETSテーブルを参照するためです。そこで、test/kv/registry_test.exs
のテストも、setup
関数をつぎのように書き替えてください。
# setup do
setup context do
# registry = start_supervised!(KV.Registry)
_ = start_supervised!({KV.Registry, name: context.test})
# %{registry: registry}
%{registry: context.test}
end
この手直しだけではテストはとおりません。たとえば、"spawns buckets"のテストは、以下のようなエラーメッセージが示されるでしょう。KV.Registry.lookup/2
が期待した値を返していません。これは、つぎのふたつの間に食い違いがあるからです。
- ETSテーブルをキャッシュにして早まった最適化をしました。
-
lookup/2
がcast/2
を呼び出しています(call/2
を使うのが適切です)。
2) test spawns buckets (KV.RegistryTest)
test/kv/registry_test.exs:
match (=) failed
code: assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
right: :error
stacktrace:
test/kv/registry_test.exs: (test)
競合状態
Elixirの開発でも競合状態は起こります。けれど、デフォルトで共有はしないという抽象化がされているので、競合しているおおもとの原因がつきとめやすくなっているのです。
テストでは、ETSテーブルに加えた操作とその変化が表れるまでに時間差が生じます。テストで期待されたのは、つぎのように処理が進むことです。
-
KV.Registry.create(registry, "shopping")
を呼び出します。 - 登録プロセスは子プロセスをつくってキャッシュテーブルが更新されます。
-
KV.Registry.lookup(registry, "shopping")
でテーブルから情報を参照します。 -
{:ok, bucket}
が返されます。
ところが、KV.Registry.create/2
はcast/2
の操作でした。すると、実際にテーブルに書き込まないうちに、コマンドが返ってしまいます。つまり、処理はつぎのように進んでいたのです。
-
KV.Registry.create(registry, "shopping")
を呼び出します。 -
KV.Registry.lookup(registry, "shopping")
でテーブルから情報を参照します。 - まだ情報がないので返るのは
:error
です。 - 登録プロセスは子プロセスをつくってキャッシュテーブルが更新されます。
この失敗を修正する方法は、KV.Registry.create/2
はcast/2
でなくcall/2
を使って同期処理にすることです。これで、クライアントが処理を進めるのは、テーブルが変更され終えてからになります。KV.Registry
モジュールのコールバックは以下のように書き替えてください。
コールバックhandle_cast/2
はhandle_call/3
に改めます。戻り値にはつくったプロセスのPIDを含めなければなりません。一般にElixirの開発には、call/2
の方がcast/2
よりも好まれます。レスポンスを確実に得て、処理が進められるからです。十分な必要性なくcast/2
を用いるのは、早まった最適化でしょう。
def create(server, name) do
# GenServer.cast(server, {:create, name})
GenServer.call(server, {:create, name})
end
# def handle_cast({:create, name}, {names, refs}) do
def handle_call({:create, name}, _from, {names, refs}) do
# [5]マップでなくETSテーブルに読み書きする。
case lookup(names, name) do
# {:ok, _pid} ->
{:ok, pid} ->
# {:noreply, {names, refs}}
{:reply, pid, {names, refs}}
:error ->
{:ok, pid} = DynamicSupervisor.start_child(KV.BucketSupervisor, KV.Bucket)
ref = Process.monitor(pid)
refs = Map.put(refs, ref, name)
:ets.insert(names, {name, pid})
# {:noreply, {names, refs}}
{:reply, pid, {names, refs}}
end
end
改めてテストしてみしましょう。mix test
コマンドに--trace
オプションを添えて試します。このオプションは、テストがデッドロックしたり、競合状態になる場合に有効です。すべてのテストを同期的に行い(async: true
は無効)、テストの詳しい情報を示します。すると、つぎのような障害が生じるかもしれません。
$ mix test test/kv/registry_test.exs --trace
1) test removes buckets on exit (KV.RegistryTest)
test/kv/registry_test.exs:19
Assertion with == failed
code: KV.Registry.lookup(registry, "shopping") == :error
lhs: {:ok, #PID<0.109.0>}
rhs: :error
stacktrace:
test/kv/registry_test.exs:23
エラーメッセージによれば、テーブルからプロセスはなくなっているはずなのに、それが残っているようです。この問題は先に解決した競合状態の逆になります。先ほどの問題は、プロセスをつくるコマンドからテーブルが更新されるまでの遅れでした。今回は、プロセスを停止したのち、テーブルから除かれるまでに遅れが生じています。つまり、これもまた競合状態です。結果として、競合による障害は起こらないかもしれません。けれど、遅れがあることは確かです。
残念ながら、handle_info/2
に対応する同期処理の関数はありません。そのため、ETSテーブルの更新を、同期的な操作に置き替えるという解決はできないのです。プロセスが停止したとき、登録プロセスから:DOWN
の通知が必ず送られるような方法を考えなければなりません。
簡単なのは、登録のプロセスに同期的な要求を送ることです。すると、メッセージは順番に処理されます。Agent.stop/3
の呼び出しのあと送信された要求に登録プロセスが応答すれば、:DOWN
メッセージが処理されたことになるでしょう。KV.Registry.create/2
は同期的な要求です。つぎのふたつのテストで、Agent.stop/3
が呼び出される前にダミーのプロセスをつくりましょう。これで競合状態による障害は起こりません。
test "removes buckets on exit", %{registry: registry} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")
Agent.stop(bucket)
# 登録プロセスに:DOWNメッセージを遅らせるための呼び出し
_ = KV.Registry.create(registry, "bogus")
assert KV.Registry.lookup(registry, "shopping") == :error
end
test "removes bucket on crash", %{registry: registry} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")
# プロセスを正常でない理由で止める
Agent.stop(bucket, :shutdown)
# 登録プロセスに:DOWNメッセージを遅らせるための呼び出し
_ = KV.Registry.create(registry, "bogus")
assert KV.Registry.lookup(registry, "shopping") == :error
end
以上が、キャッシュメカニズムとしてのETSの使い方です。テーブルのデータはどのプロセスからも読み取りでき、書き込みはひとつのプロセスが順に行いました。そして大切なのは、データが非同期に読み込まれる場合、競合状態が起こりうることに気をつけなければならないということです。
実際には、動的プロセスの登録が必要になったら、ElixirのモジュールのひとつRegistry
を用いるべきです。このモジュールには、GenServer
と:ets
を使ってここでつくったアプリケーションと似た機能が備わっています。さらに、読み書きも同時に行えるのです。40コアのマシンでも、すべてのコアにスケールできることがベンチマークされました(「Elixir v1.4 released」の「Registry」にリンクされた「Comparing registries」)。
MixとOTPもくじ
Posted on April 23, 2019
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.