Task.asny_stream/2
で行えるつまり、GenStageは需要トリガーを元にストリームイベントを処理するライブラリなのです。需要トリガーで供給を引っ張ることを、ストリーム界隈(?)ではバックプレッシャー(背圧)といいます。
NG: [Producer] -> [Step 1] -> [Step 2] -> [Step 3]
OK:
[Consumer]
/
[Producer]-<-[Consumer]
\
[Consumer]
if your domain has to process the data in multiple steps, you should write that logic in separate modules and not directly in a GenStage.
callbacks | producer | producerconsumer | consumer |
---|---|---|---|
init |
must | must | must |
handledemand/2 |
must | - | - |
handle_event/2 |
- | must | must |
GenServer’s | available | available | available |
イベントの配布方法。consumer
からevent
が流れることはないので、producer
とproducer_cosumer
にて設定する。
1. DemandDispatcher
: BackPressureによるDemandトリガーなモデル
2. PartitionDispatcher
: Eventに応じてDispatcherを変動
3. BroadcastDispatcher
: 全ConsumerにBroadcast
consumer
/producer_consumer
は「handle_event
の終了タイミング」=「再度producer
への要求タイミング」と認識して、demand
をproducer
へ自動的に投げる複数consumerを定義する場合は、上記処理を手動で行う必要がある
handle_subscribe
を定義{:manual, event}
をreturn。デフォルトは{:automatic, state}
handle_subscribe
はConsumerがProducerにsubscribeするタイミングに実行されるので、各producer毎に初回の1回ずつのみの実行となる。Producer
への要求はGenStage.ask/3
で行うhandle_subscribe
を定義max_demand
/ min_demand
optionを設定すること
Back-Pressureとして、量を調節する機構はConsumer
にて行い、レートリミッタとして実現する
> 時間間隔ごとに限られた数のイベントを処理できるコンシューマを実装しましょう。これらはレートリミッタと呼ばれることがよくあります。
Elixir London June 2016 w/ José Valim
- 基本的な流れ
Back-Pressure
→後ろから始まるからBack-Pressure