MixとOTP 08: タスクとgen_tcp
gumi TECH
Posted on May 14, 2019
本稿はElixir公式サイトの許諾を得て「Task and gen_tcp」の解説にもとづき、加筆補正を加えて、Erlangの:gen_tcp
モジュールによりリクエストをどのように処理するかについてご説明します。また、Taskモジュールの使い方もご紹介します。
エコーサーバー
まずはエコーサーバーを実装して、TCPサーバーを起ち上げましょう。リクエストで受け取ったテキストを、レスポンスで送ります。少しずつ改善を重ねて目指すのは、監視を受けながら複数の接続が扱えるサーバーです。
TCPサーバーは、大きくつぎの手順を踏みます。
- ポートが使えるまで待ってソケットを保持します。
- そのポートにクライアントが接続するのを待って受け入れます。
- クライアントのリクエストを読み込んで、返すレスポンスを書き込みます。
これらの手順を実装しましょう。apps/kv_server
アプリケーションに移って、lib/kv_server.ex
を開きます。モジュールKVServer
をつぎのように書き替えてください。
require Logger
defmodule KVServer do
def accept(port) do
# オプションの機能はつぎのとおり:
#
# 1. `:binary` - データをバイナリとして受け取る(リストでなく)
# 2. `packet: :line` - データを1行ずつ受け取る
# 3. `active: false` - データが受け取れるようになるまで`:gen_tcp.recv/2`を待たせる
# 4. `reuseaddr: true` - リスナーが落ちたときアドレスを再利用できるようにする
#
{:ok, socket} =
:gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true])
Logger.info("Accepting connections on port #{port}")
loop_acceptor(socket)
end
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client)
loop_acceptor(socket)
end
defp serve(socket) do
socket
|> read_line()
|> write_line(socket)
serve(socket)
end
defp read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end
defp write_line(line, socket) do
:gen_tcp.send(socket, line)
end
end
KVServer
モジュールは、accept/1
の呼び出しによりサーバーを起ち上げます。引数はポート番号で、渡す値は4040です。accept/1
はまずポートを監視して、ソケットが使えるまで待ちます。つぎに呼び出すのはloop_acceptor/1
です。ループしながら、クライアントの接続を待ち受けます。そして、接続があるたび呼び出すのはserve/1
です。
serve/1
もループ処理です。1行ずつソケットから読み込んでは、書き戻します。serve/1
関数がこの処理の流れを、パイプ演算子|>
で組み立てていることにご注目ください。パイプ演算子の左辺の値を右辺の関数が第1引数に受け取り、その戻り値はつぎのパイプ演算子によりさらに右辺の関数に渡されます。
socket |> read_line() |> write_line(socket)
通常の引数の書き方では関数の入れ子になる処理が、パイプ演算子を用いるとわかりやすく書けるのです。
write_line(read_line(socket), socket)
read_line/1
はソケットの読み書きを、:gen_tcp
モジュールの関数により実装しています。読み取りが:gen_tcp.recv/2
、書き込みは:gen_tcp.send/2
です。
serve/1
はずっとループして実行され続けます。すると、この関数を呼び出したloop_acceptor/1
の本体は末尾呼び出しにたどりつかないので、ループしなくてもよいのではないでしょうか。けれど、あとでserve/1
を別のプロセスに分けるため、末尾呼び出しが必要になります。
これでエコーサーバーはできました。kv_server
アプリケーションの中でIExのセッションをiex -S mix
で起動してください。そして、KVServer.accept/1
をつぎのように呼び出します。
iex> KVServer.accept(4040)
00:00:00.000 [info] Accepting connections on port 4040
サーバーが起ち上がると接続待ちの状態になります。ここでは、Telnetクライアントを使ってサーバーにアクセスしましょう。コマンドラインツールでつぎのように入力してください。他のクライアントでも、入力するコマンドラインはほぼ同じです。
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
エコーサーバーは入力待ちになります。キー入力して[enter]キーを押せば、そのテキストがつぎの行に表示されるはずです。
hello
hello
world
world
Telnetを終了するには、[control]/[Ctrl] + ]でコマンドモードに切り替え、quitに続けて[enter]を入力してください(終了の仕方はクライアントにより異なります)。Telnetが閉じると、IExのセッションにはつぎのようなエラーが表れるでしょう。:gen_tcp.recv/2
がデータを受け取ろうとしたところ、クライアントの接続が閉じてしまったからです。この問題は、あとでサーバーを改定するときに扱うことにします。
** (MatchError) no match of right hand side value: {:error, :closed}
(kv_server) lib/kv_server.ex:33: KVServer.read_line/1
(kv_server) lib/kv_server.ex:26: KVServer.serve/1
(kv_server) lib/kv_server.ex:20: KVServer.loop_acceptor/1
差し当たり、先に直さなければならないのは、TCPアクセプタがクラッシュした場合の問題です。サーバーにはまだ監視プロセスがありません。すると、サーバーが落ちたら再起動されないので、そのあとリクエストは扱えなくなります。ですから、サーバーは監視ツリーに移さなければなりません。
タスク
Agent
やGenServer
、そしてSupervisor
は、いずれも複数のメッセージを扱い、状態が管理できました。けれど、やりたいことが簡単なタスクのときは、Task
モジュールが使えます。
たとえば、引数に渡した無名関数を新しいプロセスの中で実行し、監視ツリーに加えるのがstart_link/1
関数です。lib/kv_server/application.ex
を開いて、start/2
関数につぎのように書き加えてください。いつもは、子のリストに2要素のタプルを渡します。その代わりに、Task.start_link/1
をタスクとして呼び出すのです。
def start(_type, _args) do
children = [
{Task, fn -> KVServer.accept(4040) end} # 追加
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
これで、サーバーが監視ツリーに含められ、アプリケーションとともに起動します。アプリケーションは、mix run
コマンドでつぎのように起ち上げてください。
$ mix run --no-halt
00:00:00.000 [info] Accepting connections on port 4040
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
say you
say you
say me
say me
上のコードでは、ポート番号は直打ちしました。たとえば、つぎのように書き替えると、アプリケーションが起ち上がるとき、ポート番号をシステム環境から読み取ります。
port = String.to_integer(System.get_env("PORT") || "4040")
children = [
# {Task, fn -> KVServer.accept(4040) end}
{Task, fn -> KVServer.accept(port) end}
]
ポート番号は、アプリケーションを起動するmix run
コマンドにつぎのように変数で加えてください。
$ PORT=4321 mix run --no-halt
前掲ポートの設定にはデフォルト値(4040)を与えました。ですから、ポート番号を省くと、その番号で接続を待ちます。
$ mix run --no-halt
00:00:00.000 [info] Accepting connections on port 4040
Telnetクライアントを増やすとどうでしょう。ひとつ目のクライアントを接続したまま、もうひとつTelnetクライアントを開いてみてください。ふたつ目のクライアントは、エコーが示されません。
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello?
HELLOOOOOO?
これは、リクエストを同じプロセスで扱っているからです。ひとつのクライアントが接続されると、つながっている間は、別のクライアントは受け入れられません。
タスクスーパーバイザー
サーバーが同時接続を扱えるようにするには、ひとつのプロセスをアクセプタとして、別につくったプロセスにリクエストを処理させなければなりません。ひとつ思いつくのは、KVServer. loop_acceptor/1
でTask.start_link/1
を使うように書き替えることです。これで、アクセプタのプロセスからリンクしたタスクが開始します。
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
# serve(client)
Task.start_link(fn -> serve(client) end)
loop_acceptor(socket)
end
けれど、serve(client)
のタスクをアクセプタにリンクしてしまうのはいただけません。リクエストの処理が落ちると、アクセプタまで巻き込んで、その結果すべての接続が切れてしまうことになるからです。
そうならないための方策は、プロセスごとにスーパーバイザーを設けることでした(「MixとOTP 05: ダイナミックスーパーバイザー」参照)。暫定のタスクをスーパーバイザーに開始させ、監視ツリーに加えるのです。KVServer.Application
のstart/2
関数を改めてつぎのように書き替えましょう。
def start(_type, _args) do
port = String.to_integer(System.get_env("PORT") || "4040")
children = [
{Task.Supervisor, name: KVServer.TaskSupervisor}, # 追加
{Task, fn -> KVServer.accept(port) end}
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
これで、Task.Supervisor
がKVServer.TaskSupervisor
という名前で起動します。アクセプタはこのスーパーバイザーに依存することにお気をつけください。つまり、スーパーバイザーは先に開始しなければならないのです。そこで、KVServer
のloop_acceptor/1
は、Task.Supervisor
がリクエストを扱うように書き替えます。
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
# serve(client)
{:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end) # 追加
:ok = :gen_tcp.controlling_process(client, pid) # 追加
loop_acceptor(socket)
end
:gen_tcp.controlling_process/2
の呼び出しに気づいたでしょうか。これは子プロセスをクライアントソケットの「制御プロセス」にします。そうしないと、アクセプタがクラッシュしたとき、すべてのクライアントを落としてしまいます。ソケットは、受けつけられたプロセスに結びついているからです(これはデフォルトの動作です)。
改めてサーバーを起ち上げると、Telnetクライアントが同時にいくつも開けます。ひとつのクライアントを閉じてもアクセプタは落ちないので、他のクライアントの接続は生きているでしょう。
$ mix run --no-halt
00:00:00.000 [info] Accepting connections on port 4040
スーパーバイザーが加わり、監視戦略は正しく定められました。アクセプタがクラッシュしても、すでにある接続を落とすことはありません。また、タスクスーパーバイザーがクラッシュしたとき、アクセプタを落とす必要もないでしょう。
もうひとつ考えなければならないのは再起動戦略です。タスクはデフォルトでは:restart
の値が:temporary
に設定されています。これは再起動しないということです。Task.Supervisor
が開始した接続については適切といえます。失敗した接続を再開する意味はないからです。けれど、アクセプタが落ちたら、再起動させるべきでしょう。
ひとつのやり方は、新たに定義するモジュールでuse Task
を使い、restart: :permanent
としてstart_link
関数を呼び出すことにより、タスクを再起動することです。けれど、他の開発者のライブラリと統合するときは、エージェント、タスク、およびサーバーの定義方法は変えられません。そこで、子プロセスの仕様を動的にカスタマイズすることにしましょう。そのためには、KVServer.Application
のstart/2
でSupervisor.child_spec/2
を呼び出すように書き替えます。
def start(_type, _args) do
port = String.to_integer(System.get_env("PORT") || "4040")
children = [
{Task.Supervisor, name: KVServer.TaskSupervisor},
# {Task, fn -> KVServer.accept(port) end}
Supervisor.child_spec({Task, fn -> KVServer.accept(port) end}, restart: :permanent)
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
Supervisor.child_spec/2
は、モジュールやタプルから子の仕様がつくれます。さらに、子の仕様をオーバーライドすることもできるのです。これで、アクセプタはつねに動作します。そして、一時的なタスクプロセスを、つねに動作しているスーパーバイザーのもとで開始できるのです。以下にKVServer.Application
とKVServer
の定義を掲げましょう。
defmodule KVServer.Application do
@moduledoc false
use Application
def start(_type, _args) do
port = String.to_integer(System.get_env("PORT") || "4040")
children = [
{Task.Supervisor, name: KVServer.TaskSupervisor},
Supervisor.child_spec({Task, fn -> KVServer.accept(port) end}, restart: :permanent)
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
end
require Logger
defmodule KVServer do
@doc """
定められた`port`で接続の受け入れを始める。
"""
def accept(port) do
{:ok, socket} =
:gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true])
Logger.info("Accepting connections on port #{port}")
loop_acceptor(socket)
end
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
{:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
:ok = :gen_tcp.controlling_process(client, pid)
loop_acceptor(socket)
end
defp serve(socket) do
socket
|> read_line()
|> write_line(socket)
serve(socket)
end
defp read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end
defp write_line(line, socket) do
:gen_tcp.send(socket, line)
end
end
MixとOTPもくじ
Posted on May 14, 2019
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.