「GenStage.Flowをちょっと触ってみた」っていうエントリ書こうとしたらよくわかんなくなったけどアッて気づいて解決した
んですよ。
alias Experimental.GenStage.Flow File.stream!("path/to/some/file") |> Flow.from_enumerable() |> Flow.flat_map(&String.split(&1, " ")) |> Flow.reduce(fn -> %{} end, fn word, acc -> Map.update(acc, word, 1, & &1 + 1) end) |> Enum.to_list()
こういうことを2コアのマシンでやると、2つコアを使ってflat_map
とreduce
するって書いてあるんで、あー別プロセスでやってくれるんかーと思って、
alias Experimental.GenStage.Flow ["a b c", "d e f"] |> Flow.from_enumerable() |> Flow.flat_map(fn s -> IO.inspect self; String.split(s, " ") end) |> Flow.reduce(fn -> %{} end, fn word, acc -> IO.inspect self; Map.update(acc, word, 1, & &1 + 1) end) |> Enum.to_list()
というコードを試してみたら、
#PID<0.142.0> #PID<0.142.0> #PID<0.142.0> #PID<0.142.0> #PID<0.142.0> #PID<0.142.0> #PID<0.142.0> #PID<0.142.0>
って出ちゃうんですよ。1プロセスじゃないんすかこれ、っていう。ソース追うかなーと思ったところで、ちょっと思い出したのですよ。
consumerはproducerに対して、「今の俺は余裕あるからこのくらいデータくれ」という要求をするんですが、これがデータの数を上回っていて、1プロセスが全部要求して処理してそれで終わりになっているのではと。てことで、与えるデータをどーんと増やしてみたら、みんなで頑張っている様子が見られました。
["a b c", "d e f"] |> List.duplicate(10000) |> List.flatten |> Flow.from_enumerable |> Flow.flat_map(fn i -> IO.inspect self; String.split(i, " ") end) |> Flow.reduce(fn -> %{} end, fn word, acc -> IO.inspect self; Map.update(acc, word, 1, & &1 + 1) end) |> Enum.to_list() |> IO.inspect
多くてよくわからんわ!という場合は、max_demand
オプションを1にしてあげると良さそう。
Flow.new(max_demand: 1) |> Flow.from_enumerable(["a b c", "d e f"]) |> Flow.flat_map(fn i -> IO.inspect self; String.split(i, " ") end) |> Flow.reduce(fn -> %{} end, fn word, acc -> IO.inspect self; Map.update(acc, word, 1, & &1 + 1) end) |> Enum.to_list() |> IO.inspect
これハマってる人いないのって思ったらいた。