SDP(5):ScalikeJDBC- JDBC-Engine:Streaming详解编程语言

  作为一种通用的数据库编程引擎,用Streaming来应对海量数据的处理是必备功能。同样,我们还是通过一种Context传递产生流的要求。因为StreamingContext比较简单,而且还涉及到数据抽取函数extractor的传递,所以我们分开来定义:

case class JDBCQueryContext[M]( 
                        dbName: Symbol, 
                        statement: String, 
                        parameters: Seq[Any] = Nil, 
                        fetchSize: Int = 100, 
                        autoCommit: Boolean = false, 
                        queryTimeout: Option[Int] = None, 
                        extractor: WrappedResultSet => M)

由于我们会将JDBCQueryContext传给JDBC-Engine去运算,所以Streaming函数的所有参数都必须明确定义,包括extractor函数。实际上JDBCQueryContext也完全满足了jdbcQueryResult函数。我们会在后面重新设计这个函数。JDBCStreaming函数产生一个akka-Source,如下:

def jdbcAkkaStream[A](ctx: JDBCQueryContext[A]) 
                         (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = { 
 
      val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream { 
        val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) 
        ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) 
        val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) 
 
        sql.iterator 
          .withDBSessionForceAdjuster(session => { 
            session.connection.setAutoCommit(ctx.autoCommit) 
            session.fetchSize(ctx.fetchSize) 
          }) 
       } 
      Source.fromPublisher[A](publisher) 
    }

 我们只需要提供一个Sink就可以使用这个akka-stream了:

import akka.actor._ 
import akka.stream.scaladsl._ 
import akka.stream._ 
import scalikejdbc._ 
import configdbs._ 
import jdbccontext._ 
import JDBCEngine._ 
 
object JDBCStreaming extends App { 
 
  implicit val actorSys = ActorSystem("actor-system") 
  implicit val ec = actorSys.dispatcher 
  implicit val mat = ActorMaterializer() 
 
  ConfigDBsWithEnv("dev").setup('h2) 
  ConfigDBsWithEnv("dev").loadGlobalSettings() 
 
 
  case class DataRow(year: String, state: String, county: String, value: String) 
 
  //data row converter 
  val toRow = (rs: WrappedResultSet) => DataRow( 
    year = rs.string("REPORTYEAR"), 
    state = rs.string("STATENAME"), 
    county = rs.string("COUNTYNAME"), 
    value = rs.string("VALUE") 
  ) 
 
  //construct the context 
  val ctx = JDBCQueryContext[DataRow]( 
    dbName = 'h2, 
    statement = "select * from AIRQM", 
    extractor = toRow 
  ) 
 
  //pass context to construct akka-source 
  val akkaSource = jdbcAkkaStream(ctx) 
  //a sink for display rows 
  val snk = Sink.foreach[(DataRow,Long)] { case (row,idx) => 
    println(s"rec#: $idx - year: ${row.year} location: ${row.state},${row.county} value: ${row.value}")} 
  //can manual terminate stream by kill.shutdown 
  val kill: UniqueKillSwitch = (akkaSource.zipWithIndex).viaMat(KillSwitches.single)(Keep.right).to(snk).run 
 
 
  scala.io.StdIn.readLine() 
  kill.shutdown() 
  actorSys.terminate() 
  println("+++++++++++++++") 
 
}

试运行结果OK。下面是新版本的jdbcQueryResult函数:

    def jdbcQueryResult[C[_] <: TraversableOnce[_], A]( 
         ctx: JDBCQueryContext[A])( 
          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { 
 
          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) 
          ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) 
          rawSql.fetchSize(ctx.fetchSize) 
          implicit val session = NamedAutoSession(ctx.dbName) 
          val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) 
          sql.collection.apply[C]() 
 
    }

