PICE(3):CassandraStreaming – gRPC-CQL Service详解编程语言

  在上一篇博文里我们介绍了通过gRPC实现JDBC数据库的streaming,这篇我们介绍关于cassandra的streaming实现方式。如果我们需要从一个未部署cassandra的节点或终端上读取cassandra数据,可以用gRPC来搭建一个数据桥梁来连接这两端。这时cassandra这端就是gRPC-Server端,由它提供cassandra的数据服务。

在前面sdp系列讨论里我们已经实现了Cassandra-Engine。它的运作原理还是通过某种Context把指令提交给cassandra去执行。我们先设计一个创建库表的例子。CQL语句和Cassandra-Engine程序代码如下,这是客户端部分:

  val dropCQL = "DROP TABLE IF EXISTS testdb.AQMRPT" 
 
  val createCQL =""" 
  CREATE TABLE testdb.AQMRPT ( 
     rowid bigint primary key, 
     measureid bigint, 
     statename text, 
     countyname text, 
     reportyear int, 
     value int, 
     created timestamp 
  )""" 
 
  val cqlddl = CQLUpdate(statements = Seq(dropCQL,createCQL)) 
  def createTbl: Source[CQLResult,NotUsed] = { 
    log.info(s"running createTbl ...") 
    Source 
      .single(cqlddl) 
      .via(stub.runDDL) 
  }

首先,我们在CQLUpdate这个protobuf对应Context里传入两条指令dropCQL和createCQL,可以预计这会是一种批次型batch方式。然后一如既往,我们使用了streaming编程模式。在.proto文件里用DDL来对应Context和Service:

message CQLUpdate { 
    repeated string statements = 1; 
    bytes parameters = 2; 
    google.protobuf.Int32Value consistency = 3; 
    google.protobuf.BoolValue batch = 4; 
} 
 
service CQLServices { 
  rpc runDDL(CQLUpdate) returns (CQLResult) {} 
}

服务函数runDDL程序实现如下:

 override def runDDL: Flow[CQLUpdate, CQLResult, NotUsed] = { 
    Flow[CQLUpdate] 
      .flatMapConcat { context => 
        //unpack CQLUpdate and construct the context 
        val ctx = CQLContext(context.statements) 
        log.info(s"**** CQLContext => ${ctx} ***") 
 
        Source 
          .fromFuture(cqlExecute(ctx)) 
          .map { r => CQLResult(marshal(r)) } 
      } 
  }

这里我们调用了Cassandra-Engine的cqlExecute(ctx)函数:

  def cqlExecute(ctx: CQLContext)( 
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
 
    var invalidBat = false 
    if ( ctx.batch ) { 
      if (ctx.parameters == Nil) 
        invalidBat = true 
      else if (ctx.parameters.size < 2) 
        invalidBat = true; 
    } 
    if (!ctx.batch || invalidBat) { 
      if(invalidBat) 
       log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.") 
 
      if (ctx.statements.size == 1) { 
        var param: Seq[Object] = Nil 
        if (ctx.parameters != Nil) param =  ctx.parameters.head 
        log.info(s"cqlExecute>  single-command: statement: ${ctx.statements.head} parameters: ${param}") 
        cqlSingleUpdate(ctx.consistency, ctx.statements.head, param) 
      } 
      else { 
        var params: Seq[Seq[Object]] = Nil 
        if (ctx.parameters == Nil) 
          params = Seq.fill(ctx.statements.length)(Nil) 
        else { 
          if (ctx.statements.size > ctx.parameters.size) { 
            log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.") 
            val nils = Seq.fill(ctx.statements.size - ctx.parameters.size)(Nil) 
            params = ctx.parameters ++ nils 
 
          } 
          else 
            params = ctx.parameters 
        } 
 
        val commands: Seq[(String,Seq[Object])] = ctx.statements zip params 
        log.info(s"cqlExecute>  multi-commands: ${commands}") 
/* 
        //using sequence to flip List[Future[Boolean]] => Future[List[Boolean]] 
        //therefore, make sure no command replies on prev command effect 
        val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) => 
          cqlSingleUpdate(ctx.consistency, stmt, param) 
        }.toList 
 
        val futList = lstCmds.sequence.map(_ => true)   //must map to execute 
        */ 
/* 
        //using traverse to have some degree of parallelism = max(runtimes) 
        //therefore, make sure no command replies on prev command effect 
        val futList = Future.traverse(commands) { case (stmt,param)  => 
          cqlSingleUpdate(ctx.consistency, stmt, param) 
        }.map(_ => true) 
 
        Await.result(futList, 3 seconds) 
        Future.successful(true) 
*/ 
        // run sync directly 
        Future { 
          commands.foreach { case (stm, pars) => 
            cqlExecuteSync(ctx.consistency, stm, pars) 
          } 
          true 
        } 
      } 
    } 
    else 
      cqlBatchUpdate(ctx) 
  }

特别展示了这个函数的代码是因为对于一批次多条指令可能会涉及到non-blocking和并行计算。可参考上面代码标注段落里函数式方法(cats)sequence,traverse如何实现对一串Future的运算。

下一个例子是用流方式把JDBC数据库数据并入cassandra数据库里。.proto DDL内容如下:

message ProtoDate { 
  int32 yyyy = 1; 
  int32 mm   = 2; 
  int32 dd   = 3; 
} 
 
message ProtoTime { 
  int32 hh   = 1; 
  int32 mm   = 2; 
  int32 ss   = 3; 
  int32 nnn  = 4; 
} 
 
message ProtoDateTime { 
   ProtoDate date = 1; 
   ProtoTime time = 2; 
} 
 
message AQMRPTRow { 
    int64 rowid = 1; 
    string countyname = 2; 
    string statename = 3; 
    int64 measureid = 4; 
    int32 reportyear = 5; 
    int32 value = 6; 
    ProtoDateTime created = 7; 
} 
 
message CQLResult { 
  bytes result = 1; 
} 
 
message CQLUpdate { 
    repeated string statements = 1; 
    bytes parameters = 2; 
    google.protobuf.Int32Value consistency = 3; 
    google.protobuf.BoolValue batch = 4; 
} 
 
 
service CQLServices { 
  rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {} 
  rpc runDDL(CQLUpdate) returns (CQLResult) {} 
}

下面是服务函数的实现:

 val toParams: AQMRPTRow => Seq[Object] = row => Seq[Object]( 
    row.rowid.asInstanceOf[Object], 
    row.measureid.asInstanceOf[Object], 
    row.statename, 
    row.countyname, 
    row.reportyear.asInstanceOf[Object], 
    row.value.asInstanceOf[Object], 
    CQLDateTimeNow 
  ) 
  val cqlInsert =""" 
                   |insert into testdb.AQMRPT( 
                   | rowid, 
                   | measureid, 
                   | statename, 
                   | countyname, 
                   | reportyear, 
                   | value, 
                   | created) 
                   | values(?,?,?,?,?,?,?) 
                 """.stripMargin 
   
  val cqlActionStream = CassandraActionStream(cqlInsert,toParams).setParallelism(2) 
    .setProcessOrder(false) 
 
/* 
  val cqlActionFlow: Flow[AQMRPTRow,AQMRPTRow,NotUsed] = 
    Flow[AQMRPTRow] 
      .via(cqlActionStream.performOnRow) 
*/ 
 
  val cqlActionFlow: Flow[AQMRPTRow,CQLResult,NotUsed] = { 
    Flow[AQMRPTRow] 
        .mapAsync(cqlActionStream.parallelism){ row => 
          if (IfExists(row.rowid)) 
            Future.successful(CQLResult(marshal(0))) 
          else 
            cqlActionStream.perform(row).map {_ => CQLResult(marshal(1))} 
        } 
  } 
 
  override def transferRows: Flow[AQMRPTRow, CQLResult, NotUsed] = { 
    Flow[AQMRPTRow] 
      .via(cqlActionFlow) 
  } 
 
  private def IfExists(rowid: Long): Boolean = { 
    val cql = "SELECT * FROM testdb.AQMRPT WHERE ROWID = ? ALLOW FILTERING" 
    val param = Seq(rowid.asInstanceOf[Object]) 
    val toRowId: Row => Long = r => r.getLong("rowid") 
    val ctx = CQLQueryContext(cql,param) 
    val src: Source[Long,NotUsed] = cassandraStream(ctx,toRowId) 
    val fut = src.toMat(Sink.headOption)(Keep.right).run() 
 
    val result = Await.result(fut,3 seconds) 
 
    log.info(s"checking existence: ${result}") 
    result match { 
      case Some(x) => true 
      case None => false 
    } 
  }

在上面的代码里我们调用了Cassandra-Engine的CassandraActionStream类型的流处理方法。值得注意的是这里我们尝试在stream Flow里运算另一个Flow,如:IfExists函数里运算一个Source来确定rowid是否存在。不要在意这个函数的实际应用,它只是一个人为的例子。另外,rowid:Long这样的定义是硬性规定的。cassandra对数据类型的匹配要求很弱智,没有提供任何自然转换。所以,Int <> Long被视为类型错误,而且无法catch任何明白的错误信息。

这项服务的客户端调用如下:

  val stub = CqlGrpcAkkaStream.stub(channel) 
 
  val jdbcRows2transfer = JDBCQueryContext[AQMRPTRow]( 
    dbName = 'h2, 
    statement = "select * from AQMRPT where statename='Arkansas'" 
  ) 
 
  def toAQMRPTRow: WrappedResultSet => AQMRPTRow = rs => AQMRPTRow( 
    rowid = rs.long("ROWID"), 
    measureid = rs.long("MEASUREID"), 
    statename = rs.string("STATENAME"), 
    countyname = rs.string("COUNTYNAME"), 
    reportyear = rs.int("REPORTYEAR"), 
    value = rs.int("VALUE"), 
    created = Some(ProtoDateTime(Some(ProtoDate(1990, 8, 12)), Some(ProtoTime(23, 56, 23, 0)))) 
  ) 
 
  import scala.concurrent.duration._ 
 
  def transferRows: Source[CQLResult, NotUsed] = { 
    log.info(s"**** calling transferRows ****") 
    jdbcAkkaStream(jdbcRows2transfer, toAQMRPTRow) 
      //      .throttle(1, 500.millis, 1, ThrottleMode.shaping) 
      .via(stub.transferRows) 
  }

