SDP(7):Cassandra- Cassandra-Engine:Streaming详解编程语言

  akka在alpakka工具包里提供了对cassandra数据库的streaming功能。简单来讲就是用一个CQL-statement读取cassandra数据并产生akka-stream的Source。这是一个支持reactive-stream协议的流:

object CassandraSource { 
 
  /** 
   * Scala API: creates a [[CassandraSourceStage]] from a given statement. 
   */ 
  def apply(stmt: Statement)(implicit session: Session): Source[Row, NotUsed] = 
    Source.fromGraph(new CassandraSourceStage(Future.successful(stmt), session)) 
 
  /** 
   * Scala API: creates a [[CassandraSourceStage]] from the result of a given Future. 
   */ 
  def fromFuture(futStmt: Future[Statement])(implicit session: Session): Source[Row, NotUsed] = 
    Source.fromGraph(new CassandraSourceStage(futStmt, session)) 
 
}

CassandraSource.apply构建Source[Row,NotUsed]。可以直接接通Flow[Row,Row,NotUsed]和Sink来使用。我们是通过CQLQueryContext来构建这个Source的:

  def cassandraStream[A](ctx: CQLQueryContext[A]) 
       (implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = { 
 
    val prepStmt = session.prepare(ctx.statement) 
    var boundStmt =  prepStmt.bind() 
    val params = processParameters(ctx.parameter) 
    boundStmt = prepStmt.bind(params:_*) 
    ctx.consistency.foreach {consistency => 
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))} 
 
    CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(ctx.extractor) 
   }

CQLQueryContext[A]在上期介绍过: 

case class CQLQueryContext[M]( 
                               statement: String, 
                               extractor: Row => M, 
                               parameter: Seq[Object] = Nil, 
                               consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None, 
                               fetchSize: Int = 100 
                             ) { ctx => 
  def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext[M] = 
     ctx.copy(consistency = Some(_consistency)) 
  def setFetchSize(pageSize: Int): CQLQueryContext[M] = 
    ctx.copy(fetchSize = pageSize) 
} 
object CQLQueryContext { 
  def apply[M](stmt: String, converter: Row => M): CQLQueryContext[M] = 
    new CQLQueryContext[M](statement = stmt, extractor = converter) 
}

下面是一个流的构建和使用示范:

  case class Model ( 
                       rowid: Long, 
                       measureid: Long, 
                       state: String, 
                       county: String, 
                       year: Int, 
                       value: Int, 
                       createdAt: java.util.Date 
                     ) 
  //data row converter 
  val toModel = (rs: Row) => Model( 
    rowid = rs.getLong("ROWID"), 
    measureid = rs.getLong("MEASUREID"), 
    state = rs.getString("STATENAME"), 
    county = rs.getString("COUNTYNAME"), 
    year = rs.getInt("REPORTYEAR"), 
    value = rs.getInt("VALUE"), 
    createdAt = rs.getTimestamp("CREATED") 
  ) 
 
  //setup context 
  val ctx = CQLQueryContext("select * from testdb.aqmrpt",toModel) 
  //construct source 
  val src = cassandraStream(ctx) 
  //a display sink 
  val snk = Sink.foreach[Model]{ r => 
    println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") 
  } 
  //run on source 
  src.to(snk).run()

除了通过读取数据构成stream source之外,我们还可以以流元素为数据进行数据库更新操作,因为我们可以用map来运行execute:

  case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true, 
                                  statement: String, prepareParams: R => Seq[Object], 
                                  consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas => 
    def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel) 
    def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered) 
    def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] = 
      cas.copy(consistency = Some(_consistency)) 
 
    def perform(r: R)(implicit session: Session, ec: ExecutionContext) = { 
      val prepStmt = session.prepare(statement) 
      var boundStmt =  prepStmt.bind() 
      val params = processParameters(prepareParams(r)) 
      boundStmt = prepStmt.bind(params:_*) 
      consistency.foreach { cons => 
        boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons)) 
      } 
      session.executeAsync(boundStmt).map(_ => r) 
    } 
    def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = 
      if (processInOrder) 
        Flow[R].mapAsync(parallelism)(perform) 
      else 
        Flow[R].mapAsyncUnordered(parallelism)(perform) 
 
  } 
  object CassandraActionStream { 
    def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] = 
      new CassandraActionStream[R]( statement=_statement, prepareParams = params) 
  }

CassandraActionStream可以用statement,params构建。它的一个函数performOnRow是一个Flow[R,R,NotUsed],可以把每个R转换成一条CQL后通过map来运行executeAsyn,造成一种批次运算效果。下面是CassandraActionStream的使用示范:

  //pass context to construct akka-source 
  val jdbcSource = jdbcAkkaStream(ctx) 
 
  val cqlInsert = "insert into testdb.AQMRPT(ROWID,MEASUREID,STATENAME," + 
    "COUNTYNAME,REPORTYEAR,VALUE,CREATED) VALUES(?,?,?,?,?,?,?)" 
 
  val toPparams: DataRow => Seq[Object] = r => { 
    Seq[Object](r.rowid.asInstanceOf[Object], 
      r.measureid.asInstanceOf[Object], 
      r.state, 
      r.county, 
      r.year.asInstanceOf[Object], 
      r.value.asInstanceOf[Object], 
      CQLDateTimeNow 
    ) 
  } 
 
  val actionStream = CassandraActionStream(cqlInsert,toPparams).setParallelism(2) 
    .setProcessOrder(false) 
  val actionFlow: Flow[DataRow,DataRow,NotUsed] = actionStream.performOnRow 
 
  val sink = Sink.foreach[DataRow]{ r => 
    println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") 
  } 
  val sts = jdbcSource.take(100).via(actionFlow).to(sink).run()

