akka-stream的数据流可以由一些组件组合而成。这些组件统称数据流图Graph,它描述了数据流向和处理环节。Source,Flow,Sink是最基础的Graph。用基础Graph又可以组合更复杂的复合Graph。如果一个Graph的所有端口(输入、输出)都是连接的话就是一个闭合流图RunnableGraph,否则就属于·开放流图PartialGraph。一个完整的(可运算的)数据流就是一个RunnableGraph。Graph的输出出入端口可以用Shape来描述:
/** 
 * A Shape describes the inlets and outlets of a [[Graph]]. In keeping with the 
 * philosophy that a Graph is a freely reusable blueprint, everything that 
 * matters from the outside are the connections that can be made with it, 
 * otherwise it is just a black box. 
 */ 
abstract class Shape { 
  /** 
   * Scala API: get a list of all input ports 
   */ 
  def inlets: immutable.Seq[Inlet[_]] 
 
  /** 
   * Scala API: get a list of all output ports 
   */ 
  def outlets: immutable.Seq[Outlet[_]] 
 
...
Shape类型的抽象函数inlets,outlets分别代表Graph形状的输入、输出端口。下面列出了aka-stream提供的几个现有形状Shape:
final case class SourceShape[+T](out: Outlet[T @uncheckedVariance]) extends Shape {...} 
final case class FlowShape[-I, +O](in: Inlet[I @uncheckedVariance], out: Outlet[O @uncheckedVariance]) extends Shape {...} 
final case class SinkShape[-T](in: Inlet[T @uncheckedVariance]) extends Shape {...} 
sealed abstract class ClosedShape extends Shape 
/** 
 * A bidirectional flow of elements that consequently has two inputs and two 
 * outputs, arranged like this: 
 * 
 * {{{ 
 *        +------+ 
 *  In1 ~>|      |~> Out1 
 *        | bidi | 
 * Out2 <~|      |<~ In2 
 *        +------+ 
 * }}} 
 */ 
final case class BidiShape[-In1, +Out1, -In2, +Out2]( 
  in1:  Inlet[In1 @uncheckedVariance], 
  out1: Outlet[Out1 @uncheckedVariance], 
  in2:  Inlet[In2 @uncheckedVariance], 
  out2: Outlet[Out2 @uncheckedVariance]) extends Shape {...} 
object UniformFanInShape { 
  def apply[I, O](outlet: Outlet[O], inlets: Inlet[I]*): UniformFanInShape[I, O] = 
    new UniformFanInShape(inlets.size, FanInShape.Ports(outlet, inlets.toList)) 
} 
object UniformFanOutShape { 
  def apply[I, O](inlet: Inlet[I], outlets: Outlet[O]*): UniformFanOutShape[I, O] = 
    new UniformFanOutShape(outlets.size, FanOutShape.Ports(inlet, outlets.toList)) 
}
Shape是Graph类型的一个参数:
trait Graph[+S <: Shape, +M] { 
  /** 
   * Type-level accessor for the shape parameter of this graph. 
   */ 
  type Shape = S @uncheckedVariance 
  /** 
   * The shape of a graph is all that is externally visible: its inlets and outlets. 
   */ 
  def shape: S 
...
RunnableGraph类型的Shape是ClosedShape:
/** 
 * Flow with attached input and output, can be executed. 
 */ 
final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBuilder) extends Graph[ClosedShape, Mat] { 
  override def shape = ClosedShape 
 
  /** 
   * Transform only the materialized value of this RunnableGraph, leaving all other properties as they were. 
   */ 
  def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): RunnableGraph[Mat2] = 
    copy(traversalBuilder.transformMat(f.asInstanceOf[Any ⇒ Any])) 
 