注意:JDBC在客户端本地,cassandra是远程服务。

最后我们示范一下cassandra Query。.proto DDL 定义:

message CQLQuery { 
    string statement = 1; 
    bytes parameters = 2; 
    google.protobuf.Int32Value consistency = 3; 
    google.protobuf.Int32Value fetchSize = 4; 
} 
 
service CQLServices { 
  rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {} 
  rpc runQuery(CQLQuery) returns (stream AQMRPTRow) {} 
  rpc runDDL(CQLUpdate) returns (CQLResult) {} 
}

服务函数代码如下:

 def toCQLTimestamp(rs: Row) = { 
    try { 
      val tm = rs.getTimestamp("CREATED") 
      if (tm == null) None 
      else { 
        val localdt = cqlGetTimestamp(tm) 
        Some(ProtoDateTime(Some(ProtoDate(localdt.getYear, localdt.getMonthValue, localdt.getDayOfMonth)), 
          Some(ProtoTime(localdt.getHour, localdt.getMinute, localdt.getSecond, localdt.getNano)))) 
      } 
    } 
    catch { 
      case e: Exception => None 
    } 
  } 
 
  val toAQMRow: Row => AQMRPTRow = rs=> AQMRPTRow( 
    rowid = rs.getLong("ROWID"), 
    measureid = rs.getLong("MEASUREID"), 
    statename = rs.getString("STATENAME"), 
    countyname = rs.getString("COUNTYNAME"), 
    reportyear = rs.getInt("REPORTYEAR"), 
    value = rs.getInt("VALUE"), 
    created = toCQLTimestamp(rs) 
  ) 
  override def runQuery: Flow[CQLQuery, AQMRPTRow, NotUsed] = { 
    log.info("**** runQuery called on service side ***") 
    Flow[CQLQuery] 
      .flatMapConcat { q => 
        //unpack JDBCQuery and construct the context 
        var params: Seq[Object] =  Nil 
        if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY) 
          params = unmarshal[Seq[Object]](q.parameters) 
        log.info(s"**** query parameters: ${params} ****") 
        val ctx = CQLQueryContext(q.statement,params) 
        CQLEngine.cassandraStream(ctx,toAQMRow) 
      } 
  }

这里值得看看的一是日期转换,二是对于cassandra parameter Seq[Object]的marshal和unmarshal。客户端代码:

  val query = CQLQuery( 
    statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE > ? ALLOW FILTERING;", 
    parameters = marshal(Seq("Arkansas", 0.toInt)) 
  ) 
  val query2 = CQLQuery ( 
    statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?", 
    parameters = marshal(Seq("Colorado", 3.toInt)) 
  ) 
  val query3= CQLQuery ( 
    statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?", 
    parameters = marshal(Seq("Arkansas", 8.toInt)) 
  ) 
  def queryRows: Source[AQMRPTRow,NotUsed] = { 
    log.info(s"running queryRows ...") 
    Source 
      .single(query) 
      .via(stub.runQuery) 
  }

这段相对直白。

下面就是本次讨论涉及的完整源代码:

project/scalapb.sbt

 

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") 
 
resolvers += Resolver.bintrayRepo("beyondthelines", "maven") 
 
libraryDependencies ++= Seq( 
  "com.thesamet.scalapb" %% "compilerplugin" % "0.7.4", 
  "beyondthelines"         %% "grpcakkastreamgenerator" % "0.0.5" 
)

 

build.sbt

import scalapb.compiler.Version.scalapbVersion 
import scalapb.compiler.Version.grpcJavaVersion 
 
name := "gRPCCassandra" 
 
version := "0.1" 
 
scalaVersion := "2.12.6" 
 
resolvers += Resolver.bintrayRepo("beyondthelines", "maven") 
 
scalacOptions += "-Ypartial-unification" 
 
libraryDependencies := Seq( 
  "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf", 
  "io.grpc" % "grpc-netty" % grpcJavaVersion, 
  "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, 
  "io.monix" %% "monix" % "2.3.0", 
  // for GRPC Akkastream 
  "beyondthelines"         %% "grpcakkastreamruntime" % "0.0.5", 
  // for scalikejdbc 
  "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1", 
  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test", 
  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1", 
  "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1", 
  "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1", 
  "com.h2database"  %  "h2"                % "1.4.196", 
  "mysql" % "mysql-connector-java" % "6.0.6", 
  "org.postgresql" % "postgresql" % "42.2.0", 
  "commons-dbcp" % "commons-dbcp" % "1.4", 
  "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2", 
  "com.zaxxer" % "HikariCP" % "2.7.4", 
  "com.jolbox" % "bonecp" % "0.8.0.RELEASE", 
  "com.typesafe.slick" %% "slick" % "3.2.1", 
  //for cassandra  340 
  "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0", 
  "com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0", 
  "com.typesafe.akka" %% "akka-stream" % "2.5.13", 
  "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.19", 
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3", 
  "org.typelevel" %% "cats-core" % "1.1.0" 
) 
 
PB.targets in Compile := Seq( 
  scalapb.gen() -> (sourceManaged in Compile).value, 
  // generate the akka stream files 
  grpc.akkastreams.generators.GrpcAkkaStreamGenerator() -> (sourceManaged in Compile).value 
)

main/resources/application.conf

# JDBC settings 
test { 
  db { 
    h2 { 
      driver = "org.h2.Driver" 
      url = "jdbc:h2:tcp://localhost/~/slickdemo" 
      user = "" 
      password = "" 
      poolInitialSize = 5 
      poolMaxSize = 7 
      poolConnectionTimeoutMillis = 1000 
      poolValidationQuery = "select 1 as one" 
      poolFactoryName = "commons-dbcp2" 
    } 
  } 
 
  db.mysql.driver = "com.mysql.cj.jdbc.Driver" 
  db.mysql.url = "jdbc:mysql://localhost:3306/testdb" 
  db.mysql.user = "root" 
  db.mysql.password = "123" 
  db.mysql.poolInitialSize = 5 
  db.mysql.poolMaxSize = 7 
  db.mysql.poolConnectionTimeoutMillis = 1000 
  db.mysql.poolValidationQuery = "select 1 as one" 
  db.mysql.poolFactoryName = "bonecp" 
 
  # scallikejdbc Global settings 
  scalikejdbc.global.loggingSQLAndTime.enabled = true 
  scalikejdbc.global.loggingSQLAndTime.logLevel = info 
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true 
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn 
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false 
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false 
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 
} 
dev { 
  db { 
    h2 { 
      driver = "org.h2.Driver" 
      url = "jdbc:h2:tcp://localhost/~/slickdemo" 
      user = "" 
      password = "" 
      poolFactoryName = "hikaricp" 
      numThreads = 10 
      maxConnections = 12 
      minConnections = 4 
      keepAliveConnection = true 
    } 
    mysql { 
      driver = "com.mysql.cj.jdbc.Driver" 
      url = "jdbc:mysql://localhost:3306/testdb" 
      user = "root" 
      password = "123" 
      poolInitialSize = 5 
      poolMaxSize = 7 
      poolConnectionTimeoutMillis = 1000 
      poolValidationQuery = "select 1 as one" 
      poolFactoryName = "bonecp" 
 
    } 
    postgres { 
      driver = "org.postgresql.Driver" 
      url = "jdbc:postgresql://localhost:5432/testdb" 
      user = "root" 
      password = "123" 
      poolFactoryName = "hikaricp" 
      numThreads = 10 
      maxConnections = 12 
      minConnections = 4 
      keepAliveConnection = true 
    } 
  } 
  # scallikejdbc Global settings 
  scalikejdbc.global.loggingSQLAndTime.enabled = true 
  scalikejdbc.global.loggingSQLAndTime.logLevel = info 
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true 
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn 
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false 
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false 
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 
}

main/resources/logback.xml

<?xml version="1.0" encoding="UTF-8"?> 
<configuration> 
 
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> 
        <layout class="ch.qos.logback.classic.PatternLayout"> 
            <Pattern> 
                %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n 
            </Pattern> 
        </layout> 
    </appender> 
 
    <logger name="sdp.cql" level="info" 
            additivity="false"> 
        <appender-ref ref="STDOUT" /> 
    </logger> 
 
    <logger name="demo.sdp.grpc.cql" level="info" 
            additivity="false"> 
        <appender-ref ref="STDOUT" /> 
    </logger> 
 
    <root level="error"> 
        <appender-ref ref="STDOUT" /> 
    </root> 
 
</configuration>

main/protobuf/cql.proto

syntax = "proto3"; 
 
import "google/protobuf/wrappers.proto"; 
import "google/protobuf/any.proto"; 
import "scalapb/scalapb.proto"; 
 
option (scalapb.options) = { 
  // use a custom Scala package name 
  // package_name: "io.ontherocks.introgrpc.demo" 
 
  // don't append file name to package 
  flat_package: true 
 
  // generate one Scala file for all messages (services still get their own file) 
  single_file: true 
 
  // add imports to generated file 
  // useful when extending traits or using custom types 
  // import: "io.ontherocks.hellogrpc.RockingMessage" 
 
  // code to put at the top of generated file 
  // works only with `single_file: true` 
  //preamble: "sealed trait SomeSealedTrait" 
}; 
 
/* 
 * Demoes various customization options provided by ScalaPBs. 
 */ 
 
package sdp.grpc.services; 
 
message ProtoDate { 
  int32 yyyy = 1; 
  int32 mm   = 2; 
  int32 dd   = 3; 
} 
 
