PICE(2):JDBCStreaming – gRPC-JDBC Service详解编程语言

   在一个akka-cluster环境里,从数据调用的角度上,JDBC数据库与集群中其它节点是脱离的。这是因为JDBC数据库不是分布式的,不具备节点位置透明化特性。所以,JDBC数据库服务器必须通过服务方式来向外提供数据操。在这种场景里服务端是JDBC服务,其它节点,包括其它的JDBC数据库节点都是这个JDBC服务的调用客户端。因为我们已经明确选择了在akka-cluster集群环境里实施gRPC服务模式,通过akka-stream的流控制方式实现数据库操作的程序控制,所以在本次讨论里我们将示范说明gRPC-JDBC-Streaming的具体实现和使用方式。

在上次的讨论里我们已经示范了最简单的JDBC-Streaming Unary request/response模式:从客户端向JDBC-Service发送一个JDBCQuery、JDBC服务端运行JDBCQuery后向客户端返回一个数据流DataRows。jdbc.proto文件里用IDL定义的数据和服务类型如下:

message JDBCDataRow { 
 string year = 1; 
 string state = 2; 
 string county = 3; 
 string value = 4; 
} 
 
message JDBCQuery { 
  string dbName = 1; 
  string statement = 2; 
  bytes parameters = 3; 
  google.protobuf.Int32Value fetchSize= 4; 
  google.protobuf.BoolValue autoCommit = 5; 
  google.protobuf.Int32Value queryTimeout = 6; 
} 
 
service JDBCServices { 
  rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {} 
}

以上数据类型JDBCDataRow和JDBCQuery分别对应JDBC-Streaming工具的流元素结构和JDBCQueryContext,如下:

  val toRow = (rs: WrappedResultSet) => JDBCDataRow( 
    year = rs.string("REPORTYEAR"), 
    state = rs.string("STATENAME"), 
    county = rs.string("COUNTYNAME"), 
    value = rs.string("VALUE") 
  ) 
 
   val ctx = JDBCQueryContext[JDBCDataRow]( 
     dbName = Symbol(q.dbName), 
     statement = q.statement, 
     parameters = params, 
     fetchSize = q.fetchSize.getOrElse(100), 
     autoCommit = q.autoCommit.getOrElse(false), 
     queryTimeout = q.queryTimeout 
   ) 
   jdbcAkkaStream(ctx, toRow)

用scalaPB编译后自动产生服务端和客户端框架代码(boilerplate-code)。我们需要实现具体的JDBC服务:

class JDBCStreamingServices(implicit ec: ExecutionContextExecutor) extends JdbcGrpcAkkaStream.JDBCServices { 
  val logger = Logger.getLogger(classOf[JDBCStreamingServices].getName) 
 
  val toRow = (rs: WrappedResultSet) => JDBCDataRow( 
    year = rs.string("REPORTYEAR"), 
    state = rs.string("STATENAME"), 
    county = rs.string("COUNTYNAME"), 
    value = rs.string("VALUE") 
  ) 
  override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = { 
    logger.info("**** runQuery called on service side ***") 
    Flow[JDBCQuery] 
      .flatMapConcat { q => 
        //unpack JDBCQuery and construct the context 
        val params: Seq[Any] = unmarshal[Seq[Any]](q.parameters) 
        logger.info(s"**** query parameters: ${params} ****") 
        val ctx = JDBCQueryContext[JDBCDataRow]( 
          dbName = Symbol(q.dbName), 
          statement = q.statement, 
          parameters = params, 
          fetchSize = q.fetchSize.getOrElse(100), 
          autoCommit = q.autoCommit.getOrElse(false), 
          queryTimeout = q.queryTimeout 
        ) 
        jdbcAkkaStream(ctx, toRow) 
      } 
  } 
}

下面是客户端调用服务示范:

  val query = JDBCQuery ( 
    dbName = "h2", 
    statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?", 
    parameters = marshal(Seq("Arizona", 5)) 
  ) 
  def queryRows: Source[JDBCDataRow,NotUsed] = { 
    logger.info(s"running queryRows ...") 
    Source 
      .single(query) 
      .via(stub.runQuery) 
  }

这个程序的运行方式如下:

object QueryRows extends App { 
  implicit val system = ActorSystem("QueryRows") 
  implicit val mat = ActorMaterializer.create(system) 
 
  val client = new JDBCStreamClient("localhost", 50051) 
 
  client.queryRows.runForeach(println) 
 
  scala.io.StdIn.readLine() 
  mat.shutdown() 
  system.terminate() 
}

那么如果从客户端发出一串的JDBCQuery又如何呢?这也是所谓的BiDi-Streaming模式,在jdbc.proto的服务描述如下:

service JDBCServices { 
  rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {} 
  rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {} 
}

我们看到batQuery的入参是一个stream。自动产生的服务函数batQuery款式是这样的:

  override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = { ... } 
  override def batQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = runQuery

runQuery和batQuery的函数款式是一样的。这就说明服务端提供的服务模式是一样的。在我们这个例子里它们都是对每个收到的JDBCQuery发还相关的数据流。实际上这两项服务的区别在客户方。下面是scalaPB产生的源代码:

   override def runQuery: Flow[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow, NotUsed] = 
      Flow.fromGraph( 
        new GrpcGraphStage[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow]({ outputObserver => 
          new StreamObserver[grpc.jdbc.services.JDBCQuery] { 
            override def onError(t: Throwable): Unit = () 
            override def onCompleted(): Unit = () 
            override def onNext(request: grpc.jdbc.services.JDBCQuery): Unit = 
              ClientCalls.asyncServerStreamingCall( 
                channel.newCall(METHOD_RUN_QUERY, options), 
                request, 
                outputObserver 
              ) 
          } 
        }) 
      ) 
 ... 
      override def batQuery: Flow[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow, NotUsed] = 
        Flow.fromGraph(new GrpcGraphStage[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow](outputObserver => 
          ClientCalls.asyncBidiStreamingCall( 
            channel.newCall(METHOD_BAT_QUERY, options), 
            outputObserver 
          ) 
        ))

