FunDA(9)- Stream Source:reactive data streams详解编程语言

    上篇我们讨论了静态数据源(Static Source, snapshot)。这种方式只能在预知数据规模有限的情况下使用,对于超大型的数据库表也可以说是不安全的资源使用方式。Slick3.x已经增加了支持Reactive-Streams功能,可以通过Reactive-Streams API来实现有限内存空间内的无限规模数据读取,这正符合了FunDA的设计理念:高效、便捷、安全的后台数据处理工具库。我们在前面几篇讨论里介绍了Iteratee模式,play-iteratees支持Reactive-Streams并且提供与Slick3.x的接口API,我们就在这篇讨论里介绍如何把Slick-Reactive-Streams转换成fs2-Streams。根据Slick官方文档:Slick可以通过db.stream函数用Reactive-Stream方式来读取后台数据,具体的配置如下:

  val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false)) 
  val action = queryAction.withStatementParameters(fetchSize = 512) 
  val publisher = db.stream(disableAutocommit andThen action)

首先,我们需要取消自动提交(disableAutocommit)。fetchSize是缓存数据页长度(每批次读取数据字数),然后用db.stream来构成一个Reactive-Streams标准的数据源publisher。Slick官方网页只提供了下面这个使用publisher的例子:

  val fut = publisher.foreach(s => println(s)) 
  Await.ready(fut,Duration.Inf)

除了数据枚举外就没什么用处,也无法提供更细节点的示范。FunDA的具体解决方案是把publisher转换成play-iteratee的Enumerator。play-iteratee支持Reactive-Streams,所以这个Enumerator应该具备协调后台数据和内存缓冲之间关系(back-pressure)的功能。play-iteratee是如下构建Enumerator的;

import play.api.libs.iteratee._ 
val enumerator = streams.IterateeStreams.publisherToEnumerator(publisher)

enumerator从后台数据库表中产生的数据源通过Iteratee把数据元素enqueue推送给一个fs2的queue:

    private def pushData[R](q: async.mutable.Queue[Task,Option[R]]): Iteratee[R,Unit] = Cont { 
      case Input.EOF => { 
        q.enqueue1(None).unsafeRun 
        Done((), Input.Empty) 
      } 
      case Input.Empty => pushData(q) 
      case Input.El(e) => { 
        q.enqueue1(Some(e)).unsafeRun 
        pushData(q) 
      } 
    }

然后fs2进行dequeue后生成fs2的Stream:

      Stream.eval(async.boundedQueue[Task,Option[SOURCE]](queSize)).flatMap { q => 
        Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture() 
        pipe.unNoneTerminate(q.dequeue) 
      }

整个构建Stream的过程在FunDA的fdasources包是这样定义的:

package com.bayakala.funda.fdasources 
import fs2._ 
import play.api.libs.iteratee._ 
import com.bayakala.funda.fdapipes._ 
import slick.driver.JdbcProfile 
 
object FDADataStream { 
 
  class FDAStreamLoader[SOURCE, TARGET](slickProfile: JdbcProfile, convert: SOURCE => TARGET) { 
 
    import slickProfile.api._ 
 
    def fda_typedStream(action: DBIOAction[Iterable[SOURCE],Streaming[SOURCE],Effect.Read])(slickDB: Database)(fetchSize: Int, queSize: Int): FDAPipeLine[TARGET] = { 
      val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false)) 
      val action_ = action.withStatementParameters(fetchSize = fetchSize) 
      val publisher = slickDB.stream(disableAutocommit andThen action) 
      val enumerator = streams.IterateeStreams.publisherToEnumerator(publisher) 
 