message ProtoTime { 
  int32 hh   = 1; 
  int32 mm   = 2; 
  int32 ss   = 3; 
  int32 nnn  = 4; 
} 
 
message ProtoDateTime { 
   ProtoDate date = 1; 
   ProtoTime time = 2; 
} 
 
message AQMRPTRow { 
    int64 rowid = 1; 
    string countyname = 2; 
    string statename = 3; 
    int64 measureid = 4; 
    int32 reportyear = 5; 
    int32 value = 6; 
    ProtoDateTime created = 7; 
} 
 
message CQLResult { 
  bytes result = 1; 
} 
 
message CQLQuery { 
    string statement = 1; 
    bytes parameters = 2; 
    google.protobuf.Int32Value consistency = 3; 
    google.protobuf.Int32Value fetchSize = 4; 
} 
 
message CQLUpdate { 
    repeated string statements = 1; 
    bytes parameters = 2; 
    google.protobuf.Int32Value consistency = 3; 
    google.protobuf.BoolValue batch = 4; 
} 
 
message HelloMsg { 
  string hello = 1; 
} 
 
service CQLServices { 
  rpc clientStreaming(stream HelloMsg) returns (stream HelloMsg) {} 
  rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {} 
  rpc runQuery(CQLQuery) returns (stream AQMRPTRow) {} 
  rpc runDDL(CQLUpdate) returns (CQLResult) {} 
}

logging/log.scala

package sdp.logging 
 
import org.slf4j.Logger 
 
/** 
  * Logger which just wraps org.slf4j.Logger internally. 
  * 
  * @param logger logger 
  */ 
class Log(logger: Logger) { 
 
  // use var consciously to enable squeezing later 
  var isDebugEnabled: Boolean = logger.isDebugEnabled 
  var isInfoEnabled: Boolean = logger.isInfoEnabled 
  var isWarnEnabled: Boolean = logger.isWarnEnabled 
  var isErrorEnabled: Boolean = logger.isErrorEnabled 
 
  def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = { 
    level match { 
      case 'debug | 'DEBUG => debug(msg) 
      case 'info | 'INFO => info(msg) 
      case 'warn | 'WARN => warn(msg) 
      case 'error | 'ERROR => error(msg) 
      case _ => // nothing to do 
    } 
  } 
 
  def debug(msg: => String): Unit = { 
    if (isDebugEnabled && logger.isDebugEnabled) { 
      logger.debug(msg) 
    } 
  } 
 
  def debug(msg: => String, e: Throwable): Unit = { 
    if (isDebugEnabled && logger.isDebugEnabled) { 
      logger.debug(msg, e) 
    } 
  } 
 
  def info(msg: => String): Unit = { 
    if (isInfoEnabled && logger.isInfoEnabled) { 
      logger.info(msg) 
    } 
  } 
 
  def info(msg: => String, e: Throwable): Unit = { 
    if (isInfoEnabled && logger.isInfoEnabled) { 
      logger.info(msg, e) 
    } 
  } 
 
  def warn(msg: => String): Unit = { 
    if (isWarnEnabled && logger.isWarnEnabled) { 
      logger.warn(msg) 
    } 
  } 
 
  def warn(msg: => String, e: Throwable): Unit = { 
    if (isWarnEnabled && logger.isWarnEnabled) { 
      logger.warn(msg, e) 
    } 
  } 
 
  def error(msg: => String): Unit = { 
    if (isErrorEnabled && logger.isErrorEnabled) { 
      logger.error(msg) 
    } 
  } 
 
  def error(msg: => String, e: Throwable): Unit = { 
    if (isErrorEnabled && logger.isErrorEnabled) { 
      logger.error(msg, e) 
    } 
  } 
 
}

logging/LogSupport.scala

package sdp.logging 
 
import org.slf4j.LoggerFactory 
 
trait LogSupport { 
 
  /** 
    * Logger 
    */ 
  protected val log = new Log(LoggerFactory.getLogger(this.getClass)) 
 
}

filestreaming/FileStreaming.scala

package sdp.file 
 
import java.io.{ByteArrayInputStream, InputStream} 
import java.nio.ByteBuffer 
import java.nio.file.Paths 
 
import akka.stream.Materializer 
import akka.stream.scaladsl.{FileIO, StreamConverters} 
import akka.util._ 
 
import scala.concurrent.Await 
import scala.concurrent.duration._ 
 
object Streaming { 
  def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer):ByteBuffer = { 
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
      hd ++ bs 
    } 
    (Await.result(fut, timeOut)).toByteBuffer 
  } 
 
  def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer): Array[Byte] = { 
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
      hd ++ bs 
    } 
    (Await.result(fut, timeOut)).toArray 
  } 
 
  def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer): InputStream = { 
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
      hd ++ bs 
    } 
    val buf = (Await.result(fut, timeOut)).toArray 
    new ByteArrayInputStream(buf) 
  } 
 
  def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)( 
    implicit mat: Materializer) = { 
    val ba = new Array[Byte](byteBuf.remaining()) 
    byteBuf.get(ba,0,ba.length) 
    val baInput = new ByteArrayInputStream(ba) 
    val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes)) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
 
  def ByteArrayToFile(bytes: Array[Byte], fileName: String)( 
    implicit mat: Materializer) = { 
    val bb = ByteBuffer.wrap(bytes) 
    val baInput = new ByteArrayInputStream(bytes) 
    val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
 
  def InputStreamToFile(is: InputStream, fileName: String)( 
    implicit mat: Materializer) = { 
    val source = StreamConverters.fromInputStream(() => is) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
 
}

jdbc/JDBCConfig.scala

package sdp.jdbc.config 
import scala.collection.mutable 
import scala.concurrent.duration.Duration 
import scala.language.implicitConversions 
import com.typesafe.config._ 
import java.util.concurrent.TimeUnit 
import java.util.Properties 
import scalikejdbc.config._ 
import com.typesafe.config.Config 
import com.zaxxer.hikari._ 
import scalikejdbc.ConnectionPoolFactoryRepository 
 
/** Extension methods to make Typesafe Config easier to use */ 
class ConfigExtensionMethods(val c: Config) extends AnyVal { 
  import scala.collection.JavaConverters._ 
 
  def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default 
  def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default 
  def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default 
  def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default 
 
  def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default 
  def getDurationOr(path: String, default: => Duration = Duration.Zero) = 
    if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default 
 
  def getPropertiesOr(path: String, default: => Properties = null): Properties = 
    if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default 
 
  def toProperties: Properties = { 
    def toProps(m: mutable.Map[String, ConfigValue]): Properties = { 
      val props = new Properties(null) 
      m.foreach { case (k, cv) => 
        val v = 
          if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala) 
          else if(cv.unwrapped eq null) null 
          else cv.unwrapped.toString 
        if(v ne null) props.put(k, v) 
      } 
      props 
    } 
    toProps(c.root.asScala) 
  } 
 
  def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None 
  def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None 
  def getStringOpt(path: String) = Option(getStringOr(path)) 
  def getPropertiesOpt(path: String) = Option(getPropertiesOr(path)) 
} 
 
object ConfigExtensionMethods { 
  @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c) 
} 
 
trait HikariConfigReader extends TypesafeConfigReader { 
  self: TypesafeConfig =>      // with TypesafeConfigReader => //NoEnvPrefix => 
 
  import ConfigExtensionMethods.configExtensionMethods 
 
  def getFactoryName(dbName: Symbol): String = { 
    val c: Config = config.getConfig(envPrefix + "db." + dbName.name) 
    c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP) 
  } 
 
  def hikariCPConfig(dbName: Symbol): HikariConfig = { 
 
    val hconf = new HikariConfig() 
    val c: Config = config.getConfig(envPrefix + "db." + dbName.name) 
 
    // Connection settings 
    if (c.hasPath("dataSourceClass")) { 
      hconf.setDataSourceClassName(c.getString("dataSourceClass")) 
    } else { 
      Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _) 
    } 
    hconf.setJdbcUrl(c.getStringOr("url", null)) 
    c.getStringOpt("user").foreach(hconf.setUsername) 
    c.getStringOpt("password").foreach(hconf.setPassword) 
    c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties) 
 
    // Pool configuration 
    hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000)) 
    hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000)) 
    hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000)) 
    hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000)) 
    hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0)) 
    hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false)) 
    c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery) 
    c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql) 
    val numThreads = c.getIntOr("numThreads", 20) 
    hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5)) 
    hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads)) 
    hconf.setPoolName(c.getStringOr("poolName", dbName.name)) 
    hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false)) 
 
    // Equivalent of ConnectionPreparer 
    hconf.setReadOnly(c.getBooleanOr("readOnly", false)) 
    c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation) 
    hconf.setCatalog(c.getStringOr("catalog", null)) 
 
    hconf 
 
  } 
} 
 
import scalikejdbc._ 
trait ConfigDBs { 
  self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader => 
 
  def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { 
    getFactoryName(dbName) match { 
      case "hikaricp" => { 
        val hconf = hikariCPConfig(dbName) 
        val hikariCPSource = new HikariDataSource(hconf) 
        case class HikariDataSourceCloser(src: HikariDataSource) extends DataSourceCloser { 
          var closed = false 
          override def close(): Unit = src.close() 
        } 
        if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) { 
          Class.forName(hconf.getDriverClassName) 
        } 
        ConnectionPool.add(dbName, new DataSourceConnectionPool(dataSource = hikariCPSource,settings = DataSourceConnectionPoolSettings(), 
          closer = HikariDataSourceCloser(hikariCPSource))) 
      } 
      case _ => { 
        val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName) 
        val cpSettings = readConnectionPoolSettings(dbName) 
        if (driver != null && driver.trim.nonEmpty) { 
          Class.forName(driver) 
        } 
        ConnectionPool.add(dbName, url, user, password, cpSettings) 
      } 
    } 
  } 
 
  def setupAll(): Unit = { 
    loadGlobalSettings() 
    dbNames.foreach { dbName => setup(Symbol(dbName)) } 
  } 
 
  def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { 
    ConnectionPool.close(dbName) 
  } 
 
  def closeAll(): Unit = { 
    ConnectionPool.closeAll 
  } 
 
} 
 
 
object ConfigDBs extends ConfigDBs 
  with TypesafeConfigReader 
  with StandardTypesafeConfig 
  with HikariConfigReader 
 