试运行:

 object SlickDAO { 
    import slick.jdbc.H2Profile.api._ 
 
    case class CountyModel(id: Int, name: String) 
    case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") { 
      def id = column[Int]("ID",O.AutoInc,O.PrimaryKey) 
      def name = column[String]("NAME",O.Length(64)) 
      def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply) 
    } 
    val CountyQuery = TableQuery[CountyTable] 
    val filter = "Kansas" 
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"} 
    val statement = qry.result.statements.head 
  } 
  import SlickDAO._ 
 
  def toRow: WrappedResultSet => CountyModel = rs => 
     CountyModel(id=rs.int("id"),name=rs.string("name")) 
  //construct the context 
  val slickCtx = JDBCQueryContext[CountyModel]( 
    dbName = 'h2, 
    statement = "select * from county where id > ? and id < ?", 
    parameters = Seq(6000,6200), 
    extractor = toRow 
  ) 
 
  val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx) 
  vecCounty.foreach(r => println(s"${r.id},${r.name}"))

下面是本次讨论的示范源代码:

build.sbt

// Scala 2.10, 2.11, 2.12 
libraryDependencies ++= Seq( 
  "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1", 
  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test", 
  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1", 
  "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1", 
  "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1", 
  "com.h2database"  %  "h2"                % "1.4.196", 
  "mysql" % "mysql-connector-java" % "6.0.6", 
  "org.postgresql" % "postgresql" % "42.2.0", 
  "commons-dbcp" % "commons-dbcp" % "1.4", 
  "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2", 
  "com.zaxxer" % "HikariCP" % "2.7.4", 
  "com.jolbox" % "bonecp" % "0.8.0.RELEASE", 
  "com.typesafe.slick" %% "slick" % "3.2.1", 
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3", 
  "com.typesafe.akka" %% "akka-actor" % "2.5.4", 
  "com.typesafe.akka" %% "akka-stream" % "2.5.4" 
)

resources/application.conf

# JDBC settings 
test { 
  db { 
    h2 { 
      driver = "org.h2.Driver" 
      url = "jdbc:h2:tcp://localhost/~/slickdemo" 
      user = "" 
      password = "" 
      poolInitialSize = 5 
      poolMaxSize = 7 
      poolConnectionTimeoutMillis = 1000 
      poolValidationQuery = "select 1 as one" 
      poolFactoryName = "commons-dbcp2" 
    } 
  } 
 
  db.mysql.driver = "com.mysql.cj.jdbc.Driver" 
  db.mysql.url = "jdbc:mysql://localhost:3306/testdb" 
  db.mysql.user = "root" 
  db.mysql.password = "123" 
  db.mysql.poolInitialSize = 5 
  db.mysql.poolMaxSize = 7 
  db.mysql.poolConnectionTimeoutMillis = 1000 
  db.mysql.poolValidationQuery = "select 1 as one" 
  db.mysql.poolFactoryName = "bonecp" 
 
  # scallikejdbc Global settings 
  scalikejdbc.global.loggingSQLAndTime.enabled = true 
  scalikejdbc.global.loggingSQLAndTime.logLevel = info 
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true 
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn 
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false 
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false 
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 
} 
dev { 
  db { 
    h2 { 
      driver = "org.h2.Driver" 
      url = "jdbc:h2:tcp://localhost/~/slickdemo" 
      user = "" 
      password = "" 
      poolFactoryName = "hikaricp" 
      numThreads = 10 
      maxConnections = 12 
      minConnections = 4 
      keepAliveConnection = true 
    } 
    mysql { 
      driver = "com.mysql.cj.jdbc.Driver" 
      url = "jdbc:mysql://localhost:3306/testdb" 
      user = "root" 
      password = "123" 
      poolInitialSize = 5 
      poolMaxSize = 7 
      poolConnectionTimeoutMillis = 1000 
      poolValidationQuery = "select 1 as one" 
      poolFactoryName = "bonecp" 
 
    } 
    postgres { 
      driver = "org.postgresql.Driver" 
      url = "jdbc:postgresql://localhost:5432/testdb" 
      user = "root" 
      password = "123" 
      poolFactoryName = "hikaricp" 
      numThreads = 10 
      maxConnections = 12 
      minConnections = 4 
      keepAliveConnection = true 
    } 
  } 
  # scallikejdbc Global settings 
  scalikejdbc.global.loggingSQLAndTime.enabled = true 
  scalikejdbc.global.loggingSQLAndTime.logLevel = info 
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true 
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn 
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false 
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false 
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 
}

JDBCEngine.scala

package jdbccontext 
import java.sql.PreparedStatement 
 
