Scalaz(48)- scalaz-stream: 深入了解-Transducer: Process1-tee-wye详解编程语言


def await[F[_], A, O](req: F[A])(rcv: A => Process[F, O]): Process[F, O]  


type Process1[-I,+O] = Process[Env[I,Any]#Is, O]


/** The `Process1` which awaits a single input, emits it, then halts normally. */ 
def await1[I]: Process1[I, I] = 
def receive1[I, O](rcv: I => Process1[I, O]): Process1[I, O] = 


1  import Process._ 
2  def multiplyBy(n: Int): Process1[Int,Int] = 
3     await1[Int].flatMap { i => emit(i * n) }.repeat 
4                                        //> multiplyBy: (n: Int)[Int,Int] 
5  def addPosfix: Process1[Int,String] = 
6    receive1[Int,String]{ i => emit(i.toString + "!") }.repeat 
7                                        //> addPosfix: =>[Int,String]


1  (range(11,16).toSource pipe multiplyBy(5) |> addPosfix) 
2                                     //> res0: Vector[String] = Vector(55!, 60!, 65!, 70!, 75!)


1  import process1._ 
2  (range(11,16).toSource |> lift {(i: Int) => i * 5} |> addPosfix) 
3                                      //> res1: Vector[String] = Vector(55!, 60!, 65!, 70!, 75!)


1  range(11,16).toSource.flatMap { i => 
2   emit(i * 5) }.flatMap { i => 
3   emit(i.toString + "!") }       //> res1: Vector[String] = Vector(55!, 60!, 65!, 70!, 75!)



 1 import process1._ 
 2  (range(1,6).toSource pipe take(2)) 
 3                                      //> res2: Vector[Int] = Vector(1, 2) 
 4  (range(1,10).toSource |> filter {_ % 2 == 0 } 
 5   |> collect { 
 6     case 4 => "the number four" 
 7     case 5 => "the number five" 
 8     case 6 => "the number six" 
 9     case 100 => "the number one hundred" 
10     } 
11  )         //> res3: Vector[String] = Vector(the number four, the number six)


 1 (range(1,6).toSource 
 2   |> fold(Nil:List[Int]){ (b,a) => a :: b } 
 3  )                            //> res5: Vector[List[Int]] = Vector(List(5, 4, 3, 2, 1)) 
 4 (range(1,6).toSource 
 5   |> foldMap { List(_) } 
 6  )                            //> res6: Vector[List[Int]] = Vector(List(1, 2, 3, 4, 5)) 
 7 (range(1,6).toSource 
 8   |> foldMap { identity } 
 9  )                            //> res7: Vector[Int] = Vector(15) 
10 (range(1,6).toSource 
11   |> sum 
12  )                            //> res8: Vector[Int] = Vector(15) 
13 (range(1,6).toSource 
14   |> scan(0){(a,b) => a + b} 
15  )                            //> res9: Vector[Int] = Vector(0, 1, 3, 6, 10, 15)


1  (range(1,6).toSource 
2   |> feed(6 to 10)(lift(identity)) 
3   )                         //> res10: Vector[Int] = Vector(6, 7, 8, 9, 10, 1, 2, 3, 4, 5) 
4  (range(1,6).toSource 
5   |> feed(6 to 10)(lift(identity)) 
6   |> foldMap {identity} 
7   )                         //> res11: Vector[Int] = Vector(55)


   * A stream transducer that can read from one of two inputs, 
   * the 'left' (of type `I`) or the 'right' (of type `I2`). 
   * `Process1[I,O] <: Tee[I,I2,O]`. 
  type Tee[-I,-I2,+O] = Process[Env[I,I2]#T, O]


   * Awaits to receive input from Left side, 
   * than if that request terminates with `End` or is terminated abnormally 
   * runs the supplied `continue` or `cleanup`. 
   * Otherwise `rcv` is run to produce next state. 
   * If  you don't need `continue` or `cleanup` use rather `awaitL.flatMap` 
  def receiveL[I, I2, O](rcv: I => Tee[I, I2, O]): Tee[I, I2, O] = 
    await[Env[I, I2]#T, I, O](L)(rcv) 
   * Awaits to receive input from Right side, 
   * than if that request terminates with `End` or is terminated abnormally 
   * runs the supplied continue. 
   * Otherwise `rcv` is run to produce next state. 
   * If  you don't need `continue` or `cleanup` use rather `awaitR.flatMap` 
  def receiveR[I, I2, O](rcv: I2 => Tee[I, I2, O]): Tee[I, I2, O] = 
    await[Env[I, I2]#T, I2, O](R)(rcv)


case class Env[-I, -I2]() { 
    sealed trait Y[-X] { 
      def tag: Int 
      def fold[R](l: => R, r: => R, both: => R): R 
    sealed trait T[-X] extends Y[X] 
    sealed trait Is[-X] extends T[X] 
    case object Left extends Is[I] { 
      def tag = 0 
      def fold[R](l: => R, r: => R, both: => R): R = l 
    case object Right extends T[I2] { 
      def tag = 1 
      def fold[R](l: => R, r: => R, both: => R): R = r 
    case object Both extends Y[ReceiveY[I, I2]] { 
      def tag = 2 
      def fold[R](l: => R, r: => R, both: => R): R = both 
  private val Left_  = Env[Any, Any]().Left 
  private val Right_ = Env[Any, Any]().Right 
  private val Both_  = Env[Any, Any]().Both 
  def Get[I]: Env[I, Any]#Is[I] = Left_ 
  def L[I]: Env[I, Any]#Is[I] = Left_ 
  def R[I2]: Env[Any, I2]#T[I2] = Right_ 
  def Both[I, I2]: Env[I, I2]#Y[ReceiveY[I, I2]] = Both_


   * Use a `Tee` to interleave or combine the outputs of `this` and 
   * `p2`. This can be used for zipping, interleaving, and so forth. 
   * Nothing requires that the `Tee` read elements from each 
   * `Process` in lockstep. It could read fifty elements from one 
   * side, then two elements from the other, then combine or 
   * interleave these values in some way, etc. 
   * If at any point the `Tee` awaits on a side that has halted, 
   * we gracefully kill off the other side, then halt. 
   * If at any point `t` terminates with cause `c`, both sides are killed, and 
   * the resulting `Process` terminates with `c`. 
  final def tee[F2[x] >: F[x], O2, O3](p2: Process[F2, O2])(t: Tee[O, O2, O3]): Process[F2, O3]

用伪代码表示就是:leftProcess.tee(rightProcess)(teeFunction): newProcess


 /** Alternate emitting elements from `this` and `p2`, starting with `this`. */ 
  def interleave[F2[x] >: F[x], O2 >: O](p2: Process[F2, O2]): Process[F2, O2] = 
  /** Call `tee` with the `zipWith` `Tee[O,O2,O3]` defined in `tee.scala`. */ 
  def zipWith[F2[x] >: F[x], O2, O3](p2: Process[F2, O2])(f: (O, O2) => O3): Process[F2, O3] = 
  /** Call `tee` with the `zip` `Tee[O,O2,O3]` defined in `tee.scala`. */ 
  def zip[F2[x] >: F[x], O2](p2: Process[F2, O2]): Process[F2, (O, O2)] = 
   * When `condition` is `true`, lets through any values in `this` process, otherwise blocks 
   * until `condition` becomes true again. Note that the `condition` is checked before 
   * each and every read from `this`, so `condition` should return very quickly or be 
   * continuous to avoid holding up the output `Process`. Use `condition.forwardFill` to 
   * convert an infrequent discrete `Process` to a continuous one for use with this 
   * function. 
  def when[F2[x] >: F[x], O2 >: O](condition: Process[F2, Boolean]): Process[F2, O2] = 
   * Halts this `Process` as soon as `condition` becomes `true`. Note that `condition` 
   * is checked before each and every read from `this`, so `condition` should return 
   * very quickly or be continuous to avoid holding up the output `Process`. Use 
   * `condition.forwardFill` to convert an infrequent discrete `Process` to a 
   * continuous one for use with this function. 
  def until[F2[x] >: F[x], O2 >: O](condition: Process[F2, Boolean]): Process[F2, O2] = 


/** A `Tee` which ignores all input from left. */ 
  def passR[I2]: Tee[Any, I2, I2] = awaitR[I2].repeat 
  /** A `Tee` which ignores all input from the right. */ 
  def passL[I]: Tee[I, Any, I] = awaitL[I].repeat 
  /** Echoes the right branch until the left branch becomes `true`, then halts. */ 
  def until[I]: Tee[Boolean, I, I] = 
    awaitL[Boolean].flatMap(kill => if (kill) halt else awaitR[I] ++ until) 
  /** Echoes the right branch when the left branch is `true`. */ 
  def when[I]: Tee[Boolean, I, I] = 
    awaitL[Boolean].flatMap(ok => if (ok) awaitR[I] ++ when else when) 
  /** Defined as `zipWith((_,_))` */ 
  def zip[I, I2]: Tee[I, I2, (I, I2)] = zipWith((_, _)) 
  /** Defined as `zipWith((arg,f) => f(arg)` */ 
  def zipApply[I,I2]: Tee[I, I => I2, I2] = zipWith((arg,f) => f(arg)) 
  /** A version of `zip` that pads the shorter stream with values. */ 
  def zipAll[I, I2](padI: I, padI2: I2): Tee[I, I2, (I, I2)] = 
    zipWithAll(padI, padI2)((_, _))


 1 import tee._ 
 2  val source = range(1,6).toSource                 //> source  :[scalaz.concurrent.Task,Int] = Append(Halt(End),Vector(<function1>)) 
 3  val seq = emitAll(Seq("a","b","c"))              //> seq  :[String] = Emit(List(a, b, c)) 
 4  val signalw = Process(true,true,false,true)      //> signalw  :[Boolean] = Emit(WrappedArray(true, true, false, true)) 
 5  val signalu = Process(false,true,false,true)     //> signalu  :[Boolean] = Emit(WrappedArray(false, true,false, true)) 
 7  source.tee(seq)(interleave)           //> res12: Vector[Any] = Vector(1, a, 2, b, 3, c) 
 8  (source interleave seq)               //> res13: Vector[Any] = Vector(1, a, 2, b, 3, c) 
 9  signalu.tee(source)(until)            //> res14: Vector[Int] = Vector(1) 
10  signalw.tee(source)(when)             //> res15: Vector[Int] = Vector(1, 2, 3) 
11  source.tee(seq)(passL)                //> res16: Vector[Int] = Vector(1, 2, 3, 4, 5) 
12  source.tee(seq)(passR)                //> res17: Vector[String] = Vector(a, b, c) 
13  (source zip seq)                      //> res18: Vector[(Int, String)] = Vector((1,a), (2,b), (3,c)) 
14  (seq zip source)                      //> res19: Vector[(String, Int)] = Vector((a,1), (b,2), (c,3)) 
15  (source.zipWith(seq){(a,b) => a.toString + b}) 
16                                                   //> res20: Vector[String] = Vector(1a, 2b, 3c)


/** Feed a sequence of inputs to the left side of a `Tee`. */ 
  def feedL[I, I2, O](i: Seq[I])(p: Tee[I, I2, O]): Tee[I, I2, O] = {...} 
 /** Feed a sequence of inputs to the right side of a `Tee`. */ 
  def feedR[I, I2, O](i: Seq[I2])(p: Tee[I, I2, O]): Tee[I, I2, O] = {...}


1 val ltee = tee.feedL(Seq(1,2,3))(id[Int])        //> ltee  :[Int,Any,Int] = Append(Emit(Vector(1, 2)),Vector(<function1>)) 
2  halt.tee[Task,Int,Int](halt)(ltee)    //> res21: Vector[Int] = Vector(1, 2, 3) 
3  source.tee[Task,Int,Int](halt)(ltee)  //> res22: Vector[Int] = Vector(1, 2, 3, 1, 2, 3, 4, 5)


   * Like `tee`, but we allow the `Wye` to read non-deterministically 
   * from both sides at once. 
   * If `y` is in the state of awaiting `Both`, this implementation 
   * will continue feeding `y` from either left or right side, 
   * until either it halts or _both_ sides halt. 
   * If `y` is in the state of awaiting `L`, and the left 
   * input has halted, we halt. Likewise for the right side. 
   * For as long as `y` permits it, this implementation will _always_ 
   * feed it any leading `Emit` elements from either side before issuing 
   * new `F` requests. More sophisticated chunking and fairness 
   * policies do not belong here, but should be built into the `Wye` 
   * and/or its inputs. 
   * The strategy passed in must be stack-safe, otherwise this implementation 
   * will throw SOE. Preferably use one of the `Strategys.Executor(es)` based strategies 
  final def wye[O2, O3](p2: Process[Task, O2])(y: Wye[O, O2, O3])(implicit S: Strategy): Process[Task, O3] =[O, O2, O3](self, p2)(y)(S)


   * After each input, dynamically determine whether to read from the left, right, or both, 
   * for the subsequent input, using the provided functions `f` and `g`. The returned 
   * `Wye` begins by reading from the left side and is left-biased--if a read of both branches 
   * returns a `These(x,y)`, it uses the signal generated by `f` for its next step. 
  def dynamic[I,I2](f: I => wye.Request, g: I2 => wye.Request): Wye[I,I2,ReceiveY[I,I2]] = { 
    def go(signal: wye.Request): Wye[I,I2,ReceiveY[I,I2]] = signal match { 
      case L => receiveL { i => emit(ReceiveL(i)) ++ go(f(i)) } 
      case R => receiveR { i2 => emit(ReceiveR(i2)) ++ go(g(i2)) } 
      case Both => receiveBoth { 
        case [email protected](i) => emit(t) ++ go(f(i)) 
        case [email protected](i2) => emit(t) ++ go(g(i2)) 
        case HaltOne(rsn) => Halt(rsn) 
   * Non-deterministic interleave of both inputs. Emits values whenever either 
   * of the inputs is available. 
   * Will terminate once both sides terminate. 
  def merge[I]: Wye[I,I,I] = 
    receiveBoth { 
      case ReceiveL(i) => emit(i) ++ merge 
      case ReceiveR(i) => emit(i) ++ merge 
      case HaltL(End)   => awaitR.repeat 
      case HaltR(End)   => awaitL.repeat 
      case HaltOne(rsn) => Halt(rsn) 
   * Nondeterminstic interleave of both inputs. Emits values whenever either 
   * of the inputs is available. 
  def either[I,I2]: Wye[I,I2,I // I2] = 
    receiveBoth { 
      case ReceiveL(i) => emit(left(i)) ++ either 
      case ReceiveR(i) => emit(right(i)) ++ either 
      case HaltL(End)     => awaitR[I2].map(right).repeat 
      case HaltR(End)     => awaitL[I].map(left).repeat 
      case [email protected](rsn) => Halt(rsn) 


1 import wye._ 
2  source.wye(seq)(either)               //> res23: Vector[scalaz.//[Int,String]] = Vector(-//(1), //-(a), //-(b), //-(c), -//(2), -//(3), -//(4), -//(5)) 
3  (source either seq)                   //> res24: Vector[scalaz.//[Int,String]] = Vector(-//(1), //-(a), //-(b), //-(c), -//(2), -//(3), -//(4), -//(5)) 
4  source.wye(seq)(merge)                //> res25: Vector[Any] = Vector(1, a, b, c, 2, 3, 4, 5) 
5  (source merge seq)                    //> res26: Vector[Any] = Vector(1, a, b, c, 2, 3, 4, 5)


1  val w = dynamic((r:Int) => Request.R, (l:String) => Request.L) 
2                                                   //> w  :[Int,String,[Int,String]] = Await(Left,<function1>,<function1>) 
3  source.wye(seq)(w)                    //> res27: Vector[[Int,String]] = Vector(ReceiveL(1), ReceiveR(a), ReceiveL(2), ReceiveR(b), ReceiveL(3), ReceiveR(c), ReceiveL(4)) 
4  val fw = dynamic((r: Int) => if (r % 3 == 0) { 
5    Request.R } else {Request.L}, (l:String) => Request.L) 
6                                                   //> fw  :[Int,String,[Int,String]] = Await(Left,<function1>,<function1>) 
7  source.wye(seq)(fw)                   //> res28: Vector[[Int,String]] = Vector(ReceiveL(1), ReceiveL(2), ReceiveL(3), ReceiveR(a), ReceiveL(4), ReceiveL(5))


1  val lwye = wye.feedL(Seq(1,2,3))(id[Int])        //> lwye  :[Int,Any,Int] = Append(Emit(Vector(1, 2)),Vector(< 
2                                                   //| function1>)) 
3  halt.wye(halt)(lwye)                  //> res29: Vector[Int] = Vector(1, 2, 3) 
4  source.wye(halt)(lwye)                //> res30: Vector[Int] = Vector(1, 2, 3, 1, 2, 3, 4, 5)




























上一篇 2021年7月19日
下一篇 2021年7月19日