case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs 
  with TypesafeConfigReader 
  with StandardTypesafeConfig 
  with HikariConfigReader 
  with EnvPrefix { 
 
  override val env = Option(envValue) 
}

jdbc/JDBCEngine.scala

package sdp.jdbc.engine 
import java.sql.PreparedStatement 
import scala.collection.generic.CanBuildFrom 
import akka.stream.scaladsl._ 
import scalikejdbc._ 
import scalikejdbc.streams._ 
import akka.NotUsed 
import akka.stream._ 
import java.time._ 
import scala.concurrent.duration._ 
import scala.concurrent._ 
import sdp.file.Streaming._ 
 
import scalikejdbc.TxBoundary.Try._ 
 
import scala.concurrent.ExecutionContextExecutor 
import java.io.InputStream 
 
import sdp.logging.LogSupport 
 
object JDBCContext { 
  type SQLTYPE = Int 
  val SQL_EXEDDL= 1 
  val SQL_UPDATE = 2 
  val RETURN_GENERATED_KEYVALUE = true 
  val RETURN_UPDATED_COUNT = false 
 
} 
 
case class JDBCQueryContext[M]( 
                                dbName: Symbol, 
                                statement: String, 
                                parameters: Seq[Any] = Nil, 
                                fetchSize: Int = 100, 
                                autoCommit: Boolean = false, 
                                queryTimeout: Option[Int] = None) 
 
 
case class JDBCContext ( 
                        dbName: Symbol, 
                        statements: Seq[String] = Nil, 
                        parameters: Seq[Seq[Any]] = Nil, 
                        fetchSize: Int = 100, 
                        queryTimeout: Option[Int] = None, 
                        queryTags: Seq[String] = Nil, 
                        sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE, 
                        batch: Boolean = false, 
                        returnGeneratedKey: Seq[Option[Any]] = Nil, 
                        // no return: None, return by index: Some(1), by name: Some("id") 
                        preAction: Option[PreparedStatement => Unit] = None, 
                        postAction: Option[PreparedStatement => Unit] = None) 
              extends LogSupport { 
 
  ctx => 
 
  //helper functions 
 
  def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag) 
 
  def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags) 
 
  def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size) 
 
  def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time) 
 
  def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = { 
    if (ctx.sqlType == JDBCContext.SQL_UPDATE && 
      !ctx.batch && ctx.statements.size == 1) { 
      val nc = ctx.copy(preAction = action) 
      log.info("setPreAction> set") 
      nc 
    } 
    else { 
      log.info("setPreAction> JDBCContex setting error: preAction not supported!") 
      throw new IllegalStateException("JDBCContex setting error: preAction not supported!") 
    } 
  } 
 
  def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = { 
    if (ctx.sqlType == JDBCContext.SQL_UPDATE && 
      !ctx.batch && ctx.statements.size == 1) { 
      val nc = ctx.copy(postAction = action) 
      log.info("setPostAction> set") 
      nc 
    } 
    else { 
      log.info("setPreAction> JDBCContex setting error: postAction not supported!") 
      throw new IllegalStateException("JDBCContex setting error: postAction not supported!") 
    } 
  } 
 
  def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { 
    if (ctx.sqlType == JDBCContext.SQL_EXEDDL) { 
      log.info(s"appendDDLCommand> appending: statement: ${_statement}, parameters: ${_parameters}") 
      val nc = ctx.copy( 
        statements = ctx.statements ++ Seq(_statement), 
        parameters = ctx.parameters ++ Seq(Seq(_parameters)) 
      ) 
      log.info(s"appendDDLCommand> appended: statement: ${nc.statements}, parameters: ${nc.parameters}") 
      nc 
    } else { 
      log.info(s"appendDDLCommand> JDBCContex setting error: option not supported!") 
      throw new IllegalStateException("JDBCContex setting error: option not supported!") 
    } 
  } 
 
  def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = { 
    if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) { 
      log.info(s"appendUpdateCommand> appending: returnGeneratedKey: ${_returnGeneratedKey}, statement: ${_statement}, parameters: ${_parameters}") 
      val nc = ctx.copy( 
        statements = ctx.statements ++ Seq(_statement), 
        parameters = ctx.parameters ++ Seq(_parameters), 
        returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None)) 
      ) 
      log.info(s"appendUpdateCommand> appended: statement: ${nc.statements}, parameters: ${nc.parameters}") 
      nc 
    } else { 
      log.info(s"appendUpdateCommand> JDBCContex setting error: option not supported!") 
      throw new IllegalStateException("JDBCContex setting error: option not supported!") 
    } 
  } 
 
  def appendBatchParameters(_parameters: Any*): JDBCContext = { 
    log.info(s"appendBatchParameters> appending:  parameters: ${_parameters}") 
    if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) { 
      log.info(s"appendBatchParameters> JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") 
      throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") 
    } 
    var matchParams = true 
    if (ctx.parameters != Nil) 
      if (ctx.parameters.head.size != _parameters.size) 
        matchParams = false 
    if (matchParams) { 
      val nc = ctx.copy( 
        parameters = ctx.parameters ++ Seq(_parameters) 
      ) 
      log.info(s"appendBatchParameters> appended: statement: ${nc.statements}, parameters: ${nc.parameters}") 
      nc 
    } else { 
      log.info(s"appendBatchParameters> JDBCContex setting error: batch command parameters not match!") 
      throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!") 
    } 
  } 
 
 
  def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = { 
    if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) 
      throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!") 
    ctx.copy( 
      returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil 
    ) 
  } 
 
  def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { 
    log.info(s"setDDLCommand> setting: statement: ${_statement}, parameters: ${_parameters}") 
    val nc = ctx.copy( 
      statements = Seq(_statement), 
      parameters = Seq(_parameters), 
      sqlType = JDBCContext.SQL_EXEDDL, 
      batch = false 
    ) 
    log.info(s"setDDLCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}") 
    nc 
  } 
 
  def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = { 
    log.info(s"setUpdateCommand> setting: returnGeneratedKey: ${_returnGeneratedKey}, statement: ${_statement}, parameters: ${_parameters}") 
    val nc = ctx.copy( 
      statements = Seq(_statement), 
      parameters = Seq(_parameters), 
      returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None), 
      sqlType = JDBCContext.SQL_UPDATE, 
      batch = false 
    ) 
    log.info(s"setUpdateCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}") 
    nc 
  } 
  def setBatchCommand(_statement: String): JDBCContext = { 
    log.info(s"setBatchCommand> appending: statement: ${_statement}") 
    val nc = ctx.copy ( 
      statements = Seq(_statement), 
      sqlType = JDBCContext.SQL_UPDATE, 
      batch = true 
    ) 
    log.info(s"setBatchCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}") 
    nc 
  } 
 
} 
 
object JDBCEngine extends LogSupport { 
  import JDBCContext._ 
 
  type JDBCDate = LocalDate 
  type JDBCDateTime = LocalDateTime 
  type JDBCTime = LocalTime 
 