import scala.collection.generic.CanBuildFrom 
import akka.stream.scaladsl._ 
import scalikejdbc._ 
import scalikejdbc.streams._ 
import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
 
import scala.util._ 
import scalikejdbc.TxBoundary.Try._ 
 
import scala.concurrent.ExecutionContextExecutor 
 
  object JDBCContext { 
    type SQLTYPE = Int 
    val SQL_SELECT: Int = 0 
    val SQL_EXEDDL= 1 
    val SQL_UPDATE = 2 
    val RETURN_GENERATED_KEYVALUE = true 
    val RETURN_UPDATED_COUNT = false 
 
  } 
 
case class JDBCQueryContext[M]( 
                        dbName: Symbol, 
                        statement: String, 
                        parameters: Seq[Any] = Nil, 
                        fetchSize: Int = 100, 
                        autoCommit: Boolean = false, 
                        queryTimeout: Option[Int] = None, 
                        extractor: WrappedResultSet => M) 
 
 
case class JDBCContext( 
                          dbName: Symbol, 
                          statements: Seq[String] = Nil, 
                          parameters: Seq[Seq[Any]] = Nil, 
                          fetchSize: Int = 100, 
                          queryTimeout: Option[Int] = None, 
                          queryTags: Seq[String] = Nil, 
                          sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT, 
                          batch: Boolean = false, 
                          returnGeneratedKey: Seq[Option[Any]] = Nil, 
                          // no return: None, return by index: Some(1), by name: Some("id") 
                          preAction: Option[PreparedStatement => Unit] = None, 
                          postAction: Option[PreparedStatement => Unit] = None) { 
 
    ctx => 
 
    //helper functions 
 
    def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag) 
 
    def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags) 
 
    def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size) 
 
    def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time) 
 
    def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = { 
      if (ctx.sqlType == JDBCContext.SQL_UPDATE && 
        !ctx.batch && ctx.statements.size == 1) 
        ctx.copy(preAction = action) 
      else 
        throw new IllegalStateException("JDBCContex setting error: preAction not supported!") 
    } 
 
    def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = { 
      if (ctx.sqlType == JDBCContext.SQL_UPDATE && 
        !ctx.batch && ctx.statements.size == 1) 
        ctx.copy(postAction = action) 
      else 
        throw new IllegalStateException("JDBCContex setting error: preAction not supported!") 
    } 
 
    def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { 
      if (ctx.sqlType == JDBCContext.SQL_EXEDDL) { 
        ctx.copy( 
          statements = ctx.statements ++ Seq(_statement), 
          parameters = ctx.parameters ++ Seq(Seq(_parameters)) 
        ) 
      } else 
        throw new IllegalStateException("JDBCContex setting error: option not supported!") 
    } 
 
    def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { 
      if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) { 
        ctx.copy( 
          statements = ctx.statements ++ Seq(_statement), 
          parameters = ctx.parameters ++ Seq(_parameters), 
          returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None)) 
        ) 
      } else 
        throw new IllegalStateException("JDBCContex setting error: option not supported!") 
    } 
 
    def appendBatchParameters(_parameters: Any*): JDBCContext = { 
      if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) 
        throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") 
 
      var matchParams = true 
      if (ctx.parameters != Nil) 
        if (ctx.parameters.head.size != _parameters.size) 
          matchParams = false 
      if (matchParams) { 
        ctx.copy( 
          parameters = ctx.parameters ++ Seq(_parameters) 
        ) 
      } else 
        throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!") 
    } 
 
    def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = { 
      if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) 
         throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!") 
      ctx.copy( 
        returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil 
      ) 
    } 
 
    def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { 
        ctx.copy( 
          statements = Seq(_statement), 
          parameters = Seq(_parameters), 
          sqlType = JDBCContext.SQL_EXEDDL, 
          batch = false 
        ) 
      } 
 
      def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { 
        ctx.copy( 
          statements = Seq(_statement), 
          parameters = Seq(_parameters), 
          returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None), 
          sqlType = JDBCContext.SQL_UPDATE, 
          batch = false 
        ) 
      } 
      def setBatchCommand(_statement: String): JDBCContext = { 
        ctx.copy ( 
          statements = Seq(_statement), 
          sqlType = JDBCContext.SQL_UPDATE, 
          batch = true 
        ) 
      } 
  } 
 
  object JDBCEngine { 
 
    import JDBCContext._ 
 
    private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) => 
      throw new IllegalStateException(message) 
    } 
 
    def jdbcAkkaStream[A](ctx: JDBCQueryContext[A]) 
                         (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = { 
 
      val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream { 
        val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) 
        ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) 
        val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) 
 
        sql.iterator 
          .withDBSessionForceAdjuster(session => { 
            session.connection.setAutoCommit(ctx.autoCommit) 
            session.fetchSize(ctx.fetchSize) 
          }) 
       } 
      Source.fromPublisher[A](publisher) 
    } 
 
 
    def jdbcQueryResult[C[_] <: TraversableOnce[_], A]( 
         ctx: JDBCQueryContext[A])( 
          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { 
 
          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) 
          ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) 
          rawSql.fetchSize(ctx.fetchSize) 
          implicit val session = NamedAutoSession(ctx.dbName) 
          val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) 
          sql.collection.apply[C]() 
 
    } 
 
    def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = { 
       if (ctx.sqlType != SQL_EXEDDL) { 
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")) 
      } 
      else { 
        NamedDB(ctx.dbName) localTx { implicit session => 
          Try { 
                ctx.statements.foreach { stm => 
                  val ddl = new SQLExecution(statement = stm, parameters = Nil)( 
                    before = WrappedResultSet => {})( 
                    after = WrappedResultSet => {}) 
 
                  ddl.apply() 
              } 
            "SQL_EXEDDL executed succesfully." 
          } 
        } 
      } 
    } 
 
    def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
      if (ctx.statements == Nil) 
        throw new IllegalStateException("JDBCContex setting error: statements empty!") 
      if (ctx.sqlType != SQL_UPDATE) { 
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) 
      } 
      else { 
        if (ctx.batch) { 
          if (noReturnKey(ctx)) { 
            val usql = SQL(ctx.statements.head) 
              .tags(ctx.queryTags: _*) 
              .batch(ctx.parameters: _*) 
            Try { 
              NamedDB(ctx.dbName) localTx { implicit session => 
                ctx.queryTimeout.foreach(session.queryTimeout(_)) 
                usql.apply[Seq]() 
                Seq.empty[Long].to[C] 
              } 
            } 
          } else { 
            val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None) 
            Try { 
              NamedDB(ctx.dbName) localTx { implicit session => 
                ctx.queryTimeout.foreach(session.queryTimeout(_)) 
                usql.apply[C]() 
              } 
            } 
          } 
 
        } else { 
          Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !")) 
        } 
      } 
    } 
     private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
       implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
       val Some(key) :: xs = ctx.returnGeneratedKey 
       val params: Seq[Any] = ctx.parameters match { 
         case Nil => Nil 
         case p@_ => p.head 
       } 
       val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key) 
       Try { 
         NamedDB(ctx.dbName) localTx { implicit session => 
           session.fetchSize(ctx.fetchSize) 
           ctx.queryTimeout.foreach(session.queryTimeout(_)) 
           val result = usql.apply() 
           Seq(result).to[C] 
         } 
       } 
     } 
 
      private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
        implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
      val params: Seq[Any] = ctx.parameters match { 
        case Nil => Nil 
        case p@_ => p.head 
      } 
      val before = ctx.preAction match { 
        case None => pstm: PreparedStatement => {} 
        case Some(f) => f 
      } 
      val after = ctx.postAction match { 
        case None => pstm: PreparedStatement => {} 
        case Some(f) => f 
      } 
      val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after) 
      Try { 
        NamedDB(ctx.dbName) localTx {implicit session => 
          session.fetchSize(ctx.fetchSize) 
          ctx.queryTimeout.foreach(session.queryTimeout(_)) 
          val result = usql.apply() 
          Seq(result.toLong).to[C] 
        } 
      } 
 
    } 
 
    private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
      if (noReturnKey(ctx)) 
        singleTxUpdateNoReturnKey(ctx) 
      else 
        singleTxUpdateWithReturnKey(ctx) 
    } 
 
    private def noReturnKey(ctx: JDBCContext): Boolean = { 
      if (ctx.returnGeneratedKey != Nil) { 
        val k :: xs = ctx.returnGeneratedKey 
         k match { 
          case None => true 
          case Some(k) => false 
        } 
      } else true 
    } 
 
    def noActon: PreparedStatement=>Unit = pstm => {} 
 
    def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
        Try { 
          NamedDB(ctx.dbName) localTx { implicit session => 
            session.fetchSize(ctx.fetchSize) 
            ctx.queryTimeout.foreach(session.queryTimeout(_)) 
            val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match { 
              case Nil => Seq.fill(ctx.statements.size)(None) 
              case k@_ => k 
            } 
            val sqlcmd = ctx.statements zip ctx.parameters zip keys 
            val results = sqlcmd.map { case ((stm, param), key) => 
              key match { 
                case None => 
                  new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong 
                case Some(k) => 
                  new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong 
              } 
            } 
            results.to[C] 
          } 
        } 
     } 
 
 
    def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
      if (ctx.statements == Nil) 
        throw new IllegalStateException("JDBCContex setting error: statements empty!") 
      if (ctx.sqlType != SQL_UPDATE) { 
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) 
      } 
      else { 
        if (!ctx.batch) { 
          if (ctx.statements.size == 1) 
            singleTxUpdate(ctx) 
          else 
            multiTxUpdates(ctx) 
        } else 
          Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !")) 
 
      } 
    } 
 
  }