下面的例子里我们用CassandraStream的流元素更新h2数据库中的数据,调用了JDBCActionStream: 

  //data row converter 
  val toModel = (rs: Row) => Model( 
    rowid = rs.getLong("ROWID"), 
    measureid = rs.getLong("MEASUREID"), 
    state = rs.getString("STATENAME"), 
    county = rs.getString("COUNTYNAME"), 
    year = rs.getInt("REPORTYEAR"), 
    value = rs.getInt("VALUE"), 
    createdAt = rs.getTimestamp("CREATED") 
  ) 
 
  //setup context 
  val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",toModel) 
  //construct source 
  val src = cassandraStream(cqlCtx) 
  //a display sink 
  val snk = Sink.foreach[Model]{ r => 
    println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") 
  } 
  //run on source 
  src.to(snk).run() 
 
  val params: Model => Seq[Any] = row => { 
    Seq((row.value * 10), row.rowid) } 
 
  val jdbcActionStream = JDBCActionStream('h2, "update AQMRPT set total = ? where rowid = ?",params) 
      .setParallelism(2).setProcessOrder(false) 
  val jdbcActionFlow = jdbcActionStream.performOnRow 
 
  //update rows in h2 database from data in cassandra database 
  src.via(jdbcActionFlow).to(snk).run()

下面是本次示范的源代码:

build.sbt

name := "learn_cassandra" 
version := "0.1" 
scalaVersion := "2.12.4" 
libraryDependencies := Seq( 
"com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0", 
"com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0", 
"com.typesafe.akka" %% "akka-actor" % "2.5.4", 
"com.typesafe.akka" %% "akka-stream" % "2.5.4", 
"com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16", 
"org.scalikejdbc" %% "scalikejdbc"       % "3.2.1", 
"org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test", 
"org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1", 
"org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1", 
"org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1", 
"com.h2database"  %  "h2"                % "1.4.196", 
"mysql" % "mysql-connector-java" % "6.0.6", 
"org.postgresql" % "postgresql" % "42.2.0", 
"commons-dbcp" % "commons-dbcp" % "1.4", 
"org.apache.tomcat" % "tomcat-jdbc" % "9.0.2", 
"com.zaxxer" % "HikariCP" % "2.7.4", 
"com.jolbox" % "bonecp" % "0.8.0.RELEASE", 
"com.typesafe.slick" %% "slick" % "3.2.1", 
"ch.qos.logback"  %  "logback-classic"   % "1.2.3")

resources/application.conf

# JDBC settings 
test { 
db { 
h2 { 
driver = "org.h2.Driver" 
url = "jdbc:h2:tcp://localhost/~/slickdemo" 
user = "" 
password = "" 
poolInitialSize = 5 
poolMaxSize = 7 
poolConnectionTimeoutMillis = 1000 
poolValidationQuery = "select 1 as one" 
poolFactoryName = "commons-dbcp2" 
} 
} 
db.mysql.driver = "com.mysql.cj.jdbc.Driver" 
db.mysql.url = "jdbc:mysql://localhost:3306/testdb" 
db.mysql.user = "root" 
db.mysql.password = "123" 
db.mysql.poolInitialSize = 5 
db.mysql.poolMaxSize = 7 
db.mysql.poolConnectionTimeoutMillis = 1000 
db.mysql.poolValidationQuery = "select 1 as one" 
db.mysql.poolFactoryName = "bonecp" 
# scallikejdbc Global settings 
scalikejdbc.global.loggingSQLAndTime.enabled = true 
scalikejdbc.global.loggingSQLAndTime.logLevel = info 
scalikejdbc.global.loggingSQLAndTime.warningEnabled = true 
scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 
scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn 
scalikejdbc.global.loggingSQLAndTime.singleLineMode = false 
scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false 
scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 
} 
dev { 
db { 
h2 { 
driver = "org.h2.Driver" 
url = "jdbc:h2:tcp://localhost/~/slickdemo" 
user = "" 
password = "" 
poolFactoryName = "hikaricp" 
numThreads = 10 
maxConnections = 12 
minConnections = 4 
keepAliveConnection = true 
} 
mysql { 
driver = "com.mysql.cj.jdbc.Driver" 
url = "jdbc:mysql://localhost:3306/testdb" 
user = "root" 
password = "123" 
poolInitialSize = 5 
poolMaxSize = 7 
poolConnectionTimeoutMillis = 1000 
poolValidationQuery = "select 1 as one" 
poolFactoryName = "bonecp" 
} 
postgres { 
driver = "org.postgresql.Driver" 
url = "jdbc:postgresql://localhost:5432/testdb" 
user = "root" 
password = "123" 
poolFactoryName = "hikaricp" 
numThreads = 10 
maxConnections = 12 
minConnections = 4 
keepAliveConnection = true 
} 
} 
# scallikejdbc Global settings 
scalikejdbc.global.loggingSQLAndTime.enabled = true 
scalikejdbc.global.loggingSQLAndTime.logLevel = info 
scalikejdbc.global.loggingSQLAndTime.warningEnabled = true 
scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 
scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn 
scalikejdbc.global.loggingSQLAndTime.singleLineMode = false 
scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false 
scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 
}

CassandraEngine.scala

import com.datastax.driver.core._ 
import scala.concurrent._ 
import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture} 
import scala.collection.JavaConverters._ 
import scala.collection.generic.CanBuildFrom 
import scala.concurrent.duration.Duration 
import akka.NotUsed 
import akka.stream.alpakka.cassandra.scaladsl._ 
import akka.stream.scaladsl._ 
object CQLContext { 
// Consistency Levels 
type CONSISTENCY_LEVEL = Int 
val ANY: CONSISTENCY_LEVEL          =                                        0x0000 
val ONE: CONSISTENCY_LEVEL          =                                        0x0001 
val TWO: CONSISTENCY_LEVEL          =                                        0x0002 
val THREE: CONSISTENCY_LEVEL        =                                        0x0003 
val QUORUM : CONSISTENCY_LEVEL      =                                        0x0004 
val ALL: CONSISTENCY_LEVEL          =                                        0x0005 
val LOCAL_QUORUM: CONSISTENCY_LEVEL =                                        0x0006 
val EACH_QUORUM: CONSISTENCY_LEVEL  =                                        0x0007 
val LOCAL_ONE: CONSISTENCY_LEVEL    =                                      0x000A 
val LOCAL_SERIAL: CONSISTENCY_LEVEL =                                     0x000B 
val SERIAL: CONSISTENCY_LEVEL       =                                      0x000C 
def apply(): CQLContext = CQLContext(statements = Nil) 
def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => { 
consistency match { 
case ALL => ConsistencyLevel.ALL 
case ONE => ConsistencyLevel.ONE 
case TWO => ConsistencyLevel.TWO 
case THREE => ConsistencyLevel.THREE 
case ANY => ConsistencyLevel.ANY 
case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM 
case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE 
case QUORUM => ConsistencyLevel.QUORUM 
case SERIAL => ConsistencyLevel.SERIAL 
case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL 
} 
} 
} 
case class CQLQueryContext[M]( 
statement: String, 
extractor: Row => M, 
parameter: Seq[Object] = Nil, 
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None, 
fetchSize: Int = 100 
) { ctx => 
def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext[M] = 
ctx.copy(consistency = Some(_consistency)) 
def setFetchSize(pageSize: Int): CQLQueryContext[M] = 
ctx.copy(fetchSize = pageSize) 
} 
object CQLQueryContext { 
def apply[M](stmt: String, converter: Row => M): CQLQueryContext[M] = 
new CQLQueryContext[M](statement = stmt, extractor = converter) 
} 
case class CQLContext( 
statements: Seq[String], 
parameters: Seq[Seq[Object]] = Nil, 
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None 
) { ctx => 
def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext = 
ctx.copy(consistency = Some(_consistency)) 
def setCommand(_statement: String, _parameters: Object*): CQLContext = 
ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters)) 
def appendCommand(_statement: String, _parameters: Object*): CQLContext = 
ctx.copy(statements = ctx.statements :+ _statement, 
parameters = ctx.parameters ++ Seq(_parameters)) 
} 
object CQLEngine { 
import CQLContext._ 
import CQLHelpers._ 
def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)( 
implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= { 
val prepStmt = session.prepare(ctx.statement) 
var boundStmt =  prepStmt.bind() 
if (ctx.parameter != Nil) { 
val params = processParameters(ctx.parameter) 
boundStmt = prepStmt.bind(params:_*) 
} 
ctx.consistency.foreach {consistency => 
boundStmt.setConsistencyLevel(consistencyLevel(consistency))} 
val resultSet = session.execute(boundStmt.setFetchSize(pageSize)) 
(resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C]) 
} 
def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)( 
extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) = 
if (resultSet.isFullyFetched) { 
(resultSet, None) 
} else { 
try { 
val result = Await.result(resultSet.fetchMoreResults(), timeOut) 
(result, Some((result.asScala.view.map(extractor)).to[C])) 
} catch { case e: Throwable => (resultSet, None) } 
} 
def cqlExecute(ctx: CQLContext)( 
implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
if (ctx.statements.size == 1) 
cqlSingleUpdate(ctx) 
else 
cqlMultiUpdate(ctx) 
} 
def cqlSingleUpdate(ctx: CQLContext)( 
implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
val prepStmt = session.prepare(ctx.statements.head) 
var boundStmt =  prepStmt.bind() 
if (ctx.parameters != Nil) { 
val params = processParameters(ctx.parameters.head) 
boundStmt = prepStmt.bind(params:_*) 
} 
ctx.consistency.foreach {consistency => 
boundStmt.setConsistencyLevel(consistencyLevel(consistency))} 
session.executeAsync(boundStmt).map(_.wasApplied()) 
} 
def cqlMultiUpdate(ctx: CQLContext)( 
implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters 
var batch = new BatchStatement() 
commands.foreach { case (stm, params) => 
val prepStmt = session.prepare(stm) 
if (params == Nil) 
batch.add(prepStmt.bind()) 
else { 
val p = processParameters(params) 
batch.add(prepStmt.bind(p: _*)) 
} 
} 
ctx.consistency.foreach {consistency => 
batch.setConsistencyLevel(consistencyLevel(consistency))} 
session.executeAsync(batch).map(_.wasApplied()) 
} 
def cassandraStream[A](ctx: CQLQueryContext[A]) 
(implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = { 
val prepStmt = session.prepare(ctx.statement) 
var boundStmt =  prepStmt.bind() 
val params = processParameters(ctx.parameter) 
boundStmt = prepStmt.bind(params:_*) 
ctx.consistency.foreach {consistency => 
boundStmt.setConsistencyLevel(consistencyLevel(consistency))} 
CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(ctx.extractor) 
} 
case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true, 
statement: String, prepareParams: R => Seq[Object], 
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas => 
def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel) 
def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered) 
def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] = 
cas.copy(consistency = Some(_consistency)) 
private def perform(r: R)(implicit session: Session, ec: ExecutionContext) = { 
val prepStmt = session.prepare(statement) 
var boundStmt =  prepStmt.bind() 
val params = processParameters(prepareParams(r)) 
boundStmt = prepStmt.bind(params:_*) 
consistency.foreach { cons => 
boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons)) 
} 
session.executeAsync(boundStmt).map(_ => r) 
} 
def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = 
if (processInOrder) 
Flow[R].mapAsync(parallelism)(perform) 
else 
Flow[R].mapAsyncUnordered(parallelism)(perform) 
} 
object CassandraActionStream { 
def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] = 
new CassandraActionStream[R]( statement=_statement, prepareParams = params) 
} 
} 
object CQLHelpers { 
import java.nio.ByteBuffer 
import java.io._ 
import java.nio.file._ 
import com.datastax.driver.core.LocalDate 
import com.datastax.driver.extras.codecs.jdk8.InstantCodec 
import java.time.Instant 
import akka.stream.scaladsl._ 
import akka.stream._ 
implicit def listenableFutureToFuture[T]( 
listenableFuture: ListenableFuture[T]): Future[T] = { 
val promise = Promise[T]() 
Futures.addCallback(listenableFuture, new FutureCallback[T] { 
def onFailure(error: Throwable): Unit = { 
promise.failure(error) 
() 
} 
def onSuccess(result: T): Unit = { 
promise.success(result) 
() 
} 
}) 
promise.future 
} 
case class CQLDate(year: Int, month: Int, day: Int) 
case object CQLTodayDate 
case class CQLDateTime(year: Int, Month: Int, 
day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0) 
case object CQLDateTimeNow 
def processParameters(params: Seq[Object]): Seq[Object] = { 
import java.time.{Clock,ZoneId} 
params.map { obj => 
obj match { 
case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd) 
case CQLTodayDate => 
val today = java.time.LocalDate.now() 
LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth) 
case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS))) 
case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) => 
Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d") 
case p@_ => p 
} 
} 
} 
class ByteBufferInputStream(buf: ByteBuffer) extends InputStream { 
override def read: Int = { 
if (!buf.hasRemaining) return -1 
buf.get 
} 
override def read(bytes: Array[Byte], off: Int, len: Int): Int = { 
val length: Int = Math.min(len, buf.remaining) 
buf.get(bytes, off, length) 
length 
} 
} 
object ByteBufferInputStream { 
def apply(buf: ByteBuffer): ByteBufferInputStream = { 
new ByteBufferInputStream(buf) 
} 
} 
class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream { 
override def write(b: Int): Unit = { 
buf.put(b.toByte) 
} 
override def write(bytes: Array[Byte], off: Int, len: Int): Unit = { 
buf.put(bytes, off, len) 
} 
} 
object FixsizedByteBufferOutputStream { 
def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf) 
} 
class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream { 
private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR 
override def write(b: Array[Byte], off: Int, len: Int): Unit = { 
val position = buf.position 
val limit = buf.limit 
val newTotal: Long = position + len 
if(newTotal > limit){ 
var capacity = (buf.capacity * increasing) 
while(capacity <= newTotal){ 
capacity = (capacity*increasing) 
} 
increase(capacity.toInt) 
} 
buf.put(b, 0, len) 
} 
override def write(b: Int): Unit= { 
if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt) 
buf.put(b.toByte) 
} 
protected def increase(newCapacity: Int): Unit = { 
buf.limit(buf.position) 
buf.rewind 
val newBuffer = 
if (onHeap) ByteBuffer.allocate(newCapacity) 
else  ByteBuffer.allocateDirect(newCapacity) 
newBuffer.put(buf) 
buf.clear 
buf = newBuffer 
} 
def size: Long = buf.position 
def capacity: Long = buf.capacity 
def byteBuffer: ByteBuffer = buf 
} 
object ExpandingByteBufferOutputStream { 
val DEFAULT_INCREASING_FACTOR = 1.5f 
def apply(size: Int, increasingBy: Float, onHeap: Boolean) = { 
if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0") 
val buffer: ByteBuffer = 
if (onHeap) ByteBuffer.allocate(size) 
else ByteBuffer.allocateDirect(size) 
new ExpandingByteBufferOutputStream(buffer,onHeap) 
} 
def apply(size: Int): ExpandingByteBufferOutputStream = { 
apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false) 
} 
def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = { 
apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap) 
} 
def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = { 
apply(size, increasingBy, false) 
} 
} 
def cqlFileToBytes(fileName: String): ByteBuffer = { 
val fis = new FileInputStream(fileName) 
val b = new Array[Byte](fis.available + 1) 
val length = b.length 
fis.read(b) 
ByteBuffer.wrap(b) 
} 
def cqlBytesToFile(bytes: ByteBuffer, fileName: String)( 
implicit mat: Materializer): Future[IOResult] = { 
val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes)) 
source.runWith(FileIO.toPath(Paths.get(fileName))) 
} 
def cqlDateTimeString(date: java.util.Date, fmt: String): String = { 
val outputFormat = new java.text.SimpleDateFormat(fmt) 
outputFormat.format(date) 
} 
def useJava8DateTime(cluster: Cluster) = { 
//for jdk8 datetime format 
    cluster.getConfiguration().getCodecRegistry() 
.register(InstantCodec.instance) 
} 
}

