Scalaz(55)- scalaz-stream: fs2-基础介绍,fs2 stream transformation详解编程语言

    fs2是scalaz-stream的最新版本,沿用了scalaz-stream被动式(pull model)数据流原理但采用了全新的实现方法。fs2比较scalaz-stream而言具备了:更精简的基础组件(combinator)、更安全的类型、资源使用(type safe, resource safety)、更高的运算效率。由于fs2基本沿用了scalaz-stream的原理,所以我们会在下面的讨论里着重介绍fs2的使用。根据fs2的官方文件,fs2具备了以下新的特点:

1、完全不含任何外部依赖(third-party dependency)

2、流元素增加了节组(chunk)类型和相关的操作方法

3、fs2不再只局限于Task一种副作用运算方式(effect)。用户可以提供自己的effect类型

4、更精简的流转换组件(stream transformation primitives)

5、增加了更多并行运算组件(concurrent primitives)

6、通过bracket函数增强了资源使用安全,特别是异线程资源占用的事后处理过程。用onFinalize取代了onComplete

7、stream状态转换采用了全新的实现方式,使用了新的数据结构:Pull

8、Stream取代了Process。fs2中再没有Process1、Tee、Wye、Channel这些类型别名,取而代之的是:   

  • type Pipe[F,A,B] = Stream[F,A] => Stream[F,B]
  • type Pipe2[F,A,B,C] = (Stream[F,A], Stream[F,B]) => Stream[F,C]
  • Pipe 替代了 Channel 和 Process1 
  • Pipe2 替代了 Tee 和 Wye

下面我们来看看fs2的一些基本操作:

 

1 Stream()                       //> res0: fs2.Stream[Nothing,Nothing] = Segment(Emit(Chunk())) 
2 Stream(1,2,3)                  //> res1: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(1, 2, 3))) 
3 Stream.emit(4)                 //> res2: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(4))) 
4 Stream.emits(Seq(1,2,3))       //> res3: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(1, 2, 3)))

 

Stream的类型款式是:Stream[F[_],A]。从上面的例子我们看到所有的F[_]都是Nothing,我们称这样的流为纯数据流(pure stream)。再值得注意的是每个流构建都形成了一个Chunk,代表一节元素。fs2增加了Chunk类型来提高数据元素处理效率。这是fs2的一项新功能。

我们可以用toList或者toVector来运算纯数据流中的元素值:

 

1 Stream.emits(Seq(1,2,3)).toList        //> res3: List[Int] = List(1, 2, 3) 
2 Stream.emits(Seq(1,2,3)).toVector      //> res4: Vector[Int] = Vector(1, 2, 3)

纯数据流具备了许多与List相似的操作函数:

 

 1 (Stream(1,2,3) ++ Stream(4,5)).toList             //> res5: List[Int] = List(1, 2, 3, 4, 5) 
 2 Stream(1,2,3).map { _ + 1}.toList                 //> res6: List[Int] = List(2, 3, 4) 
 3 Stream(1,2,3).filter { _ % 2 == 0}.toList         //> res7: List[Int] = List(2) 
 4 Stream(1,2,3).fold(0)(_ + _).toList               //> res8: List[Int] = List(6) 
 5 Stream(None,Some(1),Some(3),None).collect { 
 6   case None => 0 
 7   case Some(i) => i 
 8 }.toList                                          //> res9: List[Int] = List(0, 1, 3, 0) 
 9 Stream.range(1,5).intersperse(42).toList          //> res10: List[Int] = List(1, 42, 2, 42, 3, 42, 4) 
10 Stream(1,2,3).flatMap {x => Stream(x,x)}.toList   //> res11: List[Int] = List(1, 1, 2, 2, 3, 3) 
11 Stream(1,2,3).repeat.take(5).toList               //> res12: List[Int] = List(1, 2, 3, 1, 2)

 

以上都是一些基本的List操作函数示范。

