akka在alpakka工具包里提供了对cassandra数据库的streaming功能。简单来讲就是用一个CQL-statement读取cassandra数据并产生akka-stream的Source。这是一个支持reactive-stream协议的流:
object CassandraSource {
/**
* Scala API: creates a [[CassandraSourceStage]] from a given statement.
*/
def apply(stmt: Statement)(implicit session: Session): Source[Row, NotUsed] =
Source.fromGraph(new CassandraSourceStage(Future.successful(stmt), session))
/**
* Scala API: creates a [[CassandraSourceStage]] from the result of a given Future.
*/
def fromFuture(futStmt: Future[Statement])(implicit session: Session): Source[Row, NotUsed] =
Source.fromGraph(new CassandraSourceStage(futStmt, session))
}
CassandraSource.apply构建Source[Row,NotUsed]。可以直接接通Flow[Row,Row,NotUsed]和Sink来使用。我们是通过CQLQueryContext来构建这个Source的:
def cassandraStream[A](ctx: CQLQueryContext[A])
(implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = {
val prepStmt = session.prepare(ctx.statement)
var boundStmt = prepStmt.bind()
val params = processParameters(ctx.parameter)
boundStmt = prepStmt.bind(params:_*)
ctx.consistency.foreach {consistency =>
boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(ctx.extractor)
}
CQLQueryContext[A]在上期介绍过:
case class CQLQueryContext[M](
statement: String,
extractor: Row => M,
parameter: Seq[Object] = Nil,
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
fetchSize: Int = 100
) { ctx =>
def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext[M] =
ctx.copy(consistency = Some(_consistency))
def setFetchSize(pageSize: Int): CQLQueryContext[M] =
ctx.copy(fetchSize = pageSize)
}
object CQLQueryContext {
def apply[M](stmt: String, converter: Row => M): CQLQueryContext[M] =
new CQLQueryContext[M](statement = stmt, extractor = converter)
}
下面是一个流的构建和使用示范:
case class Model (
rowid: Long,
measureid: Long,
state: String,
county: String,
year: Int,
value: Int,
createdAt: java.util.Date
)
//data row converter
val toModel = (rs: Row) => Model(
rowid = rs.getLong("ROWID"),
measureid = rs.getLong("MEASUREID"),
state = rs.getString("STATENAME"),
county = rs.getString("COUNTYNAME"),
year = rs.getInt("REPORTYEAR"),
value = rs.getInt("VALUE"),
createdAt = rs.getTimestamp("CREATED")
)
//setup context
val ctx = CQLQueryContext("select * from testdb.aqmrpt",toModel)
//construct source
val src = cassandraStream(ctx)
//a display sink
val snk = Sink.foreach[Model]{ r =>
println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")
}
//run on source
src.to(snk).run()
除了通过读取数据构成stream source之外,我们还可以以流元素为数据进行数据库更新操作,因为我们可以用map来运行execute:
case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true,
statement: String, prepareParams: R => Seq[Object],
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas =>
def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel)
def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered)
def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] =
cas.copy(consistency = Some(_consistency))
def perform(r: R)(implicit session: Session, ec: ExecutionContext) = {
val prepStmt = session.prepare(statement)
var boundStmt = prepStmt.bind()
val params = processParameters(prepareParams(r))
boundStmt = prepStmt.bind(params:_*)
consistency.foreach { cons =>
boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons))
}
session.executeAsync(boundStmt).map(_ => r)
}
def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] =
if (processInOrder)
Flow[R].mapAsync(parallelism)(perform)
else
Flow[R].mapAsyncUnordered(parallelism)(perform)
}
object CassandraActionStream {
def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] =
new CassandraActionStream[R]( statement=_statement, prepareParams = params)
}
CassandraActionStream可以用statement,params构建。它的一个函数performOnRow是一个Flow[R,R,NotUsed],可以把每个R转换成一条CQL后通过map来运行executeAsyn,造成一种批次运算效果。下面是CassandraActionStream的使用示范:
//pass context to construct akka-source
val jdbcSource = jdbcAkkaStream(ctx)
val cqlInsert = "insert into testdb.AQMRPT(ROWID,MEASUREID,STATENAME," +
"COUNTYNAME,REPORTYEAR,VALUE,CREATED) VALUES(?,?,?,?,?,?,?)"
val toPparams: DataRow => Seq[Object] = r => {
Seq[Object](r.rowid.asInstanceOf[Object],
r.measureid.asInstanceOf[Object],
r.state,
r.county,
r.year.asInstanceOf[Object],
r.value.asInstanceOf[Object],
CQLDateTimeNow
)
}
val actionStream = CassandraActionStream(cqlInsert,toPparams).setParallelism(2)
.setProcessOrder(false)
val actionFlow: Flow[DataRow,DataRow,NotUsed] = actionStream.performOnRow
val sink = Sink.foreach[DataRow]{ r =>
println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")
}
val sts = jdbcSource.take(100).via(actionFlow).to(sink).run()
下面的例子里我们用CassandraStream的流元素更新h2数据库中的数据,调用了JDBCActionStream:
//data row converter
val toModel = (rs: Row) => Model(
rowid = rs.getLong("ROWID"),
measureid = rs.getLong("MEASUREID"),
state = rs.getString("STATENAME"),
county = rs.getString("COUNTYNAME"),
year = rs.getInt("REPORTYEAR"),
value = rs.getInt("VALUE"),
createdAt = rs.getTimestamp("CREATED")
)
//setup context
val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",toModel)
//construct source
val src = cassandraStream(cqlCtx)
//a display sink
val snk = Sink.foreach[Model]{ r =>
println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")
}
//run on source
src.to(snk).run()
val params: Model => Seq[Any] = row => {
Seq((row.value * 10), row.rowid) }
val jdbcActionStream = JDBCActionStream('h2, "update AQMRPT set total = ? where rowid = ?",params)
.setParallelism(2).setProcessOrder(false)
val jdbcActionFlow = jdbcActionStream.performOnRow
//update rows in h2 database from data in cassandra database
src.via(jdbcActionFlow).to(snk).run()
下面是本次示范的源代码:
build.sbt
name := "learn_cassandra"
version := "0.1"
scalaVersion := "2.12.4"
libraryDependencies := Seq(
"com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0",
"com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0",
"com.typesafe.akka" %% "akka-actor" % "2.5.4",
"com.typesafe.akka" %% "akka-stream" % "2.5.4",
"com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16",
"org.scalikejdbc" %% "scalikejdbc" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-test" % "3.2.1" % "test",
"org.scalikejdbc" %% "scalikejdbc-config" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
"com.h2database" % "h2" % "1.4.196",
"mysql" % "mysql-connector-java" % "6.0.6",
"org.postgresql" % "postgresql" % "42.2.0",
"commons-dbcp" % "commons-dbcp" % "1.4",
"org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
"com.zaxxer" % "HikariCP" % "2.7.4",
"com.jolbox" % "bonecp" % "0.8.0.RELEASE",
"com.typesafe.slick" %% "slick" % "3.2.1",
"ch.qos.logback" % "logback-classic" % "1.2.3")
resources/application.conf
# JDBC settings
test {
db {
h2 {
driver = "org.h2.Driver"
url = "jdbc:h2:tcp://localhost/~/slickdemo"
user = ""
password = ""
poolInitialSize = 5
poolMaxSize = 7
poolConnectionTimeoutMillis = 1000
poolValidationQuery = "select 1 as one"
poolFactoryName = "commons-dbcp2"
}
}
db.mysql.driver = "com.mysql.cj.jdbc.Driver"
db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
db.mysql.user = "root"
db.mysql.password = "123"
db.mysql.poolInitialSize = 5
db.mysql.poolMaxSize = 7
db.mysql.poolConnectionTimeoutMillis = 1000
db.mysql.poolValidationQuery = "select 1 as one"
db.mysql.poolFactoryName = "bonecp"
# scallikejdbc Global settings
scalikejdbc.global.loggingSQLAndTime.enabled = true
scalikejdbc.global.loggingSQLAndTime.logLevel = info
scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}
dev {
db {
h2 {
driver = "org.h2.Driver"
url = "jdbc:h2:tcp://localhost/~/slickdemo"
user = ""
password = ""
poolFactoryName = "hikaricp"
numThreads = 10
maxConnections = 12
minConnections = 4
keepAliveConnection = true
}
mysql {
driver = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/testdb"
user = "root"
password = "123"
poolInitialSize = 5
poolMaxSize = 7
poolConnectionTimeoutMillis = 1000
poolValidationQuery = "select 1 as one"
poolFactoryName = "bonecp"
}
postgres {
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://localhost:5432/testdb"
user = "root"
password = "123"
poolFactoryName = "hikaricp"
numThreads = 10
maxConnections = 12
minConnections = 4
keepAliveConnection = true
}
}
# scallikejdbc Global settings
scalikejdbc.global.loggingSQLAndTime.enabled = true
scalikejdbc.global.loggingSQLAndTime.logLevel = info
scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}
CassandraEngine.scala
import com.datastax.driver.core._
import scala.concurrent._
import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}
import scala.collection.JavaConverters._
import scala.collection.generic.CanBuildFrom
import scala.concurrent.duration.Duration
import akka.NotUsed
import akka.stream.alpakka.cassandra.scaladsl._
import akka.stream.scaladsl._
object CQLContext {
// Consistency Levels
type CONSISTENCY_LEVEL = Int
val ANY: CONSISTENCY_LEVEL = 0x0000
val ONE: CONSISTENCY_LEVEL = 0x0001
val TWO: CONSISTENCY_LEVEL = 0x0002
val THREE: CONSISTENCY_LEVEL = 0x0003
val QUORUM : CONSISTENCY_LEVEL = 0x0004
val ALL: CONSISTENCY_LEVEL = 0x0005
val LOCAL_QUORUM: CONSISTENCY_LEVEL = 0x0006
val EACH_QUORUM: CONSISTENCY_LEVEL = 0x0007
val LOCAL_ONE: CONSISTENCY_LEVEL = 0x000A
val LOCAL_SERIAL: CONSISTENCY_LEVEL = 0x000B
val SERIAL: CONSISTENCY_LEVEL = 0x000C
def apply(): CQLContext = CQLContext(statements = Nil)
def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
consistency match {
case ALL => ConsistencyLevel.ALL
case ONE => ConsistencyLevel.ONE
case TWO => ConsistencyLevel.TWO
case THREE => ConsistencyLevel.THREE
case ANY => ConsistencyLevel.ANY
case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
case QUORUM => ConsistencyLevel.QUORUM
case SERIAL => ConsistencyLevel.SERIAL
case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL
}
}
}
case class CQLQueryContext[M](
statement: String,
extractor: Row => M,
parameter: Seq[Object] = Nil,
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
fetchSize: Int = 100
) { ctx =>
def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext[M] =
ctx.copy(consistency = Some(_consistency))
def setFetchSize(pageSize: Int): CQLQueryContext[M] =
ctx.copy(fetchSize = pageSize)
}
object CQLQueryContext {
def apply[M](stmt: String, converter: Row => M): CQLQueryContext[M] =
new CQLQueryContext[M](statement = stmt, extractor = converter)
}
case class CQLContext(
statements: Seq[String],
parameters: Seq[Seq[Object]] = Nil,
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None
) { ctx =>
def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
ctx.copy(consistency = Some(_consistency))
def setCommand(_statement: String, _parameters: Object*): CQLContext =
ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
def appendCommand(_statement: String, _parameters: Object*): CQLContext =
ctx.copy(statements = ctx.statements :+ _statement,
parameters = ctx.parameters ++ Seq(_parameters))
}
object CQLEngine {
import CQLContext._
import CQLHelpers._
def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)(
implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= {
val prepStmt = session.prepare(ctx.statement)
var boundStmt = prepStmt.bind()
if (ctx.parameter != Nil) {
val params = processParameters(ctx.parameter)
boundStmt = prepStmt.bind(params:_*)
}
ctx.consistency.foreach {consistency =>
boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
(resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C])
}
def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
if (resultSet.isFullyFetched) {
(resultSet, None)
} else {
try {
val result = Await.result(resultSet.fetchMoreResults(), timeOut)
(result, Some((result.asScala.view.map(extractor)).to[C]))
} catch { case e: Throwable => (resultSet, None) }
}
def cqlExecute(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
if (ctx.statements.size == 1)
cqlSingleUpdate(ctx)
else
cqlMultiUpdate(ctx)
}
def cqlSingleUpdate(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
val prepStmt = session.prepare(ctx.statements.head)
var boundStmt = prepStmt.bind()
if (ctx.parameters != Nil) {
val params = processParameters(ctx.parameters.head)
boundStmt = prepStmt.bind(params:_*)
}
ctx.consistency.foreach {consistency =>
boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
session.executeAsync(boundStmt).map(_.wasApplied())
}
def cqlMultiUpdate(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters
var batch = new BatchStatement()
commands.foreach { case (stm, params) =>
val prepStmt = session.prepare(stm)
if (params == Nil)
batch.add(prepStmt.bind())
else {
val p = processParameters(params)
batch.add(prepStmt.bind(p: _*))
}
}
ctx.consistency.foreach {consistency =>
batch.setConsistencyLevel(consistencyLevel(consistency))}
session.executeAsync(batch).map(_.wasApplied())
}
def cassandraStream[A](ctx: CQLQueryContext[A])
(implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = {
val prepStmt = session.prepare(ctx.statement)
var boundStmt = prepStmt.bind()
val params = processParameters(ctx.parameter)
boundStmt = prepStmt.bind(params:_*)
ctx.consistency.foreach {consistency =>
boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(ctx.extractor)
}
case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true,
statement: String, prepareParams: R => Seq[Object],
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas =>
def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel)
def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered)
def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] =
cas.copy(consistency = Some(_consistency))
private def perform(r: R)(implicit session: Session, ec: ExecutionContext) = {
val prepStmt = session.prepare(statement)
var boundStmt = prepStmt.bind()
val params = processParameters(prepareParams(r))
boundStmt = prepStmt.bind(params:_*)
consistency.foreach { cons =>
boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons))
}
session.executeAsync(boundStmt).map(_ => r)
}
def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] =
if (processInOrder)
Flow[R].mapAsync(parallelism)(perform)
else
Flow[R].mapAsyncUnordered(parallelism)(perform)
}
object CassandraActionStream {
def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] =
new CassandraActionStream[R]( statement=_statement, prepareParams = params)
}
}
object CQLHelpers {
import java.nio.ByteBuffer
import java.io._
import java.nio.file._
import com.datastax.driver.core.LocalDate
import com.datastax.driver.extras.codecs.jdk8.InstantCodec
import java.time.Instant
import akka.stream.scaladsl._
import akka.stream._
implicit def listenableFutureToFuture[T](
listenableFuture: ListenableFuture[T]): Future[T] = {
val promise = Promise[T]()
Futures.addCallback(listenableFuture, new FutureCallback[T] {
def onFailure(error: Throwable): Unit = {
promise.failure(error)
()
}
def onSuccess(result: T): Unit = {
promise.success(result)
()
}
})
promise.future
}
case class CQLDate(year: Int, month: Int, day: Int)
case object CQLTodayDate
case class CQLDateTime(year: Int, Month: Int,
day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0)
case object CQLDateTimeNow
def processParameters(params: Seq[Object]): Seq[Object] = {
import java.time.{Clock,ZoneId}
params.map { obj =>
obj match {
case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
case CQLTodayDate =>
val today = java.time.LocalDate.now()
LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS)))
case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
case p@_ => p
}
}
}
class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
override def read: Int = {
if (!buf.hasRemaining) return -1
buf.get
}
override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
val length: Int = Math.min(len, buf.remaining)
buf.get(bytes, off, length)
length
}
}
object ByteBufferInputStream {
def apply(buf: ByteBuffer): ByteBufferInputStream = {
new ByteBufferInputStream(buf)
}
}
class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream {
override def write(b: Int): Unit = {
buf.put(b.toByte)
}
override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
buf.put(bytes, off, len)
}
}
object FixsizedByteBufferOutputStream {
def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf)
}
class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream {
private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
val position = buf.position
val limit = buf.limit
val newTotal: Long = position + len
if(newTotal > limit){
var capacity = (buf.capacity * increasing)
while(capacity <= newTotal){
capacity = (capacity*increasing)
}
increase(capacity.toInt)
}
buf.put(b, 0, len)
}
override def write(b: Int): Unit= {
if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt)
buf.put(b.toByte)
}
protected def increase(newCapacity: Int): Unit = {
buf.limit(buf.position)
buf.rewind
val newBuffer =
if (onHeap) ByteBuffer.allocate(newCapacity)
else ByteBuffer.allocateDirect(newCapacity)
newBuffer.put(buf)
buf.clear
buf = newBuffer
}
def size: Long = buf.position
def capacity: Long = buf.capacity
def byteBuffer: ByteBuffer = buf
}
object ExpandingByteBufferOutputStream {
val DEFAULT_INCREASING_FACTOR = 1.5f
def apply(size: Int, increasingBy: Float, onHeap: Boolean) = {
if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0")
val buffer: ByteBuffer =
if (onHeap) ByteBuffer.allocate(size)
else ByteBuffer.allocateDirect(size)
new ExpandingByteBufferOutputStream(buffer,onHeap)
}
def apply(size: Int): ExpandingByteBufferOutputStream = {
apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false)
}
def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = {
apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap)
}
def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = {
apply(size, increasingBy, false)
}
}
def cqlFileToBytes(fileName: String): ByteBuffer = {
val fis = new FileInputStream(fileName)
val b = new Array[Byte](fis.available + 1)
val length = b.length
fis.read(b)
ByteBuffer.wrap(b)
}
def cqlBytesToFile(bytes: ByteBuffer, fileName: String)(
implicit mat: Materializer): Future[IOResult] = {
val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
source.runWith(FileIO.toPath(Paths.get(fileName)))
}
def cqlDateTimeString(date: java.util.Date, fmt: String): String = {
val outputFormat = new java.text.SimpleDateFormat(fmt)
outputFormat.format(date)
}
def useJava8DateTime(cluster: Cluster) = {
//for jdk8 datetime format
cluster.getConfiguration().getCodecRegistry()
.register(InstantCodec.instance)
}
}
JDBCEngine.scala
package jdbccontext
import java.sql.PreparedStatement
import scala.collection.generic.CanBuildFrom
import akka.stream.scaladsl._
import scalikejdbc._
import scalikejdbc.streams._
import akka.NotUsed
import scala.util._
import scalikejdbc.TxBoundary.Try._
import scala.concurrent.ExecutionContextExecutor
object JDBCContext {
type SQLTYPE = Int
val SQL_EXEDDL= 1
val SQL_UPDATE = 2
val RETURN_GENERATED_KEYVALUE = true
val RETURN_UPDATED_COUNT = false
}
case class JDBCQueryContext[M](
dbName: Symbol,
statement: String,
parameters: Seq[Any] = Nil,
fetchSize: Int = 100,
autoCommit: Boolean = false,
queryTimeout: Option[Int] = None,
extractor: WrappedResultSet => M)
case class JDBCContext(
dbName: Symbol,
statements: Seq[String] = Nil,
parameters: Seq[Seq[Any]] = Nil,
fetchSize: Int = 100,
queryTimeout: Option[Int] = None,
queryTags: Seq[String] = Nil,
sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,
batch: Boolean = false,
returnGeneratedKey: Seq[Option[Any]] = Nil,
// no return: None, return by index: Some(1), by name: Some("id")
preAction: Option[PreparedStatement => Unit] = None,
postAction: Option[PreparedStatement => Unit] = None) {
ctx =>
//helper functions
def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)
def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)
def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)
def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)
def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
!ctx.batch && ctx.statements.size == 1)
ctx.copy(preAction = action)
else
throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
}
def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
!ctx.batch && ctx.statements.size == 1)
ctx.copy(postAction = action)
else
throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
}
def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
ctx.copy(
statements = ctx.statements ++ Seq(_statement),
parameters = ctx.parameters ++ Seq(Seq(_parameters))
)
} else
throw new IllegalStateException("JDBCContex setting error: option not supported!")
}
def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
ctx.copy(
statements = ctx.statements ++ Seq(_statement),
parameters = ctx.parameters ++ Seq(_parameters),
returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None))
)
} else
throw new IllegalStateException("JDBCContex setting error: option not supported!")
}
def appendBatchParameters(_parameters: Any*): JDBCContext = {
if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")
var matchParams = true
if (ctx.parameters != Nil)
if (ctx.parameters.head.size != _parameters.size)
matchParams = false
if (matchParams) {
ctx.copy(
parameters = ctx.parameters ++ Seq(_parameters)
)
} else
throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")
}
def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
ctx.copy(
returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil
)
}
def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
ctx.copy(
statements = Seq(_statement),
parameters = Seq(_parameters),
sqlType = JDBCContext.SQL_EXEDDL,
batch = false
)
}
def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
ctx.copy(
statements = Seq(_statement),
parameters = Seq(_parameters),
returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),
sqlType = JDBCContext.SQL_UPDATE,
batch = false
)
}
def setBatchCommand(_statement: String): JDBCContext = {
ctx.copy (
statements = Seq(_statement),
sqlType = JDBCContext.SQL_UPDATE,
batch = true
)
}
}
object JDBCEngine {
import JDBCContext._
private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
throw new IllegalStateException(message)
}
def jdbcAkkaStream[A](ctx: JDBCQueryContext[A])
(implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {
val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream {
val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor)
sql.iterator
.withDBSessionForceAdjuster(session => {
session.connection.setAutoCommit(ctx.autoCommit)
session.fetchSize(ctx.fetchSize)
})
}
Source.fromPublisher[A](publisher)
}
def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
ctx: JDBCQueryContext[A])(
implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
rawSql.fetchSize(ctx.fetchSize)
implicit val session = NamedAutoSession(ctx.dbName)
val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor)
sql.collection.apply[C]()
}
def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = {
if (ctx.sqlType != SQL_EXEDDL) {
Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
}
else {
NamedDB(ctx.dbName) localTx { implicit session =>
Try {
ctx.statements.foreach { stm =>
val ddl = new SQLExecution(statement = stm, parameters = Nil)(
before = WrappedResultSet => {})(
after = WrappedResultSet => {})
ddl.apply()
}
"SQL_EXEDDL executed succesfully."
}
}
}
}
def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
if (ctx.statements == Nil)
throw new IllegalStateException("JDBCContex setting error: statements empty!")
if (ctx.sqlType != SQL_UPDATE) {
Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
}
else {
if (ctx.batch) {
if (noReturnKey(ctx)) {
val usql = SQL(ctx.statements.head)
.tags(ctx.queryTags: _*)
.batch(ctx.parameters: _*)
Try {
NamedDB(ctx.dbName) localTx { implicit session =>
ctx.queryTimeout.foreach(session.queryTimeout(_))
usql.apply[Seq]()
Seq.empty[Long].to[C]
}
}
} else {
val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)
Try {
NamedDB(ctx.dbName) localTx { implicit session =>
ctx.queryTimeout.foreach(session.queryTimeout(_))
usql.apply[C]()
}
}
}
} else {
Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !"))
}
}
}
private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
val Some(key) :: xs = ctx.returnGeneratedKey
val params: Seq[Any] = ctx.parameters match {
case Nil => Nil
case p@_ => p.head
}
val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)
Try {
NamedDB(ctx.dbName) localTx { implicit session =>
session.fetchSize(ctx.fetchSize)
ctx.queryTimeout.foreach(session.queryTimeout(_))
val result = usql.apply()
Seq(result).to[C]
}
}
}
private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
val params: Seq[Any] = ctx.parameters match {
case Nil => Nil
case p@_ => p.head
}
val before = ctx.preAction match {
case None => pstm: PreparedStatement => {}
case Some(f) => f
}
val after = ctx.postAction match {
case None => pstm: PreparedStatement => {}
case Some(f) => f
}
val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)
Try {
NamedDB(ctx.dbName) localTx {implicit session =>
session.fetchSize(ctx.fetchSize)
ctx.queryTimeout.foreach(session.queryTimeout(_))
val result = usql.apply()
Seq(result.toLong).to[C]
}
}
}
private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
if (noReturnKey(ctx))
singleTxUpdateNoReturnKey(ctx)
else
singleTxUpdateWithReturnKey(ctx)
}
private def noReturnKey(ctx: JDBCContext): Boolean = {
if (ctx.returnGeneratedKey != Nil) {
val k :: xs = ctx.returnGeneratedKey
k match {
case None => true
case Some(k) => false
}
} else true
}
def noActon: PreparedStatement=>Unit = pstm => {}
def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
Try {
NamedDB(ctx.dbName) localTx { implicit session =>
session.fetchSize(ctx.fetchSize)
ctx.queryTimeout.foreach(session.queryTimeout(_))
val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {
case Nil => Seq.fill(ctx.statements.size)(None)
case k@_ => k
}
val sqlcmd = ctx.statements zip ctx.parameters zip keys
val results = sqlcmd.map { case ((stm, param), key) =>
key match {
case None =>
new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong
case Some(k) =>
new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong
}
}
results.to[C]
}
}
}
def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
if (ctx.statements == Nil)
throw new IllegalStateException("JDBCContex setting error: statements empty!")
if (ctx.sqlType != SQL_UPDATE) {
Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
}
else {
if (!ctx.batch) {
if (ctx.statements.size == 1)
singleTxUpdate(ctx)
else
multiTxUpdates(ctx)
} else
Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !"))
}
}
case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,
statement: String, prepareParams: R => Seq[Any]) {jas =>
def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName=db)
def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism=parLevel)
def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)
private def perform(r: R) = {
import scala.concurrent._
val params = prepareParams(r)
NamedDB(dbName) autoCommit { session =>
session.execute(statement,params: _*)
}
Future.successful(r)
}
def performOnRow(implicit session: DBSession): Flow[R, R, NotUsed] =
if (processInOrder)
Flow[R].mapAsync(parallelism)(perform)
else
Flow[R].mapAsyncUnordered(parallelism)(perform)
}
object JDBCActionStream {
def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =
new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)
}
}
HikariConfig.scala
package configdbs
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.language.implicitConversions
import com.typesafe.config._
import java.util.concurrent.TimeUnit
import java.util.Properties
import scalikejdbc.config._
import com.typesafe.config.Config
import com.zaxxer.hikari._
import scalikejdbc.ConnectionPoolFactoryRepository
/** Extension methods to make Typesafe Config easier to use */
class ConfigExtensionMethods(val c: Config) extends AnyVal {
import scala.collection.JavaConverters._
def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default
def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default
def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default
def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default
def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default
def getDurationOr(path: String, default: => Duration = Duration.Zero) =
if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default
def getPropertiesOr(path: String, default: => Properties = null): Properties =
if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default
def toProperties: Properties = {
def toProps(m: mutable.Map[String, ConfigValue]): Properties = {
val props = new Properties(null)
m.foreach { case (k, cv) =>
val v =
if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)
else if(cv.unwrapped eq null) null
else cv.unwrapped.toString
if(v ne null) props.put(k, v)
}
props
}
toProps(c.root.asScala)
}
def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None
def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None
def getStringOpt(path: String) = Option(getStringOr(path))
def getPropertiesOpt(path: String) = Option(getPropertiesOr(path))
}
object ConfigExtensionMethods {
@inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c)
}
trait HikariConfigReader extends TypesafeConfigReader {
self: TypesafeConfig => // with TypesafeConfigReader => //NoEnvPrefix =>
import ConfigExtensionMethods.configExtensionMethods
def getFactoryName(dbName: Symbol): String = {
val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP)
}
def hikariCPConfig(dbName: Symbol): HikariConfig = {
val hconf = new HikariConfig()
val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
// Connection settings
if (c.hasPath("dataSourceClass")) {
hconf.setDataSourceClassName(c.getString("dataSourceClass"))
} else {
Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _)
}
hconf.setJdbcUrl(c.getStringOr("url", null))
c.getStringOpt("user").foreach(hconf.setUsername)
c.getStringOpt("password").foreach(hconf.setPassword)
c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties)
// Pool configuration
hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000))
hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000))
hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000))
hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000))
hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0))
hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false))
c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery)
c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql)
val numThreads = c.getIntOr("numThreads", 20)
hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5))
hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads))
hconf.setPoolName(c.getStringOr("poolName", dbName.name))
hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false))
// Equivalent of ConnectionPreparer
hconf.setReadOnly(c.getBooleanOr("readOnly", false))
c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)
hconf.setCatalog(c.getStringOr("catalog", null))
hconf
}
}
import scalikejdbc._
trait ConfigDBs {
self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader =>
def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
getFactoryName(dbName) match {
case "hikaricp" => {
val hconf = hikariCPConfig(dbName)
val hikariCPSource = new HikariDataSource(hconf)
if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {
Class.forName(hconf.getDriverClassName)
}
ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource))
}
case _ => {
val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName)
val cpSettings = readConnectionPoolSettings(dbName)
if (driver != null && driver.trim.nonEmpty) {
Class.forName(driver)
}
ConnectionPool.add(dbName, url, user, password, cpSettings)
}
}
}
def setupAll(): Unit = {
loadGlobalSettings()
dbNames.foreach { dbName => setup(Symbol(dbName)) }
}
def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
ConnectionPool.close(dbName)
}
def closeAll(): Unit = {
ConnectionPool.closeAll
}
}
object ConfigDBs extends ConfigDBs
with TypesafeConfigReader
with StandardTypesafeConfig
with HikariConfigReader
case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs
with TypesafeConfigReader
with StandardTypesafeConfig
with HikariConfigReader
with EnvPrefix {
override val env = Option(envValue)
}
StreamDemo.scala
import com.datastax.driver.core._
import akka._
import CQLEngine._
import CQLHelpers._
import akka.actor._
import akka.stream.scaladsl._
import akka.stream._
import scalikejdbc._
import configdbs._
import jdbccontext._
import JDBCEngine._
import scala.util._
object cassandraStreamDemo extends App {
//#init-mat
implicit val cqlsys = ActorSystem("cqlSystem")
implicit val mat = ActorMaterializer()
implicit val ec = cqlsys.dispatcher
val cluster = new Cluster
.Builder()
.addContactPoints("localhost")
.withPort(9042)
.build()
useJava8DateTime(cluster)
implicit val session = cluster.connect()
//config jdbc drivers
ConfigDBsWithEnv("dev").setup('h2)
ConfigDBsWithEnv("dev").loadGlobalSettings()
val cqlCreate =
"""
|create table testdb.AQMRPT(
|ROWID BIGINT PRIMARY KEY,
|MEASUREID BIGINT,
|STATENAME TEXT,
|COUNTYNAME TEXT,
|REPORTYEAR INT,
|VALUE INT,
|CREATED TIMESTAMP)
""".stripMargin
val ctxCreate = CQLContext().setCommand(cqlCreate)
cqlExecute(ctxCreate).onComplete{
case Success(s) => println("schema created successfully!")
case Failure(e) => println(e.getMessage)
}
scala.io.StdIn.readLine()
case class DataRow (
rowid: Long,
measureid: Long,
state: String,
county: String,
year: Int,
value: Int
)
val toRow: WrappedResultSet => DataRow = rs => DataRow(
rowid = rs.long("ROWID"),
measureid = rs.long("MEASUREID"),
state = rs.string("STATENAME"),
county = rs.string("COUNTYNAME"),
year = rs.int("REPORTYEAR"),
value = rs.int("VALUE")
)
//construct the context
val ctx = JDBCQueryContext[DataRow](
dbName = 'h2,
statement = "select * from AQMRPT",
extractor = toRow
)
//source from h2 database
val jdbcSource = jdbcAkkaStream(ctx)
//insert into cassandra database
val cqlInsert = "insert into testdb.AQMRPT(ROWID,MEASUREID,STATENAME," +
"COUNTYNAME,REPORTYEAR,VALUE,CREATED) VALUES(?,?,?,?,?,?,?)"
val toPparams: DataRow => Seq[Object] = r => {
Seq[Object](r.rowid.asInstanceOf[Object],
r.measureid.asInstanceOf[Object],
r.state,
r.county,
r.year.asInstanceOf[Object],
r.value.asInstanceOf[Object],
CQLDateTimeNow
)
}
val actionStream = CassandraActionStream(cqlInsert,toPparams).setParallelism(2)
.setProcessOrder(false)
val actionFlow: Flow[DataRow,DataRow,NotUsed] = actionStream.performOnRow
val sink = Sink.foreach[DataRow]{ r =>
println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")
}
val sts = jdbcSource.take(100).via(actionFlow).to(sink).run()
case class Model (
rowid: Long,
measureid: Long,
state: String,
county: String,
year: Int,
value: Int,
createdAt: java.util.Date
)
//data row converter
val toModel = (rs: Row) => Model(
rowid = rs.getLong("ROWID"),
measureid = rs.getLong("MEASUREID"),
state = rs.getString("STATENAME"),
county = rs.getString("COUNTYNAME"),
year = rs.getInt("REPORTYEAR"),
value = rs.getInt("VALUE"),
createdAt = rs.getTimestamp("CREATED")
)
//setup context
val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",toModel)
//construct source
val src = cassandraStream(cqlCtx)
//a display sink
val snk = Sink.foreach[Model]{ r =>
println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")
}
//run on source
src.to(snk).run()
val params: Model => Seq[Any] = row => {
Seq((row.value * 10), row.rowid) }
val jdbcActionStream = JDBCActionStream('h2, "update AQMRPT set total = ? where rowid = ?",params)
.setParallelism(2).setProcessOrder(false)
val jdbcActionFlow = jdbcActionStream.performOnRow
//update rows in h2 database from data in cassandra database
src.via(jdbcActionFlow).to(snk).run()
scala.io.StdIn.readLine()
session.close()
cluster.close()
cqlsys.terminate()
}
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/12810.html