JDBCQueryDemo.scala

import akka.actor._ 
import akka.stream.scaladsl._ 
import akka.stream._ 
import scalikejdbc._ 
import configdbs._ 
import jdbccontext._ 
import JDBCEngine._ 
 
object JDBCStreaming extends App { 
 
  implicit val actorSys = ActorSystem("actor-system") 
  implicit val ec = actorSys.dispatcher 
  implicit val mat = ActorMaterializer() 
 
  ConfigDBsWithEnv("dev").setup('h2) 
  ConfigDBsWithEnv("dev").loadGlobalSettings() 
 
 
  case class DataRow(year: String, state: String, county: String, value: String) 
 
  //data row converter 
  val toRow = (rs: WrappedResultSet) => DataRow( 
    year = rs.string("REPORTYEAR"), 
    state = rs.string("STATENAME"), 
    county = rs.string("COUNTYNAME"), 
    value = rs.string("VALUE") 
  ) 
 
  //construct the context 
  val ctx = JDBCQueryContext[DataRow]( 
    dbName = 'h2, 
    statement = "select * from AIRQM", 
    extractor = toRow 
  ) 
 
  //pass context to construct akka-source 
  val akkaSource = jdbcAkkaStream(ctx) 
  //a sink for display rows 
  val snk = Sink.foreach[(DataRow,Long)] { case (row,idx) => 
    println(s"rec#: $idx - year: ${row.year} location: ${row.state},${row.county} value: ${row.value}")} 
  //can manual terminate stream by kill.shutdown 
  val kill: UniqueKillSwitch = (akkaSource.zipWithIndex).viaMat(KillSwitches.single)(Keep.right).to(snk).run 
 
 
  scala.io.StdIn.readLine() 
  kill.shutdown() 
  actorSys.terminate() 
  println("+++++++++++++++") 
 
  object SlickDAO { 
    import slick.jdbc.H2Profile.api._ 
 
    case class CountyModel(id: Int, name: String) 
    case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") { 
      def id = column[Int]("ID",O.AutoInc,O.PrimaryKey) 
      def name = column[String]("NAME",O.Length(64)) 
      def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply) 
    } 
    val CountyQuery = TableQuery[CountyTable] 
    val filter = "Kansas" 
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"} 
    val statement = qry.result.statements.head 
  } 
  import SlickDAO._ 
 
  def toCounty: WrappedResultSet => CountyModel = rs => 
    CountyModel(id=rs.int("id"),name=rs.string("name")) 
  //construct the context 
  val slickCtx = JDBCQueryContext[CountyModel]( 
    dbName = 'h2, 
    statement = "select * from county where id > ? and id < ?", 
    parameters = Seq(6000,6200), 
    extractor = toCounty 
  ) 
 
  val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx) 
  vecCounty.foreach(r => println(s"${r.id},${r.name}")) 
 
 
}

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/12812.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论