concurrent.futures を使った並列処理
Fomalhaut Weisszwerg
Posted on December 15, 2023
現在所属している FLINTERS が 2024 年 1 月で 10 周年らしく、ブログリレー企画が立ったので久しぶりに日本語オンリー記事を書くことにしました。
ブログリレー 98 日目らしいです。
1. Python 標準の並列処理モジュール
Python 標準モジュールには並列処理に関連した複数のモジュールが存在します。
よく知られているものとしては
-
_thread
モジュール- マルチスレッドを実装するためのモジュール
- Python 2 における
thread
モジュールとほぼ同じ - 直接使われることはほぼない
-
threading
モジュール- マルチスレッドを実装するためのモジュール
- 低レイヤー部分をラップしてある程度使いやすくしたモジュール
- I/O 待ちが多いプログラムの効率化などで稀に使われることがある
-
_multiprocessing
モジュール- 直接使うことはまずない
-
multiprocessing
モジュール- マルチプロセスを実装するためのモジュール
- 低レイヤー部分をラップしてある程度使いやすくしたモジュール
- 標準モジュールの中では一番使用されることが多い
-
concurrent.futures
モジュール- Future パターンに特化したモジュール
- 今回の主題
2. subprocess
モジュールはどうなの?
Python 3.10 からデフォルトでは内部処理で vfork()
システムコールが使用されるようになりました。vfork()
システムコールは子プロセスの処理が終了するまで親プロセスの動作を停止させます。
このため subprocess
は並列処理の実装にはあまり向かないモジュールになったと言えます。
3. concurrent.futures
モジュール
3.1. concurrent.futures
モジュールって何?
Future パターン に特化したモジュールです。
threading
, multiprocess
をさらにラップして、並列処理をより簡単に実装できるようにしています。
concurrent.futures
モジュールを利用した実装は OS ごとの差異をほとんど気にする必要がないので、プラットフォームポータビリティが高くメンテコストを抑えたコードを書くことが可能です。
スレッドとプロセスをほぼ同じように扱えるので、マルチスレッドとマルチプロセスどちらにしたほうがより効率的になるのかを実際に動かして比較検討するといったことも容易にできます。
また、 concurrent.futures
モジュールを活用することで分散並列処理 (distributed computing) も可能になります。実際に
dask.distributed というライブラリは concurrent.futures
と dask を拡張することで実装されています。
3.2. 実装の流れ
concurrent.future
を使った実装は
- 関数またはメソッドを実装
- Executor に関数またはメソッドを渡して実行させる
- 結果を評価
という流れになります。
3.3. 実装例
GitHub topics と dev.to rss を HTTP GET してブラウザで開くだけの単純なプログラムです。
"""
Sample with `concurrent.futures` module.
"""
import concurrent.futures
import multiprocessing
import tempfile
import webbrowser
import httpx
class SampleExecutor:
def __init__(self) -> None:
self.list_rss = []
self.executor = concurrent.futures.ProcessPoolExecutor(
max_workers=int(multiprocessing.cpu_count() / 4),
# In Python 3.14, `ProcessPoolExecutor` should specify `multiprocessing.get_context("fork")`
# for `mp_context` explicitly. See:
# https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor
mp_context=multiprocessing.get_context("fork"),
)
def precede_get_rss(self, urls: list[str]) -> None:
"""
method:
precede_get_rss(URLs)
description:
Get RSS from URLs.
"""
self.futures = [self.executor.submit(httpx.get, url) for url in urls]
def postprocess_get_rss(self) -> None:
"""
method:
postprocess_get_rss()
description:
Postprocess results of HTTP GET.
"""
for future in concurrent.futures.as_completed(self.futures):
if future.exception() is None:
http_response = future.result()
if http_response.status_code == httpx.codes.OK:
self.list_rss.append(http_response.text)
def _open_default_browser(self) -> None:
"""
method:
_open_default_browser()
description:
Open RSS in the default browser.
note:
- On Windows, this method might not work correctly...
"""
for rss_data in self.list_rss:
with tempfile.NamedTemporaryFile(mode="wt", delete=False) as temp_file:
temp_file.write(rss_data)
temp_file.flush()
webbrowser.open(temp_file.name)
if __name__ == "__main__":
target_urls = ("https://github.com/topics/rss", "https://dev.to/t/rss")
sample_executor = SampleExecutor()
sample_executor.precede_get_rss(target_urls)
sample_executor.postprocess_get_rss()
sample_executor._open_default_browser()
precede_get_rss()
メソッド内部で concurrent.futures
を使ってレスポンスを待つことなく複数の URL へアクセスしています。
Future パターンでは結果処理を任意のタイミングにできるので、URL へのアクセスに使用したメソッドとは異なる postprocess_get_rss()
に分けています。
上の例では precede_get_rss
の直後に postprocess_get_rss
を呼び出していますが、この 2 つのメソッドの間に全く別の処理を挟んだとしても何も問題はありません。
3.4. submit()
or map()
Executor にタスクを渡すメソッドとして submit()
と map()
がありますが、これらはどうやって使い分けたらいいのでしょうか?
個人的には次のように考えています
-
submit()
- すべてのタスクを同時に開始する必要がないとき
- on demand でプールにタスクを入れていきたいとき
-
map()
- 複数のタスクの開始をなるべく同時にしたいとき
- すべてのタスクの完了を待つ必要があるとき
3.5. as_completed()
or wait()
submit()
メソッドは即座に Future
オブジェクトを返しますが、オブジェクトが返された時点ではタスクが完了していないことがほとんどです。
結果を処理するためにはタスクの完了を待つ必要がありますが、タスクが完了するまで待つための関数として as_completed()
と wait()
の 2 つが用意されています。
as_completed()
と wait()
はどのように使い分ければいいでしょうか?
個人的には下記のように使い分けています
-
as_completed()
- タスクの追加順序を考慮する必要がない場合
- 複数のタスクの結果を同時に使用する必要がない場合
- 完了したタスクの結果から順に処理したい場合
-
wait()
- すべてのタスクの完了を待ってから次に進みたい場合
- タスクのいずれか一つでもエラーが発生したら即座にエラーハンドリングしたい場合
3.6. 問題が発生した場合にプロセスを停止させるには?
concurrent.futures.ProcessPoolExecutor
を使用したマルチプロセスプログラムの欠点として Future.cancel()
メソッドでは「実行中のプロセスはキャンセルできない」ということが挙げられます。
デッドロックが発生してしまった場合に、そのプロセスに対応する Future オブジェクトの cancel()
を呼び出してもプロセスを終了させることはできません。
プール内のプロセスでデッドロックが発生してしまった場合は ProcessPoolExecutor._processes
からプロセスオブジェクトにアクセスできるので、そこからプロセスを kill
します。
デッドロックなどの問題が発生してしまったなどでプール内のプロセスを終了させたい場合は下記のようになります。
pool_executor = concurrent.futures.ProcessPoolExecutor()
futures = [pool_executor.submit(task, arg) for arg in args]
try:
for future in concurrent.futures.as_completed(futures, timeout=3600):
# 処理結果を使ってやりたいことをここに実装
except concurrent.futures.TimeoutError:
for process in pool_executor._processes:
process.kill()
# プール内のプロセスを強制終了させた場合、パイプなどが破壊されるので
# ProcessPoolExecuter もシャットダウンさせてつくりなおしたほうがいい
pool_executor.shutdown()
デッドロックなどの問題が発生した際のプロセス制御に難があるものの、それを除けば concurrent.futures
モジュールを使うことで比較的簡単にマルチスレッド、マルチプロセスプログラミングが可能になったと思います。
一方で細かい制御を行いたい場合はこれまで通り threading
モジュール、 multiprocessing
モジュールを使うことになると思います。
Posted on December 15, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.