所以在客户端我们调用batQuery:

  def batQueryRows: Source[JDBCDataRow,NotUsed] = { 
    logger.info(s"running batQueryRows ...") 
    Source 
      .fromIterator(() => List(query,query2,query3).toIterator) 
      .via(stub.batQuery) 
  }

JDBC操作除Query之外还应该具备数据更新部分,包括Schema DDL和database-updates。JDBC-update是通过JDBCContext来传递更新要求的:

case class JDBCContext( 
                        dbName: Symbol, 
                        statements: Seq[String] = Nil, 
                        parameters: Seq[Seq[Any]] = Nil, 
                        fetchSize: Int = 100, 
                        queryTimeout: Option[Int] = None, 
                        queryTags: Seq[String] = Nil, 
                        sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE, 
                        batch: Boolean = false, 
                        returnGeneratedKey: Seq[Option[Any]] = Nil, 
                        // no return: None, return by index: Some(1), by name: Some("id") 
                        preAction: Option[PreparedStatement => Unit] = None, 
                        postAction: Option[PreparedStatement => Unit] = None) {... }

这个class对应的protobuf message定义如下:

message JDBCResult { 
  bytes result = 1; 
} 
 
message JDBCUpdate { 
  string dbName = 1; 
  repeated string statements = 2; 
  bytes parameters = 3; 
  google.protobuf.Int32Value fetchSize= 4; 
  google.protobuf.Int32Value queryTimeout = 5; 
  int32 sqlType = 6; 
  google.protobuf.Int32Value batch = 7; 
  bytes returnGeneratedKey = 8; 
} 
 
service JDBCServices { 
  rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {} 
  rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {} 
  rpc runDDL(JDBCUpdate) returns (JDBCResult) {} 
}

服务函数runDDL返回消息类型JDBCResult: 包嵌一个Seq[Any]类型的返回值。下面是JDBCContext的protobuf message打包、还原使用方法示范,在服务端把JDBCUpdate拆解构建JDBCContext后调用jdbcExecuteDDL:

 override def runDDL: Flow[JDBCUpdate, JDBCResult, NotUsed] = { 
    logger.info("**** runDDL called on service side ***") 
    Flow[JDBCUpdate] 
      .flatMapConcat { context => 
        //unpack JDBCUpdate and construct the context 
 
        val ctx = JDBCContext( 
          dbName = Symbol(context.dbName), 
          statements = context.statements, 
          sqlType = JDBCContext.SQL_EXEDDL, 
          queryTimeout = context.queryTimeout 
        ) 
 
        logger.info(s"**** JDBCContext => ${ctx} ***") 
 
        Source 
          .fromFuture(jdbcExecuteDDL(ctx)) 
          .map { r => JDBCResult(marshal(r)) } 
 
      } 
  }

jdbcExecuteDDL返回Future[String],如下:

  def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = { 
    if (ctx.sqlType != SQL_EXEDDL) { 
      Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")) 
    } 
    else { 
      Future { 
        NamedDB(ctx.dbName) localTx { implicit session => 
          ctx.statements.foreach { stm => 
            val ddl = new SQLExecution(statement = stm, parameters = Nil)( 
              before = WrappedResultSet => {})( 
              after = WrappedResultSet => {}) 
 
            ddl.apply() 
          } 
          "SQL_EXEDDL executed succesfully." 
        } 
      } 
    } 
  }

我们可以用Source.fromFuture(jdbcExecuteDDL(cox))来构建一个akka-stream Source。 在客户端构建一个JDBCUpdate结构传给服务端进行运算:

  val dropSQL: String =""" 
      drop table members 
    """ 
 
  val createSQL: String =""" 
    create table members ( 
      id serial not null primary key, 
      name varchar(30) not null, 
      description varchar(1000), 
      birthday date, 
      created_at timestamp not null, 
      picture blob 
    )""" 
  val ctx = JDBCUpdate ( 
    dbName = "h2", 
    sqlType = JDBCContext.SQL_EXEDDL, 
    statements = Seq(dropSQL,createSQL) 
  ) 
 
  def createTbl: Source[JDBCResult,NotUsed] = { 
    logger.info(s"running createTbl ...") 
    Source 
      .single(ctx) 
      .via(stub.runDDL) 
  }

注意:statements = Seq(dropSQL,createSQL)包含了两个独立的SQL运算。

下面我们示范一下从客户端传送一个数据流(stream MemberRow),由服务端插入数据库操作。DDL数据类型和服务函数定义如下:

message JDBCDate { 
  int32 yyyy = 1; 
  int32 mm   = 2; 
  int32 dd   = 3; 
} 
 
message JDBCTime { 
  int32 hh   = 1; 
  int32 mm   = 2; 
  int32 ss   = 3; 
  int32 nnn  = 4; 
} 
 
message JDBCDateTime { 
   JDBCDate date = 1; 
   JDBCTime time = 2; 
} 
message MemberRow { 
  string name = 1; 
  JDBCDate birthday = 2; 
  string description = 3; 
  JDBCDateTime created_at = 4; 
  bytes picture = 5; 
} 
 
service JDBCServices { 
  rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {} 
  rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {} 
  rpc runDDL(JDBCUpdate) returns (JDBCResult) {} 
  rpc insertRows(stream MemberRow) returns(JDBCResult) {} 
}

