社内のElixir勉強会で↓のtalkを観た。
GenRouter
, GenStage
という新たなアイデアを検証中で、elixir-lang/gen_brokerにコードがあるとのこと。
見どころはたくさんあるけれど、今日はGenRouter
。
GenRouter
のmoduledocの最初の方
読んでく。
複数のproducerからconsumerにイベントをルーティングするためのbehaviourモジュール。
GenRouter
を使えば、複数のproducerからイベントを受け取ったり、複数のconsumerへイベントを送ったり、その両方ができる。producerとconsumerの接続は、consumerがproducerをsubscribeすることで確立される。
1つのプロセスがproducerとconsumerの両方を兼ねることもよくある。データを送る専門のproducerのことを"source"、データを受け取る専門のconsumerのことを"sink"と呼ぶ。
プロセスA, B, Cを考える
[A] -> [B] -> [C]
AはBのproducer、BはCのproducer。つまり、CはBのconsumer、BはAのconsumer。Aはsourceで、Cはsink。
IncomingとOutgoing
Routerは2つのパーツに分けられる。Bを2つに分けよう。
[A] -> [consumer | producer] -> [C]
consumer部分は上流からメッセージを受け取るがproducerはそれを下流へ流す。言い換えれば、consumerは入力をハンドルし、producerは出力をハンドルする。これらの部分はそれぞれ別のモジュールで定義される。開発者は任意の2つを組み合わせることができる。例えば、複数プロセスからのメッセージを受け付けるGenRouter.DynamicIn
と、メッセージを複数のconsumerにブロードキャストするGenRouter.BroadcastOut
を組み合わせたり、自作のRoundRobinOut
と組み合わせることもできる。
実装例を見よう。A, B, Cに当てはめて作っていく。
始めにsinkであるCを定義する。受け取ったものをterminalに表示する。
defmodule MySink do use GenRouter.Sink def start_link do GenRouter.Sink.start_link(__MODULE__, []) end def handle_event(event, state) do IO.puts "Got #{inspect event}" {:ok, state} end end
次はsourceであるAとrouterであるB。それらをsinkにつなぐ。
# Aはイベントを外から受け取りブロードキャストするsource iex> {:ok, producer} = GenRouter.start_link(GenRouter.DynamicIn, [], GenRouter.BroadcastOut, []) # Bは1つのproducerからイベントを受けてそれをブロードキャストするrouter iex> {:ok, router} = GenRouter.start_link(GenRouter.SingleIn, [], GenRouter.BroadcastOut, []) # Cはconsumer iex> {:ok, consumer} = MySink.start_link() # subscribeして3つをつなぐ iex> GenRouter.subscribe(router, to: producer) iex> GenRouter.subscribe(consumer, to: router) # routerにイベントを送るTaskをspawnする iex> Task.start_link fn -> GenRouter.sync_notify(producer, :hello) end # consumerがイベントを画面に出力する
@moduledoc今日はここまで。
感想
発表ではCSVのパースが例に出ていた。Enum
でやるとデータのコピーがたくさんで、メモリ食って仕方ない。そこでStream
を使って問題を解決。次はGenRouter
で並列化して速度を上げるぞ。ということらしい。ここまではなるほど〜という感じ。
[CSVパースするやつ] -> [filterするやつ] -> [mapするやつ] -> [画面に出力するやつ]
ちょっと気になったのは、途中でクラッシュしたらどうなるんだろうということ。CSVの真ん中くらいによくわからんことが書いてあって、その行をfilterしようとしてクラッシュしたときどうなるのか。consumerが生きているかどうかproducerは気にかけておくべきなのか、こいつらを全部monitorしてる存在がうまくやるのか、とりあえず全部やりなおすのか。いずれにせよ、途中からresumeみたいなことは工夫しないと難しそう。