mmag

ハマったことメモなど

GenRouterってなんぞ

社内のElixir勉強会で↓のtalkを観た。

GenRouter, GenStageという新たなアイデアを検証中で、elixir-lang/gen_brokerにコードがあるとのこと。

github.com

見どころはたくさんあるけれど、今日はGenRouter

GenRouterのmoduledocの最初の方

読んでく。


複数のproducerからconsumerにイベントをルーティングするためのbehaviourモジュール。

GenRouterを使えば、複数のproducerからイベントを受け取ったり、複数のconsumerへイベントを送ったり、その両方ができる。producerとconsumerの接続は、consumerがproducerをsubscribeすることで確立される。

1つのプロセスがproducerとconsumerの両方を兼ねることもよくある。データを送る専門のproducerのことを"source"、データを受け取る専門のconsumerのことを"sink"と呼ぶ。

プロセスA, B, Cを考える

[A] -> [B] -> [C]

AはBのproducer、BはCのproducer。つまり、CはBのconsumer、BはAのconsumer。Aはsourceで、Cはsink。

IncomingとOutgoing

Routerは2つのパーツに分けられる。Bを2つに分けよう。

[A] -> [consumer | producer] -> [C]

consumer部分は上流からメッセージを受け取るがproducerはそれを下流へ流す。言い換えれば、consumerは入力をハンドルし、producerは出力をハンドルする。これらの部分はそれぞれ別のモジュールで定義される。開発者は任意の2つを組み合わせることができる。例えば、複数プロセスからのメッセージを受け付けるGenRouter.DynamicInと、メッセージを複数のconsumerにブロードキャストするGenRouter.BroadcastOutを組み合わせたり、自作のRoundRobinOutと組み合わせることもできる。

実装例を見よう。A, B, Cに当てはめて作っていく。

始めにsinkであるCを定義する。受け取ったものをterminalに表示する。

defmodule MySink do
  use GenRouter.Sink

  def start_link do
    GenRouter.Sink.start_link(__MODULE__, [])
  end

  def handle_event(event, state) do
    IO.puts "Got #{inspect event}"
    {:ok, state}
  end
end

次はsourceであるAとrouterであるB。それらをsinkにつなぐ。

# Aはイベントを外から受け取りブロードキャストするsource
iex> {:ok, producer} = GenRouter.start_link(GenRouter.DynamicIn, [], GenRouter.BroadcastOut, [])

# Bは1つのproducerからイベントを受けてそれをブロードキャストするrouter
iex> {:ok, router} = GenRouter.start_link(GenRouter.SingleIn, [], GenRouter.BroadcastOut, [])

# Cはconsumer
iex> {:ok, consumer} = MySink.start_link()

# subscribeして3つをつなぐ
iex> GenRouter.subscribe(router, to: producer)
iex> GenRouter.subscribe(consumer, to: router)

# routerにイベントを送るTaskをspawnする
iex> Task.start_link fn -> GenRouter.sync_notify(producer, :hello) end

# consumerがイベントを画面に出力する

@moduledoc今日はここまで。

感想

発表ではCSVのパースが例に出ていた。Enumでやるとデータのコピーがたくさんで、メモリ食って仕方ない。そこでStreamを使って問題を解決。次はGenRouterで並列化して速度を上げるぞ。ということらしい。ここまではなるほど〜という感じ。

[CSVパースするやつ] -> [filterするやつ] -> [mapするやつ] -> [画面に出力するやつ]

ちょっと気になったのは、途中でクラッシュしたらどうなるんだろうということ。CSVの真ん中くらいによくわからんことが書いてあって、その行をfilterしようとしてクラッシュしたときどうなるのか。consumerが生きているかどうかproducerは気にかけておくべきなのか、こいつらを全部monitorしてる存在がうまくやるのか、とりあえず全部やりなおすのか。いずれにせよ、途中からresumeみたいなことは工夫しないと難しそう。