Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub详解编程语言

  在现实中我们会经常遇到这样的场景:有一个固定的数据源Source,我们希望按照程序运行状态来接驳任意数量的下游接收方subscriber、又或者我需要在程序运行时(runtime)把多个数据流向某个固定的数据流终端Sink推送。这就涉及到动态连接合并型Merge或扩散型Broadcast的数据流连接点junction。从akka-stream的技术文档得知:一对多,多对一或多对多类型的复杂数据流组件必须用GraphDSL来设计,产生Graph类型结果。前面我们提到过:Graph就是一种运算预案,要求所有的运算环节都必须是预先明确指定的,如此应该是无法实现动态的管道连接的。但akka-stream提供了MergeHub,BroadcastHub和PartitionHub来支持这样的功能需求。

1、MergeHub:多对一合并类型。支持动态的多个上游publisher连接

2、BroadcastHub:一对多扩散类型。支持动态的多个下游subscriber连接

3、PartitionHub:实际上是一对多扩散类型。通过一个函数来选择数据派送目的地

MergeHub对象中有个source函数:

 /** 
   * Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned 
   * by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized 
   * arbitrary many times and each of the materializations will feed the elements into the original [[Source]]. 
   * 
   * Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own 
   * [[Sink]] for feeding that materialization. 
   * 
   * If one of the inputs fails the [[Sink]], the [[Source]] is failed in turn (possibly jumping over already buffered 
   * elements). Completed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed 
   * and any new producers using the [[Sink]] will be cancelled. 
   * 
   * @param perProducerBufferSize Buffer space used per producer. Default value is 16. 
   */ 
  def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = 
    Source.fromGraph(new MergeHub[T](perProducerBufferSize))