      Stream.eval(async.boundedQueue[Task,Option[SOURCE]](queSize)).flatMap { q => 
        Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture() 
        pipe.unNoneTerminate(q.dequeue).map {row => convert(row)} 
      } 
 
    } 
    def fda_plainStream(action: DBIOAction[Iterable[SOURCE],Streaming[SOURCE],Effect.Read])(slickDB: Database)(fetchSize: Int, queSize: Int): FDAPipeLine[SOURCE] = { 
      val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false)) 
      val action_ = action.withStatementParameters(fetchSize = fetchSize) 
      val publisher = slickDB.stream(disableAutocommit andThen action) 
      val enumerator = streams.IterateeStreams.publisherToEnumerator(publisher) 
 
      Stream.eval(async.boundedQueue[Task,Option[SOURCE]](queSize)).flatMap { q => 
        Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture() 
        pipe.unNoneTerminate(q.dequeue) 
      } 
 
    } 
    private def pushData[R](q: async.mutable.Queue[Task,Option[R]]): Iteratee[R,Unit] = Cont { 
      case Input.EOF => { 
        q.enqueue1(None).unsafeRun 
        Done((), Input.Empty) 
      } 
      case Input.Empty => pushData(q) 
      case Input.El(e) => { 
        q.enqueue1(Some(e)).unsafeRun 
        pushData(q) 
      } 
    } 
 
  } 
  object FDAStreamLoader { 
    def apply[SOURCE, TARGET](slickProfile: JdbcProfile, converter: SOURCE => TARGET): FDAStreamLoader[SOURCE, TARGET] = 
      new FDAStreamLoader[SOURCE, TARGET](slickProfile, converter) 
  } 
}

FDADataStream对象内主要实现了fda_typedStream和fda_plainStream。fda_typedStream提供了SOURCE=>TARGET的转换。从Enumerator转换到Stream整个过程和原理我们在FunDA(7)里已经详细介绍过了。下面我们看看FunDA-Example中fda_typedStream的具体应用例子:

package com.bayakala.funda.fdasources.examples 
import slick.driver.H2Driver.api._ 
import com.bayakala.funda.fdasources.FDADataStream._ 
import com.bayakala.funda.samples._ 
import com.bayakala.funda.fdarows._ 
import com.bayakala.funda.fdapipes._ 
import FDANodes._ 
import FDAValves._ 
object Example2 extends App { 
   val albums = SlickModels.albums 
   val companies = SlickModels.companies 
 
//数据源query 
   val albumsInfo = for { 
     (a,c) <- albums join companies on (_.company === _.id) 
   } yield (a.title,a.artist,a.year,c.name) 
 
//query结果强类型(用户提供) 
  case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW 
//转换函数(用户提供) 
  def toTypedRow(row: (String, String, Option[Int], String)): Album = 
    Album(row._1, row._2, row._3.getOrElse(2000), row._4) 
 
  val db = Database.forConfig("h2db") 
 
  val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _) 
  val albumStream = streamLoader.fda_typedStream(albumsInfo.result)(db)(512,128) 
 
//定义一个用户作业函数:列印数据内容 
  def printAlbums: FDATask[FDAROW] = row => { 
    row match { 
      case album: Album => 
        println("____________________") 
        println(s"品名:${album.title}") 
        println(s"演唱:${album.artist}") 
        println(s"年份:${album.year}") 
        println(s"发行:${album.publisher}") 
        fda_next(album) 
      case _ => fda_skip 
    } 
  } 
 
  albumStream.through(fda_execUserTask(printAlbums)).run.unsafeRun 
 
}

运算结果:

品名:Keyboard Cat's Greatest Hits 
演唱:Keyboard Cat 
年份:1999 
发行:Sony Music Inc 
____________________ 
品名:Spice 
演唱:Spice Girls 
年份:1999 
发行:Columbia Records 
____________________ 
品名:Whenever You Need Somebody 
演唱:Rick Astley 
年份:1999 
发行:Sony Music Inc 
____________________ 
品名:The Triumph of Steel 
演唱:Manowar 
年份:1999 
发行:The K-Pops Singers 
____________________ 
品名:Believe 
演唱:Justin Bieber 
年份:1999 
发行:Columbia Records 
 
Process finished with exit code 0

 

 

 

 

 

 

 

 

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/12871.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论