insertRows服务函数的实现如下:

 override def insertRows: Flow[MemberRow, JDBCResult, NotUsed] = { 
    logger.info("**** insertRows called on service side ***") 
    val insertSQL = """ 
      insert into members( 
        name, 
        birthday, 
        description, 
        created_at 
      ) values (?, ?, ?, ?) 
    """ 
    Flow[MemberRow] 
      .flatMapConcat { row => 
        val ctx = JDBCContext('h2) 
          .setUpdateCommand(true,insertSQL, 
             row.name, 
             jdbcSetDate(row.birthday.get.yyyy,row.birthday.get.mm,row.birthday.get.dd), 
             row.description, 
             jdbcSetNow 
          ) 
 
        logger.info(s"**** JDBCContext => ${ctx} ***") 
 
        Source 
          .fromFuture(jdbcTxUpdates[Vector](ctx)) 
          .map { r => JDBCResult(marshal(r)) } 
      } 
  }

同样,这个jdbcTxUpdates返回结果是Future类型。具体实现在附件的JDBCEngine.scala中。

客户端构建一个MemberRow流,然后经过stub.insertRows发送给服务端:

  val p1 = MemberRow( "Peter Chan",Some(JDBCDate(1967,5,17)),"new member1",None,_root_.com.google.protobuf.ByteString.EMPTY) 
  val p2 = MemberRow( "Alanda Wong",Some(JDBCDate(1980,11,10)),"new member2",None,_root_.com.google.protobuf.ByteString.EMPTY) 
  val p3 = MemberRow( "Kate Zhang",Some(JDBCDate(1969,8,13)),"new member3",None,_root_.com.google.protobuf.ByteString.EMPTY) 
  val p4 = MemberRow( "Tiger Chan",Some(JDBCDate(1962,5,1)),"new member4",None,_root_.com.google.protobuf.ByteString.EMPTY) 
 
  def insertRows: Source[JDBCResult,NotUsed] = { 
    logger.info(s"running insertRows ...") 
    Source 
      .fromIterator(() => List(p1,p2,p3,p4).toIterator) 
      .via(stub.insertRows) 
  }

最后,我们再示范jdbcBatchUpdate函数的使用。我们从服务端读取MemberRow再传回服务端进行更新操作。DDL如下:

message MemberRow { 
  string name = 1; 
  JDBCDate birthday = 2; 
  string description = 3; 
  JDBCDateTime created_at = 4; 
  bytes picture = 5; 
} 
 
service JDBCServices { 
  rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {} 
  rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {} 
  rpc runDDL(JDBCUpdate) returns (JDBCResult) {} 
  rpc insertRows(stream MemberRow) returns(JDBCResult) {} 
  rpc updateRows(stream MemberRow) returns(JDBCResult) {} 
  rpc getMembers(JDBCQuery) returns (stream MemberRow) {} 
}

服务端函数定义如下:

 val toMemberRow = (rs: WrappedResultSet) => MemberRow( 
    name = rs.string("name"), 
    description = rs.string("description"), 
    birthday = None, 
    createdAt = None, 
    picture = _root_.com.google.protobuf.ByteString.EMPTY 
  ) 
  override def getMembers: Flow[JDBCQuery, MemberRow, NotUsed] = { 
    logger.info("**** getMembers called on service side ***") 
    Flow[JDBCQuery] 
      .flatMapConcat { q => 
        //unpack JDBCQuery and construct the context 
        var params: Seq[Any] =  Nil 
        if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY) 
           params = unmarshal[Seq[Any]](q.parameters) 
        logger.info(s"**** query parameters: ${params} ****") 
        val ctx = JDBCQueryContext[MemberRow]( 
          dbName = Symbol(q.dbName), 
          statement = q.statement, 
          parameters = params, 
          fetchSize = q.fetchSize.getOrElse(100), 
          autoCommit = q.autoCommit.getOrElse(false), 
          queryTimeout = q.queryTimeout 
        ) 
        jdbcAkkaStream(ctx, toMemberRow) 
      } 
  } 
override def updateRows: Flow[MemberRow, JDBCResult, NotUsed] = { 
    logger.info("**** updateRows called on service side ***") 
    val updateSQL = "update members set description = ?, created_at = ? where name = ?" 
 
    Flow[MemberRow] 
      .flatMapConcat { row => 
        val ctx = JDBCContext('h2) 
            .setBatchCommand(updateSQL) 
            .appendBatchParameters( 
              row.name + " updated.", 
              jdbcSetNow, 
              row.name 
            ).setBatchReturnGeneratedKeyOption(true) 
 
        logger.info(s"**** JDBCContext => ${ctx} ***") 
 
        Source 
          .fromFuture(jdbcBatchUpdate[Vector](ctx)) 
          .map { r => JDBCResult(marshal(r)) } 
      } 
  }

jdbcBatchUpdate函数的源代码在附件JDBCEngine.scala中。客户端代码如下:

  val queryMember = JDBCQuery ( 
    dbName = "h2", 
    statement = "select * from members" 
  ) 
  def updateRows: Source[JDBCResult,NotUsed] = { 
    logger.info(s"running updateRows ...") 
    Source 
      .single(queryMember) 
      .via(stub.getMembers) 
      .via(stub.updateRows) 
  }

下面的例子示范了如何利用JDBCActionStream来批量处理数据。服务端的源代码如下:

  val params: JDBCDataRow => Seq[Any] = row => { 
    Seq((row.value.toInt * 2), row.state, row.county, row.year) } 
  val sql = "update AQMRPT set total = ? where statename = ? and countyname = ? and reportyear = ?" 
 
  val jdbcActionStream = JDBCActionStream('h2,sql ,params) 
    .setParallelism(4).setProcessOrder(false) 
  val jdbcActionFlow = jdbcActionStream.performOnRow 
 
  override def updateBat: Flow[JDBCDataRow, JDBCDataRow, NotUsed] = { 
    logger.info("**** updateBat called on service side ***") 
    Flow[JDBCDataRow] 
         .via(jdbcActionFlow) 
  }

jdbcActionFlow是一个Flow[R,R,_],所以我们直接用via把它连接到上一个Flow。下面是JDBCActionStream的定义代码:

  case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true, 
                                 statement: String, prepareParams: R => Seq[Any]) { 
    jas => 
    def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db) 
    def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel) 
    def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered) 
 
    private def perform(r: R)(implicit ec: ExecutionContextExecutor) = { 
      import scala.concurrent._ 
      val params = prepareParams(r) 
      Future { 
        NamedDB(dbName) autoCommit { session => 
          session.execute(statement, params: _*) 
        } 
        r 
      } 
    } 
    def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] = 
      if (processInOrder) 
        Flow[R].mapAsync(parallelism)(perform) 
      else 
        Flow[R].mapAsyncUnordered(parallelism)(perform) 
 
  } 
  object JDBCActionStream { 
    def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] = 
      new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params) 
  }