JDBCEngine.scala

package jdbccontext 
import java.sql.PreparedStatement 
import scala.collection.generic.CanBuildFrom 
import akka.stream.scaladsl._ 
import scalikejdbc._ 
import scalikejdbc.streams._ 
import akka.NotUsed 
import scala.util._ 
import scalikejdbc.TxBoundary.Try._ 
import scala.concurrent.ExecutionContextExecutor 
object JDBCContext { 
type SQLTYPE = Int 
val SQL_EXEDDL= 1 
val SQL_UPDATE = 2 
val RETURN_GENERATED_KEYVALUE = true 
val RETURN_UPDATED_COUNT = false 
} 
case class JDBCQueryContext[M]( 
dbName: Symbol, 
statement: String, 
parameters: Seq[Any] = Nil, 
fetchSize: Int = 100, 
autoCommit: Boolean = false, 
queryTimeout: Option[Int] = None, 
extractor: WrappedResultSet => M) 
case class JDBCContext( 
dbName: Symbol, 
statements: Seq[String] = Nil, 
parameters: Seq[Seq[Any]] = Nil, 
fetchSize: Int = 100, 
queryTimeout: Option[Int] = None, 
queryTags: Seq[String] = Nil, 
sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE, 
batch: Boolean = false, 
returnGeneratedKey: Seq[Option[Any]] = Nil, 
// no return: None, return by index: Some(1), by name: Some("id") 
preAction: Option[PreparedStatement => Unit] = None, 
postAction: Option[PreparedStatement => Unit] = None) { 
ctx => 
//helper functions 
 
def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag) 
def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags) 
def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size) 
def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time) 
def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = { 
if (ctx.sqlType == JDBCContext.SQL_UPDATE && 
!ctx.batch && ctx.statements.size == 1) 
ctx.copy(preAction = action) 
else 
throw new IllegalStateException("JDBCContex setting error: preAction not supported!") 
} 
def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = { 
if (ctx.sqlType == JDBCContext.SQL_UPDATE && 
!ctx.batch && ctx.statements.size == 1) 
ctx.copy(postAction = action) 
else 
throw new IllegalStateException("JDBCContex setting error: preAction not supported!") 
} 
def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { 
if (ctx.sqlType == JDBCContext.SQL_EXEDDL) { 
ctx.copy( 
statements = ctx.statements ++ Seq(_statement), 
parameters = ctx.parameters ++ Seq(Seq(_parameters)) 
) 
} else 
throw new IllegalStateException("JDBCContex setting error: option not supported!") 
} 
def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { 
if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) { 
ctx.copy( 
statements = ctx.statements ++ Seq(_statement), 
parameters = ctx.parameters ++ Seq(_parameters), 
returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None)) 
) 
} else 
throw new IllegalStateException("JDBCContex setting error: option not supported!") 
} 
def appendBatchParameters(_parameters: Any*): JDBCContext = { 
if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) 
throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") 
var matchParams = true 
if (ctx.parameters != Nil) 
if (ctx.parameters.head.size != _parameters.size) 
matchParams = false 
if (matchParams) { 
ctx.copy( 
parameters = ctx.parameters ++ Seq(_parameters) 
) 
} else 
throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!") 
} 
def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = { 
if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) 
throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!") 
ctx.copy( 
returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil 
) 
} 
def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { 
ctx.copy( 
statements = Seq(_statement), 
parameters = Seq(_parameters), 
sqlType = JDBCContext.SQL_EXEDDL, 
batch = false 
) 
} 
def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { 
ctx.copy( 
statements = Seq(_statement), 
parameters = Seq(_parameters), 
returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None), 
sqlType = JDBCContext.SQL_UPDATE, 
batch = false 
) 
} 
def setBatchCommand(_statement: String): JDBCContext = { 
ctx.copy ( 
statements = Seq(_statement), 
sqlType = JDBCContext.SQL_UPDATE, 
batch = true 
) 
} 
} 
object JDBCEngine { 
import JDBCContext._ 
private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) => 
throw new IllegalStateException(message) 
} 
def jdbcAkkaStream[A](ctx: JDBCQueryContext[A]) 
(implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = { 
val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream { 
val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) 
ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) 
val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) 
sql.iterator 
.withDBSessionForceAdjuster(session => { 
session.connection.setAutoCommit(ctx.autoCommit) 
session.fetchSize(ctx.fetchSize) 
}) 
} 
Source.fromPublisher[A](publisher) 
} 
def jdbcQueryResult[C[_] <: TraversableOnce[_], A]( 
ctx: JDBCQueryContext[A])( 
implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { 
val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) 
ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) 
rawSql.fetchSize(ctx.fetchSize) 
implicit val session = NamedAutoSession(ctx.dbName) 
val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) 
sql.collection.apply[C]() 
} 
def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = { 
if (ctx.sqlType != SQL_EXEDDL) { 
Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")) 
} 
else { 
NamedDB(ctx.dbName) localTx { implicit session => 
Try { 
ctx.statements.foreach { stm => 
val ddl = new SQLExecution(statement = stm, parameters = Nil)( 
before = WrappedResultSet => {})( 
after = WrappedResultSet => {}) 
ddl.apply() 
} 
"SQL_EXEDDL executed succesfully." 
} 
} 
} 
} 
def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
if (ctx.statements == Nil) 
throw new IllegalStateException("JDBCContex setting error: statements empty!") 
if (ctx.sqlType != SQL_UPDATE) { 
Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) 
} 
else { 
if (ctx.batch) { 
if (noReturnKey(ctx)) { 
val usql = SQL(ctx.statements.head) 
.tags(ctx.queryTags: _*) 
.batch(ctx.parameters: _*) 
Try { 
NamedDB(ctx.dbName) localTx { implicit session => 
ctx.queryTimeout.foreach(session.queryTimeout(_)) 
usql.apply[Seq]() 
Seq.empty[Long].to[C] 
} 
} 
} else { 
val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None) 
Try { 
NamedDB(ctx.dbName) localTx { implicit session => 
ctx.queryTimeout.foreach(session.queryTimeout(_)) 
usql.apply[C]() 
} 
} 
} 
} else { 
Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !")) 
} 
} 
} 
private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
val Some(key) :: xs = ctx.returnGeneratedKey 
val params: Seq[Any] = ctx.parameters match { 
case Nil => Nil 
case p@_ => p.head 
} 
val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key) 
Try { 
NamedDB(ctx.dbName) localTx { implicit session => 
session.fetchSize(ctx.fetchSize) 
ctx.queryTimeout.foreach(session.queryTimeout(_)) 
val result = usql.apply() 
Seq(result).to[C] 
} 
} 
} 
private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
val params: Seq[Any] = ctx.parameters match { 
case Nil => Nil 
case p@_ => p.head 
} 
val before = ctx.preAction match { 
case None => pstm: PreparedStatement => {} 
case Some(f) => f 
} 
val after = ctx.postAction match { 
case None => pstm: PreparedStatement => {} 
case Some(f) => f 
} 
val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after) 
Try { 
NamedDB(ctx.dbName) localTx {implicit session => 
session.fetchSize(ctx.fetchSize) 
ctx.queryTimeout.foreach(session.queryTimeout(_)) 
val result = usql.apply() 
Seq(result.toLong).to[C] 
} 
} 
} 
private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
if (noReturnKey(ctx)) 
singleTxUpdateNoReturnKey(ctx) 
else 
singleTxUpdateWithReturnKey(ctx) 
} 
private def noReturnKey(ctx: JDBCContext): Boolean = { 
if (ctx.returnGeneratedKey != Nil) { 
val k :: xs = ctx.returnGeneratedKey 
k match { 
case None => true 
case Some(k) => false 
} 
} else true 
} 
def noActon: PreparedStatement=>Unit = pstm => {} 
def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
Try { 
NamedDB(ctx.dbName) localTx { implicit session => 
session.fetchSize(ctx.fetchSize) 
ctx.queryTimeout.foreach(session.queryTimeout(_)) 
val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match { 
case Nil => Seq.fill(ctx.statements.size)(None) 
case k@_ => k 
} 
val sqlcmd = ctx.statements zip ctx.parameters zip keys 
val results = sqlcmd.map { case ((stm, param), key) => 
key match { 
case None => 
new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong 
case Some(k) => 
new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong 
} 
} 
results.to[C] 
} 
} 
} 
def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
if (ctx.statements == Nil) 
throw new IllegalStateException("JDBCContex setting error: statements empty!") 
if (ctx.sqlType != SQL_UPDATE) { 
Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) 
} 
else { 
if (!ctx.batch) { 
if (ctx.statements.size == 1) 
singleTxUpdate(ctx) 
else 
multiTxUpdates(ctx) 
} else 
Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !")) 
} 
} 
case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true, 
statement: String, prepareParams: R => Seq[Any]) {jas => 
def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName=db) 
def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism=parLevel) 
def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered) 
private def perform(r: R) = { 
import scala.concurrent._ 
val params = prepareParams(r) 
NamedDB(dbName) autoCommit { session => 
session.execute(statement,params: _*) 
} 
Future.successful(r) 
} 
def performOnRow(implicit session: DBSession): Flow[R, R, NotUsed] = 
if (processInOrder) 
Flow[R].mapAsync(parallelism)(perform) 
else 
Flow[R].mapAsyncUnordered(parallelism)(perform) 
} 
object JDBCActionStream { 
def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] = 
new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params) 
} 
}