  /** 
   * Run this flow and return the materialized instance from the flow. 
   */ 
  def run()(implicit materializer: Materializer): Mat = materializer.materialize(this) 
...
我们可以用akka-stream提供的GraphDSL来构建Graph。GraphDSL继承了GraphApply的create方法,GraphDSL.create(…)就是构建Graph的方法:
object GraphDSL extends GraphApply {...} 
trait GraphApply { 
  /** 
   * Creates a new [[Graph]] by passing a [[GraphDSL.Builder]] to the given create function. 
   */ 
  def create[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] ⇒ S): Graph[S, NotUsed] = { 
    val builder = new GraphDSL.Builder 
    val s = buildBlock(builder) 
 
    createGraph(s, builder) 
  } 
... 
def create[S <: Shape, Mat](g1: Graph[Shape, Mat])(buildBlock: GraphDSL.Builder[Mat] ⇒ (g1.Shape) ⇒ S): Graph[S, Mat] = {...} 
def create[S <: Shape, Mat, M1, M2](g1: Graph[Shape, M1], g2: Graph[Shape, M2])(combineMat: (M1, M2) ⇒ Mat)(buildBlock: GraphDSL.Builder[Mat] ⇒ (g1.Shape, g2.Shape) ⇒ S): Graph[S, Mat] = {...} 
... 
def create[S <: Shape, Mat, M1, M2, M3, M4, M5](g1: Graph[Shape, M1], g2: Graph[Shape, M2], g3: Graph[Shape, M3], g4: Graph[Shape, M4], g5: Graph[Shape, M5])(combineMat: (M1, M2, M3, M4, M5) ⇒ Mat)(buildBlock: GraphDSL.Builder[Mat] ⇒ (g1.Shape, g2.Shape, g3.Shape, g4.Shape, g5.Shape) ⇒ S): Graph[S, Mat] = { 
...}
buildBlock函数类型:buildBlock: GraphDSL.Builder[Mat] ⇒ (g1.Shape, g2.Shape,…,g5.Shape) ⇒ S,g?代表合并处理后的开放型流图。下面是几个最基本的Graph构建试例:
import akka.actor._ 
import akka.stream._ 
import akka.stream.scaladsl._ 
 
object SimpleGraphs extends App{ 
 
  implicit val sys = ActorSystem("streamSys") 
  implicit val ec = sys.dispatcher 
  implicit val mat = ActorMaterializer() 
 
  val source = Source(1 to 10) 
  val flow = Flow[Int].map(_ * 2) 
  val sink = Sink.foreach(println) 
 
 
  val sourceGraph = GraphDSL.create(){implicit builder => 
    import GraphDSL.Implicits._ 
    val src = source.filter(_ % 2 == 0) 
    val pipe = builder.add(Flow[Int]) 
    src ~> pipe.in 
    SourceShape(pipe.out) 
  } 
 
  Source.fromGraph(sourceGraph).runWith(sink).andThen{case _ => } // sys.terminate()} 
 
  val flowGraph = GraphDSL.create(){implicit builder => 
    import GraphDSL.Implicits._ 
 
    val pipe = builder.add(Flow[Int]) 
    FlowShape(pipe.in,pipe.out) 
  } 
 
  val (_,fut) = Flow.fromGraph(flowGraph).runWith(source,sink) 
  fut.andThen{case _ => } //sys.terminate()} 
 
 
  val sinkGraph = GraphDSL.create(){implicit builder => 
     import GraphDSL.Implicits._ 
     val pipe = builder.add(Flow[Int]) 
     pipe.out.map(_ * 3) ~> Sink.foreach(println) 
     SinkShape(pipe.in) 
  } 
 
  val fut1 = Sink.fromGraph(sinkGraph).runWith(source) 
 