函数performOnRow是个passthrough处理过程,使用了mapAsync来支持多线程运算。客户端调用方式如下:

  def updateBatches: Source[JDBCDataRow,NotUsed] = { 
    logger.info(s"running updateBatches ...") 
      Source 
        .fromIterator(() => List(query,query2,query3).toIterator) 
        .via(stub.batQuery) 
        .via(stub.updateBat) 
 
  }

下面是本次示范的完整源代码:

jdbc.proto

syntax = "proto3"; 
 
import "google/protobuf/wrappers.proto"; 
import "google/protobuf/any.proto"; 
import "scalapb/scalapb.proto"; 
 
 
package grpc.jdbc.services; 
 
option (scalapb.options) = { 
  // use a custom Scala package name 
  // package_name: "io.ontherocks.introgrpc.demo" 
 
  // don't append file name to package 
  flat_package: true 
 
  // generate one Scala file for all messages (services still get their own file) 
  single_file: true 
 
  // add imports to generated file 
  // useful when extending traits or using custom types 
  // import: "io.ontherocks.hellogrpc.RockingMessage" 
 
  // code to put at the top of generated file 
  // works only with `single_file: true` 
  //preamble: "sealed trait SomeSealedTrait" 
}; 
 
/* 
 * Demoes various customization options provided by ScalaPBs. 
 */ 
 
message JDBCDataRow { 
 string year = 1; 
 string state = 2; 
 string county = 3; 
 string value = 4; 
} 
 
 
message JDBCQuery { 
  string dbName = 1; 
  string statement = 2; 
  bytes parameters = 3; 
  google.protobuf.Int32Value fetchSize= 4; 
  google.protobuf.BoolValue autoCommit = 5; 
  google.protobuf.Int32Value queryTimeout = 6; 
} 
 
message JDBCResult { 
  bytes result = 1; 
} 
 
message JDBCUpdate { 
  string dbName = 1; 
  repeated string statements = 2; 
  bytes parameters = 3; 
  google.protobuf.Int32Value fetchSize= 4; 
  google.protobuf.Int32Value queryTimeout = 5; 
  int32 sqlType = 6; 
  google.protobuf.Int32Value batch = 7; 
  bytes returnGeneratedKey = 8; 
} 
 
message JDBCDate { 
  int32 yyyy = 1; 
  int32 mm   = 2; 
  int32 dd   = 3; 
} 
 
message JDBCTime { 
  int32 hh   = 1; 
  int32 mm   = 2; 
  int32 ss   = 3; 
  int32 nnn  = 4; 
} 
 
message JDBCDateTime { 
   JDBCDate date = 1; 
   JDBCTime time = 2; 
} 
 
message MemberRow { 
  string name = 1; 
  JDBCDate birthday = 2; 
  string description = 3; 
  JDBCDateTime created_at = 4; 
  bytes picture = 5; 
} 
 
service JDBCServices { 
  rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {} 
  rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {} 
  rpc runDDL(JDBCUpdate) returns (JDBCResult) {} 
  rpc insertRows(stream MemberRow) returns(JDBCResult) {} 
  rpc updateRows(stream MemberRow) returns(JDBCResult) {} 
  rpc getMembers(JDBCQuery) returns (stream MemberRow) {} 
}

JDBCEngine.scala