我们知道,纯数据流就是scalaz-stream里的Process1,即transducer,是负责对流进行状态转换的。在fs2里transducer就是Pipe(也是channel),我们一般用through来连接transducer。上面示范中的take,filter等都是transducer,我们可以在object pipe里找到这些函数:

 1 object pipe { 
 2 ... 
 3 /** Drop `n` elements of the input, then echo the rest. */ 
 4   def drop[F[_],I](n: Long): Stream[F,I] => Stream[F,I] = 
 5     _ pull (h => Pull.drop(n)(h) flatMap Pull.echo) 
 6 ... 
 7 /** Emits `true` as soon as a matching element is received, else `false` if no input matches */ 
 8   def exists[F[_], I](p: I => Boolean): Stream[F, I] => Stream[F, Boolean] = 
 9     _ pull { h => Pull.forall[F,I](!p(_))(h) flatMap { i => Pull.output1(!i) }} 
10  
11   /** Emit only inputs which match the supplied predicate. */ 
12   def filter[F[_], I](f: I => Boolean): Stream[F,I] => Stream[F,I] = 
13     mapChunks(_ filter f) 
14  
15   /** Emits the first input (if any) which matches the supplied predicate, to the output of the returned `Pull` */ 
16   def find[F[_],I](f: I => Boolean): Stream[F,I] => Stream[F,I] = 
17     _ pull { h => Pull.find(f)(h).flatMap { case o #: h => Pull.output1(o) }} 
18  
19  
20   /** 
21    * Folds all inputs using an initial value `z` and supplied binary operator, 
22    * and emits a single element stream. 
23    */ 
24   def fold[F[_],I,O](z: O)(f: (O, I) => O): Stream[F,I] => Stream[F,O] = 
25     _ pull { h => Pull.fold(z)(f)(h).flatMap(Pull.output1) } 
26 ... 
27 /** Emits all elements of the input except the first one. */ 
28   def tail[F[_],I]: Stream[F,I] => Stream[F,I] = 
29     drop(1) 
30  
31   /** Emit the first `n` elements of the input `Handle` and return the new `Handle`. */ 
32   def take[F[_],I](n: Long): Stream[F,I] => Stream[F,I] = 
33     _ pull Pull.take(n) 
34 ...

我们可以用through来连接这些transducer:

1 Stream(1,2,3).repeat 
2   .throughPure(pipe.take(10)) 
3   .throughPure(pipe.filter(_ % 2 == 0)) 
4   .toList                                    //> res13: List[Int] = List(2, 2, 2)

以上的throughPure等于是through + pure。Pure是没有任何作用的F[_],是专门为帮助compiler进行类型推导的类型。其实我们可以用pure先把纯数据流升格后再用through:

1 Stream(1,2,3).repeat.pure 
2   .through(pipe.take(10)) 
3   .through(pipe.filter(_ % 2 == 0)) 
4   .toList                                         //> res14: List[Int] = List(2, 2, 2)

这时compiler不再出错误信息了。在fs2 pipe对象里的函数通过方法注入或者类型继承变成了Stream的自身函数,所以我们也可以直接在Stream类型上使用这些transducer:

1 Stream(1,2,3).repeat.take(10).filter(_ % 2 == 0).toList 
2                                   //> res15: List[Int] = List(2, 2, 2)

我们在前面提到过fs2使用了全新的方法和数据类型来实现transducer。transducer的类型是Pipe,即:

type Pipe[F[_],-I,+O] = Stream[F,I] => Stream[F,O]

我们看到Pipe就是一个Function1的类型别名,一个lambda:提供一个Stream[F,I],返回Stream[F,O]。那么在fs2里是如何读取一个Stream[F,I]里的元素呢?我们前面提到是通过一个新的数据结构Pull来实现的,先来看看fs2是如何实现Stream >> Pull >> Stream转换的:

1 val pll = Stream(1,2,3).pure.open    //> pll  : fs2.Pull[fs2.Pure,Nothing,fs2.Stream.Handle[fs2.Pure,Int]] = fs2.Pull 
2 de5031f 
3 val strm = pll.close                 //> strm  : fs2.Stream[fs2.Pure,Nothing] = evalScope(Scope(Bind(Eval(Snapshot),< 
4 function1>))).flatMap(<function1>)

对一个Stream施用open后得到一个Pull类型。pll是个Pull数据结构,它的类型定义如下:

