MixとOTP 10: 分散処理のタスクと設定
gumi TECH
Posted on June 11, 2019
本稿はElixir公式サイトの許諾を得て「Distributed tasks and configuration」の解説にもとづき、加筆補正を加えて、タスクの分散処理とアプリケーションの設定についてご説明します。
今回は、kv
アプリケーションにルーティングのレイヤーを加えます。リクエストをプロセス名にもとづいてノード間でやり取りできるようにするのです。ルーティングレイヤーは、つぎのような形式でルーティングテーブルを受け取ります。
[
{?a..?m, :"foo@computer-name"},
{?n..?z, :"bar@computer-name"}
]
ルータはプロセス名の最初のバイトをテーブルで確かめ、それにもとづいて適切なノードに配信します。たとえば、文字"a"で始まるプロセスは、ノードfoo@computer-name
に送られます。なお、?a
は文字"a"のUnicodeコードポイントです(「Elixir入門 06: バイナリと文字列および文字リスト」「UTF-8とUnicode」参照)。
マッチングするエントリーがリクエストを評価しているノードに当たるときは、ルーティングはせず、そのノードが要求された操作を実行します。マッチングするエントリーが別のノードを示すときは、要求をそのノードに渡します。ノードは自身のルーティングテーブル(はじめのノードとは異なるでしょう)を見て、それに応じて動作します。マッチングするエントリーがなければエラーです。
ルーティングテーブルで見つかったノードに、要求された操作を直に実行はさせません。ルーティングの要求をノードに渡して処理するのです。上記のような単純なルーティングテーブルであれば、すべてのノードで共有してもよいでしょう。けれど、ルーティングの要求を渡すかたちにすれば、アプリケーションが拡大したとき、ルーティングテーブルが簡単に小さく分けられます。ある程度大きくなったら、foo@computer-name
はプロセスに要求をルーティングさせる役割だけを担い、要求を扱うプロセスが他のノードに送られることになるでしょう。そうすれば、bar@computer-name
は変更については知らずに済みます。
本稿では、ひとつのマシンでふたつのノードを使います。同じネットワークで、複数のマシンを使うことも可能です。その場合には準備が要ります。第1に、すべてのマシンにファイル
~/.erlang.cookie
があり、その値はまったく同じでなければなりません。第2に、epmd
がブロックされていないポートで動いていることを確かめます(epmd -d
でデバッグ情報が得られます)。さらに詳しくは、「Learn You Some Erlang for Great Good!」の「Distribunomicon」の章をお読みください。
分散処理のコードを書く
Elixirには、ノードに接続して互いに情報をやり取りする機能が備わっています。実際、分散環境でプロセスやメッセージの送受信には同じ考え方が用いられます。Elixirのプロセスは、場所を問わないからです。つまり、メッセージを送るとき、受け手のプロセスが同じノードかどうかは問いません。VMはどちらであってもメッセージを届けます。
分散処理のコードを実行するには、VMに名前をつけて起動しなければなりません。名前は短く(同じネットワークの場合)も長く(完全なコンピュータアドレス)もできます。IExをつぎのように開始してください。
$ iex --sname foo
プロンプトの表示が少し変わります。ノード名のあとに@
つきで示されるのはコンピュータ名です。
Erlang/OTP 21 [erts-10.0.5] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe] [dtrace]
Interactive Elixir (1.7.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(foo@computer-name)1>
シェルでモジュールをつぎのように定めます。
iex> defmodule Hello do
...> def world, do: IO.puts "hello world"
...> end
{:module, Hello,
<<70, 79, 82, 49, 0, 0, 4, 60, 66, 69, 65, 77, 65, 116, 85, 56, 0, 0, 0, 140,
0, 0, 0, 15, 12, 69, 108, 105, 120, 105, 114, 46, 72, 101, 108, 108, 111, 8,
95, 95, 105, 110, 102, 111, 95, 95, 7, ...>>, {:world, 0}}
同じネットワークの別のコンピュータにErlangとElixirがインストールされていれば、別のシェルを起動できます。そうでない場合は、IExのセッションを別のターミナルから開けばよいでしょう。どちらでも、つぎのように短い別の名前をつけます。
$ iex --sname bar
新たな別セッションからは、先ほど定めたモジュールの関数は呼び出せません。
iex(bar@computer-name)> Hello.world
** (UndefinedFunctionError) function Hello.world/0 is undefined (module Hello is not available)
Hello.world()
けれど、Node.spawn_link/2
を用いて、bar@computer-name
からfoo@computer-name
に新たなプロセスがつくれます(ノード名の中のcomputer-nameはプロンプトに示された名前に置き替えてください)。
iex(bar@computer-name)> Node.spawn_link :"foo@computer-name", fn -> Hello.world end
hello world
#PID<10577.117.0>
Elixirは別ノードにプロセスをつくり、そのPIDが返されました。そして、コードは関数が定められている別ノードで実行され、関数は呼び出されたのです。関数の出力が示されたのは、現在のノードで、別ノードではないことにご注目ください。つまり、メッセージは別ノードから送り返され、それが現ノードに出力されたということです。これは別ノードにつくられたプロセスが、現ノードと同じグループリーダーをもつからです(「Elixir入門 12: 入出力とファイルシステム」「プロセスとグループリーダー」参照)。
Node.spawn_link/2
が返すPIDを使えば、メッセージを送って、さらに受け取れます。
iex(bar@computer-name)> pid = Node.spawn_link :"foo@computer-name", fn ->
...(bar@computer-name)> receive do
...(bar@computer-name)> {:ping, client} -> send client, :pong
...(bar@computer-name)> end
...(bar@computer-name)> end
#PID<10577.119.0>
iex(bar@computer-name)> send pid, {:ping, self()}
{:ping, #PID<0.106.0>}
iex(bar@computer-name)> flush()
:pong
:ok
分散処理を行うには、処理のたびにNode.spawn_link/2
でリモートノードにプロセスを生成すればよいということがわかりました。けれども、監視ツリーの外にプロセスをつくるのは、できるだけ避けるべきです。今回の実装でNode.spawn_link/2
を用いるより望ましいやり方は3つ考えられます。
- Erlangの
:rpc
モジュールを使うと、リモートノードの関数が実行できます。たとえば、シェルから:rpc.call(:"foo@computer-name", Hello, :world, [])
とすれば、別ノードの関数Hello.world/0
が呼び出せます。 -
GenServer
のAPIにより他のノードでサーバーを起ち上げれば、要求が送れます。たとえば、GenServer.call({name, node}, arg)
によりリモートノードのサーバーが呼び出せます。第1引数はリモートプロセスのPIDにしても構いません。 -
Task
を用いて、ローカルとリモートの両ノードに生成することもできます(「MixとOTP 08: タスクとgen_tcp」参照)。
:rpc
とGenServer
を使うと、要求はひとつのサーバーにシリアライズされます。他方、Task
はリモートノードで効率的に非同期処理されます。シリアライズされるポイントは、スーパーバイザーによる生成だけです。今回のルーティングレイヤーでは、Task
を用いることにします。けれど、他のやり方でも問題はありません。
async/await
これまでは、開始したTask
は単独で実行しました。また、戻り値も確かめていません。しかし、タスクで値を処理し、あとで結果を見ると役立つ場合があります。そのために、Task
に備わっているのがasync/await
バターンです。
async/await
は、値を同時に処理するシンプルな仕組みです。それだけでなく、async/await
を同じTask.Supervisor
で使うこともできます(「MixとOTP 08: タスクとgen_tcp」「タスクスーパーバイザー」参照)。Task.Supervisor.start_child/3
の替わりにTask.Supervisor.async/3
を呼び出して、あとからTask.await/2
で結果を読み取るだけです。
task = Task.async(fn -> compute_something_expensive end)
res = compute_something_else()
res + Task.await(task)
タスクの分散処理
タスクの分散処理は、タスクの監視と基本的に変わりません。違いは、スーパーバイザーにタスクをつくるとき、ノード名を渡すことです。:kv
アプリケーションのlib/kv/supervisor.ex
を開いて、init/1
のchildren
リストの最後につぎのようにTask.Supervisor
を加えてください。
def init(:ok) do
children = [
{DynamicSupervisor, name: KV.BucketSupervisor, strategy: :one_for_one},
{KV.Registry, name: KV.Registry},
{Task.Supervisor, name: KV.RouterTasks} # 追加
]
Supervisor.init(children, strategy: :one_for_all)
end
改めて、名前つきのノードを起動します。ただし、:kv
アプリケーションのディレクトリから開いてください。
$ cd apps/kv
$ iex --sname foo -S mix
もうひとつのノードについても同様です。
$ iex --sname bar -S mix
これで、スーパーバイザーによりひとつのノードから、もうひとつのノードに直接タスクがつくれます。つぎの分散処理タスクは、タスクが実行されているノード名を取得します。
iex(bar@computer-name)> task = Task.Supervisor.async {KV.RouterTasks, :"foo@computer-name"}, fn ->
...(bar@computer-name)> {:ok, node()}
...(bar@computer-name)> end
%Task{
owner: #PID<0.140.0>,
pid: #PID<15195.164.0>,
ref: #Reference<0.1065362951.4240965635.67142>
}
iex(bar@computer-name)> Task.await(task)
{:ok, :"foo@computer-name"}
上のコードではTask.Supervisor.async/3
に無名関数を与えました。けれど、分散処理ではモジュールと関数および引数を明示的に与える方が望ましいです。無名関数では、ターゲットノードが呼び出しもととまったく同じコードバージョンをもたなければなりません。Task.Supervisor.async/4
を用いれば、引数に渡したモジュールにアリティの合致する関数があればよいので、より堅牢といえます。
iex(bar@computer-name)> task = Task.Supervisor.async {KV.RouterTasks, :"foo@computer-name"}, Kernel, :node, []
%Task{
owner: #PID<0.140.0>,
pid: #PID<15195.165.0>,
ref: #Reference<0.1065362951.4240965635.67197>
}
iex(bar@computer-name)> Task.await(task)
:"foo@computer-name"
ルーティングレイヤー
ファイルlib/kv/router.ex
にルーターのモジュールKV.Router
をつぎのように定めます。なお、computer-nameはローカルマシン名に書き替えてください。
defmodule KV.Router do
@doc """
与えられた`mod`の`fun`に`args`が渡された要求を
プロセス`bucket`にもとづいて適切なノードに送る。
"""
def route(bucket, mod, fun, args) do
# バイナリの最初のバイトを得る
first = :binary.first(bucket)
# table()からエントリーを探してなければエラー
entry =
Enum.find(table(), fn {enum, _node} ->
first in enum
end) || no_entry_error(bucket)
# エントリーが現ノードの場合
if elem(entry, 1) == node() do
apply(mod, fun, args)
else
{KV.RouterTasks, elem(entry, 1)}
|> Task.Supervisor.async(KV.Router, :route, [bucket, mod, fun, args])
|> Task.await()
end
end
defp no_entry_error(bucket) do
raise "could not find entry for #{inspect bucket} in table #{inspect table()}"
end
@doc """
ルーティングテーブル
"""
def table do
# computer-nameはローカルマシン名に置き替える
[{?a..?m, :"foo@computer-name"}, {?n..?z, :"bar@computer-name"}]
end
end
ルーターの動作をテストで確かめましょう。test/kv/router_test.exs
に、つぎのふたつのテストを書きます。
defmodule KV.RouterTest do
use ExUnit.Case, async: true
test "route requests across nodes" do
assert KV.Router.route("hello", Kernel, :node, []) ==
:"foo@computer-name"
assert KV.Router.route("world", Kernel, :node, []) ==
:"bar@computer-name"
end
test "raises on unknown entries" do
assert_raise RuntimeError, ~r/could not find entry/, fn ->
KV.Router.route(<<0>>, Kernel, :node, [])
end
end
end
はじめのテストでは、Kernel.node/0
の呼び出しにより、実行ノード名を得ます。プロセス名として“hello”と“world”を渡しているので、ルーティングテーブルからそれぞれfoo@computer-name
とbar@computer-name
が返されるでしょう。
ふたつ目のテストは、知らないエントリに対してエラーが起こるかどうか確かめます。
はじめのテストを走らせるには、ノードがふたつ実行されていなければなりません。ディレクトリapps/kv
に移って、テストに使われるもうひとつのノードをつぎのように起動してください。
$ iex --sname bar -S mix
そして、もとのノードから、つぎのようにテストを実行すると、正しくとおるはずです。
$ elixir --sname foo -S mix test
テストのフィルタとタグ
前項のテストはとおりました。けれど、テストの構成はもっと込み入ってくるでしょう。実際今も、mix test
で試すと、失敗してしまいます。他のノードへの接続を求めるテストが含まれているからです。
1) test route requests across nodes (KV.RouterTest)
test/kv/router_test.exs:4
** (exit) exited in: GenServer.call({KV.RouterTasks, :"foo@computer-name"}, {:start_task, [#PID<0.169.0>, :monitor, {:nonode@nohost, #PID<0.169.0>}, {KV.Router,:route, ["hello", Kernel, :node, []]}], :temporary, nil}, :infinity)
** (EXIT) no connection to foo@computer-name
code: assert KV.Router.route("hello", Kernel, :node, []) ==
stacktrace:
(elixir) lib/gen_server.ex:924: GenServer.call/3
(elixir) lib/task/supervisor.ex:377: Task.Supervisor.async/6
(kv) lib/kv/router.ex:21: KV.Router.route/4
test/kv/router_test.exs:5: (test)
幸いにも、ExUnitにはテストにタグづけする機能が備わっています。特定のコールバックだけ実行したり、タグにもとづいてフィルタリングすることもできるのです。なお、:capture_log
のように予めExUnitに定められているタグもあります(「MixとOTP 09: DocTestとwithのパターンマッチング」「コマンドを実行する」参照)。
そこで、test/kv/router_test.exs
につぎのようにタグづけしましょう。@tag :distributed
は@tag distributed: true
と書くのと同じです。
@tag :distributed # 追加
test "route requests across nodes" do
assert KV.Router.route("hello", Kernel, :node, []) ==
:"foo@computer-name"
assert KV.Router.route("world", Kernel, :node, []) ==
:"bar@computer-name"
end
テストに正しくタグづけしてあれば、ネットワークにノードがあるかどうかNode.alive?/0
でテストのときに確かめればよいでしょう。なければ、分散処理のテストを省きます。そのために、:kv
アプリケーション(apps/kv
)のtest/test_helper.exs
につぎのように書き加えてください。
exclude =
if Node.alive?, do: [], else: [distributed: true] # 追加
# ExUnit.start()
ExUnit.start(exclude: exclude)
これで、mix test
が失敗なく走るようになります。ExUnitが分散処理のテストひとつを省くからです。
$ mix test
Excluding tags: [distributed: true]
......
Finished in 2.0 seconds
7 tests, 0 failures, 1 excluded
ノードbar@computer-name
が使えるときは、つぎのようにテストすると分散処理も含めてとおるでしょう。
$ elixir --sname foo -S mix test
.......
Finished in 2.0 seconds
7 tests, 0 failures
mix test
コマンドには、タグを動的に含めたり、除いたりすることができます。たとえば、mix test --include distributed
と入力すれば、test/test_helper.exs
の設定にかかわらず、分散処理のテストが含められるのです。あるいは、コマンドラインに--exclude
を渡すと、特定のタグが除けます。さらに、つぎのように--only
を用いて、特定タグのテストだけ実行することもできます。
$ elixir --sname foo -S mix test --only distributed
Including tags: [:distributed]
Excluding tags: [:test]
.
Finished in 0.06 seconds
7 tests, 0 failures, 6 excluded
フィルタについて詳しくは、ExUnit.Case
モジュールの「Filters」をお読みください。
アプリケーション環境と設定
ルーティングテーブルはKV.Router
につぎのように直打ちしてありました。このテーブルを動的にしましょう。そうすれば、開発とテストあるいはプロダクションの設定だけでなく、異なるエントリで動いている異なるノードをルーティングテーブルに加えることもできるようになるのです。OTPにはまさにその機能があります。それがアプリケーション環境です。
def table do
# computer-nameはローカルマシン名に置き替える
[{?a..?m, :"foo@computer-name"}, {?n..?z, :"bar@computer-name"}]
end
環境はアプリケーションごとにあり、固有の設定がキーに与えられます。たとえば、ルーティングテーブルを:kv
アプリケーション環境に納めることができます。これがデフォルト値となり、他のアプリケーションは必要に応じてテーブルが変えられるのです。
apps/kv/mix.exs
を開いて、関数application/0
の戻り値につぎのように書き加えてください。アプリケーションに与えたのはキー:env
です。これがアプリケーションのデフォルト環境となります。エントリーがキー:routing_table
で、値は空のリストです。これで、アプリケーション環境は出荷時に空のテーブルとなります。使われるテーブルは、テストと開発の構成によって変わります。
def application do
[
extra_applications: [:logger],
env: [routing_table: []], # 追加
mod: {KV, []}
]
end
アプリケーション環境をコードで使うには、KV.Router.table/0
の定めをつぎのように書き替えてください。Application.fetch_env!/2
は、:routing_table
は:kv
の環境の中のエントリーを読み込みます。アプリケーション環境の操作について詳しくは、「Application behaviour
」をご参照ください。
def table do
# [{?a..?m, :"foo@computer-name"}, {?n..?z, :"bar@computer-name"}]
Application.fetch_env!(:kv, :routing_table)
end
ルーティングテーブルが空になったので、分散処理のテストは失敗するはずです。改めてテストを実行して確かめましょう。
$ iex --sname bar -S mix
$ elixir --sname foo -S mix test --only distributed
1) test route requests across nodes (KV.RouterTest)
test/kv/router_test.exs:5
** (RuntimeError) could not find entry for "hello" in table []
code: assert KV.Router.route("hello", Kernel, :node, []) ==
stacktrace:
(kv) lib/kv/router.ex:27: KV.Router.no_entry_error/1
(kv) lib/kv/router.ex:14: KV.Router.route/4
test/kv/router_test.exs:6: (test)
アプリケーション環境のよいところは、現行のアプリケーションだけでなく、アプリケーション全体にも設定できることです。全体の設定は、config/config.exs
で行います。たとえば、apps/kv/config/config.exs
を開いて、つぎのコードを最後に加えてください。これは、IExのデフォルトプロンプトの定めです。
config :iex, default_prompt: ">>>"
$ iex -S mix
Erlang/OTP 21 [erts-10.0.5] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe] [dtrace]
Compiling 5 files (.ex)
Generated kv app
Interactive Elixir (1.7.3) - press Ctrl+C to exit (type h() ENTER for help)
>>>
同じように、apps/kv/config/config.exs
ファイルに:routing_table
をつぎのように定めることができます。
# computer-nameはローカルマシン名に置き替える
config :kv, :routing_table, [{?a..?m, :"foo@computer-name"}, {?n..?z, :"bar@computer-name"}]
改めてノードを起動して分散処理をテストすれば、今度はとおるはずです。
:kv
アプリケーションはアンブレラプロジェクトの中のひとつでした。Elixir v1.2から、アンブレラアプリケーションはconfig/config.exs
の設定を共有します。プロジェクトルートのconfig/config.exs
のつぎの記述により、すべての子の設定が読み込まれるからです。
import_config "../apps/*/config/config.exs"
mix run
コマンドは--config
フラグを加えると、必要に応じた設定ファイルが与えられます。異なるノードを起動して、それぞれに異なる設定ができるのです(たとえば、ルーティングテーブルを変えるなど)。
アプリケーション設定のために組み込まれた機能により、アンブレラアプリケーションをデプロイするときにさまざまなやり方が考えられます。
- アンブレラアプリケーションをノードにデプロイして、TCPサーバーとキー/値データ保存の両方に使う
-
:kv_server
アプリケーションをデプロイしてTCPサーバーにのみ使い、ルーティングテーブルには他のノードの指定のみさせる -
:kv
アプリケーションだけをデプロイして、ノードはデータ保存にのみ使う(TCPアクセスはしない)
今後アプリケーションが増えても、デプロイの粒度は同じくらいのレベルに保つことができます。プロダクションのとき、どのアプリケーションをどの設定にするか検討してください。
複数のリリースをビルドするときは、Distilleryのようなツールを使うことも考えられるでしょう。アプリケーションと設定を選んで、現行のErlangとElixirも含めてパッケージにできます。そうすれば、ターゲットシステムにランタイムがインストールされていなくても、アプリケーションがデプロイできるのです。
今回のようなキー/値データ保存の分散処理をプロダクションで使うときには、Riakがお勧めです。RiakもErlang VMで動作します。子プロセスは複製され、データを失うことが避けられます。ルーターの替わりにコンシステントハッシュ法を用いて、プロセスはノードにマップされるのです。このりアルゴリズムは、プロセスを保存するノードが新たに加わって、データを移行しなければならなくなっても、その量を抑えるのに役立ちたます。
MixとOTPもくじ
Posted on June 11, 2019
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.