package sdp.jdbc.engine 
import java.sql.PreparedStatement 
import scala.collection.generic.CanBuildFrom 
import akka.stream.scaladsl._ 
import scalikejdbc._ 
import scalikejdbc.streams._ 
import akka.NotUsed 
import akka.stream._ 
import java.time._ 
import scala.concurrent.duration._ 
import scala.concurrent._ 
import sdp.jdbc.FileStreaming._ 
import scalikejdbc.TxBoundary.Try._ 
import scala.concurrent.ExecutionContextExecutor 
import java.io.InputStream 
object JDBCContext { 
type SQLTYPE = Int 
val SQL_EXEDDL= 1 
val SQL_UPDATE = 2 
val RETURN_GENERATED_KEYVALUE = true 
val RETURN_UPDATED_COUNT = false 
} 
case class JDBCQueryContext[M]( 
dbName: Symbol, 
statement: String, 
parameters: Seq[Any] = Nil, 
fetchSize: Int = 100, 
autoCommit: Boolean = false, 
queryTimeout: Option[Int] = None) 
case class JDBCContext( 
dbName: Symbol, 
statements: Seq[String] = Nil, 
parameters: Seq[Seq[Any]] = Nil, 
fetchSize: Int = 100, 
queryTimeout: Option[Int] = None, 
queryTags: Seq[String] = Nil, 
sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE, 
batch: Boolean = false, 
returnGeneratedKey: Seq[Option[Any]] = Nil, 
// no return: None, return by index: Some(1), by name: Some("id") 
preAction: Option[PreparedStatement => Unit] = None, 
postAction: Option[PreparedStatement => Unit] = None) { 
ctx => 
//helper functions 
 
def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag) 
def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags) 
def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size) 
def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time) 
def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = { 
if (ctx.sqlType == JDBCContext.SQL_UPDATE && 
!ctx.batch && ctx.statements.size == 1) 
ctx.copy(preAction = action) 
else 
throw new IllegalStateException("JDBCContex setting error: preAction not supported!") 
} 
def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = { 
if (ctx.sqlType == JDBCContext.SQL_UPDATE && 
!ctx.batch && ctx.statements.size == 1) 
ctx.copy(postAction = action) 
else 
throw new IllegalStateException("JDBCContex setting error: preAction not supported!") 
} 
def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { 
if (ctx.sqlType == JDBCContext.SQL_EXEDDL) { 
ctx.copy( 
statements = ctx.statements ++ Seq(_statement), 
parameters = ctx.parameters ++ Seq(Seq(_parameters)) 
) 
} else 
throw new IllegalStateException("JDBCContex setting error: option not supported!") 
} 
def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = { 
if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) { 
ctx.copy( 
statements = ctx.statements ++ Seq(_statement), 
parameters = ctx.parameters ++ Seq(_parameters), 
returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None)) 
) 
} else 
throw new IllegalStateException("JDBCContex setting error: option not supported!") 
} 
def appendBatchParameters(_parameters: Any*): JDBCContext = { 
if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) 
throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") 
var matchParams = true 
if (ctx.parameters != Nil) 
if (ctx.parameters.head.size != _parameters.size) 
matchParams = false 
if (matchParams) { 
ctx.copy( 
parameters = ctx.parameters ++ Seq(_parameters) 
) 
} else 
throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!") 
} 
def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = { 
if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) 
throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!") 
ctx.copy( 
returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil 
) 
} 
def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { 
ctx.copy( 
statements = Seq(_statement), 
parameters = Seq(_parameters), 
sqlType = JDBCContext.SQL_EXEDDL, 
batch = false 
) 
} 
def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = { 
ctx.copy( 
statements = Seq(_statement), 
parameters = Seq(_parameters), 
returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None), 
sqlType = JDBCContext.SQL_UPDATE, 
batch = false 
) 
} 
def setBatchCommand(_statement: String): JDBCContext = { 
ctx.copy ( 
statements = Seq(_statement), 
sqlType = JDBCContext.SQL_UPDATE, 
batch = true 
) 
} 
} 
object JDBCEngine { 
import JDBCContext._ 
type JDBCDate = LocalDate 
type JDBCDateTime = LocalDateTime 
type JDBCTime = LocalTime 
def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd) 
def jdbcSetTime(hh: Int, mm: Int, ss: Int, nn: Int) = LocalTime.of(hh,mm,ss,nn) 
def jdbcSetDateTime(date: JDBCDate, time: JDBCTime) =  LocalDateTime.of(date,time) 
def jdbcSetNow = LocalDateTime.now() 
type JDBCBlob = InputStream 
def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
implicit mat: Materializer) = FileToInputStream(fileName,timeOut) 
def jdbcBlobToFile(blob: JDBCBlob, fileName: String)( 
implicit mat: Materializer) =  InputStreamToFile(blob,fileName) 
private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) => 
throw new IllegalStateException(message) 
} 
def jdbcAkkaStream[A](ctx: JDBCQueryContext[A],extractor: WrappedResultSet => A) 
(implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = { 
val publisher: DatabasePublisher[A] = NamedDB(ctx.dbName) readOnlyStream { 
val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) 
ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) 
val sql: SQL[A, HasExtractor] = rawSql.map(extractor) 
sql.iterator 
.withDBSessionForceAdjuster(session => { 
session.connection.setAutoCommit(ctx.autoCommit) 
session.fetchSize(ctx.fetchSize) 
}) 
} 
Source.fromPublisher[A](publisher) 
} 
def jdbcQueryResult[C[_] <: TraversableOnce[_], A](ctx: JDBCQueryContext[A], 
extractor: WrappedResultSet => A)( 
implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { 
val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) 
ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) 
rawSql.fetchSize(ctx.fetchSize) 
implicit val session = NamedAutoSession(ctx.dbName) 
val sql: SQL[A, HasExtractor] = rawSql.map(extractor) 
sql.collection.apply[C]() 
} 
def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = { 
if (ctx.sqlType != SQL_EXEDDL) { 
Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")) 
} 
else { 
Future { 
NamedDB(ctx.dbName) localTx { implicit session => 
ctx.statements.foreach { stm => 
val ddl = new SQLExecution(statement = stm, parameters = Nil)( 
before = WrappedResultSet => {})( 
after = WrappedResultSet => {}) 
ddl.apply() 
} 
"SQL_EXEDDL executed succesfully." 
} 
} 
} 
} 
def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
implicit ec: ExecutionContextExecutor, 
cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { 
if (ctx.statements == Nil) 
Future.failed ( new IllegalStateException("JDBCContex setting error: statements empty!")) 
if (ctx.sqlType != SQL_UPDATE) { 
Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) 
} 
else { 
if (ctx.batch) { 
if (noReturnKey(ctx)) { 
val usql = SQL(ctx.statements.head) 
.tags(ctx.queryTags: _*) 
.batch(ctx.parameters: _*) 
Future { 
NamedDB(ctx.dbName) localTx { implicit session => 
ctx.queryTimeout.foreach(session.queryTimeout(_)) 
usql.apply[Seq]() 
Seq.empty[Long].to[C] 
} 
} 
} else { 
val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None) 
Future { 
NamedDB(ctx.dbName) localTx { implicit session => 
ctx.queryTimeout.foreach(session.queryTimeout(_)) 
usql.apply[C]() 
} 
} 
} 
} else { 
Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = true !")) 
} 
} 
} 
private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
implicit ec: ExecutionContextExecutor, 
cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { 
val Some(key) :: xs = ctx.returnGeneratedKey 
val params: Seq[Any] = ctx.parameters match { 
case Nil => Nil 
case p@_ => p.head 
} 
val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key) 
Future { 
NamedDB(ctx.dbName) localTx { implicit session => 
session.fetchSize(ctx.fetchSize) 
ctx.queryTimeout.foreach(session.queryTimeout(_)) 
val result = usql.apply() 
Seq(result).to[C] 
} 
} 
} 
private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
implicit ec: ExecutionContextExecutor, 
cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { 
val params: Seq[Any] = ctx.parameters match { 
case Nil => Nil 
case p@_ => p.head 
} 
val before = ctx.preAction match { 
case None => pstm: PreparedStatement => {} 
case Some(f) => f 
} 
val after = ctx.postAction match { 
case None => pstm: PreparedStatement => {} 
case Some(f) => f 
} 
val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after) 
Future { 
NamedDB(ctx.dbName) localTx {implicit session => 
session.fetchSize(ctx.fetchSize) 
ctx.queryTimeout.foreach(session.queryTimeout(_)) 
val result = usql.apply() 
Seq(result.toLong).to[C] 
} 
} 
} 
private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
implicit ec: ExecutionContextExecutor, 
cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { 
if (noReturnKey(ctx)) 
singleTxUpdateNoReturnKey(ctx) 
else 
singleTxUpdateWithReturnKey(ctx) 
} 
private def noReturnKey(ctx: JDBCContext): Boolean = { 
if (ctx.returnGeneratedKey != Nil) { 
val k :: xs = ctx.returnGeneratedKey 
k match { 
case None => true 
case Some(k) => false 
} 
} else true 
} 
def noActon: PreparedStatement=>Unit = pstm => {} 
def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
implicit ec: ExecutionContextExecutor, 
cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { 
Future { 
NamedDB(ctx.dbName) localTx { implicit session => 
session.fetchSize(ctx.fetchSize) 
ctx.queryTimeout.foreach(session.queryTimeout(_)) 
val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match { 
case Nil => Seq.fill(ctx.statements.size)(None) 
case k@_ => k 
} 
val sqlcmd = ctx.statements zip ctx.parameters zip keys 
val results = sqlcmd.map { case ((stm, param), key) => 
key match { 
case None => 
new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong 
case Some(k) => 
new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong 
} 
} 
results.to[C] 
} 
} 
} 
def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
implicit ec: ExecutionContextExecutor, 
cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = { 
if (ctx.statements == Nil) 
Future.failed( new IllegalStateException("JDBCContex setting error: statements empty!")) 
if (ctx.sqlType != SQL_UPDATE) { 
Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) 
} 
else { 
if (!ctx.batch) { 
if (ctx.statements.size == 1) 
singleTxUpdate(ctx) 
else 
multiTxUpdates(ctx) 
} else 
Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = false !")) 
} 
} 
case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true, 
statement: String, prepareParams: R => Seq[Any]) { 
jas => 
def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db) 
def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel) 
def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered) 
private def perform(r: R)(implicit ec: ExecutionContextExecutor) = { 
import scala.concurrent._ 
val params = prepareParams(r) 
Future { 
NamedDB(dbName) autoCommit { session => 
session.execute(statement, params: _*) 
} 
r 
} 
// Future.successful(r) 
    } 
