* Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned
* by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized
* arbitrary many times and each of the materializations will feed the elements into the original [[Source]].
* Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own
* [[Sink]] for feeding that materialization.
* If one of the inputs fails the [[Sink]], the [[Source]] is failed in turn (possibly jumping over already buffered
* elements). Completed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed
* and any new producers using the [[Sink]] will be cancelled.
* @param perProducerBufferSize Buffer space used per producer. Default value is 16.
def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] =
Source.fromGraph(new MergeHub[T](perProducerBufferSize))
val fixedSink = Sink.foreach(println)
val sinkGraph: RunnableGraph[Sink[Any,NotUsed]] = MergeHub.source(perProducerBufferSize = 16).to(fixedSink)
val inGate: Sink[Any,NotUsed] = sinkGraph.run() //common input
//now connect any number of source
val (killSwitch,_) = (Source(Stream.from(0)).delay(1.second,DelayOverflowStrategy.backpressure)
val (killSwitch2,_) = (Source(List("a","b","c","d","e")).delay(2.second,DelayOverflowStrategy.backpressure)
val (killSwitch3,_) = (Source(List("AA","BB","CC","DD","EE")).delay(3.second,DelayOverflowStrategy.backpressure)
* Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set
* of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized
* value. This [[Source]] can be materialized an arbitrary number of times and each materialization will receive the
* broadcast elements from the original [[Sink]].
* Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own
* [[Source]] for consuming the [[Sink]] of that materialization.
* If the original [[Sink]] is failed, then the failure is immediately propagated to all of its materialized
* [[Source]]s (possibly jumping over already buffered elements). If the original [[Sink]] is completed, then
* all corresponding [[Source]]s are completed. Both failure and normal completion is "remembered" and later
* materializations of the [[Source]] will see the same (failure or completion) state. [[Source]]s that are
* cancelled are simply removed from the dynamic set of consumers.
* @param bufferSize Buffer size used by the producer. Gives an upper bound on how "far" from each other two
* concurrent consumers can be in terms of element. If this buffer is full, the producer
* is backpressured. Must be a power of two and less than 4096.
def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = Sink.fromGraph(new BroadcastHub[T](bufferSize))
val killAll = KillSwitches.shared("terminator")
val fixedSource=Source(Stream.from(100)).delay(1.second,DelayOverflowStrategy.backpressure)
val sourceGraph = fixedSource.via(killAll.flow).toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).async
val outPort = sourceGraph.run() //shared source
//now connect any number of sink to outPort
outPort.to(Sink.foreach{c =>println(s"A: $c")}).run()
outPort.to(Sink.foreach{c =>println(s"B: $c")}).run()
outPort.to(Sink.foreach{c =>println(s"C: $c")}).run()
val (sink, source) = MergeHub.source[Int](perProducerBufferSize = 16)
.toMat(BroadcastHub.sink(bufferSize = 16))(Keep.both).run()
现在我们可以用Flow.fromSinkAndSource(sink, source)来构建一个Flow[I,O,?]:
def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] =
fromSinkAndSourceMat(sink, source)(Keep.none)
val channel: Flow[Int, Int, UniqueKillSwitch] =
Flow.fromSinkAndSource(sink, source)
.joinMat(KillSwitches.singleBidi[Int, Int])(Keep.right)
* If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
* the stream is failed with a [[scala.concurrent.TimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
* '''Emits when''' upstream emits an element
* '''Backpressures when''' downstream backpressures
* '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand.
* '''Cancels when''' downstream cancels
def backpressureTimeout(timeout: FiniteDuration): Repr[Out] = via(new Timers.BackpressureTimeout[Out](timeout))
val killChannel1 = fixedSource.viaMat(channel)(Keep.right).to(fixedSink).run()
val killChannel2 = Source.repeat(888)
def sink[T](partitioner: (Int, T) ⇒ Int, startAfterNrOfConsumers: Int,
bufferSize: Int = defaultBufferSize): Sink[T, Source[T, NotUsed]] =
statefulSink(() ⇒ (info, elem) ⇒ info.consumerIdByIdx(partitioner(info.size, elem)), startAfterNrOfConsumers, bufferSize)
* This `statefulSink` should be used when there is a need to keep mutable state in the partition function,
* e.g. for implemening round-robin or sticky session kind of routing. If state is not needed the [[#sink]] can
* be more convenient to use.
* @param partitioner Function that decides where to route an element. It is a factory of a function to
* to be able to hold stateful variables that are unique for each materialization. The function
* takes two parameters; the first is information about active consumers, including an array of consumer
* identifiers and the second is the stream element. The function should return the selected consumer
* identifier for the given element. The function will never be called when there are no active consumers,
* i.e. there is always at least one element in the array of identifiers.
* @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected.
* This is only used initially when the stage is starting up, i.e. it is not honored when consumers have
* been removed (canceled).
* @param bufferSize Total number of elements that can be buffered. If this buffer is full, the producer
* is backpressured.
@ApiMayChange def statefulSink[T](partitioner: () ⇒ (ConsumerInfo, T) ⇒ Long, startAfterNrOfConsumers: Int,
bufferSize: Int = defaultBufferSize): Sink[T, Source[T, NotUsed]] =
Sink.fromGraph(new PartitionHub[T](partitioner, startAfterNrOfConsumers, bufferSize))
//interupted temination
val killAll = KillSwitches.shared("terminator")
//fix a producer
val fixedSource = Source.tick(1.second, 1.second, "message")
.zipWith(Source(1 to 100))((a, b) => s"$a-$b")
//connect to PartitionHub which uses function to select sink
val sourceGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.sink(
(size, elem) => math.abs(elem.hashCode) % size,
startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right)
//materialize the source
val fromSource = sourceGraph.run()
//connect to fixedSource freely
fromSource.runForeach(msg => println("subs1: " + msg))
fromSource.runForeach(msg => println("subs2: " + msg))
//partitioner function
def roundRobin(): (PartitionHub.ConsumerInfo, String) ⇒ Long = {
var i = -1L
(info, elem) => {
i += 1
info.consumerIdByIdx((i % info.size).toInt)
val roundRobinGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.statefulSink(
() => roundRobin(),startAfterNrOfConsumers = 2,bufferSize = 256)
val roundRobinSource = roundRobinGraph.run()
roundRobinSource.runForeach(msg => println("roundRobin1: " + msg))
roundRobinSource.runForeach(msg => println("roundRobin2: " + msg))
val producer = Source(0 until 100)
// ConsumerInfo.queueSize is the approximate number of buffered elements for a consumer.
// Note that this is a moving target since the elements are consumed concurrently.
val runnableGraph: RunnableGraph[Source[Int, NotUsed]] =
() => (info, elem) ⇒ info.consumerIds.minBy(id ⇒ info.queueSize(id)),
startAfterNrOfConsumers = 2, bufferSize = 16))(Keep.right)
val fromProducer: Source[Int, NotUsed] = runnableGraph.run()
fromProducer.runForeach(msg => println("fast1: " + msg))
fromProducer.throttle(10, 100.millis, 10, ThrottleMode.Shaping)
.runForeach(msg => println("fast2: " + msg))