MergeHub.source函数的返回结果类型是Source[T,Sink[T,NotUsed]],本质上MergeHub就是一个共用的Sink,如下所示:

  val fixedSink = Sink.foreach(println) 
  val sinkGraph: RunnableGraph[Sink[Any,NotUsed]] = MergeHub.source(perProducerBufferSize = 16).to(fixedSink) 
  val inGate: Sink[Any,NotUsed] = sinkGraph.run()   //common input 
 
  //now connect any number of source 
  val (killSwitch,_) = (Source(Stream.from(0)).delay(1.second,DelayOverflowStrategy.backpressure) 
      .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() 
 
  val (killSwitch2,_) = (Source(List("a","b","c","d","e")).delay(2.second,DelayOverflowStrategy.backpressure) 
    .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() 
 
  val (killSwitch3,_) = (Source(List("AA","BB","CC","DD","EE")).delay(3.second,DelayOverflowStrategy.backpressure) 
    .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() 
 
  scala.io.StdIn.readLine() 
  killSwitch.shutdown() 
  killSwitch2.shutdown() 
  killSwitch3.shutdown() 
  actorSys.terminate()

同样,BroadcastHub就是一种共用的Source,可以连接任何数量的下游subscriber。下面是BroadcastHub.sink的定义:

  /** 
   * Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set 
   * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized 
   * value. This [[Source]] can be materialized an arbitrary number of times and each materialization will receive the 
   * broadcast elements from the original [[Sink]]. 
   * 
   * Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own 
   * [[Source]] for consuming the [[Sink]] of that materialization. 
   * 
   * If the original [[Sink]] is failed, then the failure is immediately propagated to all of its materialized 
   * [[Source]]s (possibly jumping over already buffered elements). If the original [[Sink]] is completed, then 
   * all corresponding [[Source]]s are completed. Both failure and normal completion is "remembered" and later 
   * materializations of the [[Source]] will see the same (failure or completion) state. [[Source]]s that are 
   * cancelled are simply removed from the dynamic set of consumers. 
   * 
   * @param bufferSize Buffer size used by the producer. Gives an upper bound on how "far" from each other two 
   *                   concurrent consumers can be in terms of element. If this buffer is full, the producer 
   *                   is backpressured. Must be a power of two and less than 4096. 
   */ 
  def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = Sink.fromGraph(new BroadcastHub[T](bufferSize))

BroadcastHub.sink返回结果类型:Sink[T,Source[T,NotUsed]],就是个可连接任何数量下游的共用Source: 

  val killAll = KillSwitches.shared("terminator") 
  val fixedSource=Source(Stream.from(100)).delay(1.second,DelayOverflowStrategy.backpressure) 
  val sourceGraph = fixedSource.via(killAll.flow).toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).async 
  val outPort = sourceGraph.run()  //shared source 
  //now connect any number of sink to outPort 
  outPort.to(Sink.foreach{c =>println(s"A: $c")}).run() 
  outPort.to(Sink.foreach{c =>println(s"B: $c")}).run() 
  outPort.to(Sink.foreach{c =>println(s"C: $c")}).run()

还有一种做法是把MergeHub和BroadcastHub背对背连接起来形成一种多对多的形状。理论上应该能作为一种集散中心容许连接任何数量的上游publisher和下游subscriber。我们先把它们连接起来获得一个Sink和一个Source:

val (sink, source)  = MergeHub.source[Int](perProducerBufferSize = 16) 
           .toMat(BroadcastHub.sink(bufferSize = 16))(Keep.both).run()

理论上我们现在可以对sink和source进行任意连接了。但有个特殊情况是:当下游没有任何subscriber时上游所有producer都无法发送任何数据。这是由于backpressure造成的:作为一个合成的节点,下游速率跟不上则通过backpressure制约上游数据发布。我们可以安装一个泄洪机制来保证上游publisher数据推送的正常进行:

  source.runWith(Sink.ignore)

这样在没有任何下游subscriber的情况下,上游producer还是能够正常运作。

现在我们可以用Flow.fromSinkAndSource(sink, source)来构建一个Flow[I,O,?]:

  def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] = 
    fromSinkAndSourceMat(sink, source)(Keep.none)

我们还可以把上篇提到的KillSwitches.singleBidi用上:

 val channel: Flow[Int, Int, UniqueKillSwitch] = 
    Flow.fromSinkAndSource(sink, source) 
      .joinMat(KillSwitches.singleBidi[Int, Int])(Keep.right) 
      .backpressureTimeout(3.seconds)

上面backpressureTimeout保证了任何下游subscriber阻塞超时的话都会被强力终止。如下:

  /** 
   * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, 
   * the stream is failed with a [[scala.concurrent.TimeoutException]]. The timeout is checked periodically, 
   * so the resolution of the check is one period (equals to timeout value). 
   * 
   * '''Emits when''' upstream emits an element 
   * 
   * '''Backpressures when''' downstream backpressures 
   * 
   * '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand. 
   * 
   * '''Cancels when''' downstream cancels 
   */ 
  def backpressureTimeout(timeout: FiniteDuration): Repr[Out] = via(new Timers.BackpressureTimeout[Out](timeout))

好了,下面我们可以把channel当作Flow来使用了:

  val killChannel1 = fixedSource.viaMat(channel)(Keep.right).to(fixedSink).run() 
  val killChannel2 = Source.repeat(888) 
        .delay(2.second,DelayOverflowStrategy.backpressure) 
        .viaMat(channel)(Keep.right).to(fixedSink).run()

上面我们提到:PartitionHub就是一种特殊的BroadcastHub。功能是扩散型的。不过PartitionHub用了一个函数来选择下游的subscriber。从PartitionHub.sink函数款式可以看出:

 def sink[T](partitioner: (Int, T) ⇒ Int, startAfterNrOfConsumers: Int, 
              bufferSize: Int = defaultBufferSize): Sink[T, Source[T, NotUsed]] = 
    statefulSink(() ⇒ (info, elem) ⇒ info.consumerIdByIdx(partitioner(info.size, elem)), startAfterNrOfConsumers, bufferSize)

可以看出:partitioner函数就是一种典型的状态转换函数款式,实际上sink调用了statefulSink方法并固定了partitioner函数:

   * This `statefulSink` should be used when there is a need to keep mutable state in the partition function, 
   * e.g. for implemening round-robin or sticky session kind of routing. If state is not needed the [[#sink]] can 
   * be more convenient to use. 
   * 
   * @param partitioner Function that decides where to route an element. It is a factory of a function to 
   *   to be able to hold stateful variables that are unique for each materialization. The function 
   *   takes two parameters; the first is information about active consumers, including an array of consumer 
   *   identifiers and the second is the stream element. The function should return the selected consumer 
   *   identifier for the given element. The function will never be called when there are no active consumers, 
   *   i.e. there is always at least one element in the array of identifiers. 
   * @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected. 
   *   This is only used initially when the stage is starting up, i.e. it is not honored when consumers have 
   *   been removed (canceled). 
   * @param bufferSize Total number of elements that can be buffered. If this buffer is full, the producer 
   *   is backpressured. 
   */ 
  @ApiMayChange def statefulSink[T](partitioner: () ⇒ (ConsumerInfo, T) ⇒ Long, startAfterNrOfConsumers: Int, 
                                    bufferSize: Int = defaultBufferSize): Sink[T, Source[T, NotUsed]] = 
    Sink.fromGraph(new PartitionHub[T](partitioner, startAfterNrOfConsumers, bufferSize))

与BroadcastHub相同,我们首先构建一个共用的数据源producer,然后连接PartitionHub形成一个通往下游终端的通道让任何下游subscriber可以连接这个通道:

 //interupted temination 
  val killAll = KillSwitches.shared("terminator") 
  //fix a producer 
  val fixedSource = Source.tick(1.second, 1.second, "message") 
    .zipWith(Source(1 to 100))((a, b) => s"$a-$b") 
  //connect to PartitionHub which uses function to select sink 
  val sourceGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.sink( 
    (size, elem) => math.abs(elem.hashCode) % size, 
    startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right) 
  //materialize the source 
  val fromSource = sourceGraph.run() 
  //connect to fixedSource freely 
  fromSource.runForeach(msg => println("subs1: " + msg)) 
  fromSource.runForeach(msg => println("subs2: " + msg)) 
   
  scala.io.StdIn.readLine() 
  killAll.shutdown() 
  actorSys.terminate()

可以看到:上游数据流向多个下游中哪个subscriber是通过partitioner函数选定的。从这项功能来讲:PartitionHub又是某种路由Router。下面的例子实现了仿Router的RoundRobin推送策略: 

  //partitioner function 
  def roundRobin(): (PartitionHub.ConsumerInfo, String) ⇒ Long = { 
    var i = -1L 
 
    (info, elem) => { 
      i += 1 
      info.consumerIdByIdx((i % info.size).toInt) 
    } 
  } 
  val roundRobinGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.statefulSink( 
    () => roundRobin(),startAfterNrOfConsumers = 2,bufferSize = 256) 
  )(Keep.right) 
  val roundRobinSource = roundRobinGraph.run() 
 
  roundRobinSource.runForeach(msg => println("roundRobin1: " + msg)) 
  roundRobinSource.runForeach(msg => println("roundRobin2: " + msg))

上面例子里数据源流动方向是由roundRobin函数确定的。

而在下面这个例子里数据流向速率最快的subscriber:

  val producer = Source(0 until 100) 
 
  // ConsumerInfo.queueSize is the approximate number of buffered elements for a consumer. 
  // Note that this is a moving target since the elements are consumed concurrently. 
  val runnableGraph: RunnableGraph[Source[Int, NotUsed]] = 
  producer.via(killAll.flow).toMat(PartitionHub.statefulSink( 
    () => (info, elem) ⇒ info.consumerIds.minBy(id ⇒ info.queueSize(id)), 
    startAfterNrOfConsumers = 2, bufferSize = 16))(Keep.right) 
 
  val fromProducer: Source[Int, NotUsed] = runnableGraph.run() 
 
  fromProducer.runForeach(msg => println("fast1: " + msg)) 
  fromProducer.throttle(10, 100.millis, 10, ThrottleMode.Shaping) 
    .runForeach(msg => println("fast2: " + msg))

上面这个例子里partitioner函数是根据众下游的缓冲数量(queueSize)来确定数据应该流向哪个subscriber,queueSize数值越大则表示速率越慢。

下面是以上示范中MergeHub及BroadcastHub示范的源代码:

import akka.NotUsed 
import akka.stream.scaladsl._ 
import akka.stream._ 
import akka.actor._ 
 
import scala.concurrent.duration._ 
object HubsDemo extends App { 
  implicit val actorSys = ActorSystem("sys") 
  implicit val ec = actorSys.dispatcher 
  implicit val mat = ActorMaterializer( 
    ActorMaterializerSettings(actorSys) 
      .withInputBuffer(16,16) 
  ) 
 
  val fixedSink = Sink.foreach(println) 
  val sinkGraph: RunnableGraph[Sink[Any,NotUsed]] = MergeHub.source(perProducerBufferSize = 16).to(fixedSink).async 
  val inGate: Sink[Any,NotUsed] = sinkGraph.run()   //common input 
 
  //now connect any number of source 
  val (killSwitch,_) = (Source(Stream.from(0)).delay(1.second,DelayOverflowStrategy.backpressure) 
      .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() 
 
  val (killSwitch2,_) = (Source(List("a","b","c","d","e")).delay(2.second,DelayOverflowStrategy.backpressure) 
    .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() 
 
  val (killSwitch3,_) = (Source(List("AA","BB","CC","DD","EE")).delay(3.second,DelayOverflowStrategy.backpressure) 
    .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() 
 
 
  val killAll = KillSwitches.shared("terminator") 
  val fixedSource=Source(Stream.from(100)).delay(1.second,DelayOverflowStrategy.backpressure) 
  val sourceGraph = fixedSource.via(killAll.flow).toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).async 
  val outPort = sourceGraph.run()  //shared source 
  //now connect any number of sink to outPort 
  outPort.to(Sink.foreach{c =>println(s"A: $c")}).run() 
  outPort.to(Sink.foreach{c =>println(s"B: $c")}).run() 
  outPort.to(Sink.foreach{c =>println(s"C: $c")}).run() 
 
 
  val (sink, source)  = MergeHub.source[Int](perProducerBufferSize = 16) 
           .toMat(BroadcastHub.sink(bufferSize = 16))(Keep.both).run() 
 
  source.runWith(Sink.ignore) 
 
  val channel: Flow[Int, Int, UniqueKillSwitch] = 
    Flow.fromSinkAndSource(sink, source) 
      .joinMat(KillSwitches.singleBidi[Int, Int])(Keep.right) 
      .backpressureTimeout(3.seconds) 
 
  val killChannel1 = fixedSource.viaMat(channel)(Keep.right).to(fixedSink).run() 
  val killChannel2 = Source.repeat(888) 
        .delay(2.second,DelayOverflowStrategy.backpressure) 
        .viaMat(channel)(Keep.right).to(fixedSink).run() 
 
 
  scala.io.StdIn.readLine() 
  killSwitch.shutdown() 
  killSwitch2.shutdown() 
  killSwitch3.shutdown() 
  killAll.shutdown() 
  killChannel1.shutdown() 
  killChannel2.shutdown() 
  scala.io.StdIn.readLine() 
  actorSys.terminate() 
 
 
}

下面是PartitionHub示范源代码:

import akka.NotUsed 
import akka.stream.scaladsl._ 
import akka.stream._ 
import akka.actor._ 
 
import scala.concurrent.duration._ 
object PartitionHubDemo extends App { 
  implicit val actorSys = ActorSystem("sys") 
  implicit val ec = actorSys.dispatcher 
  implicit val mat = ActorMaterializer( 
    ActorMaterializerSettings(actorSys) 
      .withInputBuffer(16,16) 
  ) 
 
  //interupted temination 
  val killAll = KillSwitches.shared("terminator") 
  //fix a producer 
  val fixedSource = Source.tick(1.second, 1.second, "message") 
    .zipWith(Source(1 to 100))((a, b) => s"$a-$b") 
  //connect to PartitionHub which uses function to select sink 
  val sourceGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.sink( 
    (size, elem) => math.abs(elem.hashCode) % size, 
    startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right) 
  //materialize the source 
  val fromSource = sourceGraph.run() 
  //connect to fixedSource freely 
  fromSource.runForeach(msg => println("subs1: " + msg)) 
  fromSource.runForeach(msg => println("subs2: " + msg)) 
 
  //partitioner function 
  def roundRobin(): (PartitionHub.ConsumerInfo, String) ⇒ Long = { 
    var i = -1L 
 
    (info, elem) => { 
      i += 1 
      info.consumerIdByIdx((i % info.size).toInt) 
    } 
  } 
  val roundRobinGraph = fixedSource.via(killAll.flow).toMat(PartitionHub.statefulSink( 
    () => roundRobin(),startAfterNrOfConsumers = 2,bufferSize = 256) 
  )(Keep.right) 
  val roundRobinSource = roundRobinGraph.run() 
 
  roundRobinSource.runForeach(msg => println("roundRobin1: " + msg)) 
  roundRobinSource.runForeach(msg => println("roundRobin2: " + msg)) 
 
 
  val producer = Source(0 until 100) 
 
  // ConsumerInfo.queueSize is the approximate number of buffered elements for a consumer. 
  // Note that this is a moving target since the elements are consumed concurrently. 
  val runnableGraph: RunnableGraph[Source[Int, NotUsed]] = 
  producer.via(killAll.flow).toMat(PartitionHub.statefulSink( 
    () => (info, elem) ⇒ info.consumerIds.minBy(id ⇒ info.queueSize(id)), 
    startAfterNrOfConsumers = 2, bufferSize = 16))(Keep.right) 
 
  val fromProducer: Source[Int, NotUsed] = runnableGraph.run() 
 
  fromProducer.runForeach(msg => println("fast1: " + msg)) 
  fromProducer.throttle(10, 100.millis, 10, ThrottleMode.Shaping) 
    .runForeach(msg => println("fast2: " + msg)) 
 
 
  scala.io.StdIn.readLine() 
  killAll.shutdown() 
  actorSys.terminate() 
 
 
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/12839.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论