  Thread.sleep(1000) 
  sys.terminate()
上面我们示范了Source,Flow,Sink的Graph编写,我们使用了Flow[Int]作为共同基础组件。我们知道:akka-stream的Graph可以用更简单的Partial-Graph来组合,而所有Graph最终都是用基础流图Core-Graph如Source,Flow,Sink组合而成的。上面例子里我们是用builder.add(…)把一个Flow Graph加入到一个空的Graph模版里,builder.add返回Shape pipe用于揭露这个被加入的Graph的输入输出端口。然后我们按目标Graph的功能要求把pipe的端口连接起来就完成了这个数据流图的设计了。测试使用证明这几个Graph的功能符合预想。下面我们还可以试着自定义一种类似的Pipe类型Graph来更细致的了解Graph组合的过程。所有基础组件Core-Graph都必须定义Shape来描述它的输入输出端口,定义GraphStage中的GraphStateLogic来描述对数据流元素具体的读写方式。
import akka.actor._ 
import akka.stream._ 
import akka.stream.scaladsl._ 
import scala.collection.immutable 
 
case class PipeShape[In,Out]( 
    in: Inlet[In], 
    out: Outlet[Out]) extends Shape { 
 
  override def inlets: immutable.Seq[Inlet[_]] = in :: Nil 
 
  override def outlets: immutable.Seq[Outlet[_]] = out :: Nil 
 
  override def deepCopy(): Shape =  
    PipeShape( 
      in = in.carbonCopy(), 
      out = out.carbonCopy() 
    ) 
}
PipeShape有一个输入端口和一个输出端口。因为继承了Shape类所以必须实现Shape类的抽象函数。假设我们设计一个Graph,能把用户提供的一个函数用来对输入元素进行施用,如:source.via(ApplyPipe(myFunc)).runWith(sink)。当然,我们可以直接使用source.map(r => myFunc).runWith(sink),不过我们需要的是:ApplyPipe里可能涉及到许多预设定的共用功能,然后myFunc是其中的一部分代码。如果用map(…)的话用户就必须提供所有的代码了。ApplyPipe的形状是PipeShape,下面是它的GraphState设计:
  class Pipe[In, Out](f: In => Out) extends GraphStage[PipeShape[In, Out]] { 
    val in = Inlet[In]("Pipe.in") 
    val out = Outlet[Out]("Pipe.out") 
 
    override def shape = PipeShape(in, out) 
 
    override def initialAttributes: Attributes = Attributes.none 
 
    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
      new GraphStageLogic(shape) with InHandler with OutHandler { 
 
        private def decider = 
          inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) 
         
        override def onPull(): Unit = pull(in) 
 
        override def onPush(): Unit = { 
          try { 
            push(out, f(grab(in))) 
          } 
          catch { 
            case NonFatal(ex) ⇒ decider(ex) match { 
              case Supervision.Stop ⇒ failStage(ex) 
              case _ ⇒ pull(in) 
            } 
          } 
        } 
 
        setHandlers(in,out, this) 
      } 
  }
在这个Pipe GraphStage定义里首先定义了输入输出端口in,out,然后通过createLogic来定义GraphStageLogic,InHandler,outHandler。InHandler和OutHandler分别对应输入输出端口上数据元素的活动处理方式:
/** 
 * Collection of callbacks for an input port of a [[GraphStage]] 
 */ 
trait InHandler { 
  /** 
   * Called when the input port has a new element available. The actual element can be retrieved via the 
   * [[GraphStageLogic.grab()]] method. 
   */ 
  @throws(classOf[Exception]) 
  def onPush(): Unit 
 
  /** 
   * Called when the input port is finished. After this callback no other callbacks will be called for this port. 
   */ 
  @throws(classOf[Exception]) 
  def onUpstreamFinish(): Unit = GraphInterpreter.currentInterpreter.activeStage.completeStage() 
 
  /** 
   * Called when the input port has failed. After this callback no other callbacks will be called for this port. 
   */ 
  @throws(classOf[Exception]) 
  def onUpstreamFailure(ex: Throwable): Unit = GraphInterpreter.currentInterpreter.activeStage.failStage(ex) 
} 
 
/** 
 * Collection of callbacks for an output port of a [[GraphStage]] 
 */ 
trait OutHandler { 
  /** 
   * Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]] 
   * is now allowed to be called on this port. 
   */ 
  @throws(classOf[Exception]) 
  def onPull(): Unit 
 