def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] = 
if (processInOrder) 
Flow[R].mapAsync(parallelism)(perform) 
else 
Flow[R].mapAsyncUnordered(parallelism)(perform) 
} 
object JDBCActionStream { 
def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] = 
new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params) 
} 
}

JDBCService.scala

package demo.grpc.jdbc.services 
import akka.NotUsed 
import akka.stream.scaladsl.{Source,Flow} 
import grpc.jdbc.services._ 
import java.util.logging.Logger 
import protobuf.bytes.Converter._ 
import sdp.jdbc.engine._ 
import JDBCEngine._ 
import scalikejdbc.WrappedResultSet 
import scala.concurrent.ExecutionContextExecutor 
class JDBCStreamingServices(implicit ec: ExecutionContextExecutor) extends JdbcGrpcAkkaStream.JDBCServices { 
val logger = Logger.getLogger(classOf[JDBCStreamingServices].getName) 
val toRow = (rs: WrappedResultSet) => JDBCDataRow( 
year = rs.string("REPORTYEAR"), 
state = rs.string("STATENAME"), 
county = rs.string("COUNTYNAME"), 
value = rs.string("VALUE") 
) 
override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = { 
logger.info("**** runQuery called on service side ***") 
Flow[JDBCQuery] 
.flatMapConcat { q => 
//unpack JDBCQuery and construct the context 
var params: Seq[Any] =  Nil 
if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY) 
params = unmarshal[Seq[Any]](q.parameters) 
logger.info(s"**** query parameters: ${params} ****") 
val ctx = JDBCQueryContext[JDBCDataRow]( 
dbName = Symbol(q.dbName), 
statement = q.statement, 
parameters = params, 
fetchSize = q.fetchSize.getOrElse(100), 
autoCommit = q.autoCommit.getOrElse(false), 
queryTimeout = q.queryTimeout 
) 
jdbcAkkaStream(ctx, toRow) 
} 
} 
override def batQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = runQuery 
override def runDDL: Flow[JDBCUpdate, JDBCResult, NotUsed] = { 
logger.info("**** runDDL called on service side ***") 
Flow[JDBCUpdate] 
.flatMapConcat { context => 
//unpack JDBCUpdate and construct the context 
 
val ctx = JDBCContext( 
dbName = Symbol(context.dbName), 
statements = context.statements, 
sqlType = JDBCContext.SQL_EXEDDL, 
queryTimeout = context.queryTimeout 
) 
logger.info(s"**** JDBCContext => ${ctx} ***") 
Source 
.fromFuture(jdbcExecuteDDL(ctx)) 
.map { r => JDBCResult(marshal(r)) } 
} 
} 
override def insertRows: Flow[MemberRow, JDBCResult, NotUsed] = { 
logger.info("**** insertRows called on service side ***") 
val insertSQL = """ 
      insert into members( 
name, 
birthday, 
description, 
created_at 
) values (?, ?, ?, ?) 
""" 
    Flow[MemberRow] 
.flatMapConcat { row => 
val ctx = JDBCContext('h2) 
.setUpdateCommand(true,insertSQL, 
row.name, 
jdbcSetDate(row.birthday.get.yyyy,row.birthday.get.mm,row.birthday.get.dd), 
row.description, 
jdbcSetNow 
) 
logger.info(s"**** JDBCContext => ${ctx} ***") 
Source 
.fromFuture(jdbcTxUpdates[Vector](ctx)) 
.map { r => JDBCResult(marshal(r)) } 
} 
} 
override def updateRows: Flow[MemberRow, JDBCResult, NotUsed] = { 
logger.info("**** updateRows called on service side ***") 
val updateSQL = "update members set description = ?, created_at = ? where name = ?" 
Flow[MemberRow] 
.flatMapConcat { row => 
val ctx = JDBCContext('h2) 
            .setBatchCommand(updateSQL) 
.appendBatchParameters( 
row.name + " updated.", 
jdbcSetNow, 
row.name 
).setBatchReturnGeneratedKeyOption(true) 
logger.info(s"**** JDBCContext => ${ctx} ***") 
Source 
.fromFuture(jdbcBatchUpdate[Vector](ctx)) 
.map { r => JDBCResult(marshal(r)) } 
} 
} 
val toMemberRow = (rs: WrappedResultSet) => MemberRow( 
name = rs.string("name"), 
description = rs.string("description"), 
birthday = None, 
createdAt = None, 
picture = _root_.com.google.protobuf.ByteString.EMPTY 
) 
override def getMembers: Flow[JDBCQuery, MemberRow, NotUsed] = { 
logger.info("**** getMembers called on service side ***") 
Flow[JDBCQuery] 
.flatMapConcat { q => 
//unpack JDBCQuery and construct the context 
var params: Seq[Any] =  Nil 
if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY) 
params = unmarshal[Seq[Any]](q.parameters) 
logger.info(s"**** query parameters: ${params} ****") 
val ctx = JDBCQueryContext[MemberRow]( 
dbName = Symbol(q.dbName), 
statement = q.statement, 
parameters = params, 
fetchSize = q.fetchSize.getOrElse(100), 
autoCommit = q.autoCommit.getOrElse(false), 
queryTimeout = q.queryTimeout 
) 
jdbcAkkaStream(ctx, toMemberRow) 
} 
} 
}