  def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd) 
  def jdbcSetTime(hh: Int, mm: Int, ss: Int, nn: Int) = LocalTime.of(hh,mm,ss,nn) 
  def jdbcSetDateTime(date: JDBCDate, time: JDBCTime) =  LocalDateTime.of(date,time) 
  def jdbcSetNow = LocalDateTime.now() 
 
  def jdbcGetDate(sqlDate: java.sql.Date): java.time.LocalDate = sqlDate.toLocalDate 
  def jdbcGetTime(sqlTime: java.sql.Time): java.time.LocalTime = sqlTime.toLocalTime 
  def jdbcGetTimestamp(sqlTimestamp: java.sql.Timestamp): java.time.LocalDateTime = 
                  sqlTimestamp.toLocalDateTime 
 
 
  type JDBCBlob = InputStream 
 
  def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer) = FileToInputStream(fileName,timeOut) 
 
  def jdbcBlobToFile(blob: JDBCBlob, fileName: String)( 
    implicit mat: Materializer) =  InputStreamToFile(blob,fileName) 
 
 
  private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) => 
    throw new IllegalStateException(message) 
  } 
 
  def jdbcAkkaStream[A](ctx: JDBCQueryContext[A],extractor: WrappedResultSet => A) 
                       (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = { 
      val publisher: DatabasePublisher[A] = NamedDB(ctx.dbName) readOnlyStream { 
      val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) 
      ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) 
      val sql: SQL[A, HasExtractor] = rawSql.map(extractor) 
      sql.iterator 
        .withDBSessionForceAdjuster(session => { 
          session.connection.setAutoCommit(ctx.autoCommit) 
          session.fetchSize(ctx.fetchSize) 
        }) 
    } 
    log.info(s"jdbcAkkaStream> Source: db: ${ctx.dbName}, statement: ${ctx.statement}, parameters: ${ctx.parameters}") 
    Source.fromPublisher[A](publisher) 
  } 
 
 
  def jdbcQueryResult[C[_] <: TraversableOnce[_], A](ctx: JDBCQueryContext[A], 
                                                     extractor: WrappedResultSet => A)( 
                                                      implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { 
    val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) 
    ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) 
    rawSql.fetchSize(ctx.fetchSize) 
    try { 
      implicit val session = NamedAutoSession(ctx.dbName) 
      log.info(s"jdbcQueryResult> Source: db: ${ctx.dbName}, statement: ${ctx.statement}, parameters: ${ctx.parameters}") 
      val sql: SQL[A, HasExtractor] = rawSql.map(extractor) 
      sql.collection.apply[C]() 
    } catch { 
      case e: Exception => 
        log.error(s"jdbcQueryResult> runtime error: ${e.getMessage}") 
        throw new RuntimeException(s"jdbcQueryResult> Error: ${e.getMessage}") 
 
    } 
 
  } 
 
  def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = { 
    if (ctx.sqlType != SQL_EXEDDL) { 
      log.info(s"jdbcExecuteDDL> JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!") 
      Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")) 
    } 
    else { 
      log.info(s"jdbcExecuteDDL> Source: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}") 
      Future { 
        NamedDB(ctx.dbName) localTx { implicit session => 
          ctx.statements.foreach { stm => 
            val ddl = new SQLExecution(statement = stm, parameters = Nil)( 
              before = WrappedResultSet => {})( 
              after = WrappedResultSet => {}) 
 
            ddl.apply() 
          } 
          "SQL_EXEDDL executed succesfully." 
        } 
      } 
    } 
  } 
 
  def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
    implicit ec: ExecutionContextExecutor, 
             cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { 
    if (ctx.statements == Nil) { 
      log.info(s"jdbcBatchUpdate> JDBCContex setting error: statements empty!") 
      Future.failed(new IllegalStateException("JDBCContex setting error: statements empty!")) 
    } 
    if (ctx.sqlType != SQL_UPDATE) { 
      log.info(s"jdbcBatchUpdate> JDBCContex setting error: sqlType must be 'SQL_UPDATE'!") 
      Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) 
    } 
    else { 
      if (ctx.batch) { 
        if (noReturnKey(ctx)) { 
          log.info(s"jdbcBatchUpdate> batch updating no return: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}") 
          val usql = SQL(ctx.statements.head) 
            .tags(ctx.queryTags: _*) 
            .batch(ctx.parameters: _*) 
          Future { 
            NamedDB(ctx.dbName) localTx { implicit session => 
              ctx.queryTimeout.foreach(session.queryTimeout(_)) 
              usql.apply[Seq]() 
              Seq.empty[Long].to[C] 
            } 
          } 
        } else { 
          log.info(s"jdbcBatchUpdate> batch updating return genkey: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}") 
          val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None) 
          Future { 
            NamedDB(ctx.dbName) localTx { implicit session => 
              ctx.queryTimeout.foreach(session.queryTimeout(_)) 
              usql.apply[C]() 
            } 
          } 
        } 
 
      } else { 
        log.info(s"jdbcBatchUpdate> JDBCContex setting error: must set batch = true !") 
        Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = true !")) 
      } 
    } 
  } 
  private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
          implicit ec: ExecutionContextExecutor, 
                    cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { 
    val Some(key) :: xs = ctx.returnGeneratedKey 
    val params: Seq[Any] = ctx.parameters match { 
      case Nil => Nil 
      case p@_ => p.head 
    } 
    log.info(s"singleTxUpdateWithReturnKey> updating: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}") 
    val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key) 
    Future { 
      NamedDB(ctx.dbName) localTx { implicit session => 
        session.fetchSize(ctx.fetchSize) 
        ctx.queryTimeout.foreach(session.queryTimeout(_)) 
        val result = usql.apply() 
        Seq(result).to[C] 
      } 
    } 
  } 
 
  private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
          implicit ec: ExecutionContextExecutor, 
                   cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { 
    val params: Seq[Any] = ctx.parameters match { 
      case Nil => Nil 
      case p@_ => p.head 
    } 
    val before = ctx.preAction match { 
      case None => pstm: PreparedStatement => {} 
      case Some(f) => f 
    } 
    val after = ctx.postAction match { 
      case None => pstm: PreparedStatement => {} 
      case Some(f) => f 
    } 
    log.info(s"singleTxUpdateNoReturnKey> updating: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}") 
    val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after) 
    Future { 
      NamedDB(ctx.dbName) localTx {implicit session => 
        session.fetchSize(ctx.fetchSize) 
        ctx.queryTimeout.foreach(session.queryTimeout(_)) 
        val result = usql.apply() 
        Seq(result.toLong).to[C] 
      } 
    } 
 
  } 
 
  private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
    implicit ec: ExecutionContextExecutor, 
             cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { 
    if (noReturnKey(ctx)) 
      singleTxUpdateNoReturnKey(ctx) 
    else 
      singleTxUpdateWithReturnKey(ctx) 
  } 
 
  private def noReturnKey(ctx: JDBCContext): Boolean = { 
    if (ctx.returnGeneratedKey != Nil) { 
      val k :: xs = ctx.returnGeneratedKey 
      k match { 
        case None => true 
        case Some(k) => false 
      } 
    } else true 
  } 
 
  def noActon: PreparedStatement=>Unit = pstm => {} 
 
  def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
    implicit ec: ExecutionContextExecutor, 
             cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { 
 
    val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match { 
      case Nil => Seq.fill(ctx.statements.size)(None) 
      case k@_ => k 
    } 
    val sqlcmd = ctx.statements zip ctx.parameters zip keys 
    log.info(s"multiTxUpdates> updating: db: ${ctx.dbName}, SQL Commands: ${sqlcmd}") 
    Future { 
      NamedDB(ctx.dbName) localTx { implicit session => 
        session.fetchSize(ctx.fetchSize) 
        ctx.queryTimeout.foreach(session.queryTimeout(_)) 
        val results = sqlcmd.map { case ((stm, param), key) => 
          key match { 
            case None => 
              new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong 
            case Some(k) => 
              new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong 
          } 
        } 
        results.to[C] 
      } 
    } 
  } 
 
 
  def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
    implicit ec: ExecutionContextExecutor, 
             cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { 
    if (ctx.statements == Nil) { 
      log.info(s"jdbcTxUpdates> JDBCContex setting error: statements empty!") 
      Future.failed(new IllegalStateException("JDBCContex setting error: statements empty!")) 
    } 
    if (ctx.sqlType != SQL_UPDATE) { 
      log.info(s"jdbcTxUpdates> JDBCContex setting error: sqlType must be 'SQL_UPDATE'!") 
      Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) 
    } 
    else { 
      if (!ctx.batch) { 
        if (ctx.statements.size == 1) 
          singleTxUpdate(ctx) 
        else 
          multiTxUpdates(ctx) 
      } else { 
        log.info(s"jdbcTxUpdates> JDBCContex setting error: must set batch = false !") 
        Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = false !")) 
      } 
    } 
  } 
 
  case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true, 
                                 statement: String, prepareParams: R => Seq[Any]) extends LogSupport { 
    jas => 
    def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db) 
    def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel) 
    def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered) 
 
    private def perform(r: R)(implicit ec: ExecutionContextExecutor) = { 
      import scala.concurrent._ 
      val params = prepareParams(r) 
      log.info(s"JDBCActionStream.perform>  db: ${dbName}, statement: ${statement}, parameters: ${params}") 
      Future { 
        NamedDB(dbName) autoCommit { session => 
          session.execute(statement, params: _*) 
        } 
        r 
      } 
    } 
    def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] = 
      if (processInOrder) 
        Flow[R].mapAsync(parallelism)(perform) 
      else 
        Flow[R].mapAsyncUnordered(parallelism)(perform) 
 
  } 
 
  object JDBCActionStream { 
    def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] = 
      new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params) 
  } 
 
}

cql/CassandraEngine.scala

package sdp.cql.engine 
 
import akka.NotUsed 
import akka.stream.alpakka.cassandra.scaladsl._ 
import akka.stream.scaladsl._ 
import com.datastax.driver.core._ 
import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture} 
 
import scala.collection.JavaConverters._ 
import scala.collection.generic.CanBuildFrom 
import scala.concurrent._ 
import scala.concurrent.duration.Duration 
import sdp.logging.LogSupport 
 
object CQLContext { 
  // Consistency Levels 
  type CONSISTENCY_LEVEL = Int 
  val ANY: CONSISTENCY_LEVEL          =                                        0x0000 
  val ONE: CONSISTENCY_LEVEL          =                                        0x0001 
  val TWO: CONSISTENCY_LEVEL          =                                        0x0002 
  val THREE: CONSISTENCY_LEVEL        =                                        0x0003 
  val QUORUM : CONSISTENCY_LEVEL      =                                        0x0004 
  val ALL: CONSISTENCY_LEVEL          =                                        0x0005 
  val LOCAL_QUORUM: CONSISTENCY_LEVEL =                                        0x0006 
  val EACH_QUORUM: CONSISTENCY_LEVEL  =                                        0x0007 
  val LOCAL_ONE: CONSISTENCY_LEVEL    =                                        0x000A 
  val LOCAL_SERIAL: CONSISTENCY_LEVEL =                                        0x000B 
  val SERIAL: CONSISTENCY_LEVEL       =                                        0x000C 
 
  def apply(): CQLContext = CQLContext(statements = Nil) 
 
  def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => { 
    consistency match { 
      case ALL => ConsistencyLevel.ALL 
      case ONE => ConsistencyLevel.ONE 
      case TWO => ConsistencyLevel.TWO 
      case THREE => ConsistencyLevel.THREE 
      case ANY => ConsistencyLevel.ANY 
      case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM 
      case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE 
      case QUORUM => ConsistencyLevel.QUORUM 
      case SERIAL => ConsistencyLevel.SERIAL 
      case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL 
 
    } 
  } 
 
} 
case class CQLQueryContext( 
                               statement: String, 
                               parameter: Seq[Object] = Nil, 
                               consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None, 
                               fetchSize: Int = 100 
                             ) { ctx => 
  def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext = 
    ctx.copy(consistency = Some(_consistency)) 
  def setFetchSize(pageSize: Int): CQLQueryContext = 
    ctx.copy(fetchSize = pageSize) 
  def setParameters(param: Seq[Object]): CQLQueryContext = 
    ctx.copy(parameter = param) 
} 
object CQLQueryContext { 
  def apply[M](stmt: String, param: Seq[Object]): CQLQueryContext = new CQLQueryContext(statement = stmt, parameter = param) 
} 
 