HikariConfig.scala

package configdbs 
import scala.collection.mutable 
import scala.concurrent.duration.Duration 
import scala.language.implicitConversions 
import com.typesafe.config._ 
import java.util.concurrent.TimeUnit 
import java.util.Properties 
import scalikejdbc.config._ 
import com.typesafe.config.Config 
import com.zaxxer.hikari._ 
import scalikejdbc.ConnectionPoolFactoryRepository 
/** Extension methods to make Typesafe Config easier to use */ 
class ConfigExtensionMethods(val c: Config) extends AnyVal { 
import scala.collection.JavaConverters._ 
def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default 
def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default 
def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default 
def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default 
def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default 
def getDurationOr(path: String, default: => Duration = Duration.Zero) = 
if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default 
def getPropertiesOr(path: String, default: => Properties = null): Properties = 
if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default 
def toProperties: Properties = { 
def toProps(m: mutable.Map[String, ConfigValue]): Properties = { 
val props = new Properties(null) 
m.foreach { case (k, cv) => 
val v = 
if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala) 
else if(cv.unwrapped eq null) null 
else cv.unwrapped.toString 
if(v ne null) props.put(k, v) 
} 
props 
} 
toProps(c.root.asScala) 
} 
def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None 
def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None 
def getStringOpt(path: String) = Option(getStringOr(path)) 
def getPropertiesOpt(path: String) = Option(getPropertiesOr(path)) 
} 
object ConfigExtensionMethods { 
@inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c) 
} 
trait HikariConfigReader extends TypesafeConfigReader { 
self: TypesafeConfig =>      // with TypesafeConfigReader => //NoEnvPrefix => 
 
import ConfigExtensionMethods.configExtensionMethods 
def getFactoryName(dbName: Symbol): String = { 
val c: Config = config.getConfig(envPrefix + "db." + dbName.name) 
c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP) 
} 
def hikariCPConfig(dbName: Symbol): HikariConfig = { 
val hconf = new HikariConfig() 
val c: Config = config.getConfig(envPrefix + "db." + dbName.name) 
// Connection settings 
if (c.hasPath("dataSourceClass")) { 
hconf.setDataSourceClassName(c.getString("dataSourceClass")) 
} else { 
Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _) 
} 
hconf.setJdbcUrl(c.getStringOr("url", null)) 
c.getStringOpt("user").foreach(hconf.setUsername) 
c.getStringOpt("password").foreach(hconf.setPassword) 
c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties) 
// Pool configuration 
hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000)) 
hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000)) 
hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000)) 
hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000)) 
hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0)) 
hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false)) 
c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery) 
c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql) 
val numThreads = c.getIntOr("numThreads", 20) 
hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5)) 
hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads)) 
hconf.setPoolName(c.getStringOr("poolName", dbName.name)) 
hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false)) 
// Equivalent of ConnectionPreparer 
hconf.setReadOnly(c.getBooleanOr("readOnly", false)) 
c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation) 
hconf.setCatalog(c.getStringOr("catalog", null)) 
hconf 
} 
} 
import scalikejdbc._ 
trait ConfigDBs { 
self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader => 
def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { 
getFactoryName(dbName) match { 
case "hikaricp" => { 
val hconf = hikariCPConfig(dbName) 
val hikariCPSource = new HikariDataSource(hconf) 
if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) { 
Class.forName(hconf.getDriverClassName) 
} 
ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource)) 
} 
case _ => { 
val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName) 
val cpSettings = readConnectionPoolSettings(dbName) 
if (driver != null && driver.trim.nonEmpty) { 
Class.forName(driver) 
} 
ConnectionPool.add(dbName, url, user, password, cpSettings) 
} 
} 
} 
def setupAll(): Unit = { 
loadGlobalSettings() 
dbNames.foreach { dbName => setup(Symbol(dbName)) } 
} 
def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { 
ConnectionPool.close(dbName) 
} 
def closeAll(): Unit = { 
ConnectionPool.closeAll 
} 
} 
object ConfigDBs extends ConfigDBs 
with TypesafeConfigReader 
with StandardTypesafeConfig 
with HikariConfigReader 
case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs 
with TypesafeConfigReader 
with StandardTypesafeConfig 
with HikariConfigReader 
with EnvPrefix { 
override val env = Option(envValue) 
}

