在上一篇博文里我们介绍了通过gRPC实现JDBC数据库的streaming,这篇我们介绍关于cassandra的streaming实现方式。如果我们需要从一个未部署cassandra的节点或终端上读取cassandra数据,可以用gRPC来搭建一个数据桥梁来连接这两端。这时cassandra这端就是gRPC-Server端,由它提供cassandra的数据服务。
在前面sdp系列讨论里我们已经实现了Cassandra-Engine。它的运作原理还是通过某种Context把指令提交给cassandra去执行。我们先设计一个创建库表的例子。CQL语句和Cassandra-Engine程序代码如下,这是客户端部分:
val dropCQL = "DROP TABLE IF EXISTS testdb.AQMRPT"
val createCQL ="""
CREATE TABLE testdb.AQMRPT (
rowid bigint primary key,
measureid bigint,
statename text,
countyname text,
reportyear int,
value int,
created timestamp
)"""
val cqlddl = CQLUpdate(statements = Seq(dropCQL,createCQL))
def createTbl: Source[CQLResult,NotUsed] = {
log.info(s"running createTbl ...")
Source
.single(cqlddl)
.via(stub.runDDL)
}
首先,我们在CQLUpdate这个protobuf对应Context里传入两条指令dropCQL和createCQL,可以预计这会是一种批次型batch方式。然后一如既往,我们使用了streaming编程模式。在.proto文件里用DDL来对应Context和Service:
message CQLUpdate {
repeated string statements = 1;
bytes parameters = 2;
google.protobuf.Int32Value consistency = 3;
google.protobuf.BoolValue batch = 4;
}
service CQLServices {
rpc runDDL(CQLUpdate) returns (CQLResult) {}
}
服务函数runDDL程序实现如下:
override def runDDL: Flow[CQLUpdate, CQLResult, NotUsed] = {
Flow[CQLUpdate]
.flatMapConcat { context =>
//unpack CQLUpdate and construct the context
val ctx = CQLContext(context.statements)
log.info(s"**** CQLContext => ${ctx} ***")
Source
.fromFuture(cqlExecute(ctx))
.map { r => CQLResult(marshal(r)) }
}
}
这里我们调用了Cassandra-Engine的cqlExecute(ctx)函数:
def cqlExecute(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
var invalidBat = false
if ( ctx.batch ) {
if (ctx.parameters == Nil)
invalidBat = true
else if (ctx.parameters.size < 2)
invalidBat = true;
}
if (!ctx.batch || invalidBat) {
if(invalidBat)
log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.")
if (ctx.statements.size == 1) {
var param: Seq[Object] = Nil
if (ctx.parameters != Nil) param = ctx.parameters.head
log.info(s"cqlExecute> single-command: statement: ${ctx.statements.head} parameters: ${param}")
cqlSingleUpdate(ctx.consistency, ctx.statements.head, param)
}
else {
var params: Seq[Seq[Object]] = Nil
if (ctx.parameters == Nil)
params = Seq.fill(ctx.statements.length)(Nil)
else {
if (ctx.statements.size > ctx.parameters.size) {
log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.")
val nils = Seq.fill(ctx.statements.size - ctx.parameters.size)(Nil)
params = ctx.parameters ++ nils
}
else
params = ctx.parameters
}
val commands: Seq[(String,Seq[Object])] = ctx.statements zip params
log.info(s"cqlExecute> multi-commands: ${commands}")
/*
//using sequence to flip List[Future[Boolean]] => Future[List[Boolean]]
//therefore, make sure no command replies on prev command effect
val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) =>
cqlSingleUpdate(ctx.consistency, stmt, param)
}.toList
val futList = lstCmds.sequence.map(_ => true) //must map to execute
*/
/*
//using traverse to have some degree of parallelism = max(runtimes)
//therefore, make sure no command replies on prev command effect
val futList = Future.traverse(commands) { case (stmt,param) =>
cqlSingleUpdate(ctx.consistency, stmt, param)
}.map(_ => true)
Await.result(futList, 3 seconds)
Future.successful(true)
*/
// run sync directly
Future {
commands.foreach { case (stm, pars) =>
cqlExecuteSync(ctx.consistency, stm, pars)
}
true
}
}
}
else
cqlBatchUpdate(ctx)
}
特别展示了这个函数的代码是因为对于一批次多条指令可能会涉及到non-blocking和并行计算。可参考上面代码标注段落里函数式方法(cats)sequence,traverse如何实现对一串Future的运算。
下一个例子是用流方式把JDBC数据库数据并入cassandra数据库里。.proto DDL内容如下:
message ProtoDate {
int32 yyyy = 1;
int32 mm = 2;
int32 dd = 3;
}
message ProtoTime {
int32 hh = 1;
int32 mm = 2;
int32 ss = 3;
int32 nnn = 4;
}
message ProtoDateTime {
ProtoDate date = 1;
ProtoTime time = 2;
}
message AQMRPTRow {
int64 rowid = 1;
string countyname = 2;
string statename = 3;
int64 measureid = 4;
int32 reportyear = 5;
int32 value = 6;
ProtoDateTime created = 7;
}
message CQLResult {
bytes result = 1;
}
message CQLUpdate {
repeated string statements = 1;
bytes parameters = 2;
google.protobuf.Int32Value consistency = 3;
google.protobuf.BoolValue batch = 4;
}
service CQLServices {
rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
rpc runDDL(CQLUpdate) returns (CQLResult) {}
}
下面是服务函数的实现:
val toParams: AQMRPTRow => Seq[Object] = row => Seq[Object](
row.rowid.asInstanceOf[Object],
row.measureid.asInstanceOf[Object],
row.statename,
row.countyname,
row.reportyear.asInstanceOf[Object],
row.value.asInstanceOf[Object],
CQLDateTimeNow
)
val cqlInsert ="""
|insert into testdb.AQMRPT(
| rowid,
| measureid,
| statename,
| countyname,
| reportyear,
| value,
| created)
| values(?,?,?,?,?,?,?)
""".stripMargin
val cqlActionStream = CassandraActionStream(cqlInsert,toParams).setParallelism(2)
.setProcessOrder(false)
/*
val cqlActionFlow: Flow[AQMRPTRow,AQMRPTRow,NotUsed] =
Flow[AQMRPTRow]
.via(cqlActionStream.performOnRow)
*/
val cqlActionFlow: Flow[AQMRPTRow,CQLResult,NotUsed] = {
Flow[AQMRPTRow]
.mapAsync(cqlActionStream.parallelism){ row =>
if (IfExists(row.rowid))
Future.successful(CQLResult(marshal(0)))
else
cqlActionStream.perform(row).map {_ => CQLResult(marshal(1))}
}
}
override def transferRows: Flow[AQMRPTRow, CQLResult, NotUsed] = {
Flow[AQMRPTRow]
.via(cqlActionFlow)
}
private def IfExists(rowid: Long): Boolean = {
val cql = "SELECT * FROM testdb.AQMRPT WHERE ROWID = ? ALLOW FILTERING"
val param = Seq(rowid.asInstanceOf[Object])
val toRowId: Row => Long = r => r.getLong("rowid")
val ctx = CQLQueryContext(cql,param)
val src: Source[Long,NotUsed] = cassandraStream(ctx,toRowId)
val fut = src.toMat(Sink.headOption)(Keep.right).run()
val result = Await.result(fut,3 seconds)
log.info(s"checking existence: ${result}")
result match {
case Some(x) => true
case None => false
}
}
在上面的代码里我们调用了Cassandra-Engine的CassandraActionStream类型的流处理方法。值得注意的是这里我们尝试在stream Flow里运算另一个Flow,如:IfExists函数里运算一个Source来确定rowid是否存在。不要在意这个函数的实际应用,它只是一个人为的例子。另外,rowid:Long这样的定义是硬性规定的。cassandra对数据类型的匹配要求很弱智,没有提供任何自然转换。所以,Int <> Long被视为类型错误,而且无法catch任何明白的错误信息。
这项服务的客户端调用如下:
val stub = CqlGrpcAkkaStream.stub(channel)
val jdbcRows2transfer = JDBCQueryContext[AQMRPTRow](
dbName = 'h2,
statement = "select * from AQMRPT where statename='Arkansas'"
)
def toAQMRPTRow: WrappedResultSet => AQMRPTRow = rs => AQMRPTRow(
rowid = rs.long("ROWID"),
measureid = rs.long("MEASUREID"),
statename = rs.string("STATENAME"),
countyname = rs.string("COUNTYNAME"),
reportyear = rs.int("REPORTYEAR"),
value = rs.int("VALUE"),
created = Some(ProtoDateTime(Some(ProtoDate(1990, 8, 12)), Some(ProtoTime(23, 56, 23, 0))))
)
import scala.concurrent.duration._
def transferRows: Source[CQLResult, NotUsed] = {
log.info(s"**** calling transferRows ****")
jdbcAkkaStream(jdbcRows2transfer, toAQMRPTRow)
// .throttle(1, 500.millis, 1, ThrottleMode.shaping)
.via(stub.transferRows)
}
注意:JDBC在客户端本地,cassandra是远程服务。
最后我们示范一下cassandra Query。.proto DDL 定义:
message CQLQuery {
string statement = 1;
bytes parameters = 2;
google.protobuf.Int32Value consistency = 3;
google.protobuf.Int32Value fetchSize = 4;
}
service CQLServices {
rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
rpc runQuery(CQLQuery) returns (stream AQMRPTRow) {}
rpc runDDL(CQLUpdate) returns (CQLResult) {}
}
服务函数代码如下:
def toCQLTimestamp(rs: Row) = {
try {
val tm = rs.getTimestamp("CREATED")
if (tm == null) None
else {
val localdt = cqlGetTimestamp(tm)
Some(ProtoDateTime(Some(ProtoDate(localdt.getYear, localdt.getMonthValue, localdt.getDayOfMonth)),
Some(ProtoTime(localdt.getHour, localdt.getMinute, localdt.getSecond, localdt.getNano))))
}
}
catch {
case e: Exception => None
}
}
val toAQMRow: Row => AQMRPTRow = rs=> AQMRPTRow(
rowid = rs.getLong("ROWID"),
measureid = rs.getLong("MEASUREID"),
statename = rs.getString("STATENAME"),
countyname = rs.getString("COUNTYNAME"),
reportyear = rs.getInt("REPORTYEAR"),
value = rs.getInt("VALUE"),
created = toCQLTimestamp(rs)
)
override def runQuery: Flow[CQLQuery, AQMRPTRow, NotUsed] = {
log.info("**** runQuery called on service side ***")
Flow[CQLQuery]
.flatMapConcat { q =>
//unpack JDBCQuery and construct the context
var params: Seq[Object] = Nil
if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
params = unmarshal[Seq[Object]](q.parameters)
log.info(s"**** query parameters: ${params} ****")
val ctx = CQLQueryContext(q.statement,params)
CQLEngine.cassandraStream(ctx,toAQMRow)
}
}
这里值得看看的一是日期转换,二是对于cassandra parameter Seq[Object]的marshal和unmarshal。客户端代码:
val query = CQLQuery(
statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE > ? ALLOW FILTERING;",
parameters = marshal(Seq("Arkansas", 0.toInt))
)
val query2 = CQLQuery (
statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
parameters = marshal(Seq("Colorado", 3.toInt))
)
val query3= CQLQuery (
statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
parameters = marshal(Seq("Arkansas", 8.toInt))
)
def queryRows: Source[AQMRPTRow,NotUsed] = {
log.info(s"running queryRows ...")
Source
.single(query)
.via(stub.runQuery)
}
这段相对直白。
下面就是本次讨论涉及的完整源代码:
project/scalapb.sbt
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") resolvers += Resolver.bintrayRepo("beyondthelines", "maven") libraryDependencies ++= Seq( "com.thesamet.scalapb" %% "compilerplugin" % "0.7.4", "beyondthelines" %% "grpcakkastreamgenerator" % "0.0.5" )
build.sbt
import scalapb.compiler.Version.scalapbVersion import scalapb.compiler.Version.grpcJavaVersion name := "gRPCCassandra" version := "0.1" scalaVersion := "2.12.6" resolvers += Resolver.bintrayRepo("beyondthelines", "maven") scalacOptions += "-Ypartial-unification" libraryDependencies := Seq( "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf", "io.grpc" % "grpc-netty" % grpcJavaVersion, "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, "io.monix" %% "monix" % "2.3.0", // for GRPC Akkastream "beyondthelines" %% "grpcakkastreamruntime" % "0.0.5", // for scalikejdbc "org.scalikejdbc" %% "scalikejdbc" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-test" % "3.2.1" % "test", "org.scalikejdbc" %% "scalikejdbc-config" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1", "com.h2database" % "h2" % "1.4.196", "mysql" % "mysql-connector-java" % "6.0.6", "org.postgresql" % "postgresql" % "42.2.0", "commons-dbcp" % "commons-dbcp" % "1.4", "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2", "com.zaxxer" % "HikariCP" % "2.7.4", "com.jolbox" % "bonecp" % "0.8.0.RELEASE", "com.typesafe.slick" %% "slick" % "3.2.1", //for cassandra 340 "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0", "com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0", "com.typesafe.akka" %% "akka-stream" % "2.5.13", "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.19", "ch.qos.logback" % "logback-classic" % "1.2.3", "org.typelevel" %% "cats-core" % "1.1.0" ) PB.targets in Compile := Seq( scalapb.gen() -> (sourceManaged in Compile).value, // generate the akka stream files grpc.akkastreams.generators.GrpcAkkaStreamGenerator() -> (sourceManaged in Compile).value )
main/resources/application.conf
# JDBC settings test { db { h2 { driver = "org.h2.Driver" url = "jdbc:h2:tcp://localhost/~/slickdemo" user = "" password = "" poolInitialSize = 5 poolMaxSize = 7 poolConnectionTimeoutMillis = 1000 poolValidationQuery = "select 1 as one" poolFactoryName = "commons-dbcp2" } } db.mysql.driver = "com.mysql.cj.jdbc.Driver" db.mysql.url = "jdbc:mysql://localhost:3306/testdb" db.mysql.user = "root" db.mysql.password = "123" db.mysql.poolInitialSize = 5 db.mysql.poolMaxSize = 7 db.mysql.poolConnectionTimeoutMillis = 1000 db.mysql.poolValidationQuery = "select 1 as one" db.mysql.poolFactoryName = "bonecp" # scallikejdbc Global settings scalikejdbc.global.loggingSQLAndTime.enabled = true scalikejdbc.global.loggingSQLAndTime.logLevel = info scalikejdbc.global.loggingSQLAndTime.warningEnabled = true scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn scalikejdbc.global.loggingSQLAndTime.singleLineMode = false scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 } dev { db { h2 { driver = "org.h2.Driver" url = "jdbc:h2:tcp://localhost/~/slickdemo" user = "" password = "" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } mysql { driver = "com.mysql.cj.jdbc.Driver" url = "jdbc:mysql://localhost:3306/testdb" user = "root" password = "123" poolInitialSize = 5 poolMaxSize = 7 poolConnectionTimeoutMillis = 1000 poolValidationQuery = "select 1 as one" poolFactoryName = "bonecp" } postgres { driver = "org.postgresql.Driver" url = "jdbc:postgresql://localhost:5432/testdb" user = "root" password = "123" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } } # scallikejdbc Global settings scalikejdbc.global.loggingSQLAndTime.enabled = true scalikejdbc.global.loggingSQLAndTime.logLevel = info scalikejdbc.global.loggingSQLAndTime.warningEnabled = true scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn scalikejdbc.global.loggingSQLAndTime.singleLineMode = false scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 }
main/resources/logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <layout class="ch.qos.logback.classic.PatternLayout"> <Pattern> %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n </Pattern> </layout> </appender> <logger name="sdp.cql" level="info" additivity="false"> <appender-ref ref="STDOUT" /> </logger> <logger name="demo.sdp.grpc.cql" level="info" additivity="false"> <appender-ref ref="STDOUT" /> </logger> <root level="error"> <appender-ref ref="STDOUT" /> </root> </configuration>
main/protobuf/cql.proto
syntax = "proto3"; import "google/protobuf/wrappers.proto"; import "google/protobuf/any.proto"; import "scalapb/scalapb.proto"; option (scalapb.options) = { // use a custom Scala package name // package_name: "io.ontherocks.introgrpc.demo" // don't append file name to package flat_package: true // generate one Scala file for all messages (services still get their own file) single_file: true // add imports to generated file // useful when extending traits or using custom types // import: "io.ontherocks.hellogrpc.RockingMessage" // code to put at the top of generated file // works only with `single_file: true` //preamble: "sealed trait SomeSealedTrait" }; /* * Demoes various customization options provided by ScalaPBs. */ package sdp.grpc.services; message ProtoDate { int32 yyyy = 1; int32 mm = 2; int32 dd = 3; } message ProtoTime { int32 hh = 1; int32 mm = 2; int32 ss = 3; int32 nnn = 4; } message ProtoDateTime { ProtoDate date = 1; ProtoTime time = 2; } message AQMRPTRow { int64 rowid = 1; string countyname = 2; string statename = 3; int64 measureid = 4; int32 reportyear = 5; int32 value = 6; ProtoDateTime created = 7; } message CQLResult { bytes result = 1; } message CQLQuery { string statement = 1; bytes parameters = 2; google.protobuf.Int32Value consistency = 3; google.protobuf.Int32Value fetchSize = 4; } message CQLUpdate { repeated string statements = 1; bytes parameters = 2; google.protobuf.Int32Value consistency = 3; google.protobuf.BoolValue batch = 4; } message HelloMsg { string hello = 1; } service CQLServices { rpc clientStreaming(stream HelloMsg) returns (stream HelloMsg) {} rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {} rpc runQuery(CQLQuery) returns (stream AQMRPTRow) {} rpc runDDL(CQLUpdate) returns (CQLResult) {} }
logging/log.scala
package sdp.logging import org.slf4j.Logger /** * Logger which just wraps org.slf4j.Logger internally. * * @param logger logger */ class Log(logger: Logger) { // use var consciously to enable squeezing later var isDebugEnabled: Boolean = logger.isDebugEnabled var isInfoEnabled: Boolean = logger.isInfoEnabled var isWarnEnabled: Boolean = logger.isWarnEnabled var isErrorEnabled: Boolean = logger.isErrorEnabled def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = { level match { case 'debug | 'DEBUG => debug(msg) case 'info | 'INFO => info(msg) case 'warn | 'WARN => warn(msg) case 'error | 'ERROR => error(msg) case _ => // nothing to do } } def debug(msg: => String): Unit = { if (isDebugEnabled && logger.isDebugEnabled) { logger.debug(msg) } } def debug(msg: => String, e: Throwable): Unit = { if (isDebugEnabled && logger.isDebugEnabled) { logger.debug(msg, e) } } def info(msg: => String): Unit = { if (isInfoEnabled && logger.isInfoEnabled) { logger.info(msg) } } def info(msg: => String, e: Throwable): Unit = { if (isInfoEnabled && logger.isInfoEnabled) { logger.info(msg, e) } } def warn(msg: => String): Unit = { if (isWarnEnabled && logger.isWarnEnabled) { logger.warn(msg) } } def warn(msg: => String, e: Throwable): Unit = { if (isWarnEnabled && logger.isWarnEnabled) { logger.warn(msg, e) } } def error(msg: => String): Unit = { if (isErrorEnabled && logger.isErrorEnabled) { logger.error(msg) } } def error(msg: => String, e: Throwable): Unit = { if (isErrorEnabled && logger.isErrorEnabled) { logger.error(msg, e) } } }
logging/LogSupport.scala
package sdp.logging import org.slf4j.LoggerFactory trait LogSupport { /** * Logger */ protected val log = new Log(LoggerFactory.getLogger(this.getClass)) }
filestreaming/FileStreaming.scala
package sdp.file import java.io.{ByteArrayInputStream, InputStream} import java.nio.ByteBuffer import java.nio.file.Paths import akka.stream.Materializer import akka.stream.scaladsl.{FileIO, StreamConverters} import akka.util._ import scala.concurrent.Await import scala.concurrent.duration._ object Streaming { def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer):ByteBuffer = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } (Await.result(fut, timeOut)).toByteBuffer } def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer): Array[Byte] = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } (Await.result(fut, timeOut)).toArray } def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer): InputStream = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } val buf = (Await.result(fut, timeOut)).toArray new ByteArrayInputStream(buf) } def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)( implicit mat: Materializer) = { val ba = new Array[Byte](byteBuf.remaining()) byteBuf.get(ba,0,ba.length) val baInput = new ByteArrayInputStream(ba) val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) source.runWith(FileIO.toPath(Paths.get(fileName))) } def ByteArrayToFile(bytes: Array[Byte], fileName: String)( implicit mat: Materializer) = { val bb = ByteBuffer.wrap(bytes) val baInput = new ByteArrayInputStream(bytes) val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) source.runWith(FileIO.toPath(Paths.get(fileName))) } def InputStreamToFile(is: InputStream, fileName: String)( implicit mat: Materializer) = { val source = StreamConverters.fromInputStream(() => is) source.runWith(FileIO.toPath(Paths.get(fileName))) } }
jdbc/JDBCConfig.scala
package sdp.jdbc.config import scala.collection.mutable import scala.concurrent.duration.Duration import scala.language.implicitConversions import com.typesafe.config._ import java.util.concurrent.TimeUnit import java.util.Properties import scalikejdbc.config._ import com.typesafe.config.Config import com.zaxxer.hikari._ import scalikejdbc.ConnectionPoolFactoryRepository /** Extension methods to make Typesafe Config easier to use */ class ConfigExtensionMethods(val c: Config) extends AnyVal { import scala.collection.JavaConverters._ def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default def getDurationOr(path: String, default: => Duration = Duration.Zero) = if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default def getPropertiesOr(path: String, default: => Properties = null): Properties = if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default def toProperties: Properties = { def toProps(m: mutable.Map[String, ConfigValue]): Properties = { val props = new Properties(null) m.foreach { case (k, cv) => val v = if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala) else if(cv.unwrapped eq null) null else cv.unwrapped.toString if(v ne null) props.put(k, v) } props } toProps(c.root.asScala) } def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None def getStringOpt(path: String) = Option(getStringOr(path)) def getPropertiesOpt(path: String) = Option(getPropertiesOr(path)) } object ConfigExtensionMethods { @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c) } trait HikariConfigReader extends TypesafeConfigReader { self: TypesafeConfig => // with TypesafeConfigReader => //NoEnvPrefix => import ConfigExtensionMethods.configExtensionMethods def getFactoryName(dbName: Symbol): String = { val c: Config = config.getConfig(envPrefix + "db." + dbName.name) c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP) } def hikariCPConfig(dbName: Symbol): HikariConfig = { val hconf = new HikariConfig() val c: Config = config.getConfig(envPrefix + "db." + dbName.name) // Connection settings if (c.hasPath("dataSourceClass")) { hconf.setDataSourceClassName(c.getString("dataSourceClass")) } else { Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _) } hconf.setJdbcUrl(c.getStringOr("url", null)) c.getStringOpt("user").foreach(hconf.setUsername) c.getStringOpt("password").foreach(hconf.setPassword) c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties) // Pool configuration hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000)) hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000)) hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000)) hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000)) hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0)) hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false)) c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery) c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql) val numThreads = c.getIntOr("numThreads", 20) hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5)) hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads)) hconf.setPoolName(c.getStringOr("poolName", dbName.name)) hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false)) // Equivalent of ConnectionPreparer hconf.setReadOnly(c.getBooleanOr("readOnly", false)) c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation) hconf.setCatalog(c.getStringOr("catalog", null)) hconf } } import scalikejdbc._ trait ConfigDBs { self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader => def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { getFactoryName(dbName) match { case "hikaricp" => { val hconf = hikariCPConfig(dbName) val hikariCPSource = new HikariDataSource(hconf) case class HikariDataSourceCloser(src: HikariDataSource) extends DataSourceCloser { var closed = false override def close(): Unit = src.close() } if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) { Class.forName(hconf.getDriverClassName) } ConnectionPool.add(dbName, new DataSourceConnectionPool(dataSource = hikariCPSource,settings = DataSourceConnectionPoolSettings(), closer = HikariDataSourceCloser(hikariCPSource))) } case _ => { val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName) val cpSettings = readConnectionPoolSettings(dbName) if (driver != null && driver.trim.nonEmpty) { Class.forName(driver) } ConnectionPool.add(dbName, url, user, password, cpSettings) } } } def setupAll(): Unit = { loadGlobalSettings() dbNames.foreach { dbName => setup(Symbol(dbName)) } } def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { ConnectionPool.close(dbName) } def closeAll(): Unit = { ConnectionPool.closeAll } } object ConfigDBs extends ConfigDBs with TypesafeConfigReader with StandardTypesafeConfig with HikariConfigReader case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs with TypesafeConfigReader with StandardTypesafeConfig with HikariConfigReader with EnvPrefix { override val env = Option(envValue) }
jdbc/JDBCEngine.scala
package sdp.jdbc.engine import java.sql.PreparedStatement import scala.collection.generic.CanBuildFrom import akka.stream.scaladsl._ import scalikejdbc._ import scalikejdbc.streams._ import akka.NotUsed import akka.stream._ import java.time._ import scala.concurrent.duration._ import scala.concurrent._ import sdp.file.Streaming._ import scalikejdbc.TxBoundary.Try._ import scala.concurrent.ExecutionContextExecutor import java.io.InputStream import sdp.logging.LogSupport object JDBCContext { type SQLTYPE = Int val SQL_EXEDDL= 1 val SQL_UPDATE = 2 val RETURN_GENERATED_KEYVALUE = true val RETURN_UPDATED_COUNT = false } case class JDBCQueryContext[M]( dbName: Symbol, statement: String, parameters: Seq[Any] = Nil, fetchSize: Int = 100, autoCommit: Boolean = false, queryTimeout: Option[Int] = None) case class JDBCContext ( dbName: Symbol, statements: Seq[String] = Nil, parameters: Seq[Seq[Any]] = Nil, fetchSize: Int = 100, queryTimeout: Option[Int] = None, queryTags: Seq[String] = Nil, sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE, batch: Boolean = false, returnGeneratedKey: Seq[Option[Any]] = Nil, // no return: None, return by index: Some(1), by name: Some("id") preAction: Option[PreparedStatement => Unit] = None, postAction: Option[PreparedStatement => Unit] = None) extends LogSupport { ctx => //helper functions def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag) def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags) def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size) def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time) def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch && ctx.statements.size == 1) { val nc = ctx.copy(preAction = action) log.info("setPreAction> set") nc } else { log.info("setPreAction> JDBCContex setting error: preAction not supported!") throw new IllegalStateException("JDBCContex setting error: preAction not supported!") } } def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch && ctx.statements.size == 1) { val nc = ctx.copy(postAction = action) log.info("setPostAction> set") nc } else { log.info("setPreAction> JDBCContex setting error: postAction not supported!") throw new IllegalStateException("JDBCContex setting error: postAction not supported!") } } def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_EXEDDL) { log.info(s"appendDDLCommand> appending: statement: ${_statement}, parameters: ${_parameters}") val nc = ctx.copy( statements = ctx.statements ++ Seq(_statement), parameters = ctx.parameters ++ Seq(Seq(_parameters)) ) log.info(s"appendDDLCommand> appended: statement: ${nc.statements}, parameters: ${nc.parameters}") nc } else { log.info(s"appendDDLCommand> JDBCContex setting error: option not supported!") throw new IllegalStateException("JDBCContex setting error: option not supported!") } } def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) { log.info(s"appendUpdateCommand> appending: returnGeneratedKey: ${_returnGeneratedKey}, statement: ${_statement}, parameters: ${_parameters}") val nc = ctx.copy( statements = ctx.statements ++ Seq(_statement), parameters = ctx.parameters ++ Seq(_parameters), returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None)) ) log.info(s"appendUpdateCommand> appended: statement: ${nc.statements}, parameters: ${nc.parameters}") nc } else { log.info(s"appendUpdateCommand> JDBCContex setting error: option not supported!") throw new IllegalStateException("JDBCContex setting error: option not supported!") } } def appendBatchParameters(_parameters: Any*): JDBCContext = { log.info(s"appendBatchParameters> appending: parameters: ${_parameters}") if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) { log.info(s"appendBatchParameters> JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") } var matchParams = true if (ctx.parameters != Nil) if (ctx.parameters.head.size != _parameters.size) matchParams = false if (matchParams) { val nc = ctx.copy( parameters = ctx.parameters ++ Seq(_parameters) ) log.info(s"appendBatchParameters> appended: statement: ${nc.statements}, parameters: ${nc.parameters}") nc } else { log.info(s"appendBatchParameters> JDBCContex setting error: batch command parameters not match!") throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!") } } def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = { if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!") ctx.copy( returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil ) } def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { log.info(s"setDDLCommand> setting: statement: ${_statement}, parameters: ${_parameters}") val nc = ctx.copy( statements = Seq(_statement), parameters = Seq(_parameters), sqlType = JDBCContext.SQL_EXEDDL, batch = false ) log.info(s"setDDLCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}") nc } def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = { log.info(s"setUpdateCommand> setting: returnGeneratedKey: ${_returnGeneratedKey}, statement: ${_statement}, parameters: ${_parameters}") val nc = ctx.copy( statements = Seq(_statement), parameters = Seq(_parameters), returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None), sqlType = JDBCContext.SQL_UPDATE, batch = false ) log.info(s"setUpdateCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}") nc } def setBatchCommand(_statement: String): JDBCContext = { log.info(s"setBatchCommand> appending: statement: ${_statement}") val nc = ctx.copy ( statements = Seq(_statement), sqlType = JDBCContext.SQL_UPDATE, batch = true ) log.info(s"setBatchCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}") nc } } object JDBCEngine extends LogSupport { import JDBCContext._ type JDBCDate = LocalDate type JDBCDateTime = LocalDateTime type JDBCTime = LocalTime def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd) def jdbcSetTime(hh: Int, mm: Int, ss: Int, nn: Int) = LocalTime.of(hh,mm,ss,nn) def jdbcSetDateTime(date: JDBCDate, time: JDBCTime) = LocalDateTime.of(date,time) def jdbcSetNow = LocalDateTime.now() def jdbcGetDate(sqlDate: java.sql.Date): java.time.LocalDate = sqlDate.toLocalDate def jdbcGetTime(sqlTime: java.sql.Time): java.time.LocalTime = sqlTime.toLocalTime def jdbcGetTimestamp(sqlTimestamp: java.sql.Timestamp): java.time.LocalDateTime = sqlTimestamp.toLocalDateTime type JDBCBlob = InputStream def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer) = FileToInputStream(fileName,timeOut) def jdbcBlobToFile(blob: JDBCBlob, fileName: String)( implicit mat: Materializer) = InputStreamToFile(blob,fileName) private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) => throw new IllegalStateException(message) } def jdbcAkkaStream[A](ctx: JDBCQueryContext[A],extractor: WrappedResultSet => A) (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = { val publisher: DatabasePublisher[A] = NamedDB(ctx.dbName) readOnlyStream { val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) val sql: SQL[A, HasExtractor] = rawSql.map(extractor) sql.iterator .withDBSessionForceAdjuster(session => { session.connection.setAutoCommit(ctx.autoCommit) session.fetchSize(ctx.fetchSize) }) } log.info(s"jdbcAkkaStream> Source: db: ${ctx.dbName}, statement: ${ctx.statement}, parameters: ${ctx.parameters}") Source.fromPublisher[A](publisher) } def jdbcQueryResult[C[_] <: TraversableOnce[_], A](ctx: JDBCQueryContext[A], extractor: WrappedResultSet => A)( implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) rawSql.fetchSize(ctx.fetchSize) try { implicit val session = NamedAutoSession(ctx.dbName) log.info(s"jdbcQueryResult> Source: db: ${ctx.dbName}, statement: ${ctx.statement}, parameters: ${ctx.parameters}") val sql: SQL[A, HasExtractor] = rawSql.map(extractor) sql.collection.apply[C]() } catch { case e: Exception => log.error(s"jdbcQueryResult> runtime error: ${e.getMessage}") throw new RuntimeException(s"jdbcQueryResult> Error: ${e.getMessage}") } } def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = { if (ctx.sqlType != SQL_EXEDDL) { log.info(s"jdbcExecuteDDL> JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!") Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")) } else { log.info(s"jdbcExecuteDDL> Source: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}") Future { NamedDB(ctx.dbName) localTx { implicit session => ctx.statements.foreach { stm => val ddl = new SQLExecution(statement = stm, parameters = Nil)( before = WrappedResultSet => {})( after = WrappedResultSet => {}) ddl.apply() } "SQL_EXEDDL executed succesfully." } } } } def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit ec: ExecutionContextExecutor, cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { if (ctx.statements == Nil) { log.info(s"jdbcBatchUpdate> JDBCContex setting error: statements empty!") Future.failed(new IllegalStateException("JDBCContex setting error: statements empty!")) } if (ctx.sqlType != SQL_UPDATE) { log.info(s"jdbcBatchUpdate> JDBCContex setting error: sqlType must be 'SQL_UPDATE'!") Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) } else { if (ctx.batch) { if (noReturnKey(ctx)) { log.info(s"jdbcBatchUpdate> batch updating no return: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}") val usql = SQL(ctx.statements.head) .tags(ctx.queryTags: _*) .batch(ctx.parameters: _*) Future { NamedDB(ctx.dbName) localTx { implicit session => ctx.queryTimeout.foreach(session.queryTimeout(_)) usql.apply[Seq]() Seq.empty[Long].to[C] } } } else { log.info(s"jdbcBatchUpdate> batch updating return genkey: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}") val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None) Future { NamedDB(ctx.dbName) localTx { implicit session => ctx.queryTimeout.foreach(session.queryTimeout(_)) usql.apply[C]() } } } } else { log.info(s"jdbcBatchUpdate> JDBCContex setting error: must set batch = true !") Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = true !")) } } } private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit ec: ExecutionContextExecutor, cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { val Some(key) :: xs = ctx.returnGeneratedKey val params: Seq[Any] = ctx.parameters match { case Nil => Nil case p@_ => p.head } log.info(s"singleTxUpdateWithReturnKey> updating: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}") val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key) Future { NamedDB(ctx.dbName) localTx { implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val result = usql.apply() Seq(result).to[C] } } } private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit ec: ExecutionContextExecutor, cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { val params: Seq[Any] = ctx.parameters match { case Nil => Nil case p@_ => p.head } val before = ctx.preAction match { case None => pstm: PreparedStatement => {} case Some(f) => f } val after = ctx.postAction match { case None => pstm: PreparedStatement => {} case Some(f) => f } log.info(s"singleTxUpdateNoReturnKey> updating: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}") val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after) Future { NamedDB(ctx.dbName) localTx {implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val result = usql.apply() Seq(result.toLong).to[C] } } } private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit ec: ExecutionContextExecutor, cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { if (noReturnKey(ctx)) singleTxUpdateNoReturnKey(ctx) else singleTxUpdateWithReturnKey(ctx) } private def noReturnKey(ctx: JDBCContext): Boolean = { if (ctx.returnGeneratedKey != Nil) { val k :: xs = ctx.returnGeneratedKey k match { case None => true case Some(k) => false } } else true } def noActon: PreparedStatement=>Unit = pstm => {} def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit ec: ExecutionContextExecutor, cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match { case Nil => Seq.fill(ctx.statements.size)(None) case k@_ => k } val sqlcmd = ctx.statements zip ctx.parameters zip keys log.info(s"multiTxUpdates> updating: db: ${ctx.dbName}, SQL Commands: ${sqlcmd}") Future { NamedDB(ctx.dbName) localTx { implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val results = sqlcmd.map { case ((stm, param), key) => key match { case None => new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong case Some(k) => new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong } } results.to[C] } } } def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit ec: ExecutionContextExecutor, cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { if (ctx.statements == Nil) { log.info(s"jdbcTxUpdates> JDBCContex setting error: statements empty!") Future.failed(new IllegalStateException("JDBCContex setting error: statements empty!")) } if (ctx.sqlType != SQL_UPDATE) { log.info(s"jdbcTxUpdates> JDBCContex setting error: sqlType must be 'SQL_UPDATE'!") Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) } else { if (!ctx.batch) { if (ctx.statements.size == 1) singleTxUpdate(ctx) else multiTxUpdates(ctx) } else { log.info(s"jdbcTxUpdates> JDBCContex setting error: must set batch = false !") Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = false !")) } } } case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true, statement: String, prepareParams: R => Seq[Any]) extends LogSupport { jas => def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db) def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel) def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered) private def perform(r: R)(implicit ec: ExecutionContextExecutor) = { import scala.concurrent._ val params = prepareParams(r) log.info(s"JDBCActionStream.perform> db: ${dbName}, statement: ${statement}, parameters: ${params}") Future { NamedDB(dbName) autoCommit { session => session.execute(statement, params: _*) } r } } def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] = if (processInOrder) Flow[R].mapAsync(parallelism)(perform) else Flow[R].mapAsyncUnordered(parallelism)(perform) } object JDBCActionStream { def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] = new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params) } }
cql/CassandraEngine.scala
package sdp.cql.engine import akka.NotUsed import akka.stream.alpakka.cassandra.scaladsl._ import akka.stream.scaladsl._ import com.datastax.driver.core._ import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture} import scala.collection.JavaConverters._ import scala.collection.generic.CanBuildFrom import scala.concurrent._ import scala.concurrent.duration.Duration import sdp.logging.LogSupport object CQLContext { // Consistency Levels type CONSISTENCY_LEVEL = Int val ANY: CONSISTENCY_LEVEL = 0x0000 val ONE: CONSISTENCY_LEVEL = 0x0001 val TWO: CONSISTENCY_LEVEL = 0x0002 val THREE: CONSISTENCY_LEVEL = 0x0003 val QUORUM : CONSISTENCY_LEVEL = 0x0004 val ALL: CONSISTENCY_LEVEL = 0x0005 val LOCAL_QUORUM: CONSISTENCY_LEVEL = 0x0006 val EACH_QUORUM: CONSISTENCY_LEVEL = 0x0007 val LOCAL_ONE: CONSISTENCY_LEVEL = 0x000A val LOCAL_SERIAL: CONSISTENCY_LEVEL = 0x000B val SERIAL: CONSISTENCY_LEVEL = 0x000C def apply(): CQLContext = CQLContext(statements = Nil) def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => { consistency match { case ALL => ConsistencyLevel.ALL case ONE => ConsistencyLevel.ONE case TWO => ConsistencyLevel.TWO case THREE => ConsistencyLevel.THREE case ANY => ConsistencyLevel.ANY case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE case QUORUM => ConsistencyLevel.QUORUM case SERIAL => ConsistencyLevel.SERIAL case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL } } } case class CQLQueryContext( statement: String, parameter: Seq[Object] = Nil, consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None, fetchSize: Int = 100 ) { ctx => def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext = ctx.copy(consistency = Some(_consistency)) def setFetchSize(pageSize: Int): CQLQueryContext = ctx.copy(fetchSize = pageSize) def setParameters(param: Seq[Object]): CQLQueryContext = ctx.copy(parameter = param) } object CQLQueryContext { def apply[M](stmt: String, param: Seq[Object]): CQLQueryContext = new CQLQueryContext(statement = stmt, parameter = param) } case class CQLContext( statements: Seq[String], parameters: Seq[Seq[Object]] = Nil, consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None, batch: Boolean = false ) extends LogSupport { ctx => def setBatch(bat: Boolean) = ctx.copy(batch = bat) def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext = ctx.copy(consistency = Some(_consistency)) def setCommand(_statement: String, _parameters: Object*): CQLContext = { log.info(s"setCommand> setting: statement: ${_statement}, parameters: ${_parameters}") val nc = ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters)) log.info(s"setCommand> set: statements: ${nc.statements}, parameters: ${nc.parameters}") nc } def appendCommand(_statement: String, _parameters: Object*): CQLContext = { log.info(s"appendCommand> appending: statement: ${_statement}, parameters: ${_parameters}") val nc = ctx.copy(statements = ctx.statements :+ _statement, parameters = ctx.parameters ++ Seq(_parameters)) log.info(s"appendCommand> appended: statements: ${nc.statements}, parameters: ${nc.parameters}") nc } } object CQLEngine extends LogSupport { import CQLContext._ import CQLHelpers._ import cats._, cats.data._, cats.implicits._ import scala.concurrent.{Await, Future} import scala.concurrent.duration._ def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext, pageSize: Int = 100 ,extractor: Row => A)( implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind() var params: Seq[Object] = Nil if (ctx.parameter != Nil) { params = processParameters(ctx.parameter) boundStmt = prepStmt.bind(params:_*) } log.info(s"fetchResultPage> statement: ${prepStmt.getQueryString}, parameters: ${params}") ctx.consistency.foreach {consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency))} val resultSet = session.execute(boundStmt.setFetchSize(pageSize)) (resultSet,(resultSet.asScala.view.map(extractor)).to[C]) } def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)( extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) = if (resultSet.isFullyFetched) { (resultSet, None) } else { try { val result = Await.result(resultSet.fetchMoreResults(), timeOut) (result, Some((result.asScala.view.map(extractor)).to[C])) } catch { case e: Throwable => (resultSet, None) } } def cqlExecute(ctx: CQLContext)( implicit session: Session, ec: ExecutionContext): Future[Boolean] = { var invalidBat = false if ( ctx.batch ) { if (ctx.parameters == Nil) invalidBat = true else if (ctx.parameters.size < 2) invalidBat = true; } if (!ctx.batch || invalidBat) { if(invalidBat) log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.") if (ctx.statements.size == 1) { var param: Seq[Object] = Nil if (ctx.parameters != Nil) param = ctx.parameters.head log.info(s"cqlExecute> single-command: statement: ${ctx.statements.head} parameters: ${param}") cqlSingleUpdate(ctx.consistency, ctx.statements.head, param) } else { var params: Seq[Seq[Object]] = Nil if (ctx.parameters == Nil) params = Seq.fill(ctx.statements.length)(Nil) else { if (ctx.statements.size > ctx.parameters.size) { log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.") val nils = Seq.fill(ctx.statements.size - ctx.parameters.size)(Nil) params = ctx.parameters ++ nils } else params = ctx.parameters } val commands: Seq[(String,Seq[Object])] = ctx.statements zip params log.info(s"cqlExecute> multi-commands: ${commands}") /* //using sequence to flip List[Future[Boolean]] => Future[List[Boolean]] //therefore, make sure no command replies on prev command effect val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) => cqlSingleUpdate(ctx.consistency, stmt, param) }.toList val futList = lstCmds.sequence.map(_ => true) //must map to execute */ /* //using traverse to have some degree of parallelism = max(runtimes) //therefore, make sure no command replies on prev command effect val futList = Future.traverse(commands) { case (stmt,param) => cqlSingleUpdate(ctx.consistency, stmt, param) }.map(_ => true) Await.result(futList, 3 seconds) Future.successful(true) */ // run sync directly Future { commands.foreach { case (stm, pars) => cqlExecuteSync(ctx.consistency, stm, pars) } true } } } else cqlBatchUpdate(ctx) } def cqlSingleUpdate(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])( implicit session: Session, ec: ExecutionContext): Future[Boolean] = { val prepStmt = session.prepare(stmt) var boundStmt = prepStmt.bind() var pars: Seq[Object] = Nil if (params != Nil) { pars = processParameters(params) boundStmt = prepStmt.bind(pars: _*) } log.info(s"cqlSingleUpdate> statement: ${prepStmt.getQueryString}, parameters: ${pars}") cons.foreach { consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency)) } session.executeAsync(boundStmt).map(_.wasApplied()) } def cqlExecuteSync(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])( implicit session: Session, ec: ExecutionContext): Boolean = { val prepStmt = session.prepare(stmt) var boundStmt = prepStmt.bind() var pars: Seq[Object] = Nil if (params != Nil) { pars = processParameters(params) boundStmt = prepStmt.bind(pars: _*) } log.info(s"cqlExecuteSync> statement: ${prepStmt.getQueryString}, parameters: ${pars}") cons.foreach { consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency)) } session.execute(boundStmt).wasApplied() } def cqlBatchUpdate(ctx: CQLContext)( implicit session: Session, ec: ExecutionContext): Future[Boolean] = { var params: Seq[Seq[Object]] = Nil if (ctx.parameters == Nil) params = Seq.fill(ctx.statements.length)(Nil) else params = ctx.parameters log.info(s"cqlBatchUpdate> statement: ${ctx.statements.head}, parameters: ${params}") val prepStmt = session.prepare(ctx.statements.head) var batch = new BatchStatement() params.foreach { p => log.info(s"cqlBatchUpdate> batch with raw parameter: ${p}") val pars = processParameters(p) log.info(s"cqlMultiUpdate> batch with cooked parameters: ${pars}") batch.add(prepStmt.bind(pars: _*)) } ctx.consistency.foreach { consistency => batch.setConsistencyLevel(consistencyLevel(consistency)) } session.executeAsync(batch).map(_.wasApplied()) } def cassandraStream[A](ctx: CQLQueryContext,extractor: Row => A) (implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind() val params = processParameters(ctx.parameter) boundStmt = prepStmt.bind(params:_*) ctx.consistency.foreach {consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency))} log.info(s"cassandraStream> statement: ${prepStmt.getQueryString}, parameters: ${params}") CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(extractor) } case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true, statement: String, prepareParams: R => Seq[Object], consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas => def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel) def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered) def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] = cas.copy(consistency = Some(_consistency)) def perform(r: R)(implicit session: Session, ec: ExecutionContext) = { var prepStmt = session.prepare(statement) var boundStmt = prepStmt.bind() val params = processParameters(prepareParams(r)) boundStmt = prepStmt.bind(params: _*) consistency.foreach { cons => boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons)) } log.info(s"CassandraActionStream.perform> statement: ${prepStmt.getQueryString}, parameters: ${params}") session.executeAsync(boundStmt).map(_ => r) } def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = if (processInOrder) Flow[R].mapAsync(parallelism)(perform) else Flow[R].mapAsyncUnordered(parallelism)(perform) def unloggedBatch[K](statementBinder: ( R, PreparedStatement) => BoundStatement,partitionKey: R => K)( implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = { val preparedStatement = session.prepare(statement) log.info(s"CassandraActionStream.unloggedBatch> statement: ${preparedStatement.getQueryString}") CassandraFlow.createUnloggedBatchWithPassThrough[R, K]( parallelism, preparedStatement, statementBinder, partitionKey) } } object CassandraActionStream { def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] = new CassandraActionStream[R]( statement=_statement, prepareParams = params) } } object CQLHelpers extends LogSupport { import java.nio.ByteBuffer import java.io._ import java.nio.file._ import com.datastax.driver.core.LocalDate import com.datastax.driver.extras.codecs.jdk8.InstantCodec import java.time.Instant import akka.stream.scaladsl._ import akka.stream._ implicit def listenableFutureToFuture[T]( listenableFuture: ListenableFuture[T]): Future[T] = { val promise = Promise[T]() Futures.addCallback(listenableFuture, new FutureCallback[T] { def onFailure(error: Throwable): Unit = { promise.failure(error) () } def onSuccess(result: T): Unit = { promise.success(result) () } }) promise.future } case class CQLDate(year: Int, month: Int, day: Int) case object CQLTodayDate case class CQLDateTime(year: Int, Month: Int, day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0) case object CQLDateTimeNow def cqlGetDate(dateToConvert: java.util.Date): java.time.LocalDate = dateToConvert.toInstant() .atZone(java.time.ZoneId.systemDefault()) .toLocalDate() def cqlGetTime(dateToConvert: java.util.Date): java.time.LocalTime = dateToConvert.toInstant() .atZone(java.time.ZoneId.systemDefault()) .toLocalTime() def cqlGetTimestamp(dateToConvert: java.util.Date): java.time.LocalDateTime= new java.sql.Timestamp( dateToConvert.getTime() ).toLocalDateTime() def processParameters(params: Seq[Object]): Seq[Object] = { import java.time.{Clock,ZoneId} log.info(s"[processParameters] input: ${params}") val outParams = params.map { obj => obj match { case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd) case CQLTodayDate => val today = java.time.LocalDate.now() LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth) case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS))) case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) => Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d") case p@_ => p } } log.info(s"[processParameters] output: ${params}") outParams } class ByteBufferInputStream(buf: ByteBuffer) extends InputStream { override def read: Int = { if (!buf.hasRemaining) return -1 buf.get } override def read(bytes: Array[Byte], off: Int, len: Int): Int = { val length: Int = Math.min(len, buf.remaining) buf.get(bytes, off, length) length } } object ByteBufferInputStream { def apply(buf: ByteBuffer): ByteBufferInputStream = { new ByteBufferInputStream(buf) } } class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream { override def write(b: Int): Unit = { buf.put(b.toByte) } override def write(bytes: Array[Byte], off: Int, len: Int): Unit = { buf.put(bytes, off, len) } } object FixsizedByteBufferOutputStream { def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf) } class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream { private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR override def write(b: Array[Byte], off: Int, len: Int): Unit = { val position = buf.position val limit = buf.limit val newTotal: Long = position + len if(newTotal > limit){ var capacity = (buf.capacity * increasing) while(capacity <= newTotal){ capacity = (capacity*increasing) } increase(capacity.toInt) } buf.put(b, 0, len) } override def write(b: Int): Unit= { if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt) buf.put(b.toByte) } protected def increase(newCapacity: Int): Unit = { buf.limit(buf.position) buf.rewind val newBuffer = if (onHeap) ByteBuffer.allocate(newCapacity) else ByteBuffer.allocateDirect(newCapacity) newBuffer.put(buf) buf.clear buf = newBuffer } def size: Long = buf.position def capacity: Long = buf.capacity def byteBuffer: ByteBuffer = buf } object ExpandingByteBufferOutputStream { val DEFAULT_INCREASING_FACTOR = 1.5f def apply(size: Int, increasingBy: Float, onHeap: Boolean) = { if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0") val buffer: ByteBuffer = if (onHeap) ByteBuffer.allocate(size) else ByteBuffer.allocateDirect(size) new ExpandingByteBufferOutputStream(buffer,onHeap) } def apply(size: Int): ExpandingByteBufferOutputStream = { apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false) } def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = { apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap) } def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = { apply(size, increasingBy, false) } } def cqlFileToBytes(fileName: String): ByteBuffer = { val fis = new FileInputStream(fileName) val b = new Array[Byte](fis.available + 1) val length = b.length fis.read(b) ByteBuffer.wrap(b) } def cqlBytesToFile(bytes: ByteBuffer, fileName: String)( implicit mat: Materializer): Future[IOResult] = { val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes)) source.runWith(FileIO.toPath(Paths.get(fileName))) } def cqlDateTimeString(date: java.util.Date, fmt: String): String = { val outputFormat = new java.text.SimpleDateFormat(fmt) outputFormat.format(date) } def useJava8DateTime(cluster: Cluster) = { //for jdk8 datetime format cluster.getConfiguration().getCodecRegistry() .register(InstantCodec.instance) } }
BytesConverter.scala
package protobuf.bytes import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream} import com.google.protobuf.ByteString object Converter { def marshal(value: Any): ByteString = { val stream: ByteArrayOutputStream = new ByteArrayOutputStream() val oos = new ObjectOutputStream(stream) oos.writeObject(value) oos.close() ByteString.copyFrom(stream.toByteArray()) } def unmarshal[A](bytes: ByteString): A = { val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray)) val value = ois.readObject() ois.close() value.asInstanceOf[A] } }
CQLServices.scala
package demo.sdp.grpc.cql.server import akka.NotUsed import akka.stream.scaladsl._ import protobuf.bytes.Converter._ import com.datastax.driver.core._ import scala.concurrent.ExecutionContextExecutor import sdp.grpc.services._ import sdp.cql.engine._ import CQLEngine._ import CQLHelpers._ import sdp.logging.LogSupport import scala.concurrent._ import scala.concurrent.duration._ import akka.stream.ActorMaterializer class CQLStreamingServices(implicit ec: ExecutionContextExecutor, mat: ActorMaterializer, session: Session) extends CqlGrpcAkkaStream.CQLServices with LogSupport{ val toParams: AQMRPTRow => Seq[Object] = row => Seq[Object]( row.rowid.asInstanceOf[Object], row.measureid.asInstanceOf[Object], row.statename, row.countyname, row.reportyear.asInstanceOf[Object], row.value.asInstanceOf[Object], CQLDateTimeNow ) val cqlInsert =""" |insert into testdb.AQMRPT( | rowid, | measureid, | statename, | countyname, | reportyear, | value, | created) | values(?,?,?,?,?,?,?) """.stripMargin val cqlActionStream = CassandraActionStream(cqlInsert,toParams).setParallelism(2) .setProcessOrder(false) /* val cqlActionFlow: Flow[AQMRPTRow,AQMRPTRow,NotUsed] = Flow[AQMRPTRow] .via(cqlActionStream.performOnRow) */ val cqlActionFlow: Flow[AQMRPTRow,CQLResult,NotUsed] = { Flow[AQMRPTRow] .mapAsync(cqlActionStream.parallelism){ row => if (IfExists(row.rowid)) Future.successful(CQLResult(marshal(0))) else cqlActionStream.perform(row).map {_ => CQLResult(marshal(1))} } } override def transferRows: Flow[AQMRPTRow, CQLResult, NotUsed] = { Flow[AQMRPTRow] .via(cqlActionFlow) } private def IfExists(rowid: Long): Boolean = { val cql = "SELECT * FROM testdb.AQMRPT WHERE ROWID = ? ALLOW FILTERING" val param = Seq(rowid.asInstanceOf[Object]) val toRowId: Row => Long = r => r.getLong("rowid") val ctx = CQLQueryContext(cql,param) val src: Source[Long,NotUsed] = cassandraStream(ctx,toRowId) val fut = src.toMat(Sink.headOption)(Keep.right).run() val result = Await.result(fut,3 seconds) log.info(s"checking existence: ${result}") result match { case Some(x) => true case None => false } } override def clientStreaming: Flow[HelloMsg, HelloMsg, NotUsed] = { Flow[HelloMsg] .map {r => println(r) ; r} } override def runDDL: Flow[CQLUpdate, CQLResult, NotUsed] = { Flow[CQLUpdate] .flatMapConcat { context => //unpack CQLUpdate and construct the context val ctx = CQLContext(context.statements) log.info(s"**** CQLContext => ${ctx} ***") Source .fromFuture(cqlExecute(ctx)) .map { r => CQLResult(marshal(r)) } } } def toCQLTimestamp(rs: Row) = { try { val tm = rs.getTimestamp("CREATED") if (tm == null) None else { val localdt = cqlGetTimestamp(tm) Some(ProtoDateTime(Some(ProtoDate(localdt.getYear, localdt.getMonthValue, localdt.getDayOfMonth)), Some(ProtoTime(localdt.getHour, localdt.getMinute, localdt.getSecond, localdt.getNano)))) } } catch { case e: Exception => None } } val toAQMRow: Row => AQMRPTRow = rs=> AQMRPTRow( rowid = rs.getLong("ROWID"), measureid = rs.getLong("MEASUREID"), statename = rs.getString("STATENAME"), countyname = rs.getString("COUNTYNAME"), reportyear = rs.getInt("REPORTYEAR"), value = rs.getInt("VALUE"), created = toCQLTimestamp(rs) ) override def runQuery: Flow[CQLQuery, AQMRPTRow, NotUsed] = { log.info("**** runQuery called on service side ***") Flow[CQLQuery] .flatMapConcat { q => //unpack JDBCQuery and construct the context var params: Seq[Object] = Nil if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY) params = unmarshal[Seq[Object]](q.parameters) log.info(s"**** query parameters: ${params} ****") val ctx = CQLQueryContext(q.statement,params) CQLEngine.cassandraStream(ctx,toAQMRow) } } }
CQLServer.scala
package demo.sdp.grpc.cql.server import java.util.logging.Logger import com.datastax.driver.core._ import akka.actor.ActorSystem import akka.stream.ActorMaterializer import io.grpc.Server import io.grpc.ServerBuilder import sdp.grpc.services._ import sdp.cql.engine._ import CQLHelpers._ class gRPCServer(server: Server) { val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName) def start(): Unit = { server.start() logger.info(s"Server started, listening on ${server.getPort}") sys.addShutdownHook { // Use stderr here since the logger may has been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down") stop() System.err.println("*** server shut down") } () } def stop(): Unit = { server.shutdown() } /** * Await termination on the main thread since the grpc library uses daemon threads. */ def blockUntilShutdown(): Unit = { server.awaitTermination() } } object CQLServer extends App { implicit val cqlsys = ActorSystem("cqlSystem") implicit val mat = ActorMaterializer() implicit val ec = cqlsys.dispatcher val cluster = new Cluster .Builder() .addContactPoints("localhost") .withPort(9042) .build() useJava8DateTime(cluster) implicit val session = cluster.connect() val server = new gRPCServer( ServerBuilder .forPort(50051) .addService( CqlGrpcAkkaStream.bindService( new CQLStreamingServices ) ).build() ) server.start() // server.blockUntilShutdown() scala.io.StdIn.readLine() session.close() cluster.close() mat.shutdown() cqlsys.terminate() }
CQLClient.scala
package demo.sdp.grpc.cql.client import sdp.grpc.services._ import protobuf.bytes.Converter._ import akka.stream.scaladsl._ import akka.NotUsed import akka.actor.ActorSystem import akka.stream.{ActorMaterializer, ThrottleMode} import io.grpc._ import sdp.logging.LogSupport import sdp.jdbc.engine._ import JDBCEngine._ import scalikejdbc.WrappedResultSet import sdp.cql.engine.CQLHelpers.CQLDateTimeNow import scala.util._ import scala.concurrent.ExecutionContextExecutor class CQLStreamClient(host: String, port: Int)( implicit ec: ExecutionContextExecutor) extends LogSupport { val channel = ManagedChannelBuilder .forAddress(host, port) .usePlaintext(true) .build() val stub = CqlGrpcAkkaStream.stub(channel) val jdbcRows2transfer = JDBCQueryContext[AQMRPTRow]( dbName = 'h2, statement = "select * from AQMRPT where statename='Arkansas'" ) def toAQMRPTRow: WrappedResultSet => AQMRPTRow = rs => AQMRPTRow( rowid = rs.long("ROWID"), measureid = rs.long("MEASUREID"), statename = rs.string("STATENAME"), countyname = rs.string("COUNTYNAME"), reportyear = rs.int("REPORTYEAR"), value = rs.int("VALUE"), created = Some(ProtoDateTime(Some(ProtoDate(1990, 8, 12)), Some(ProtoTime(23, 56, 23, 0)))) ) import scala.concurrent.duration._ def transferRows: Source[CQLResult, NotUsed] = { log.info(s"**** calling transferRows ****") jdbcAkkaStream(jdbcRows2transfer, toAQMRPTRow) // .throttle(1, 500.millis, 1, ThrottleMode.shaping) .via(stub.transferRows) } def echoHello: Source[HelloMsg,NotUsed] = { val row = HelloMsg("hello world!") val rows = List.fill[HelloMsg](100)(row) Source .fromIterator(() => rows.iterator) .via(stub.clientStreaming) } val query0 = CQLQuery( statement = "select * from testdb.AQMRPT" ) val query = CQLQuery( statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE > ? ALLOW FILTERING;", parameters = marshal(Seq("Arkansas", 0.toInt)) ) val query2 = CQLQuery ( statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?", parameters = marshal(Seq("Colorado", 3.toInt)) ) val query3= CQLQuery ( statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?", parameters = marshal(Seq("Arkansas", 8.toInt)) ) def queryRows: Source[AQMRPTRow,NotUsed] = { log.info(s"running queryRows ...") Source .single(query) .via(stub.runQuery) } val dropCQL = "DROP TABLE IF EXISTS testdb.AQMRPT" val createCQL =""" CREATE TABLE testdb.AQMRPT ( rowid bigint primary key, measureid bigint, statename text, countyname text, reportyear int, value int, created timestamp )""" val cqlddl = CQLUpdate(statements = Seq(dropCQL,createCQL)) def createTbl: Source[CQLResult,NotUsed] = { log.info(s"running createTbl ...") Source .single(cqlddl) .via(stub.runDDL) } } object EchoHelloClient extends App { implicit val system = ActorSystem("EchoNumsClient") implicit val mat = ActorMaterializer.create(system) implicit val ec = system.dispatcher val client = new CQLStreamClient("localhost", 50051) client.echoHello.runForeach(println) scala.io.StdIn.readLine() mat.shutdown() system.terminate() } object TransferRows extends App { import sdp.jdbc.config._ implicit val system = ActorSystem("JDBCServer") implicit val mat = ActorMaterializer.create(system) implicit val ec = system.dispatcher ConfigDBsWithEnv("dev").setup('h2) ConfigDBsWithEnv("dev").loadGlobalSettings() val client = new CQLStreamClient("localhost", 50051) val fut = client.transferRows.runFold(0){(a,b) => a + unmarshal[Int](b.result)} fut.onComplete { case scala.util.Success(cnt) => println(s"done transfer ${cnt} rows.") case Failure(e) => println(s"!!!!!streaming error: ${e.getMessage}") } scala.io.StdIn.readLine() ConfigDBsWithEnv("dev").close('h2) mat.shutdown() system.terminate() } object QueryRows extends App { implicit val system = ActorSystem("QueryRows") implicit val mat = ActorMaterializer.create(system) implicit val ec = system.dispatcher val client = new CQLStreamClient("localhost", 50051) val fut = client.queryRows.runForeach { r => println(r) } fut.onComplete { case scala.util.Success(d) => println(s"done querying.") case Failure(e) => println(s"!!!!!query error: ${e.getMessage}") } scala.io.StdIn.readLine() mat.shutdown() system.terminate() } object RunDDL extends App { implicit val system = ActorSystem("RunDDL") implicit val mat = ActorMaterializer.create(system) implicit val ec = system.dispatcher val client = new CQLStreamClient("localhost", 50051) client.createTbl.runForeach { r => println(unmarshal(r.result)) } scala.io.StdIn.readLine() mat.shutdown() system.terminate() }
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/12795.html