class Pull[+F[_],+O,+R](private[fs2] val get: Free[P[F,O]#f,Option[Either[Throwable,R]]])

在Pull的类型参数中F是一个运算,O代表输出元素类型,R代表Pull里的数据资源。我们可以从R读取元素。在上面的例子里pll的R值是个Handle类型。这个类型里应该提供了读取元素的方法:

implicit class HandleOps[+F[_],+A](h: Handle[F,A]) { 
    def push[A2>:A](c: Chunk[A2])(implicit A2: RealSupertype[A,A2]): Handle[F,A2] = 
      self.push(h: Handle[F,A2])(c) 
    def push1[A2>:A](a: A2)(implicit A2: RealSupertype[A,A2]): Handle[F,A2] = 
      self.push1(h: Handle[F,A2])(a) 
    def #:[H](hd: H): Step[H, Handle[F,A]] = Step(hd, h) 
    def await: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = self.await(h) 
    def await1: Pull[F, Nothing, Step[A, Handle[F,A]]] = self.await1(h) 
    def awaitNonempty: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = Pull.awaitNonempty(h) 
    def echo1: Pull[F,A,Handle[F,A]] = Pull.echo1(h) 
    def echoChunk: Pull[F,A,Handle[F,A]] = Pull.echoChunk(h) 
    def peek: Pull[F, Nothing, Step[Chunk[A], Handle[F,A]]] = self.peek(h) 
    def peek1: Pull[F, Nothing, Step[A, Handle[F,A]]] = self.peek1(h) 
    def awaitAsync[F2[_],A2>:A](implicit S: Sub1[F,F2], F2: Async[F2], A2: RealSupertype[A,A2]): 
      Pull[F2, Nothing, AsyncStep[F2,A2]] = self.awaitAsync(Sub1.substHandle(h)) 
    def await1Async[F2[_],A2>:A](implicit S: Sub1[F,F2], F2: Async[F2], A2: RealSupertype[A,A2]): 
      Pull[F2, Nothing, AsyncStep1[F2,A2]] = self.await1Async(Sub1.substHandle(h)) 
    def covary[F2[_]](implicit S: Sub1[F,F2]): Handle[F2,A] = Sub1.substHandle(h) 
  } 
 
  implicit class HandleInvariantEffectOps[F[_],+A](h: Handle[F,A]) { 
    def invAwait1Async[A2>:A](implicit F: Async[F], A2: RealSupertype[A,A2]): 
      Pull[F, Nothing, AsyncStep1[F,A2]] = self.await1Async(h) 
    def invAwaitAsync[A2>:A](implicit F: Async[F], A2: RealSupertype[A,A2]): 
      Pull[F, Nothing, AsyncStep[F,A2]] = self.awaitAsync(h) 
    def receive1[O,B](f: Step[A,Handle[F,A]] => Pull[F,O,B]): Pull[F,O,B] = h.await1.flatMap(f) 
    def receive[O,B](f: Step[Chunk[A],Handle[F,A]] => Pull[F,O,B]): Pull[F,O,B] = h.await.flatMap(f) 
  }

果然在Handle提供的函数里有await,receive等这些读取函数。我们试着来实现一个简单的transducer:一个filter函数:

 1 import scala.language.higherKinds 
 2 def myFilter[F[_],A](f: A => Boolean): Pipe[F, A, A] = { 
 3   def go(h: Stream.Handle[F,A]): Pull[F,A,Unit] =  { 
 4 //      h.receive1 {case Step(a,h) => if(f(a)) Pull.output1(a) >> go(h) else go(h)} 
 5        h.await1.flatMap { case Step(a,h) => if(f(a)) Pull.output1(a) >> go(h) else go(h)} 
 6   } 
 7 //  sin => sin.open.flatMap {h => go(h)}.close 
 8   sin => sin.pull(go _) 
 9 }                                   //> myFilter: [F[_], A](f: A => Boolean)fs2.Pipe[F,A,A] 
10  
11 Stream.range(0,10).pure.through(myFilter(_ % 2 == 0)).toList 
12                                      //> res17: List[Int] = List(0, 2, 4, 6, 8)

我们从Pull里用await1或者receive1把一个Step数据结构从Handle里扯(pull)出来然后再output到Pull结构里。把这个Pull close后得到我们需要的Stream。我们把例子使用的类型及函数款式陈列在下面:

type Pipe[F[_],-I,+O] = Stream[F,I] => Stream[F,O] 
 
def await1[F[_],I]: Handle[F,I] => Pull[F,Nothing,Step[I,Handle[F,I]]] = {...} 
 
def receive1[F[_],I,O,R](f: Step[I,Handle[F,I]] => Pull[F,O,R]): Handle[F,I] => Pull[F,O,R] = 
    _.await1.flatMap(f) 
 
def pull[F[_],F2[_],A,B](s: Stream[F,A])(using: Handle[F,A] => Pull[F2,B,Any])(implicit S: Sub1[F,F2]) 
  : Stream[F2,B] = 
    Pull.close { Sub1.substPull(open(s)) flatMap (h => Sub1.substPull(using(h))) }

再示范另一个Pipe的实现:take

1 def myTake[F[_],A](n: Int): Pipe[F,A,A] = { 
2    def go(n: Int): Stream.Handle[F,A] => Pull[F,A,Unit] = h => { 
3       if (n <= 0) Pull.done 
4       else h.receive1 { case a #: h => Pull.output1(a).flatMap{_ => go(n-1)(h)}} 
5    } 
6    sin => sin.pull(go(n)) 
7 }                                                 //> myTake: [F[_], A](n: Int)fs2.Pipe[F,A,A] 
8 Stream.range(0,10).pure.through(myTake(3)).toList //> res18: List[Int] = List(0, 1, 2)

我们曾经提过fs2功能提升的其中一项是增加了节组(Chunk)数据类型和相关的操作函数。Chunk是fs2内部使用的一种集合,这样fs2就可以一节一节(by chunks)来处理数据了。Chunk本身具备了完整的集合函数:

 

/** 
 * Chunk represents a strict, in-memory sequence of `A` values. 
 */ 
trait Chunk[+A] { self => 
  def size: Int 
  def uncons: Option[(A, Chunk[A])] = 
    if (size == 0) None 
    else Some(apply(0) -> drop(1)) 
  def apply(i: Int): A 
  def copyToArray[B >: A](xs: Array[B]): Unit 
  def drop(n: Int): Chunk[A] 
  def take(n: Int): Chunk[A] 
  def filter(f: A => Boolean): Chunk[A] 
  def foldLeft[B](z: B)(f: (B,A) => B): B 
  def foldRight[B](z: B)(f: (A,B) => B): B 
  def indexWhere(p: A => Boolean): Option[Int] = { 
    val index = iterator.indexWhere(p) 
    if (index < 0) None else Some(index) 
  } 
  def isEmpty = size == 0 
  def toArray[B >: A: ClassTag]: Array[B] = { 
    val arr = new Array[B](size) 
    copyToArray(arr) 
    arr 
  } 
  def toList = foldRight(Nil: List[A])(_ :: _) 
  def toVector = foldLeft(Vector.empty[A])(_ :+ _) 
  def collect[B](pf: PartialFunction[A,B]): Chunk[B] = { 
    val buf = new collection.mutable.ArrayBuffer[B](size) 
    iterator.collect(pf).copyToBuffer(buf) 
    Chunk.indexedSeq(buf) 
  } 
  def map[B](f: A => B): Chunk[B] = { 
    val buf = new collection.mutable.ArrayBuffer[B](size) 
    iterator.map(f).copyToBuffer(buf) 
    Chunk.indexedSeq(buf) 
  } 
  def mapAccumulate[S,B](s0: S)(f: (S,A) => (S,B)): (S,Chunk[B]) = { 
    val buf = new collection.mutable.ArrayBuffer[B](size) 
    var s = s0 
    for { c <- iterator } { 
      val (newS, newC) = f(s, c) 
      buf += newC 
      s = newS 
    } 
    (s, Chunk.indexedSeq(buf)) 
  } 
  def scanLeft[B](z: B)(f: (B, A) => B): Chunk[B] = { 
    val buf = new collection.mutable.ArrayBuffer[B](size + 1) 
    iterator.scanLeft(z)(f).copyToBuffer(buf) 
    Chunk.indexedSeq(buf) 
  } 
  def iterator: Iterator[A] = new Iterator[A] { 
    var i = 0 
    def hasNext = i < self.size 
    def next = { val result = apply(i); i += 1; result } 
  } 
...

 

fs2的大部分转换函数都考虑了对Chunk数据的处理机制。我们先看看fs2是如何表现Chunk数据的:

1 (Stream(1,2) ++ Stream(3,4,5) ++ Stream(6,7)).chunks.toList 
2     //> res16: List[fs2.Chunk[Int]] = List(Chunk(1, 2), Chunk(3, 4, 5), Chunk(6, 7))

fs2是按照Stream的构建批次来分节的。我们来示范一下如何使用Pull的Chunk机制:

 1 def myTakeC[F[_],A](n: Int): Pipe[F,A,A] = { 
 2   def go(n: Int): Stream.Handle[F,A] => Pull[F,A,Unit] = h => { 
 3      if ( n <= 0 ) Pull.done 
 4      else Pull.awaitLimit(n)(h).flatMap {case Step(chunk,h) => 
 5        if (chunk.size <= n) Pull.output(chunk) >> go(n-chunk.size)(h) 
 6        else Pull.output(chunk.take(n)) } 
 7   } 
 8   sin => sin.pull(go(n)) 
 9 }                       //> myTakeC: [F[_], A](n: Int)fs2.Pipe[F,A,A] 
10 val s1 = (Stream(1,2) ++ Stream(3,4,5) ++ Stream(6,7)) 
11                        //> s1  : fs2.Stream[Nothing,Int] = append(append(Segment(Emit(Chunk(1, 2))), S 
12 egment(Emit(Chunk(()))).flatMap(<function1>)), Segment(Emit(Chunk(()))).fla 
13 tMap(<function1>)) 
14 s1.pure.through(myTake(4)).chunks.toList  //> res20: List[fs2.Chunk[Int]] = List(Chunk(1), Chunk(2), Chunk(3), Chunk(4)) 
15 s1.pure.through(myTakeC(4)).chunks.toList //> res21: List[fs2.Chunk[Int]] = List(Chunk(1, 2), Chunk(3, 4))

myTake和myTakeC产生了不同的结果。

fs2的特长应该是多线程编程了。在Stream的类型款式中:Stream[F[_],A],F[_]是一种可能产生副作用的运算方式,当F[_]等于Nothing时,Stream[Nothing,A]是一种纯数据流,而Stream[F[_],A]就是一种运算流了。我们可以在对运算流进行状态转换的过程中进行运算来实现F的副作用如:数据库读写、IO操作等。fs2不再绑定Task一种运算方式了。任何有Catchable实例的Monad都可以成为Stream的运算方式。但是,作为一种以多线程编程为主导的工具库,没有什么运算方式会比Task更合适了。
我们可以把一个纯数据流升格成运算流:

 

1 val s2 = Stream.emits(Seq(1,2,3)).covary[Task]    //> s2  : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3)))

 

我们先运算这个运算流,结果为一个Task,然后再运算Task来获取运算值:

1 val s2 = Stream.emits(Seq(1,2,3)).covary[Task]    //> s2  : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3))) 
2 val t2 = s2.runLog                                //> t2  : fs2.Task[Vector[Int]] = Task 
3 t2.unsafeRun                                      //> res22: Vector[Int] = Vector(1, 2, 3)

现在使用myTake和myFilter就不需要pure升格了:

1 s3.through(myFilter(_ % 2 == 0)).through(myTake(3)).runLog.unsafeRun 
2                                                   //> res23: Vector[Int] = Vector(2, 2, 2)

下面的例子里展示了fs2的运算流从源头(Source)到传换(Transducer)一直到终点(Sink)的使用示范:

 1 def stdOut: Sink[Task,String]  = 
 2   _.evalMap { x => Task.delay{ println(s"milli: $x")}} 
 3                                                   //> stdOut: => fs2.Sink[fs2.Task,String] 
 4 Stream.repeatEval(Task.delay{System.currentTimeMillis}) 
 5   .map(_.toString) 
 6   .through(myTake(3)) 
 7   .to(stdOut) 
 8   .run.unsafeRun                                  //> milli: 1472001934708 
 9                                                   //| milli: 1472001934714 
10                                                   //| milli: 1472001934714

在上面的例子里我们使用了through,to等连接函数。由于数据最终发送到终点stdOut,我们无须用runLog来记录运算结果。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/12895.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论