case class CQLContext( 
                       statements: Seq[String], 
                       parameters: Seq[Seq[Object]] = Nil, 
                       consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None, 
                       batch: Boolean = false 
                     ) extends LogSupport { ctx => 
  def setBatch(bat: Boolean) = ctx.copy(batch = bat) 
  def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext = 
    ctx.copy(consistency = Some(_consistency)) 
  def setCommand(_statement: String, _parameters: Object*): CQLContext = { 
    log.info(s"setCommand> setting: statement: ${_statement}, parameters: ${_parameters}") 
    val nc = ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters)) 
    log.info(s"setCommand> set: statements: ${nc.statements}, parameters: ${nc.parameters}") 
    nc 
  } 
  def appendCommand(_statement: String, _parameters: Object*): CQLContext = { 
    log.info(s"appendCommand> appending: statement: ${_statement}, parameters: ${_parameters}") 
    val nc = ctx.copy(statements = ctx.statements :+ _statement, 
      parameters = ctx.parameters ++ Seq(_parameters)) 
    log.info(s"appendCommand> appended: statements: ${nc.statements}, parameters: ${nc.parameters}") 
    nc 
  } 
} 
 
object CQLEngine extends LogSupport { 
  import CQLContext._ 
  import CQLHelpers._ 
 
  import cats._, cats.data._, cats.implicits._ 
  import scala.concurrent.{Await, Future} 
  import scala.concurrent.duration._ 
 
  def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext, pageSize: Int = 100 
                                                   ,extractor: Row => A)( 
    implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= { 
 
    val prepStmt = session.prepare(ctx.statement) 
 
    var boundStmt =  prepStmt.bind() 
    var params: Seq[Object] = Nil 
    if (ctx.parameter != Nil) { 
      params = processParameters(ctx.parameter) 
      boundStmt = prepStmt.bind(params:_*) 
    } 
    log.info(s"fetchResultPage>  statement: ${prepStmt.getQueryString}, parameters: ${params}") 
 
    ctx.consistency.foreach {consistency => 
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))} 
 
    val resultSet = session.execute(boundStmt.setFetchSize(pageSize)) 
    (resultSet,(resultSet.asScala.view.map(extractor)).to[C]) 
  } 
  def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)( 
    extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) = 
    if (resultSet.isFullyFetched) { 
      (resultSet, None) 
    } else { 
      try { 
        val result = Await.result(resultSet.fetchMoreResults(), timeOut) 
        (result, Some((result.asScala.view.map(extractor)).to[C])) 
      } catch { case e: Throwable => (resultSet, None) } 
    } 
 
  def cqlExecute(ctx: CQLContext)( 
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
 
    var invalidBat = false 
    if ( ctx.batch ) { 
      if (ctx.parameters == Nil) 
        invalidBat = true 
      else if (ctx.parameters.size < 2) 
        invalidBat = true; 
    } 
    if (!ctx.batch || invalidBat) { 
      if(invalidBat) 
       log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.") 
 
      if (ctx.statements.size == 1) { 
        var param: Seq[Object] = Nil 
        if (ctx.parameters != Nil) param =  ctx.parameters.head 
        log.info(s"cqlExecute>  single-command: statement: ${ctx.statements.head} parameters: ${param}") 
        cqlSingleUpdate(ctx.consistency, ctx.statements.head, param) 
      } 
      else { 
        var params: Seq[Seq[Object]] = Nil 
        if (ctx.parameters == Nil) 
          params = Seq.fill(ctx.statements.length)(Nil) 
        else { 
          if (ctx.statements.size > ctx.parameters.size) { 
            log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.") 
            val nils = Seq.fill(ctx.statements.size - ctx.parameters.size)(Nil) 
            params = ctx.parameters ++ nils 
 
          } 
          else 
            params = ctx.parameters 
        } 
 
        val commands: Seq[(String,Seq[Object])] = ctx.statements zip params 
        log.info(s"cqlExecute>  multi-commands: ${commands}") 
/* 
        //using sequence to flip List[Future[Boolean]] => Future[List[Boolean]] 
        //therefore, make sure no command replies on prev command effect 
        val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) => 
          cqlSingleUpdate(ctx.consistency, stmt, param) 
        }.toList 
 
        val futList = lstCmds.sequence.map(_ => true)   //must map to execute 
        */ 
/* 
        //using traverse to have some degree of parallelism = max(runtimes) 
        //therefore, make sure no command replies on prev command effect 
        val futList = Future.traverse(commands) { case (stmt,param)  => 
          cqlSingleUpdate(ctx.consistency, stmt, param) 
        }.map(_ => true) 
 
        Await.result(futList, 3 seconds) 
        Future.successful(true) 
*/ 
        // run sync directly 
        Future { 
          commands.foreach { case (stm, pars) => 
            cqlExecuteSync(ctx.consistency, stm, pars) 
          } 
          true 
        } 
      } 
    } 
    else 
      cqlBatchUpdate(ctx) 
  } 
  def cqlSingleUpdate(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])( 
         implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
 
    val prepStmt = session.prepare(stmt) 
 
    var boundStmt = prepStmt.bind() 
    var pars: Seq[Object] = Nil 
    if (params != Nil) { 
      pars = processParameters(params) 
      boundStmt = prepStmt.bind(pars: _*) 
    } 
    log.info(s"cqlSingleUpdate>  statement: ${prepStmt.getQueryString}, parameters: ${pars}") 
 
    cons.foreach { consistency => 
      boundStmt.setConsistencyLevel(consistencyLevel(consistency)) 
    } 
    session.executeAsync(boundStmt).map(_.wasApplied()) 
  } 
 
  def cqlExecuteSync(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])( 
    implicit session: Session, ec: ExecutionContext): Boolean = { 
 
    val prepStmt = session.prepare(stmt) 
 
    var boundStmt = prepStmt.bind() 
    var pars: Seq[Object] = Nil 
    if (params != Nil) { 
      pars = processParameters(params) 
      boundStmt = prepStmt.bind(pars: _*) 
    } 
    log.info(s"cqlExecuteSync>  statement: ${prepStmt.getQueryString}, parameters: ${pars}") 
 
    cons.foreach { consistency => 
      boundStmt.setConsistencyLevel(consistencyLevel(consistency)) 
    } 
    session.execute(boundStmt).wasApplied() 
 
  } 
 
  def cqlBatchUpdate(ctx: CQLContext)( 
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
    var params: Seq[Seq[Object]] = Nil 
    if (ctx.parameters == Nil) 
      params = Seq.fill(ctx.statements.length)(Nil) 
    else 
      params = ctx.parameters 
    log.info(s"cqlBatchUpdate>  statement: ${ctx.statements.head}, parameters: ${params}") 
 
    val prepStmt = session.prepare(ctx.statements.head) 
 
    var batch = new BatchStatement() 
    params.foreach { p => 
      log.info(s"cqlBatchUpdate>  batch with raw parameter: ${p}") 
      val pars = processParameters(p) 
      log.info(s"cqlMultiUpdate>  batch with cooked parameters: ${pars}") 
      batch.add(prepStmt.bind(pars: _*)) 
    } 
    ctx.consistency.foreach { consistency => 
      batch.setConsistencyLevel(consistencyLevel(consistency)) 
    } 
    session.executeAsync(batch).map(_.wasApplied()) 
  } 
 
  def cassandraStream[A](ctx: CQLQueryContext,extractor: Row => A) 
                        (implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = { 
 
    val prepStmt = session.prepare(ctx.statement) 
    var boundStmt =  prepStmt.bind() 
    val params = processParameters(ctx.parameter) 
    boundStmt = prepStmt.bind(params:_*) 
    ctx.consistency.foreach {consistency => 
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))} 
 
    log.info(s"cassandraStream>  statement: ${prepStmt.getQueryString}, parameters: ${params}") 
    CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(extractor) 
  } 
 
  case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true, 
                                      statement: String, prepareParams: R => Seq[Object], 
                                      consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas => 
    def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel) 
    def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered) 
    def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] = 
      cas.copy(consistency = Some(_consistency)) 
 
    def perform(r: R)(implicit session: Session, ec: ExecutionContext) = { 
      var prepStmt = session.prepare(statement) 
      var boundStmt =  prepStmt.bind() 
      val params = processParameters(prepareParams(r)) 
      boundStmt = prepStmt.bind(params: _*) 
      consistency.foreach { cons => 
        boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons)) 
      } 
      log.info(s"CassandraActionStream.perform>  statement: ${prepStmt.getQueryString}, parameters: ${params}") 
      session.executeAsync(boundStmt).map(_ => r) 
    } 
 
    def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = 
      if (processInOrder) 
        Flow[R].mapAsync(parallelism)(perform) 
      else 
        Flow[R].mapAsyncUnordered(parallelism)(perform) 
 
    def unloggedBatch[K](statementBinder: ( 
      R, PreparedStatement) => BoundStatement,partitionKey: R => K)( 
      implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = { 
      val preparedStatement = session.prepare(statement) 
      log.info(s"CassandraActionStream.unloggedBatch>  statement: ${preparedStatement.getQueryString}") 
      CassandraFlow.createUnloggedBatchWithPassThrough[R, K]( 
        parallelism, 
        preparedStatement, 
        statementBinder, 
        partitionKey) 
    } 
 
  } 
  object CassandraActionStream { 
    def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] = 
      new CassandraActionStream[R]( statement=_statement, prepareParams = params) 
  } 
 
 
} 
 
object CQLHelpers extends LogSupport { 
  import java.nio.ByteBuffer 
  import java.io._ 
  import java.nio.file._ 
  import com.datastax.driver.core.LocalDate 
  import com.datastax.driver.extras.codecs.jdk8.InstantCodec 
  import java.time.Instant 
  import akka.stream.scaladsl._ 
  import akka.stream._ 
 