JDBCServer.scala

package demo.grpc.jdbc.server 
import java.util.logging.Logger 
import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import io.grpc.Server 
import demo.grpc.jdbc.services._ 
import io.grpc.ServerBuilder 
import grpc.jdbc.services._ 
class gRPCServer(server: Server) { 
val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName) 
def start(): Unit = { 
server.start() 
logger.info(s"Server started, listening on ${server.getPort}") 
sys.addShutdownHook { 
// Use stderr here since the logger may has been reset by its JVM shutdown hook. 
System.err.println("*** shutting down gRPC server since JVM is shutting down") 
stop() 
System.err.println("*** server shut down") 
} 
() 
} 
def stop(): Unit = { 
server.shutdown() 
} 
/** 
* Await termination on the main thread since the grpc library uses daemon threads. 
*/ 
def blockUntilShutdown(): Unit = { 
server.awaitTermination() 
} 
} 
object JDBCServer extends App { 
import sdp.jdbc.config._ 
implicit val system = ActorSystem("JDBCServer") 
implicit val mat = ActorMaterializer.create(system) 
implicit val ec = system.dispatcher 
ConfigDBsWithEnv("dev").setup('h2) 
ConfigDBsWithEnv("dev").loadGlobalSettings() 
val server = new gRPCServer( 
ServerBuilder 
.forPort(50051) 
.addService( 
JdbcGrpcAkkaStream.bindService( 
new JDBCStreamingServices 
) 
).build() 
) 
server.start() 
//  server.blockUntilShutdown() 
  scala.io.StdIn.readLine() 
ConfigDBsWithEnv("dev").close('h2) 
  mat.shutdown() 
system.terminate() 
}

JDBCClient.scala

package demo.grpc.jdbc.client 
import grpc.jdbc.services._ 
import java.util.logging.Logger 
import protobuf.bytes.Converter._ 
import akka.stream.scaladsl._ 
import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, ThrottleMode} 
import io.grpc._ 
import sdp.jdbc.engine._ 
class JDBCStreamClient(host: String, port: Int) { 
val logger: Logger = Logger.getLogger(classOf[JDBCStreamClient].getName) 
val channel = ManagedChannelBuilder 
.forAddress(host,port) 
.usePlaintext(true) 
.build() 
val stub = JdbcGrpcAkkaStream.stub(channel) 
val query = JDBCQuery ( 
dbName = "h2", 
statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?", 
parameters = marshal(Seq("Arizona", 2)) 
) 
val query2 = JDBCQuery ( 
dbName = "h2", 
statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?", 
parameters = marshal(Seq("Colorado", 3)) 
) 
val query3= JDBCQuery ( 
dbName = "h2", 
statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?", 
parameters = marshal(Seq("Arkansas", 8)) 
) 
def queryRows: Source[JDBCDataRow,NotUsed] = { 
logger.info(s"running queryRows ...") 
Source 
.single(query) 
.via(stub.runQuery) 
} 
def batQueryRows: Source[JDBCDataRow,NotUsed] = { 
logger.info(s"running batQueryRows ...") 
Source 
.fromIterator(() => List(query,query2,query3).toIterator) 
.via(stub.batQuery) 
} 
val dropSQL: String =""" 
      drop table members 
""" 
 
val createSQL: String =""" 
    create table members ( 
id serial not null primary key, 
name varchar(30) not null, 
description varchar(1000), 
birthday date, 
created_at timestamp not null, 
picture blob 
)""" 
  val ctx = JDBCUpdate ( 
dbName = "h2", 
sqlType = JDBCContext.SQL_EXEDDL, 
statements = Seq(dropSQL,createSQL) 
) 
def createTbl: Source[JDBCResult,NotUsed] = { 
logger.info(s"running createTbl ...") 
Source 
.single(ctx) 
.via(stub.runDDL) 
} 
val p1 = MemberRow( "Peter Chan",Some(JDBCDate(1967,5,17)),"new member1",None,_root_.com.google.protobuf.ByteString.EMPTY) 
val p2 = MemberRow( "Alanda Wong",Some(JDBCDate(1980,11,10)),"new member2",None,_root_.com.google.protobuf.ByteString.EMPTY) 
val p3 = MemberRow( "Kate Zhang",Some(JDBCDate(1969,8,13)),"new member3",None,_root_.com.google.protobuf.ByteString.EMPTY) 
val p4 = MemberRow( "Tiger Chan",Some(JDBCDate(1962,5,1)),"new member4",None,_root_.com.google.protobuf.ByteString.EMPTY) 
def insertRows: Source[JDBCResult,NotUsed] = { 
logger.info(s"running insertRows ...") 
Source 
.fromIterator(() => List(p1,p2,p3,p4).toIterator) 
.via(stub.insertRows) 
} 
val queryMember = JDBCQuery ( 
dbName = "h2", 
statement = "select * from members" 
) 
def updateRows: Source[JDBCResult,NotUsed] = { 
logger.info(s"running updateRows ...") 
Source 
.single(queryMember) 
.via(stub.getMembers) 
.via(stub.updateRows) 
} 
def updateBatches: Source[JDBCDataRow,NotUsed] = { 
logger.info(s"running updateBatches ...") 
Source 
.fromIterator(() => List(query,query2,query3).toIterator) 
.via(stub.batQuery) 
.via(stub.updateBat) 
} 
} 
object TestConversion extends App { 
val orgval: Seq[Option[Any]] = Seq(Some(1),Some("id"),None,Some(2)) 
println(s"original value: ${orgval}") 
val marval = marshal(orgval) 
println(s"marshal value: ${marval}") 
val unmval = unmarshal[Seq[Option[Any]]](marval) 
println(s"marshal value: ${unmval}") 
val m1 = MemberRow(name = "Peter") 
val m2 = m1.update( 
_.birthday.yyyy := 1989, 
_.birthday.mm := 10, 
_.birthday.dd := 3, 
_.description := "a new member" 
) 
} 
object QueryRows extends App { 
implicit val system = ActorSystem("QueryRows") 
implicit val mat = ActorMaterializer.create(system) 
val client = new JDBCStreamClient("localhost", 50051) 
client.queryRows.runForeach { r => println(r) } 
scala.io.StdIn.readLine() 
mat.shutdown() 
system.terminate() 
} 
object BatQueryRows extends App { 
implicit val system = ActorSystem("BatQueryRows") 
implicit val mat = ActorMaterializer.create(system) 
val client = new JDBCStreamClient("localhost", 50051) 
client.batQueryRows.runForeach(println) 
scala.io.StdIn.readLine() 
mat.shutdown() 
system.terminate() 
} 
object RunDDL extends App { 
implicit val system = ActorSystem("RunDDL") 
implicit val mat = ActorMaterializer.create(system) 
val client = new JDBCStreamClient("localhost", 50051) 
client.createTbl.runForeach{r => println(unmarshal[Seq[Any]](r.result))} 
scala.io.StdIn.readLine() 
mat.shutdown() 
system.terminate() 
} 
object InsertRows extends App { 
implicit val system = ActorSystem("InsertRows") 
implicit val mat = ActorMaterializer.create(system) 
val client = new JDBCStreamClient("localhost", 50051) 
client.insertRows.runForeach { r => println(unmarshal[Vector[Long]](r.result)) } 
scala.io.StdIn.readLine() 
mat.shutdown() 
system.terminate() 
} 
object UpdateRows extends App { 
implicit val system = ActorSystem("UpdateRows") 
implicit val mat = ActorMaterializer.create(system) 
val client = new JDBCStreamClient("localhost", 50051) 
client.updateRows.runForeach{ r => println(unmarshal[Vector[Long]](r.result)) } 
scala.io.StdIn.readLine() 
mat.shutdown() 
system.terminate() 
} 
object BatUpdates extends App { 
implicit val system = ActorSystem("BatUpdates") 
implicit val mat = ActorMaterializer.create(system) 
val client = new JDBCStreamClient("localhost", 50051) 
client.updateBatches.runForeach(println) 
scala.io.StdIn.readLine() 
mat.shutdown() 
system.terminate() 
}

ByteConverter.scala

package protobuf.bytes 
import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream} 
import com.google.protobuf.ByteString 
object Converter { 
def marshal(value: Any): ByteString = { 
val stream: ByteArrayOutputStream = new ByteArrayOutputStream() 
val oos = new ObjectOutputStream(stream) 
oos.writeObject(value) 
oos.close() 
ByteString.copyFrom(stream.toByteArray()) 
} 
def unmarshal[A](bytes: ByteString): A = { 
val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray)) 
val value = ois.readObject() 
ois.close() 
value.asInstanceOf[A] 
} 
}

其它部分的源代码和系统设置可以从上次的讨论稿中获取。

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/12796.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论