mmag

ハマったことメモなど

「GenStage.Flowをちょっと触ってみた」っていうエントリ書こうとしたらよくわかんなくなったけどアッて気づいて解決した

んですよ。

GenStage.Flowのドキュメントには、

alias Experimental.GenStage.Flow

File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.reduce(fn -> %{} end, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

こういうことを2コアのマシンでやると、2つコアを使ってflat_mapreduceするって書いてあるんで、あー別プロセスでやってくれるんかーと思って、

alias Experimental.GenStage.Flow

["a b c", "d e f"]
|> Flow.from_enumerable()
|> Flow.flat_map(fn s -> IO.inspect self; String.split(s, " ") end)
|> Flow.reduce(fn -> %{} end, fn word, acc ->
  IO.inspect self; Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

というコードを試してみたら、

#PID<0.142.0>
#PID<0.142.0>
#PID<0.142.0>
#PID<0.142.0>
#PID<0.142.0>
#PID<0.142.0>
#PID<0.142.0>
#PID<0.142.0>

って出ちゃうんですよ。1プロセスじゃないんすかこれ、っていう。ソース追うかなーと思ったところで、ちょっと思い出したのですよ。

consumerはproducerに対して、「今の俺は余裕あるからこのくらいデータくれ」という要求をするんですが、これがデータの数を上回っていて、1プロセスが全部要求して処理してそれで終わりになっているのではと。てことで、与えるデータをどーんと増やしてみたら、みんなで頑張っている様子が見られました。

["a b c", "d e f"]
|> List.duplicate(10000)
|> List.flatten
|> Flow.from_enumerable
|> Flow.flat_map(fn i -> IO.inspect self; String.split(i, " ") end)
|> Flow.reduce(fn -> %{} end, fn word, acc ->
  IO.inspect self;  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
|> IO.inspect

多くてよくわからんわ!という場合は、max_demandオプションを1にしてあげると良さそう。

Flow.new(max_demand: 1)
|> Flow.from_enumerable(["a b c", "d e f"])
|> Flow.flat_map(fn i -> IO.inspect self; String.split(i, " ") end)
|> Flow.reduce(fn -> %{} end, fn word, acc ->
  IO.inspect self;  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
|> IO.inspect

これハマってる人いないのって思ったらいた。

github.com