  implicit def listenableFutureToFuture[T]( 
                                            listenableFuture: ListenableFuture[T]): Future[T] = { 
    val promise = Promise[T]() 
    Futures.addCallback(listenableFuture, new FutureCallback[T] { 
      def onFailure(error: Throwable): Unit = { 
        promise.failure(error) 
        () 
      } 
      def onSuccess(result: T): Unit = { 
        promise.success(result) 
        () 
      } 
    }) 
    promise.future 
  } 
 
  case class CQLDate(year: Int, month: Int, day: Int) 
  case object CQLTodayDate 
  case class CQLDateTime(year: Int, Month: Int, 
                         day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0) 
  case object CQLDateTimeNow 
 
  def cqlGetDate(dateToConvert: java.util.Date): java.time.LocalDate = 
          dateToConvert.toInstant() 
           .atZone(java.time.ZoneId.systemDefault()) 
           .toLocalDate() 
 
  def cqlGetTime(dateToConvert: java.util.Date): java.time.LocalTime = 
      dateToConvert.toInstant() 
        .atZone(java.time.ZoneId.systemDefault()) 
        .toLocalTime() 
 
  def cqlGetTimestamp(dateToConvert: java.util.Date): java.time.LocalDateTime= 
      new java.sql.Timestamp( 
           dateToConvert.getTime() 
           ).toLocalDateTime() 
 
  def processParameters(params: Seq[Object]): Seq[Object] = { 
    import java.time.{Clock,ZoneId} 
    log.info(s"[processParameters] input: ${params}") 
    val outParams = params.map { obj => 
      obj match { 
        case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd) 
        case CQLTodayDate => 
          val today = java.time.LocalDate.now() 
          LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth) 
        case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS))) 
        case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) => 
          Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d") 
        case p@_ => p 
      } 
    } 
    log.info(s"[processParameters] output: ${params}") 
    outParams 
  } 
  class ByteBufferInputStream(buf: ByteBuffer) extends InputStream { 
    override def read: Int = { 
      if (!buf.hasRemaining) return -1 
      buf.get 
    } 
 
    override def read(bytes: Array[Byte], off: Int, len: Int): Int = { 
      val length: Int = Math.min(len, buf.remaining) 
      buf.get(bytes, off, length) 
      length 
    } 
  } 
  object ByteBufferInputStream { 
    def apply(buf: ByteBuffer): ByteBufferInputStream = { 
      new ByteBufferInputStream(buf) 
    } 
  } 
  class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream { 
 
    override def write(b: Int): Unit = { 
      buf.put(b.toByte) 
    } 
 
    override def write(bytes: Array[Byte], off: Int, len: Int): Unit = { 
      buf.put(bytes, off, len) 
    } 
  } 
  object FixsizedByteBufferOutputStream { 
    def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf) 
  } 
  class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream { 
 
    private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR 
 
    override def write(b: Array[Byte], off: Int, len: Int): Unit = { 
      val position = buf.position 
      val limit = buf.limit 
      val newTotal: Long = position + len 
      if(newTotal > limit){ 
        var capacity = (buf.capacity * increasing) 
        while(capacity <= newTotal){ 
          capacity = (capacity*increasing) 
        } 
        increase(capacity.toInt) 
      } 
 
      buf.put(b, 0, len) 
    } 
 
    override def write(b: Int): Unit= { 
      if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt) 
      buf.put(b.toByte) 
    } 
    protected def increase(newCapacity: Int): Unit = { 
      buf.limit(buf.position) 
      buf.rewind 
      val newBuffer = 
        if (onHeap) ByteBuffer.allocate(newCapacity) 
        else  ByteBuffer.allocateDirect(newCapacity) 
      newBuffer.put(buf) 
      buf.clear 
      buf = newBuffer 
    } 
    def size: Long = buf.position 
    def capacity: Long = buf.capacity 
    def byteBuffer: ByteBuffer = buf 
  } 
  object ExpandingByteBufferOutputStream { 
    val DEFAULT_INCREASING_FACTOR = 1.5f 
    def apply(size: Int, increasingBy: Float, onHeap: Boolean) = { 
      if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0") 
      val buffer: ByteBuffer = 
        if (onHeap) ByteBuffer.allocate(size) 
        else ByteBuffer.allocateDirect(size) 
      new ExpandingByteBufferOutputStream(buffer,onHeap) 
    } 
    def apply(size: Int): ExpandingByteBufferOutputStream = { 
      apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false) 
    } 
 
    def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = { 
      apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap) 
    } 
 
    def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = { 
      apply(size, increasingBy, false) 
    } 
 
  } 
  def cqlFileToBytes(fileName: String): ByteBuffer = { 
    val fis = new FileInputStream(fileName) 
    val b = new Array[Byte](fis.available + 1) 
    val length = b.length 
    fis.read(b) 
    ByteBuffer.wrap(b) 
  } 
  def cqlBytesToFile(bytes: ByteBuffer, fileName: String)( 
    implicit mat: Materializer): Future[IOResult] = { 
    val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes)) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
  def cqlDateTimeString(date: java.util.Date, fmt: String): String = { 
    val outputFormat = new java.text.SimpleDateFormat(fmt) 
    outputFormat.format(date) 
  } 
  def useJava8DateTime(cluster: Cluster) = { 
    //for jdk8 datetime format 
    cluster.getConfiguration().getCodecRegistry() 
      .register(InstantCodec.instance) 
  } 
}

BytesConverter.scala

package protobuf.bytes 
import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream} 
import com.google.protobuf.ByteString 
object Converter { 
 
  def marshal(value: Any): ByteString = { 
    val stream: ByteArrayOutputStream = new ByteArrayOutputStream() 
    val oos = new ObjectOutputStream(stream) 
    oos.writeObject(value) 
    oos.close() 
    ByteString.copyFrom(stream.toByteArray()) 
  } 
 
  def unmarshal[A](bytes: ByteString): A = { 
    val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray)) 
    val value = ois.readObject() 
    ois.close() 
    value.asInstanceOf[A] 
  } 
 
}

CQLServices.scala

package demo.sdp.grpc.cql.server 
 
import akka.NotUsed 
import akka.stream.scaladsl._ 
 
import protobuf.bytes.Converter._ 
import com.datastax.driver.core._ 
 
import scala.concurrent.ExecutionContextExecutor 
import sdp.grpc.services._ 
import sdp.cql.engine._ 
import CQLEngine._ 
import CQLHelpers._ 
import sdp.logging.LogSupport 
import scala.concurrent._ 
import scala.concurrent.duration._ 
import akka.stream.ActorMaterializer 
 
 
class CQLStreamingServices(implicit ec: ExecutionContextExecutor, 
                           mat: ActorMaterializer,  session: Session) 
  extends CqlGrpcAkkaStream.CQLServices with LogSupport{ 
 
  val toParams: AQMRPTRow => Seq[Object] = row => Seq[Object]( 
    row.rowid.asInstanceOf[Object], 
    row.measureid.asInstanceOf[Object], 
    row.statename, 
    row.countyname, 
    row.reportyear.asInstanceOf[Object], 
    row.value.asInstanceOf[Object], 
    CQLDateTimeNow 
  ) 
  val cqlInsert =""" 
                   |insert into testdb.AQMRPT( 
                   | rowid, 
                   | measureid, 
                   | statename, 
                   | countyname, 
                   | reportyear, 
                   | value, 
                   | created) 
                   | values(?,?,?,?,?,?,?) 
                 """.stripMargin 
 
  val cqlActionStream = CassandraActionStream(cqlInsert,toParams).setParallelism(2) 
    .setProcessOrder(false) 
 
/* 
  val cqlActionFlow: Flow[AQMRPTRow,AQMRPTRow,NotUsed] = 
    Flow[AQMRPTRow] 
      .via(cqlActionStream.performOnRow) 
*/ 
 
  val cqlActionFlow: Flow[AQMRPTRow,CQLResult,NotUsed] = { 
    Flow[AQMRPTRow] 
        .mapAsync(cqlActionStream.parallelism){ row => 
          if (IfExists(row.rowid)) 
            Future.successful(CQLResult(marshal(0))) 
          else 
            cqlActionStream.perform(row).map {_ => CQLResult(marshal(1))} 
        } 
  } 
 
  override def transferRows: Flow[AQMRPTRow, CQLResult, NotUsed] = { 
    Flow[AQMRPTRow] 
      .via(cqlActionFlow) 
  } 
 
 
  private def IfExists(rowid: Long): Boolean = { 
 
    val cql = "SELECT * FROM testdb.AQMRPT WHERE ROWID = ? ALLOW FILTERING" 
    val param = Seq(rowid.asInstanceOf[Object]) 
    val toRowId: Row => Long = r => r.getLong("rowid") 
    val ctx = CQLQueryContext(cql,param) 
    val src: Source[Long,NotUsed] = cassandraStream(ctx,toRowId) 
    val fut = src.toMat(Sink.headOption)(Keep.right).run() 
 
    val result = Await.result(fut,3 seconds) 
 
    log.info(s"checking existence: ${result}") 
    result match { 
      case Some(x) => true 
      case None => false 
    } 
 
  } 
 
  override def clientStreaming: Flow[HelloMsg, HelloMsg, NotUsed] = { 
    Flow[HelloMsg] 
      .map {r => println(r) ; r} 
  } 
 
  override def runDDL: Flow[CQLUpdate, CQLResult, NotUsed] = { 
    Flow[CQLUpdate] 
      .flatMapConcat { context => 
        //unpack CQLUpdate and construct the context 
        val ctx = CQLContext(context.statements) 
        log.info(s"**** CQLContext => ${ctx} ***") 
 
        Source 
          .fromFuture(cqlExecute(ctx)) 
          .map { r => CQLResult(marshal(r)) } 
      } 
  } 
 
  def toCQLTimestamp(rs: Row) = { 
    try { 
      val tm = rs.getTimestamp("CREATED") 
      if (tm == null) None 
      else { 
        val localdt = cqlGetTimestamp(tm) 
        Some(ProtoDateTime(Some(ProtoDate(localdt.getYear, localdt.getMonthValue, localdt.getDayOfMonth)), 
          Some(ProtoTime(localdt.getHour, localdt.getMinute, localdt.getSecond, localdt.getNano)))) 
      } 
    } 
    catch { 
      case e: Exception => None 
    } 
  } 
 
  val toAQMRow: Row => AQMRPTRow = rs=> AQMRPTRow( 
    rowid = rs.getLong("ROWID"), 
    measureid = rs.getLong("MEASUREID"), 
    statename = rs.getString("STATENAME"), 
    countyname = rs.getString("COUNTYNAME"), 
    reportyear = rs.getInt("REPORTYEAR"), 
    value = rs.getInt("VALUE"), 
    created = toCQLTimestamp(rs) 
  ) 
  override def runQuery: Flow[CQLQuery, AQMRPTRow, NotUsed] = { 
    log.info("**** runQuery called on service side ***") 
    Flow[CQLQuery] 
      .flatMapConcat { q => 
        //unpack JDBCQuery and construct the context 
        var params: Seq[Object] =  Nil 
        if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY) 
          params = unmarshal[Seq[Object]](q.parameters) 
        log.info(s"**** query parameters: ${params} ****") 
        val ctx = CQLQueryContext(q.statement,params) 
        CQLEngine.cassandraStream(ctx,toAQMRow) 
 
      } 
  } 
   
}