StreamDemo.scala

import com.datastax.driver.core._ 
import akka._ 
import CQLEngine._ 
import CQLHelpers._ 
import akka.actor._ 
import akka.stream.scaladsl._ 
import akka.stream._ 
import scalikejdbc._ 
import configdbs._ 
import jdbccontext._ 
import JDBCEngine._ 
import scala.util._ 
object cassandraStreamDemo extends App { 
//#init-mat 
implicit val cqlsys = ActorSystem("cqlSystem") 
implicit val mat = ActorMaterializer() 
implicit val ec = cqlsys.dispatcher 
val cluster = new Cluster 
.Builder() 
.addContactPoints("localhost") 
.withPort(9042) 
.build() 
useJava8DateTime(cluster) 
implicit val session = cluster.connect() 
//config jdbc drivers 
ConfigDBsWithEnv("dev").setup('h2) 
ConfigDBsWithEnv("dev").loadGlobalSettings() 
val cqlCreate = 
""" 
      |create table testdb.AQMRPT( 
|ROWID BIGINT PRIMARY KEY, 
|MEASUREID BIGINT, 
|STATENAME TEXT, 
|COUNTYNAME TEXT, 
|REPORTYEAR INT, 
|VALUE INT, 
|CREATED TIMESTAMP) 
""".stripMargin 
val ctxCreate = CQLContext().setCommand(cqlCreate) 
cqlExecute(ctxCreate).onComplete{ 
case Success(s) => println("schema created successfully!") 
case Failure(e) => println(e.getMessage) 
} 
scala.io.StdIn.readLine() 
case class DataRow ( 
rowid: Long, 
measureid: Long, 
state: String, 
county: String, 
year: Int, 
value: Int 
) 
val toRow: WrappedResultSet => DataRow = rs => DataRow( 
rowid = rs.long("ROWID"), 
measureid = rs.long("MEASUREID"), 
state = rs.string("STATENAME"), 
county = rs.string("COUNTYNAME"), 
year = rs.int("REPORTYEAR"), 
value = rs.int("VALUE") 
) 
//construct the context 
val ctx = JDBCQueryContext[DataRow]( 
dbName = 'h2, 
statement = "select * from AQMRPT", 
extractor = toRow 
) 
//source from h2 database 
val jdbcSource = jdbcAkkaStream(ctx) 
//insert into cassandra database 
val cqlInsert = "insert into testdb.AQMRPT(ROWID,MEASUREID,STATENAME," + 
"COUNTYNAME,REPORTYEAR,VALUE,CREATED) VALUES(?,?,?,?,?,?,?)" 
val toPparams: DataRow => Seq[Object] = r => { 
Seq[Object](r.rowid.asInstanceOf[Object], 
r.measureid.asInstanceOf[Object], 
r.state, 
r.county, 
r.year.asInstanceOf[Object], 
r.value.asInstanceOf[Object], 
CQLDateTimeNow 
) 
} 
val actionStream = CassandraActionStream(cqlInsert,toPparams).setParallelism(2) 
.setProcessOrder(false) 
val actionFlow: Flow[DataRow,DataRow,NotUsed] = actionStream.performOnRow 
val sink = Sink.foreach[DataRow]{ r => 
println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") 
} 
val sts = jdbcSource.take(100).via(actionFlow).to(sink).run() 
case class Model ( 
rowid: Long, 
measureid: Long, 
state: String, 
county: String, 
year: Int, 
value: Int, 
createdAt: java.util.Date 
) 
//data row converter 
val toModel = (rs: Row) => Model( 
rowid = rs.getLong("ROWID"), 
measureid = rs.getLong("MEASUREID"), 
state = rs.getString("STATENAME"), 
county = rs.getString("COUNTYNAME"), 
year = rs.getInt("REPORTYEAR"), 
value = rs.getInt("VALUE"), 
createdAt = rs.getTimestamp("CREATED") 
) 
//setup context 
val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",toModel) 
//construct source 
val src = cassandraStream(cqlCtx) 
//a display sink 
val snk = Sink.foreach[Model]{ r => 
println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") 
} 
//run on source 
  src.to(snk).run() 
val params: Model => Seq[Any] = row => { 
Seq((row.value * 10), row.rowid) } 
val jdbcActionStream = JDBCActionStream('h2, "update AQMRPT set total = ? where rowid = ?",params) 
.setParallelism(2).setProcessOrder(false) 
val jdbcActionFlow = jdbcActionStream.performOnRow 
//update rows in h2 database from data in cassandra database 
  src.via(jdbcActionFlow).to(snk).run() 
scala.io.StdIn.readLine() 
session.close() 
cluster.close() 
cqlsys.terminate() 
}

 

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/12810.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论