val createCQL ="""
rowid bigint primary key,
measureid bigint,
statename text,
countyname text,
reportyear int,
value int,
created timestamp
val cqlddl = CQLUpdate(statements = Seq(dropCQL,createCQL))
def createTbl: Source[CQLResult,NotUsed] = {
log.info(s"running createTbl ...")
message CQLUpdate {
repeated string statements = 1;
bytes parameters = 2;
google.protobuf.Int32Value consistency = 3;
google.protobuf.BoolValue batch = 4;
service CQLServices {
rpc runDDL(CQLUpdate) returns (CQLResult) {}
override def runDDL: Flow[CQLUpdate, CQLResult, NotUsed] = {
.flatMapConcat { context =>
//unpack CQLUpdate and construct the context
val ctx = CQLContext(context.statements)
log.info(s"**** CQLContext => ${ctx} ***")
.map { r => CQLResult(marshal(r)) }
def cqlExecute(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
var invalidBat = false
if ( ctx.batch ) {
if (ctx.parameters == Nil)
invalidBat = true
else if (ctx.parameters.size < 2)
invalidBat = true;
if (!ctx.batch || invalidBat) {
log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.")
if (ctx.statements.size == 1) {
var param: Seq[Object] = Nil
if (ctx.parameters != Nil) param = ctx.parameters.head
log.info(s"cqlExecute> single-command: statement: ${ctx.statements.head} parameters: ${param}")
cqlSingleUpdate(ctx.consistency, ctx.statements.head, param)
else {
var params: Seq[Seq[Object]] = Nil
if (ctx.parameters == Nil)
params = Seq.fill(ctx.statements.length)(Nil)
else {
if (ctx.statements.size > ctx.parameters.size) {
log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.")
val nils = Seq.fill(ctx.statements.size - ctx.parameters.size)(Nil)
params = ctx.parameters ++ nils
params = ctx.parameters
val commands: Seq[(String,Seq[Object])] = ctx.statements zip params
log.info(s"cqlExecute> multi-commands: ${commands}")
//using sequence to flip List[Future[Boolean]] => Future[List[Boolean]]
//therefore, make sure no command replies on prev command effect
val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) =>
cqlSingleUpdate(ctx.consistency, stmt, param)
val futList = lstCmds.sequence.map(_ => true) //must map to execute
//using traverse to have some degree of parallelism = max(runtimes)
//therefore, make sure no command replies on prev command effect
val futList = Future.traverse(commands) { case (stmt,param) =>
cqlSingleUpdate(ctx.consistency, stmt, param)
}.map(_ => true)
Await.result(futList, 3 seconds)
// run sync directly
Future {
commands.foreach { case (stm, pars) =>
cqlExecuteSync(ctx.consistency, stm, pars)
下一个例子是用流方式把JDBC数据库数据并入cassandra数据库里。.proto DDL内容如下:
message ProtoDate {
int32 yyyy = 1;
int32 mm = 2;
int32 dd = 3;
message ProtoTime {
int32 hh = 1;
int32 mm = 2;
int32 ss = 3;
int32 nnn = 4;
message ProtoDateTime {
ProtoDate date = 1;
ProtoTime time = 2;
message AQMRPTRow {
int64 rowid = 1;
string countyname = 2;
string statename = 3;
int64 measureid = 4;
int32 reportyear = 5;
int32 value = 6;
ProtoDateTime created = 7;
message CQLResult {
bytes result = 1;
message CQLUpdate {
repeated string statements = 1;
bytes parameters = 2;
google.protobuf.Int32Value consistency = 3;
google.protobuf.BoolValue batch = 4;
service CQLServices {
rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
rpc runDDL(CQLUpdate) returns (CQLResult) {}
val toParams: AQMRPTRow => Seq[Object] = row => Seq[Object](
val cqlInsert ="""
|insert into testdb.AQMRPT(
| rowid,
| measureid,
| statename,
| countyname,
| reportyear,
| value,
| created)
| values(?,?,?,?,?,?,?)
val cqlActionStream = CassandraActionStream(cqlInsert,toParams).setParallelism(2)
val cqlActionFlow: Flow[AQMRPTRow,AQMRPTRow,NotUsed] =
val cqlActionFlow: Flow[AQMRPTRow,CQLResult,NotUsed] = {
.mapAsync(cqlActionStream.parallelism){ row =>
if (IfExists(row.rowid))
cqlActionStream.perform(row).map {_ => CQLResult(marshal(1))}
override def transferRows: Flow[AQMRPTRow, CQLResult, NotUsed] = {
private def IfExists(rowid: Long): Boolean = {
val param = Seq(rowid.asInstanceOf[Object])
val toRowId: Row => Long = r => r.getLong("rowid")
val ctx = CQLQueryContext(cql,param)
val src: Source[Long,NotUsed] = cassandraStream(ctx,toRowId)
val fut = src.toMat(Sink.headOption)(Keep.right).run()
val result = Await.result(fut,3 seconds)
log.info(s"checking existence: ${result}")
result match {
case Some(x) => true
case None => false
在上面的代码里我们调用了Cassandra-Engine的CassandraActionStream类型的流处理方法。值得注意的是这里我们尝试在stream Flow里运算另一个Flow,如:IfExists函数里运算一个Source来确定rowid是否存在。不要在意这个函数的实际应用,它只是一个人为的例子。另外,rowid:Long这样的定义是硬性规定的。cassandra对数据类型的匹配要求很弱智,没有提供任何自然转换。所以,Int <> Long被视为类型错误,而且无法catch任何明白的错误信息。
val stub = CqlGrpcAkkaStream.stub(channel)
val jdbcRows2transfer = JDBCQueryContext[AQMRPTRow](
dbName = 'h2,
statement = "select * from AQMRPT where statename='Arkansas'"
def toAQMRPTRow: WrappedResultSet => AQMRPTRow = rs => AQMRPTRow(
rowid = rs.long("ROWID"),
measureid = rs.long("MEASUREID"),
statename = rs.string("STATENAME"),
countyname = rs.string("COUNTYNAME"),
reportyear = rs.int("REPORTYEAR"),
value = rs.int("VALUE"),
created = Some(ProtoDateTime(Some(ProtoDate(1990, 8, 12)), Some(ProtoTime(23, 56, 23, 0))))
import scala.concurrent.duration._
def transferRows: Source[CQLResult, NotUsed] = {
log.info(s"**** calling transferRows ****")
jdbcAkkaStream(jdbcRows2transfer, toAQMRPTRow)
// .throttle(1, 500.millis, 1, ThrottleMode.shaping)
最后我们示范一下cassandra Query。.proto DDL 定义:
message CQLQuery {
string statement = 1;
bytes parameters = 2;
google.protobuf.Int32Value consistency = 3;
google.protobuf.Int32Value fetchSize = 4;
service CQLServices {
rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
rpc runQuery(CQLQuery) returns (stream AQMRPTRow) {}
rpc runDDL(CQLUpdate) returns (CQLResult) {}
def toCQLTimestamp(rs: Row) = {
try {
val tm = rs.getTimestamp("CREATED")
if (tm == null) None
else {
val localdt = cqlGetTimestamp(tm)
Some(ProtoDateTime(Some(ProtoDate(localdt.getYear, localdt.getMonthValue, localdt.getDayOfMonth)),
Some(ProtoTime(localdt.getHour, localdt.getMinute, localdt.getSecond, localdt.getNano))))
catch {
case e: Exception => None
val toAQMRow: Row => AQMRPTRow = rs=> AQMRPTRow(
rowid = rs.getLong("ROWID"),
measureid = rs.getLong("MEASUREID"),
statename = rs.getString("STATENAME"),
countyname = rs.getString("COUNTYNAME"),
reportyear = rs.getInt("REPORTYEAR"),
value = rs.getInt("VALUE"),
created = toCQLTimestamp(rs)
override def runQuery: Flow[CQLQuery, AQMRPTRow, NotUsed] = {
log.info("**** runQuery called on service side ***")
.flatMapConcat { q =>
//unpack JDBCQuery and construct the context
var params: Seq[Object] = Nil
if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
params = unmarshal[Seq[Object]](q.parameters)
log.info(s"**** query parameters: ${params} ****")
val ctx = CQLQueryContext(q.statement,params)
这里值得看看的一是日期转换,二是对于cassandra parameter Seq[Object]的marshal和unmarshal。客户端代码:
val query = CQLQuery(
statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE > ? ALLOW FILTERING;",
parameters = marshal(Seq("Arkansas", 0.toInt))
val query2 = CQLQuery (
statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
parameters = marshal(Seq("Colorado", 3.toInt))
val query3= CQLQuery (
statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
parameters = marshal(Seq("Arkansas", 8.toInt))
def queryRows: Source[AQMRPTRow,NotUsed] = {
log.info(s"running queryRows ...")
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") resolvers += Resolver.bintrayRepo("beyondthelines", "maven") libraryDependencies ++= Seq( "com.thesamet.scalapb" %% "compilerplugin" % "0.7.4", "beyondthelines" %% "grpcakkastreamgenerator" % "0.0.5" )
import scalapb.compiler.Version.scalapbVersion import scalapb.compiler.Version.grpcJavaVersion name := "gRPCCassandra" version := "0.1" scalaVersion := "2.12.6" resolvers += Resolver.bintrayRepo("beyondthelines", "maven") scalacOptions += "-Ypartial-unification" libraryDependencies := Seq( "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf", "io.grpc" % "grpc-netty" % grpcJavaVersion, "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, "io.monix" %% "monix" % "2.3.0", // for GRPC Akkastream "beyondthelines" %% "grpcakkastreamruntime" % "0.0.5", // for scalikejdbc "org.scalikejdbc" %% "scalikejdbc" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-test" % "3.2.1" % "test", "org.scalikejdbc" %% "scalikejdbc-config" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1", "com.h2database" % "h2" % "1.4.196", "mysql" % "mysql-connector-java" % "6.0.6", "org.postgresql" % "postgresql" % "42.2.0", "commons-dbcp" % "commons-dbcp" % "1.4", "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2", "com.zaxxer" % "HikariCP" % "2.7.4", "com.jolbox" % "bonecp" % "0.8.0.RELEASE", "com.typesafe.slick" %% "slick" % "3.2.1", //for cassandra 340 "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0", "com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0", "com.typesafe.akka" %% "akka-stream" % "2.5.13", "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.19", "ch.qos.logback" % "logback-classic" % "1.2.3", "org.typelevel" %% "cats-core" % "1.1.0" ) PB.targets in Compile := Seq( scalapb.gen() -> (sourceManaged in Compile).value, // generate the akka stream files grpc.akkastreams.generators.GrpcAkkaStreamGenerator() -> (sourceManaged in Compile).value )
# JDBC settings test { db { h2 { driver = "org.h2.Driver" url = "jdbc:h2:tcp://localhost/~/slickdemo" user = "" password = "" poolInitialSize = 5 poolMaxSize = 7 poolConnectionTimeoutMillis = 1000 poolValidationQuery = "select 1 as one" poolFactoryName = "commons-dbcp2" } } db.mysql.driver = "com.mysql.cj.jdbc.Driver" db.mysql.url = "jdbc:mysql://localhost:3306/testdb" db.mysql.user = "root" db.mysql.password = "123" db.mysql.poolInitialSize = 5 db.mysql.poolMaxSize = 7 db.mysql.poolConnectionTimeoutMillis = 1000 db.mysql.poolValidationQuery = "select 1 as one" db.mysql.poolFactoryName = "bonecp" # scallikejdbc Global settings scalikejdbc.global.loggingSQLAndTime.enabled = true scalikejdbc.global.loggingSQLAndTime.logLevel = info scalikejdbc.global.loggingSQLAndTime.warningEnabled = true scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn scalikejdbc.global.loggingSQLAndTime.singleLineMode = false scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 } dev { db { h2 { driver = "org.h2.Driver" url = "jdbc:h2:tcp://localhost/~/slickdemo" user = "" password = "" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } mysql { driver = "com.mysql.cj.jdbc.Driver" url = "jdbc:mysql://localhost:3306/testdb" user = "root" password = "123" poolInitialSize = 5 poolMaxSize = 7 poolConnectionTimeoutMillis = 1000 poolValidationQuery = "select 1 as one" poolFactoryName = "bonecp" } postgres { driver = "org.postgresql.Driver" url = "jdbc:postgresql://localhost:5432/testdb" user = "root" password = "123" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } } # scallikejdbc Global settings scalikejdbc.global.loggingSQLAndTime.enabled = true scalikejdbc.global.loggingSQLAndTime.logLevel = info scalikejdbc.global.loggingSQLAndTime.warningEnabled = true scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn scalikejdbc.global.loggingSQLAndTime.singleLineMode = false scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 }
<?xml version="1.0" encoding="UTF-8"?> <configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <layout class="ch.qos.logback.classic.PatternLayout"> <Pattern> %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n </Pattern> </layout> </appender> <logger name="sdp.cql" level="info" additivity="false"> <appender-ref ref="STDOUT" /> </logger> <logger name="demo.sdp.grpc.cql" level="info" additivity="false"> <appender-ref ref="STDOUT" /> </logger> <root level="error"> <appender-ref ref="STDOUT" /> </root> </configuration>
syntax = "proto3"; import "google/protobuf/wrappers.proto"; import "google/protobuf/any.proto"; import "scalapb/scalapb.proto"; option (scalapb.options) = { // use a custom Scala package name // package_name: "io.ontherocks.introgrpc.demo" // don't append file name to package flat_package: true // generate one Scala file for all messages (services still get their own file) single_file: true // add imports to generated file // useful when extending traits or using custom types // import: "io.ontherocks.hellogrpc.RockingMessage" // code to put at the top of generated file // works only with `single_file: true` //preamble: "sealed trait SomeSealedTrait" }; /* * Demoes various customization options provided by ScalaPBs. */ package sdp.grpc.services; message ProtoDate { int32 yyyy = 1; int32 mm = 2; int32 dd = 3; } message ProtoTime { int32 hh = 1; int32 mm = 2; int32 ss = 3; int32 nnn = 4; } message ProtoDateTime { ProtoDate date = 1; ProtoTime time = 2; } message AQMRPTRow { int64 rowid = 1; string countyname = 2; string statename = 3; int64 measureid = 4; int32 reportyear = 5; int32 value = 6; ProtoDateTime created = 7; } message CQLResult { bytes result = 1; } message CQLQuery { string statement = 1; bytes parameters = 2; google.protobuf.Int32Value consistency = 3; google.protobuf.Int32Value fetchSize = 4; } message CQLUpdate { repeated string statements = 1; bytes parameters = 2; google.protobuf.Int32Value consistency = 3; google.protobuf.BoolValue batch = 4; } message HelloMsg { string hello = 1; } service CQLServices { rpc clientStreaming(stream HelloMsg) returns (stream HelloMsg) {} rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {} rpc runQuery(CQLQuery) returns (stream AQMRPTRow) {} rpc runDDL(CQLUpdate) returns (CQLResult) {} }
package sdp.logging import org.slf4j.Logger /** * Logger which just wraps org.slf4j.Logger internally. * * @param logger logger */ class Log(logger: Logger) { // use var consciously to enable squeezing later var isDebugEnabled: Boolean = logger.isDebugEnabled var isInfoEnabled: Boolean = logger.isInfoEnabled var isWarnEnabled: Boolean = logger.isWarnEnabled var isErrorEnabled: Boolean = logger.isErrorEnabled def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = { level match { case 'debug | 'DEBUG => debug(msg) case 'info | 'INFO => info(msg) case 'warn | 'WARN => warn(msg) case 'error | 'ERROR => error(msg) case _ => // nothing to do } } def debug(msg: => String): Unit = { if (isDebugEnabled && logger.isDebugEnabled) { logger.debug(msg) } } def debug(msg: => String, e: Throwable): Unit = { if (isDebugEnabled && logger.isDebugEnabled) { logger.debug(msg, e) } } def info(msg: => String): Unit = { if (isInfoEnabled && logger.isInfoEnabled) { logger.info(msg) } } def info(msg: => String, e: Throwable): Unit = { if (isInfoEnabled && logger.isInfoEnabled) { logger.info(msg, e) } } def warn(msg: => String): Unit = { if (isWarnEnabled && logger.isWarnEnabled) { logger.warn(msg) } } def warn(msg: => String, e: Throwable): Unit = { if (isWarnEnabled && logger.isWarnEnabled) { logger.warn(msg, e) } } def error(msg: => String): Unit = { if (isErrorEnabled && logger.isErrorEnabled) { logger.error(msg) } } def error(msg: => String, e: Throwable): Unit = { if (isErrorEnabled && logger.isErrorEnabled) { logger.error(msg, e) } } }
package sdp.logging import org.slf4j.LoggerFactory trait LogSupport { /** * Logger */ protected val log = new Log(LoggerFactory.getLogger(this.getClass)) }
package sdp.file import java.io.{ByteArrayInputStream, InputStream} import java.nio.ByteBuffer import java.nio.file.Paths import akka.stream.Materializer import akka.stream.scaladsl.{FileIO, StreamConverters} import akka.util._ import scala.concurrent.Await import scala.concurrent.duration._ object Streaming { def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer):ByteBuffer = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } (Await.result(fut, timeOut)).toByteBuffer } def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer): Array[Byte] = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } (Await.result(fut, timeOut)).toArray } def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer): InputStream = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } val buf = (Await.result(fut, timeOut)).toArray new ByteArrayInputStream(buf) } def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)( implicit mat: Materializer) = { val ba = new Array[Byte](byteBuf.remaining()) byteBuf.get(ba,0,ba.length) val baInput = new ByteArrayInputStream(ba) val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) source.runWith(FileIO.toPath(Paths.get(fileName))) } def ByteArrayToFile(bytes: Array[Byte], fileName: String)( implicit mat: Materializer) = { val bb = ByteBuffer.wrap(bytes) val baInput = new ByteArrayInputStream(bytes) val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) source.runWith(FileIO.toPath(Paths.get(fileName))) } def InputStreamToFile(is: InputStream, fileName: String)( implicit mat: Materializer) = { val source = StreamConverters.fromInputStream(() => is) source.runWith(FileIO.toPath(Paths.get(fileName))) } }
package sdp.jdbc.config import scala.collection.mutable import scala.concurrent.duration.Duration import scala.language.implicitConversions import com.typesafe.config._ import java.util.concurrent.TimeUnit import java.util.Properties import scalikejdbc.config._ import com.typesafe.config.Config import com.zaxxer.hikari._ import scalikejdbc.ConnectionPoolFactoryRepository /** Extension methods to make Typesafe Config easier to use */ class ConfigExtensionMethods(val c: Config) extends AnyVal { import scala.collection.JavaConverters._ def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default def getDurationOr(path: String, default: => Duration = Duration.Zero) = if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default def getPropertiesOr(path: String, default: => Properties = null): Properties = if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default def toProperties: Properties = { def toProps(m: mutable.Map[String, ConfigValue]): Properties = { val props = new Properties(null) m.foreach { case (k, cv) => val v = if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala) else if(cv.unwrapped eq null) null else cv.unwrapped.toString if(v ne null) props.put(k, v) } props } toProps(c.root.asScala) } def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None def getStringOpt(path: String) = Option(getStringOr(path)) def getPropertiesOpt(path: String) = Option(getPropertiesOr(path)) } object ConfigExtensionMethods { @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c) } trait HikariConfigReader extends TypesafeConfigReader { self: TypesafeConfig => // with TypesafeConfigReader => //NoEnvPrefix => import ConfigExtensionMethods.configExtensionMethods def getFactoryName(dbName: Symbol): String = { val c: Config = config.getConfig(envPrefix + "db." + dbName.name) c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP) } def hikariCPConfig(dbName: Symbol): HikariConfig = { val hconf = new HikariConfig() val c: Config = config.getConfig(envPrefix + "db." + dbName.name) // Connection settings if (c.hasPath("dataSourceClass")) { hconf.setDataSourceClassName(c.getString("dataSourceClass")) } else { Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _) } hconf.setJdbcUrl(c.getStringOr("url", null)) c.getStringOpt("user").foreach(hconf.setUsername) c.getStringOpt("password").foreach(hconf.setPassword) c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties) // Pool configuration hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000)) hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000)) hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000)) hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000)) hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0)) hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false)) c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery) c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql) val numThreads = c.getIntOr("numThreads", 20) hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5)) hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads)) hconf.setPoolName(c.getStringOr("poolName", dbName.name)) hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false)) // Equivalent of ConnectionPreparer hconf.setReadOnly(c.getBooleanOr("readOnly", false)) c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation) hconf.setCatalog(c.getStringOr("catalog", null)) hconf } } import scalikejdbc._ trait ConfigDBs { self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader => def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { getFactoryName(dbName) match { case "hikaricp" => { val hconf = hikariCPConfig(dbName) val hikariCPSource = new HikariDataSource(hconf) case class HikariDataSourceCloser(src: HikariDataSource) extends DataSourceCloser { var closed = false override def close(): Unit = src.close() } if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) { Class.forName(hconf.getDriverClassName) } ConnectionPool.add(dbName, new DataSourceConnectionPool(dataSource = hikariCPSource,settings = DataSourceConnectionPoolSettings(), closer = HikariDataSourceCloser(hikariCPSource))) } case _ => { val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName) val cpSettings = readConnectionPoolSettings(dbName) if (driver != null && driver.trim.nonEmpty) { Class.forName(driver) } ConnectionPool.add(dbName, url, user, password, cpSettings) } } } def setupAll(): Unit = { loadGlobalSettings() dbNames.foreach { dbName => setup(Symbol(dbName)) } } def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { ConnectionPool.close(dbName) } def closeAll(): Unit = { ConnectionPool.closeAll } } object ConfigDBs extends ConfigDBs with TypesafeConfigReader with StandardTypesafeConfig with HikariConfigReader case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs with TypesafeConfigReader with StandardTypesafeConfig with HikariConfigReader with EnvPrefix { override val env = Option(envValue) }
package sdp.jdbc.engine import java.sql.PreparedStatement import scala.collection.generic.CanBuildFrom import akka.stream.scaladsl._ import scalikejdbc._ import scalikejdbc.streams._ import akka.NotUsed import akka.stream._ import java.time._ import scala.concurrent.duration._ import scala.concurrent._ import sdp.file.Streaming._ import scalikejdbc.TxBoundary.Try._ import scala.concurrent.ExecutionContextExecutor import java.io.InputStream import sdp.logging.LogSupport object JDBCContext { type SQLTYPE = Int val SQL_EXEDDL= 1 val SQL_UPDATE = 2 val RETURN_GENERATED_KEYVALUE = true val RETURN_UPDATED_COUNT = false } case class JDBCQueryContext[M]( dbName: Symbol, statement: String, parameters: Seq[Any] = Nil, fetchSize: Int = 100, autoCommit: Boolean = false, queryTimeout: Option[Int] = None) case class JDBCContext ( dbName: Symbol, statements: Seq[String] = Nil, parameters: Seq[Seq[Any]] = Nil, fetchSize: Int = 100, queryTimeout: Option[Int] = None, queryTags: Seq[String] = Nil, sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE, batch: Boolean = false, returnGeneratedKey: Seq[Option[Any]] = Nil, // no return: None, return by index: Some(1), by name: Some("id") preAction: Option[PreparedStatement => Unit] = None, postAction: Option[PreparedStatement => Unit] = None) extends LogSupport { ctx => //helper functions def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag) def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags) def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size) def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time) def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch && ctx.statements.size == 1) { val nc = ctx.copy(preAction = action) log.info("setPreAction> set") nc } else { log.info("setPreAction> JDBCContex setting error: preAction not supported!") throw new IllegalStateException("JDBCContex setting error: preAction not supported!") } } def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch && ctx.statements.size == 1) { val nc = ctx.copy(postAction = action) log.info("setPostAction> set") nc } else { log.info("setPreAction> JDBCContex setting error: postAction not supported!") throw new IllegalStateException("JDBCContex setting error: postAction not supported!") } } def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_EXEDDL) { log.info(s"appendDDLCommand> appending: statement: ${_statement}, parameters: ${_parameters}") val nc = ctx.copy( statements = ctx.statements ++ Seq(_statement), parameters = ctx.parameters ++ Seq(Seq(_parameters)) ) log.info(s"appendDDLCommand> appended: statement: ${nc.statements}, parameters: ${nc.parameters}") nc } else { log.info(s"appendDDLCommand> JDBCContex setting error: option not supported!") throw new IllegalStateException("JDBCContex setting error: option not supported!") } } def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) { log.info(s"appendUpdateCommand> appending: returnGeneratedKey: ${_returnGeneratedKey}, statement: ${_statement}, parameters: ${_parameters}") val nc = ctx.copy( statements = ctx.statements ++ Seq(_statement), parameters = ctx.parameters ++ Seq(_parameters), returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None)) ) log.info(s"appendUpdateCommand> appended: statement: ${nc.statements}, parameters: ${nc.parameters}") nc } else { log.info(s"appendUpdateCommand> JDBCContex setting error: option not supported!") throw new IllegalStateException("JDBCContex setting error: option not supported!") } } def appendBatchParameters(_parameters: Any*): JDBCContext = { log.info(s"appendBatchParameters> appending: parameters: ${_parameters}") if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) { log.info(s"appendBatchParameters> JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") } var matchParams = true if (ctx.parameters != Nil) if (ctx.parameters.head.size != _parameters.size) matchParams = false if (matchParams) { val nc = ctx.copy( parameters = ctx.parameters ++ Seq(_parameters) ) log.info(s"appendBatchParameters> appended: statement: ${nc.statements}, parameters: ${nc.parameters}") nc } else { log.info(s"appendBatchParameters> JDBCContex setting error: batch command parameters not match!") throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!") } } def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = { if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!") ctx.copy( returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil ) } def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { log.info(s"setDDLCommand> setting: statement: ${_statement}, parameters: ${_parameters}") val nc = ctx.copy( statements = Seq(_statement), parameters = Seq(_parameters), sqlType = JDBCContext.SQL_EXEDDL, batch = false ) log.info(s"setDDLCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}") nc } def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = { log.info(s"setUpdateCommand> setting: returnGeneratedKey: ${_returnGeneratedKey}, statement: ${_statement}, parameters: ${_parameters}") val nc = ctx.copy( statements = Seq(_statement), parameters = Seq(_parameters), returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None), sqlType = JDBCContext.SQL_UPDATE, batch = false ) log.info(s"setUpdateCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}") nc } def setBatchCommand(_statement: String): JDBCContext = { log.info(s"setBatchCommand> appending: statement: ${_statement}") val nc = ctx.copy ( statements = Seq(_statement), sqlType = JDBCContext.SQL_UPDATE, batch = true ) log.info(s"setBatchCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}") nc } } object JDBCEngine extends LogSupport { import JDBCContext._ type JDBCDate = LocalDate type JDBCDateTime = LocalDateTime type JDBCTime = LocalTime def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd) def jdbcSetTime(hh: Int, mm: Int, ss: Int, nn: Int) = LocalTime.of(hh,mm,ss,nn) def jdbcSetDateTime(date: JDBCDate, time: JDBCTime) = LocalDateTime.of(date,time) def jdbcSetNow = LocalDateTime.now() def jdbcGetDate(sqlDate: java.sql.Date): java.time.LocalDate = sqlDate.toLocalDate def jdbcGetTime(sqlTime: java.sql.Time): java.time.LocalTime = sqlTime.toLocalTime def jdbcGetTimestamp(sqlTimestamp: java.sql.Timestamp): java.time.LocalDateTime = sqlTimestamp.toLocalDateTime type JDBCBlob = InputStream def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer) = FileToInputStream(fileName,timeOut) def jdbcBlobToFile(blob: JDBCBlob, fileName: String)( implicit mat: Materializer) = InputStreamToFile(blob,fileName) private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) => throw new IllegalStateException(message) } def jdbcAkkaStream[A](ctx: JDBCQueryContext[A],extractor: WrappedResultSet => A) (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = { val publisher: DatabasePublisher[A] = NamedDB(ctx.dbName) readOnlyStream { val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) val sql: SQL[A, HasExtractor] = rawSql.map(extractor) sql.iterator .withDBSessionForceAdjuster(session => { session.connection.setAutoCommit(ctx.autoCommit) session.fetchSize(ctx.fetchSize) }) } log.info(s"jdbcAkkaStream> Source: db: ${ctx.dbName}, statement: ${ctx.statement}, parameters: ${ctx.parameters}") Source.fromPublisher[A](publisher) } def jdbcQueryResult[C[_] <: TraversableOnce[_], A](ctx: JDBCQueryContext[A], extractor: WrappedResultSet => A)( implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) rawSql.fetchSize(ctx.fetchSize) try { implicit val session = NamedAutoSession(ctx.dbName) log.info(s"jdbcQueryResult> Source: db: ${ctx.dbName}, statement: ${ctx.statement}, parameters: ${ctx.parameters}") val sql: SQL[A, HasExtractor] = rawSql.map(extractor) sql.collection.apply[C]() } catch { case e: Exception => log.error(s"jdbcQueryResult> runtime error: ${e.getMessage}") throw new RuntimeException(s"jdbcQueryResult> Error: ${e.getMessage}") } } def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = { if (ctx.sqlType != SQL_EXEDDL) { log.info(s"jdbcExecuteDDL> JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!") Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")) } else { log.info(s"jdbcExecuteDDL> Source: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}") Future { NamedDB(ctx.dbName) localTx { implicit session => ctx.statements.foreach { stm => val ddl = new SQLExecution(statement = stm, parameters = Nil)( before = WrappedResultSet => {})( after = WrappedResultSet => {}) ddl.apply() } "SQL_EXEDDL executed succesfully." } } } } def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit ec: ExecutionContextExecutor, cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { if (ctx.statements == Nil) { log.info(s"jdbcBatchUpdate> JDBCContex setting error: statements empty!") Future.failed(new IllegalStateException("JDBCContex setting error: statements empty!")) } if (ctx.sqlType != SQL_UPDATE) { log.info(s"jdbcBatchUpdate> JDBCContex setting error: sqlType must be 'SQL_UPDATE'!") Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) } else { if (ctx.batch) { if (noReturnKey(ctx)) { log.info(s"jdbcBatchUpdate> batch updating no return: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}") val usql = SQL(ctx.statements.head) .tags(ctx.queryTags: _*) .batch(ctx.parameters: _*) Future { NamedDB(ctx.dbName) localTx { implicit session => ctx.queryTimeout.foreach(session.queryTimeout(_)) usql.apply[Seq]() Seq.empty[Long].to[C] } } } else { log.info(s"jdbcBatchUpdate> batch updating return genkey: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}") val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None) Future { NamedDB(ctx.dbName) localTx { implicit session => ctx.queryTimeout.foreach(session.queryTimeout(_)) usql.apply[C]() } } } } else { log.info(s"jdbcBatchUpdate> JDBCContex setting error: must set batch = true !") Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = true !")) } } } private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit ec: ExecutionContextExecutor, cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { val Some(key) :: xs = ctx.returnGeneratedKey val params: Seq[Any] = ctx.parameters match { case Nil => Nil case p@_ => p.head } log.info(s"singleTxUpdateWithReturnKey> updating: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}") val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key) Future { NamedDB(ctx.dbName) localTx { implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val result = usql.apply() Seq(result).to[C] } } } private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit ec: ExecutionContextExecutor, cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { val params: Seq[Any] = ctx.parameters match { case Nil => Nil case p@_ => p.head } val before = ctx.preAction match { case None => pstm: PreparedStatement => {} case Some(f) => f } val after = ctx.postAction match { case None => pstm: PreparedStatement => {} case Some(f) => f } log.info(s"singleTxUpdateNoReturnKey> updating: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}") val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after) Future { NamedDB(ctx.dbName) localTx {implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val result = usql.apply() Seq(result.toLong).to[C] } } } private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit ec: ExecutionContextExecutor, cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { if (noReturnKey(ctx)) singleTxUpdateNoReturnKey(ctx) else singleTxUpdateWithReturnKey(ctx) } private def noReturnKey(ctx: JDBCContext): Boolean = { if (ctx.returnGeneratedKey != Nil) { val k :: xs = ctx.returnGeneratedKey k match { case None => true case Some(k) => false } } else true } def noActon: PreparedStatement=>Unit = pstm => {} def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit ec: ExecutionContextExecutor, cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match { case Nil => Seq.fill(ctx.statements.size)(None) case k@_ => k } val sqlcmd = ctx.statements zip ctx.parameters zip keys log.info(s"multiTxUpdates> updating: db: ${ctx.dbName}, SQL Commands: ${sqlcmd}") Future { NamedDB(ctx.dbName) localTx { implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val results = sqlcmd.map { case ((stm, param), key) => key match { case None => new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong case Some(k) => new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong } } results.to[C] } } } def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit ec: ExecutionContextExecutor, cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { if (ctx.statements == Nil) { log.info(s"jdbcTxUpdates> JDBCContex setting error: statements empty!") Future.failed(new IllegalStateException("JDBCContex setting error: statements empty!")) } if (ctx.sqlType != SQL_UPDATE) { log.info(s"jdbcTxUpdates> JDBCContex setting error: sqlType must be 'SQL_UPDATE'!") Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) } else { if (!ctx.batch) { if (ctx.statements.size == 1) singleTxUpdate(ctx) else multiTxUpdates(ctx) } else { log.info(s"jdbcTxUpdates> JDBCContex setting error: must set batch = false !") Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = false !")) } } } case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true, statement: String, prepareParams: R => Seq[Any]) extends LogSupport { jas => def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db) def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel) def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered) private def perform(r: R)(implicit ec: ExecutionContextExecutor) = { import scala.concurrent._ val params = prepareParams(r) log.info(s"JDBCActionStream.perform> db: ${dbName}, statement: ${statement}, parameters: ${params}") Future { NamedDB(dbName) autoCommit { session => session.execute(statement, params: _*) } r } } def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] = if (processInOrder) Flow[R].mapAsync(parallelism)(perform) else Flow[R].mapAsyncUnordered(parallelism)(perform) } object JDBCActionStream { def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] = new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params) } }
package sdp.cql.engine import akka.NotUsed import akka.stream.alpakka.cassandra.scaladsl._ import akka.stream.scaladsl._ import com.datastax.driver.core._ import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture} import scala.collection.JavaConverters._ import scala.collection.generic.CanBuildFrom import scala.concurrent._ import scala.concurrent.duration.Duration import sdp.logging.LogSupport object CQLContext { // Consistency Levels type CONSISTENCY_LEVEL = Int val ANY: CONSISTENCY_LEVEL = 0x0000 val ONE: CONSISTENCY_LEVEL = 0x0001 val TWO: CONSISTENCY_LEVEL = 0x0002 val THREE: CONSISTENCY_LEVEL = 0x0003 val QUORUM : CONSISTENCY_LEVEL = 0x0004 val ALL: CONSISTENCY_LEVEL = 0x0005 val LOCAL_QUORUM: CONSISTENCY_LEVEL = 0x0006 val EACH_QUORUM: CONSISTENCY_LEVEL = 0x0007 val LOCAL_ONE: CONSISTENCY_LEVEL = 0x000A val LOCAL_SERIAL: CONSISTENCY_LEVEL = 0x000B val SERIAL: CONSISTENCY_LEVEL = 0x000C def apply(): CQLContext = CQLContext(statements = Nil) def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => { consistency match { case ALL => ConsistencyLevel.ALL case ONE => ConsistencyLevel.ONE case TWO => ConsistencyLevel.TWO case THREE => ConsistencyLevel.THREE case ANY => ConsistencyLevel.ANY case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE case QUORUM => ConsistencyLevel.QUORUM case SERIAL => ConsistencyLevel.SERIAL case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL } } } case class CQLQueryContext( statement: String, parameter: Seq[Object] = Nil, consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None, fetchSize: Int = 100 ) { ctx => def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext = ctx.copy(consistency = Some(_consistency)) def setFetchSize(pageSize: Int): CQLQueryContext = ctx.copy(fetchSize = pageSize) def setParameters(param: Seq[Object]): CQLQueryContext = ctx.copy(parameter = param) } object CQLQueryContext { def apply[M](stmt: String, param: Seq[Object]): CQLQueryContext = new CQLQueryContext(statement = stmt, parameter = param) } case class CQLContext( statements: Seq[String], parameters: Seq[Seq[Object]] = Nil, consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None, batch: Boolean = false ) extends LogSupport { ctx => def setBatch(bat: Boolean) = ctx.copy(batch = bat) def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext = ctx.copy(consistency = Some(_consistency)) def setCommand(_statement: String, _parameters: Object*): CQLContext = { log.info(s"setCommand> setting: statement: ${_statement}, parameters: ${_parameters}") val nc = ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters)) log.info(s"setCommand> set: statements: ${nc.statements}, parameters: ${nc.parameters}") nc } def appendCommand(_statement: String, _parameters: Object*): CQLContext = { log.info(s"appendCommand> appending: statement: ${_statement}, parameters: ${_parameters}") val nc = ctx.copy(statements = ctx.statements :+ _statement, parameters = ctx.parameters ++ Seq(_parameters)) log.info(s"appendCommand> appended: statements: ${nc.statements}, parameters: ${nc.parameters}") nc } } object CQLEngine extends LogSupport { import CQLContext._ import CQLHelpers._ import cats._, cats.data._, cats.implicits._ import scala.concurrent.{Await, Future} import scala.concurrent.duration._ def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext, pageSize: Int = 100 ,extractor: Row => A)( implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind() var params: Seq[Object] = Nil if (ctx.parameter != Nil) { params = processParameters(ctx.parameter) boundStmt = prepStmt.bind(params:_*) } log.info(s"fetchResultPage> statement: ${prepStmt.getQueryString}, parameters: ${params}") ctx.consistency.foreach {consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency))} val resultSet = session.execute(boundStmt.setFetchSize(pageSize)) (resultSet,(resultSet.asScala.view.map(extractor)).to[C]) } def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)( extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) = if (resultSet.isFullyFetched) { (resultSet, None) } else { try { val result = Await.result(resultSet.fetchMoreResults(), timeOut) (result, Some((result.asScala.view.map(extractor)).to[C])) } catch { case e: Throwable => (resultSet, None) } } def cqlExecute(ctx: CQLContext)( implicit session: Session, ec: ExecutionContext): Future[Boolean] = { var invalidBat = false if ( ctx.batch ) { if (ctx.parameters == Nil) invalidBat = true else if (ctx.parameters.size < 2) invalidBat = true; } if (!ctx.batch || invalidBat) { if(invalidBat) log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.") if (ctx.statements.size == 1) { var param: Seq[Object] = Nil if (ctx.parameters != Nil) param = ctx.parameters.head log.info(s"cqlExecute> single-command: statement: ${ctx.statements.head} parameters: ${param}") cqlSingleUpdate(ctx.consistency, ctx.statements.head, param) } else { var params: Seq[Seq[Object]] = Nil if (ctx.parameters == Nil) params = Seq.fill(ctx.statements.length)(Nil) else { if (ctx.statements.size > ctx.parameters.size) { log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.") val nils = Seq.fill(ctx.statements.size - ctx.parameters.size)(Nil) params = ctx.parameters ++ nils } else params = ctx.parameters } val commands: Seq[(String,Seq[Object])] = ctx.statements zip params log.info(s"cqlExecute> multi-commands: ${commands}") /* //using sequence to flip List[Future[Boolean]] => Future[List[Boolean]] //therefore, make sure no command replies on prev command effect val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) => cqlSingleUpdate(ctx.consistency, stmt, param) }.toList val futList = lstCmds.sequence.map(_ => true) //must map to execute */ /* //using traverse to have some degree of parallelism = max(runtimes) //therefore, make sure no command replies on prev command effect val futList = Future.traverse(commands) { case (stmt,param) => cqlSingleUpdate(ctx.consistency, stmt, param) }.map(_ => true) Await.result(futList, 3 seconds) Future.successful(true) */ // run sync directly Future { commands.foreach { case (stm, pars) => cqlExecuteSync(ctx.consistency, stm, pars) } true } } } else cqlBatchUpdate(ctx) } def cqlSingleUpdate(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])( implicit session: Session, ec: ExecutionContext): Future[Boolean] = { val prepStmt = session.prepare(stmt) var boundStmt = prepStmt.bind() var pars: Seq[Object] = Nil if (params != Nil) { pars = processParameters(params) boundStmt = prepStmt.bind(pars: _*) } log.info(s"cqlSingleUpdate> statement: ${prepStmt.getQueryString}, parameters: ${pars}") cons.foreach { consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency)) } session.executeAsync(boundStmt).map(_.wasApplied()) } def cqlExecuteSync(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])( implicit session: Session, ec: ExecutionContext): Boolean = { val prepStmt = session.prepare(stmt) var boundStmt = prepStmt.bind() var pars: Seq[Object] = Nil if (params != Nil) { pars = processParameters(params) boundStmt = prepStmt.bind(pars: _*) } log.info(s"cqlExecuteSync> statement: ${prepStmt.getQueryString}, parameters: ${pars}") cons.foreach { consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency)) } session.execute(boundStmt).wasApplied() } def cqlBatchUpdate(ctx: CQLContext)( implicit session: Session, ec: ExecutionContext): Future[Boolean] = { var params: Seq[Seq[Object]] = Nil if (ctx.parameters == Nil) params = Seq.fill(ctx.statements.length)(Nil) else params = ctx.parameters log.info(s"cqlBatchUpdate> statement: ${ctx.statements.head}, parameters: ${params}") val prepStmt = session.prepare(ctx.statements.head) var batch = new BatchStatement() params.foreach { p => log.info(s"cqlBatchUpdate> batch with raw parameter: ${p}") val pars = processParameters(p) log.info(s"cqlMultiUpdate> batch with cooked parameters: ${pars}") batch.add(prepStmt.bind(pars: _*)) } ctx.consistency.foreach { consistency => batch.setConsistencyLevel(consistencyLevel(consistency)) } session.executeAsync(batch).map(_.wasApplied()) } def cassandraStream[A](ctx: CQLQueryContext,extractor: Row => A) (implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind() val params = processParameters(ctx.parameter) boundStmt = prepStmt.bind(params:_*) ctx.consistency.foreach {consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency))} log.info(s"cassandraStream> statement: ${prepStmt.getQueryString}, parameters: ${params}") CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(extractor) } case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true, statement: String, prepareParams: R => Seq[Object], consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas => def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel) def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered) def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] = cas.copy(consistency = Some(_consistency)) def perform(r: R)(implicit session: Session, ec: ExecutionContext) = { var prepStmt = session.prepare(statement) var boundStmt = prepStmt.bind() val params = processParameters(prepareParams(r)) boundStmt = prepStmt.bind(params: _*) consistency.foreach { cons => boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons)) } log.info(s"CassandraActionStream.perform> statement: ${prepStmt.getQueryString}, parameters: ${params}") session.executeAsync(boundStmt).map(_ => r) } def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = if (processInOrder) Flow[R].mapAsync(parallelism)(perform) else Flow[R].mapAsyncUnordered(parallelism)(perform) def unloggedBatch[K](statementBinder: ( R, PreparedStatement) => BoundStatement,partitionKey: R => K)( implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = { val preparedStatement = session.prepare(statement) log.info(s"CassandraActionStream.unloggedBatch> statement: ${preparedStatement.getQueryString}") CassandraFlow.createUnloggedBatchWithPassThrough[R, K]( parallelism, preparedStatement, statementBinder, partitionKey) } } object CassandraActionStream { def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] = new CassandraActionStream[R]( statement=_statement, prepareParams = params) } } object CQLHelpers extends LogSupport { import java.nio.ByteBuffer import java.io._ import java.nio.file._ import com.datastax.driver.core.LocalDate import com.datastax.driver.extras.codecs.jdk8.InstantCodec import java.time.Instant import akka.stream.scaladsl._ import akka.stream._ implicit def listenableFutureToFuture[T]( listenableFuture: ListenableFuture[T]): Future[T] = { val promise = Promise[T]() Futures.addCallback(listenableFuture, new FutureCallback[T] { def onFailure(error: Throwable): Unit = { promise.failure(error) () } def onSuccess(result: T): Unit = { promise.success(result) () } }) promise.future } case class CQLDate(year: Int, month: Int, day: Int) case object CQLTodayDate case class CQLDateTime(year: Int, Month: Int, day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0) case object CQLDateTimeNow def cqlGetDate(dateToConvert: java.util.Date): java.time.LocalDate = dateToConvert.toInstant() .atZone(java.time.ZoneId.systemDefault()) .toLocalDate() def cqlGetTime(dateToConvert: java.util.Date): java.time.LocalTime = dateToConvert.toInstant() .atZone(java.time.ZoneId.systemDefault()) .toLocalTime() def cqlGetTimestamp(dateToConvert: java.util.Date): java.time.LocalDateTime= new java.sql.Timestamp( dateToConvert.getTime() ).toLocalDateTime() def processParameters(params: Seq[Object]): Seq[Object] = { import java.time.{Clock,ZoneId} log.info(s"[processParameters] input: ${params}") val outParams = params.map { obj => obj match { case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd) case CQLTodayDate => val today = java.time.LocalDate.now() LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth) case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS))) case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) => Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d") case p@_ => p } } log.info(s"[processParameters] output: ${params}") outParams } class ByteBufferInputStream(buf: ByteBuffer) extends InputStream { override def read: Int = { if (!buf.hasRemaining) return -1 buf.get } override def read(bytes: Array[Byte], off: Int, len: Int): Int = { val length: Int = Math.min(len, buf.remaining) buf.get(bytes, off, length) length } } object ByteBufferInputStream { def apply(buf: ByteBuffer): ByteBufferInputStream = { new ByteBufferInputStream(buf) } } class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream { override def write(b: Int): Unit = { buf.put(b.toByte) } override def write(bytes: Array[Byte], off: Int, len: Int): Unit = { buf.put(bytes, off, len) } } object FixsizedByteBufferOutputStream { def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf) } class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream { private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR override def write(b: Array[Byte], off: Int, len: Int): Unit = { val position = buf.position val limit = buf.limit val newTotal: Long = position + len if(newTotal > limit){ var capacity = (buf.capacity * increasing) while(capacity <= newTotal){ capacity = (capacity*increasing) } increase(capacity.toInt) } buf.put(b, 0, len) } override def write(b: Int): Unit= { if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt) buf.put(b.toByte) } protected def increase(newCapacity: Int): Unit = { buf.limit(buf.position) buf.rewind val newBuffer = if (onHeap) ByteBuffer.allocate(newCapacity) else ByteBuffer.allocateDirect(newCapacity) newBuffer.put(buf) buf.clear buf = newBuffer } def size: Long = buf.position def capacity: Long = buf.capacity def byteBuffer: ByteBuffer = buf } object ExpandingByteBufferOutputStream { val DEFAULT_INCREASING_FACTOR = 1.5f def apply(size: Int, increasingBy: Float, onHeap: Boolean) = { if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0") val buffer: ByteBuffer = if (onHeap) ByteBuffer.allocate(size) else ByteBuffer.allocateDirect(size) new ExpandingByteBufferOutputStream(buffer,onHeap) } def apply(size: Int): ExpandingByteBufferOutputStream = { apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false) } def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = { apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap) } def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = { apply(size, increasingBy, false) } } def cqlFileToBytes(fileName: String): ByteBuffer = { val fis = new FileInputStream(fileName) val b = new Array[Byte](fis.available + 1) val length = b.length fis.read(b) ByteBuffer.wrap(b) } def cqlBytesToFile(bytes: ByteBuffer, fileName: String)( implicit mat: Materializer): Future[IOResult] = { val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes)) source.runWith(FileIO.toPath(Paths.get(fileName))) } def cqlDateTimeString(date: java.util.Date, fmt: String): String = { val outputFormat = new java.text.SimpleDateFormat(fmt) outputFormat.format(date) } def useJava8DateTime(cluster: Cluster) = { //for jdk8 datetime format cluster.getConfiguration().getCodecRegistry() .register(InstantCodec.instance) } }
package protobuf.bytes import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream} import com.google.protobuf.ByteString object Converter { def marshal(value: Any): ByteString = { val stream: ByteArrayOutputStream = new ByteArrayOutputStream() val oos = new ObjectOutputStream(stream) oos.writeObject(value) oos.close() ByteString.copyFrom(stream.toByteArray()) } def unmarshal[A](bytes: ByteString): A = { val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray)) val value = ois.readObject() ois.close() value.asInstanceOf[A] } }
package demo.sdp.grpc.cql.server import akka.NotUsed import akka.stream.scaladsl._ import protobuf.bytes.Converter._ import com.datastax.driver.core._ import scala.concurrent.ExecutionContextExecutor import sdp.grpc.services._ import sdp.cql.engine._ import CQLEngine._ import CQLHelpers._ import sdp.logging.LogSupport import scala.concurrent._ import scala.concurrent.duration._ import akka.stream.ActorMaterializer class CQLStreamingServices(implicit ec: ExecutionContextExecutor, mat: ActorMaterializer, session: Session) extends CqlGrpcAkkaStream.CQLServices with LogSupport{ val toParams: AQMRPTRow => Seq[Object] = row => Seq[Object]( row.rowid.asInstanceOf[Object], row.measureid.asInstanceOf[Object], row.statename, row.countyname, row.reportyear.asInstanceOf[Object], row.value.asInstanceOf[Object], CQLDateTimeNow ) val cqlInsert =""" |insert into testdb.AQMRPT( | rowid, | measureid, | statename, | countyname, | reportyear, | value, | created) | values(?,?,?,?,?,?,?) """.stripMargin val cqlActionStream = CassandraActionStream(cqlInsert,toParams).setParallelism(2) .setProcessOrder(false) /* val cqlActionFlow: Flow[AQMRPTRow,AQMRPTRow,NotUsed] = Flow[AQMRPTRow] .via(cqlActionStream.performOnRow) */ val cqlActionFlow: Flow[AQMRPTRow,CQLResult,NotUsed] = { Flow[AQMRPTRow] .mapAsync(cqlActionStream.parallelism){ row => if (IfExists(row.rowid)) Future.successful(CQLResult(marshal(0))) else cqlActionStream.perform(row).map {_ => CQLResult(marshal(1))} } } override def transferRows: Flow[AQMRPTRow, CQLResult, NotUsed] = { Flow[AQMRPTRow] .via(cqlActionFlow) } private def IfExists(rowid: Long): Boolean = { val cql = "SELECT * FROM testdb.AQMRPT WHERE ROWID = ? ALLOW FILTERING" val param = Seq(rowid.asInstanceOf[Object]) val toRowId: Row => Long = r => r.getLong("rowid") val ctx = CQLQueryContext(cql,param) val src: Source[Long,NotUsed] = cassandraStream(ctx,toRowId) val fut = src.toMat(Sink.headOption)(Keep.right).run() val result = Await.result(fut,3 seconds) log.info(s"checking existence: ${result}") result match { case Some(x) => true case None => false } } override def clientStreaming: Flow[HelloMsg, HelloMsg, NotUsed] = { Flow[HelloMsg] .map {r => println(r) ; r} } override def runDDL: Flow[CQLUpdate, CQLResult, NotUsed] = { Flow[CQLUpdate] .flatMapConcat { context => //unpack CQLUpdate and construct the context val ctx = CQLContext(context.statements) log.info(s"**** CQLContext => ${ctx} ***") Source .fromFuture(cqlExecute(ctx)) .map { r => CQLResult(marshal(r)) } } } def toCQLTimestamp(rs: Row) = { try { val tm = rs.getTimestamp("CREATED") if (tm == null) None else { val localdt = cqlGetTimestamp(tm) Some(ProtoDateTime(Some(ProtoDate(localdt.getYear, localdt.getMonthValue, localdt.getDayOfMonth)), Some(ProtoTime(localdt.getHour, localdt.getMinute, localdt.getSecond, localdt.getNano)))) } } catch { case e: Exception => None } } val toAQMRow: Row => AQMRPTRow = rs=> AQMRPTRow( rowid = rs.getLong("ROWID"), measureid = rs.getLong("MEASUREID"), statename = rs.getString("STATENAME"), countyname = rs.getString("COUNTYNAME"), reportyear = rs.getInt("REPORTYEAR"), value = rs.getInt("VALUE"), created = toCQLTimestamp(rs) ) override def runQuery: Flow[CQLQuery, AQMRPTRow, NotUsed] = { log.info("**** runQuery called on service side ***") Flow[CQLQuery] .flatMapConcat { q => //unpack JDBCQuery and construct the context var params: Seq[Object] = Nil if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY) params = unmarshal[Seq[Object]](q.parameters) log.info(s"**** query parameters: ${params} ****") val ctx = CQLQueryContext(q.statement,params) CQLEngine.cassandraStream(ctx,toAQMRow) } } }
package demo.sdp.grpc.cql.server import java.util.logging.Logger import com.datastax.driver.core._ import akka.actor.ActorSystem import akka.stream.ActorMaterializer import io.grpc.Server import io.grpc.ServerBuilder import sdp.grpc.services._ import sdp.cql.engine._ import CQLHelpers._ class gRPCServer(server: Server) { val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName) def start(): Unit = { server.start() logger.info(s"Server started, listening on ${server.getPort}") sys.addShutdownHook { // Use stderr here since the logger may has been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down") stop() System.err.println("*** server shut down") } () } def stop(): Unit = { server.shutdown() } /** * Await termination on the main thread since the grpc library uses daemon threads. */ def blockUntilShutdown(): Unit = { server.awaitTermination() } } object CQLServer extends App { implicit val cqlsys = ActorSystem("cqlSystem") implicit val mat = ActorMaterializer() implicit val ec = cqlsys.dispatcher val cluster = new Cluster .Builder() .addContactPoints("localhost") .withPort(9042) .build() useJava8DateTime(cluster) implicit val session = cluster.connect() val server = new gRPCServer( ServerBuilder .forPort(50051) .addService( CqlGrpcAkkaStream.bindService( new CQLStreamingServices ) ).build() ) server.start() // server.blockUntilShutdown() scala.io.StdIn.readLine() session.close() cluster.close() mat.shutdown() cqlsys.terminate() }
package demo.sdp.grpc.cql.client import sdp.grpc.services._ import protobuf.bytes.Converter._ import akka.stream.scaladsl._ import akka.NotUsed import akka.actor.ActorSystem import akka.stream.{ActorMaterializer, ThrottleMode} import io.grpc._ import sdp.logging.LogSupport import sdp.jdbc.engine._ import JDBCEngine._ import scalikejdbc.WrappedResultSet import sdp.cql.engine.CQLHelpers.CQLDateTimeNow import scala.util._ import scala.concurrent.ExecutionContextExecutor class CQLStreamClient(host: String, port: Int)( implicit ec: ExecutionContextExecutor) extends LogSupport { val channel = ManagedChannelBuilder .forAddress(host, port) .usePlaintext(true) .build() val stub = CqlGrpcAkkaStream.stub(channel) val jdbcRows2transfer = JDBCQueryContext[AQMRPTRow]( dbName = 'h2, statement = "select * from AQMRPT where statename='Arkansas'" ) def toAQMRPTRow: WrappedResultSet => AQMRPTRow = rs => AQMRPTRow( rowid = rs.long("ROWID"), measureid = rs.long("MEASUREID"), statename = rs.string("STATENAME"), countyname = rs.string("COUNTYNAME"), reportyear = rs.int("REPORTYEAR"), value = rs.int("VALUE"), created = Some(ProtoDateTime(Some(ProtoDate(1990, 8, 12)), Some(ProtoTime(23, 56, 23, 0)))) ) import scala.concurrent.duration._ def transferRows: Source[CQLResult, NotUsed] = { log.info(s"**** calling transferRows ****") jdbcAkkaStream(jdbcRows2transfer, toAQMRPTRow) // .throttle(1, 500.millis, 1, ThrottleMode.shaping) .via(stub.transferRows) } def echoHello: Source[HelloMsg,NotUsed] = { val row = HelloMsg("hello world!") val rows = List.fill[HelloMsg](100)(row) Source .fromIterator(() => rows.iterator) .via(stub.clientStreaming) } val query0 = CQLQuery( statement = "select * from testdb.AQMRPT" ) val query = CQLQuery( statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE > ? ALLOW FILTERING;", parameters = marshal(Seq("Arkansas", 0.toInt)) ) val query2 = CQLQuery ( statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?", parameters = marshal(Seq("Colorado", 3.toInt)) ) val query3= CQLQuery ( statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?", parameters = marshal(Seq("Arkansas", 8.toInt)) ) def queryRows: Source[AQMRPTRow,NotUsed] = { log.info(s"running queryRows ...") Source .single(query) .via(stub.runQuery) } val dropCQL = "DROP TABLE IF EXISTS testdb.AQMRPT" val createCQL =""" CREATE TABLE testdb.AQMRPT ( rowid bigint primary key, measureid bigint, statename text, countyname text, reportyear int, value int, created timestamp )""" val cqlddl = CQLUpdate(statements = Seq(dropCQL,createCQL)) def createTbl: Source[CQLResult,NotUsed] = { log.info(s"running createTbl ...") Source .single(cqlddl) .via(stub.runDDL) } } object EchoHelloClient extends App { implicit val system = ActorSystem("EchoNumsClient") implicit val mat = ActorMaterializer.create(system) implicit val ec = system.dispatcher val client = new CQLStreamClient("localhost", 50051) client.echoHello.runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() } object TransferRows extends App { import sdp.jdbc.config._ implicit val system = ActorSystem("JDBCServer") implicit val mat = ActorMaterializer.create(system) implicit val ec = system.dispatcher ConfigDBsWithEnv("dev").setup('h2) ConfigDBsWithEnv("dev").loadGlobalSettings() val client = new CQLStreamClient("localhost", 50051) val fut = client.transferRows.runFold(0){(a,b) => a + unmarshal[Int](b.result)} fut.onComplete { case scala.util.Success(cnt) => println(s"done transfer ${cnt} rows.") case Failure(e) => println(s"!!!!!streaming error: ${e.getMessage}") } scala.io.StdIn.readLine() ConfigDBsWithEnv("dev").close('h2) mat.shutdown() system.terminate() } object QueryRows extends App { implicit val system = ActorSystem("QueryRows") implicit val mat = ActorMaterializer.create(system) implicit val ec = system.dispatcher val client = new CQLStreamClient("localhost", 50051) val fut = client.queryRows.runForeach { r => println(r) } fut.onComplete { case scala.util.Success(d) => println(s"done querying.") case Failure(e) => println(s"!!!!!query error: ${e.getMessage}") } scala.io.StdIn.readLine() mat.shutdown() system.terminate() } object RunDDL extends App { implicit val system = ActorSystem("RunDDL") implicit val mat = ActorMaterializer.create(system) implicit val ec = system.dispatcher val client = new CQLStreamClient("localhost", 50051) client.createTbl.runForeach { r => println(unmarshal(r.result)) } scala.io.StdIn.readLine() mat.shutdown() system.terminate() }