CQLServer.scala

package demo.sdp.grpc.cql.server 
 
import java.util.logging.Logger 
import com.datastax.driver.core._ 
import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import io.grpc.Server 
import io.grpc.ServerBuilder 
import sdp.grpc.services._ 
import sdp.cql.engine._ 
import CQLHelpers._ 
 
class gRPCServer(server: Server) { 
 
  val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName) 
 
  def start(): Unit = { 
    server.start() 
    logger.info(s"Server started, listening on ${server.getPort}") 
    sys.addShutdownHook { 
      // Use stderr here since the logger may has been reset by its JVM shutdown hook. 
      System.err.println("*** shutting down gRPC server since JVM is shutting down") 
      stop() 
      System.err.println("*** server shut down") 
    } 
    () 
  } 
 
  def stop(): Unit = { 
    server.shutdown() 
  } 
 
  /** 
    * Await termination on the main thread since the grpc library uses daemon threads. 
    */ 
  def blockUntilShutdown(): Unit = { 
    server.awaitTermination() 
  } 
} 
 
object CQLServer extends App { 
  implicit val cqlsys = ActorSystem("cqlSystem") 
  implicit val mat = ActorMaterializer() 
  implicit val ec = cqlsys.dispatcher 
 
  val cluster = new Cluster 
  .Builder() 
    .addContactPoints("localhost") 
    .withPort(9042) 
    .build() 
 
  useJava8DateTime(cluster) 
  implicit val session = cluster.connect() 
 
  val server = new gRPCServer( 
    ServerBuilder 
      .forPort(50051) 
      .addService( 
        CqlGrpcAkkaStream.bindService( 
          new CQLStreamingServices 
        ) 
      ).build() 
  ) 
  server.start() 
  //  server.blockUntilShutdown() 
  scala.io.StdIn.readLine() 
  session.close() 
  cluster.close() 
  mat.shutdown() 
  cqlsys.terminate() 
}

CQLClient.scala

package demo.sdp.grpc.cql.client 
 
import sdp.grpc.services._ 
import protobuf.bytes.Converter._ 
import akka.stream.scaladsl._ 
import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, ThrottleMode} 
import io.grpc._ 
import sdp.logging.LogSupport 
import sdp.jdbc.engine._ 
import JDBCEngine._ 
import scalikejdbc.WrappedResultSet 
import sdp.cql.engine.CQLHelpers.CQLDateTimeNow 
import scala.util._ 
import scala.concurrent.ExecutionContextExecutor 
 
class CQLStreamClient(host: String, port: Int)( 
  implicit ec: ExecutionContextExecutor) extends LogSupport { 
 
  val channel = ManagedChannelBuilder 
    .forAddress(host, port) 
    .usePlaintext(true) 
    .build() 
 
 
  val stub = CqlGrpcAkkaStream.stub(channel) 
 
  val jdbcRows2transfer = JDBCQueryContext[AQMRPTRow]( 
    dbName = 'h2, 
    statement = "select * from AQMRPT where statename='Arkansas'" 
  ) 
 
 
  def toAQMRPTRow: WrappedResultSet => AQMRPTRow = rs => AQMRPTRow( 
    rowid = rs.long("ROWID"), 
    measureid = rs.long("MEASUREID"), 
    statename = rs.string("STATENAME"), 
    countyname = rs.string("COUNTYNAME"), 
    reportyear = rs.int("REPORTYEAR"), 
    value = rs.int("VALUE"), 
    created = Some(ProtoDateTime(Some(ProtoDate(1990, 8, 12)), Some(ProtoTime(23, 56, 23, 0)))) 
  ) 
 
 
  import scala.concurrent.duration._ 
 
  def transferRows: Source[CQLResult, NotUsed] = { 
    log.info(s"**** calling transferRows ****") 
    jdbcAkkaStream(jdbcRows2transfer, toAQMRPTRow) 
      //      .throttle(1, 500.millis, 1, ThrottleMode.shaping) 
      .via(stub.transferRows) 
  } 
 
  def echoHello: Source[HelloMsg,NotUsed] = { 
    val row = HelloMsg("hello world!") 
    val rows = List.fill[HelloMsg](100)(row) 
    Source 
      .fromIterator(() => rows.iterator) 
      .via(stub.clientStreaming) 
  } 
  val query0 = CQLQuery( 
    statement = "select * from testdb.AQMRPT" 
  ) 
 
  val query = CQLQuery( 
    statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE > ? ALLOW FILTERING;", 
    parameters = marshal(Seq("Arkansas", 0.toInt)) 
  ) 
  val query2 = CQLQuery ( 
    statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?", 
    parameters = marshal(Seq("Colorado", 3.toInt)) 
  ) 
  val query3= CQLQuery ( 
    statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?", 
    parameters = marshal(Seq("Arkansas", 8.toInt)) 
  ) 
 
  def queryRows: Source[AQMRPTRow,NotUsed] = { 
    log.info(s"running queryRows ...") 
    Source 
      .single(query) 
      .via(stub.runQuery) 
  } 
 
  val dropCQL = "DROP TABLE IF EXISTS testdb.AQMRPT" 
 
  val createCQL =""" 
  CREATE TABLE testdb.AQMRPT ( 
     rowid bigint primary key, 
     measureid bigint, 
     statename text, 
     countyname text, 
     reportyear int, 
     value int, 
     created timestamp 
  )""" 
 
  val cqlddl = CQLUpdate(statements = Seq(dropCQL,createCQL)) 
  def createTbl: Source[CQLResult,NotUsed] = { 
    log.info(s"running createTbl ...") 
    Source 
      .single(cqlddl) 
      .via(stub.runDDL) 
  } 
   
} 
 
 
object EchoHelloClient extends App { 
  implicit val system = ActorSystem("EchoNumsClient") 
  implicit val mat = ActorMaterializer.create(system) 
  implicit val ec = system.dispatcher 
  val client = new CQLStreamClient("localhost", 50051) 
 
  client.echoHello.runForeach(println) 
 
  scala.io.StdIn.readLine() 
  mat.shutdown() 
  system.terminate() 
 
} 
 
 
object TransferRows extends App { 
 
  import sdp.jdbc.config._ 
 
  implicit val system = ActorSystem("JDBCServer") 
  implicit val mat = ActorMaterializer.create(system) 
  implicit val ec = system.dispatcher 
 
  ConfigDBsWithEnv("dev").setup('h2) 
  ConfigDBsWithEnv("dev").loadGlobalSettings() 
 
 
  val client = new CQLStreamClient("localhost", 50051) 
  val fut = client.transferRows.runFold(0){(a,b) => a + unmarshal[Int](b.result)} 
  fut.onComplete { 
    case scala.util.Success(cnt) => println(s"done transfer ${cnt} rows.") 
    case Failure(e) => println(s"!!!!!streaming error: ${e.getMessage}") 
  } 
 
  scala.io.StdIn.readLine() 
  ConfigDBsWithEnv("dev").close('h2) 
  mat.shutdown() 
  system.terminate() 
 
 
} 
 
object QueryRows extends App { 
  implicit val system = ActorSystem("QueryRows") 
  implicit val mat = ActorMaterializer.create(system) 
  implicit val ec = system.dispatcher 
 
 
  val client = new CQLStreamClient("localhost", 50051) 
 
  val fut = client.queryRows.runForeach { r => println(r) } 
  fut.onComplete { 
    case scala.util.Success(d) => println(s"done querying.") 
    case Failure(e) => println(s"!!!!!query error: ${e.getMessage}") 
  } 
 
  scala.io.StdIn.readLine() 
  mat.shutdown() 
  system.terminate() 
 
} 
 
object RunDDL extends App { 
  implicit val system = ActorSystem("RunDDL") 
  implicit val mat = ActorMaterializer.create(system) 
  implicit val ec = system.dispatcher 
 
  val client = new CQLStreamClient("localhost", 50051) 
 
  client.createTbl.runForeach { r => println(unmarshal(r.result)) } 
 
 
  scala.io.StdIn.readLine() 
  mat.shutdown() 
  system.terminate() 
 
}

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/industrynews/12795.html

(0)
上一篇 2021年7月19日 14:54
下一篇 2021年7月19日 14:54

相关推荐

发表回复

登录后才能评论