  /** 
   * Called when the output port will no longer accept any new elements. After this callback no other callbacks will 
   * be called for this port. 
   */ 
  @throws(classOf[Exception]) 
  def onDownstreamFinish(): Unit = { 
    GraphInterpreter 
      .currentInterpreter 
      .activeStage 
      .completeStage() 
  } 
}
akka-stream Graph的输入输出处理实现了Reactive-Stream协议。所以我们最好使用akka-stream提供现成的pull,push来重写抽象函数onPull,onPush。然后用setHandlers来设定这个GraphStage的输入输出及处理函数handler:
  /** 
   * Assign callbacks for linear stage for both [[Inlet]] and [[Outlet]] 
   */ 
  final protected def setHandlers(in: Inlet[_], out: Outlet[_], handler: InHandler with OutHandler): Unit = { 
    setHandler(in, handler) 
    setHandler(out, handler) 
  } 
 /** 
   * Assigns callbacks for the events for an [[Inlet]] 
   */ 
  final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = { 
    handlers(in.id) = handler 
    if (_interpreter != null) _interpreter.setHandler(conn(in), handler) 
  } 
  /** 
   * Assigns callbacks for the events for an [[Outlet]] 
   */ 
  final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = { 
    handlers(out.id + inCount) = handler 
    if (_interpreter != null) _interpreter.setHandler(conn(out), handler) 
  }
有了Shape和GraphStage后我们就可以构建一个Graph:
def applyPipe[In,Out](f: In => Out) = GraphDSL.create() {implicit builder => 
    val pipe = builder.add(new Pipe(f)) 
    FlowShape(pipe.in,pipe.out) 
  }
也可以直接用来组合一个复合Graph:
  RunnableGraph.fromGraph( 
    GraphDSL.create(){implicit builder => 
      import GraphDSL.Implicits._ 
 
      val source = Source(1 to 10) 
      val sink = Sink.foreach(println) 
      val f: Int => Int = _ * 3 
      val pipeShape = builder.add(new Pipe[Int,Int](f)) 
      source ~> pipeShape.in 
      pipeShape.out~> sink 
      ClosedShape 
 
    } 
  ).run()
整个例子源代码如下:
import akka.actor._ 
import akka.stream._ 
import akka.stream.scaladsl._ 
import akka.stream.ActorAttributes._ 
import akka.stream.stage._ 
 
import scala.collection.immutable 
import scala.util.control.NonFatal 
 
object PipeOps { 
 
  case class PipeShape[In, Out]( 
                                 in: Inlet[In], 
                                 out: Outlet[Out]) extends Shape { 
 
    override def inlets: immutable.Seq[Inlet[_]] = in :: Nil 
 
    override def outlets: immutable.Seq[Outlet[_]] = out :: Nil 
 
    override def deepCopy(): Shape = 
      PipeShape( 
        in = in.carbonCopy(), 
        out = out.carbonCopy() 
      ) 
  } 
 
  class Pipe[In, Out](f: In => Out) extends GraphStage[PipeShape[In, Out]] { 
    val in = Inlet[In]("Pipe.in") 
    val out = Outlet[Out]("Pipe.out") 
 
    override def shape = PipeShape(in, out) 
 
    override def initialAttributes: Attributes = Attributes.none 
 
    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
      new GraphStageLogic(shape) with InHandler with OutHandler { 
 
        private def decider = 
          inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) 
 
        override def onPull(): Unit = pull(in) 
 
        override def onPush(): Unit = { 
          try { 
            push(out, f(grab(in))) 
          } 
          catch { 
            case NonFatal(ex) ⇒ decider(ex) match { 
              case Supervision.Stop ⇒ failStage(ex) 
              case _ ⇒ pull(in) 
            } 
          } 
        } 
 
        setHandlers(in,out, this) 
      } 
  } 
 
  def applyPipe[In,Out](f: In => Out) = GraphDSL.create() {implicit builder => 
    val pipe = builder.add(new Pipe(f)) 
    FlowShape(pipe.in,pipe.out) 
  } 
 
} 
 
object ShapeDemo1 extends App { 
import PipeOps._ 
  implicit val sys = ActorSystem("streamSys") 
  implicit val ec = sys.dispatcher 
  implicit val mat = ActorMaterializer() 
 
  RunnableGraph.fromGraph( 
    GraphDSL.create(){implicit builder => 
      import GraphDSL.Implicits._ 
 
      val source = Source(1 to 10) 
      val sink = Sink.foreach(println) 
      val f: Int => Int = _ * 3 
      val pipeShape = builder.add(new Pipe[Int,Int](f)) 
      source ~> pipeShape.in 
      pipeShape.out~> sink 
      ClosedShape 
 
    } 
  ).run() 
 
 
  val fut = Source(1 to 10).via(applyPipe[Int,Int](_ * 2)).runForeach(println) 
 
  scala.io.StdIn.readLine() 
 
  sys.terminate() 
 
 
}
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/industrynews/12843.html
