SDP(12): MongoDB-Engine – Streaming详解编程语言

   在akka-alpakka工具包里也提供了对MongoDB的stream-connector,能针对MongoDB数据库进行streaming操作。这个MongoDB-connector里包含了MongoSource,MongoFlow,MongoSink。我们只使用MongoSource,其它两个我们直接用mapAsyc来创造。下面是MongoSource的定义:

object MongoSource { 
 
  def apply(query: Observable[Document]): Source[Document, NotUsed] = 
    Source.fromPublisher(ObservableToPublisher(query)) 
 
}

实际上就是把Mongo-scala的Observable[Document]转成Source[Document, NotUsed]。我们还是通过传入context来构建这个Source:

  case class MGOContext( 
                         dbName: String, 
                         collName: String, 
                         action: MGOCommands = null 
                       ) {...} 
   case class DocumentStream(filter: Option[Bson] = None, 
                              andThen: Option[FindObservable[Document] => FindObservable[Document]] = None, 
                             ) extends MGOCommands

Source的具体实现:

    def mongoStream(ctx: MGOContext)( 
      implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = { 
      val db = client.getDatabase(ctx.dbName) 
      val coll = db.getCollection(ctx.collName) 
      ctx.action match { 
        case DocumentStream(None, None) => 
          MongoSource(coll.find()) 
        case DocumentStream(Some(filter), None) => 
          MongoSource(coll.find(filter)) 
        case DocumentStream(None, Some(next)) => 
          MongoSource(next(coll.find())) 
        case DocumentStream(Some(filter), Some(next)) => 
          MongoSource(next(coll.find(filter))) 
      } 
    }

下面是mongoStream的使用示范:

  val clusterSettings = ClusterSettings.builder() 
    .hosts(List(new ServerAddress("localhost:27017")).asJava).build() 
  val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build() 
  implicit val client = MongoClient(clientSettings) 
 
  implicit val system = ActorSystem() 
  implicit val mat = ActorMaterializer() 
  implicit val ec = system.dispatcher 
 
  case class PO ( 
                  ponum: String, 
                  podate: MGODate, 
                  vendor: String, 
                  remarks: Option[String], 
                  podtl: Option[MGOArray] 
                ) 
  def toPO(doc: Document): PO = { 
    PO( 
      ponum = doc.getString("ponum"), 
      podate = doc.getDate("podate"), 
      vendor = doc.getString("vendor"), 
      remarks = mgoGetStringOrNone(doc,"remarks"), 
      podtl = mgoGetArrayOrNone(doc,"podtl") 
    ) 
  } 
  case class PODTL( 
                    item: String, 
                    price: Double, 
                    qty: Int, 
                    packing: Option[String], 
                    payTerm: Option[String] 
                  ) 
  def toPODTL(podtl: Document): PODTL = { 
    PODTL( 
      item = podtl.getString("item"), 
      price = podtl.getDouble("price"), 
      qty = podtl.getInteger("qty"), 
      packing = mgoGetStringOrNone(podtl,"packing"), 
      payTerm = mgoGetStringOrNone(podtl,"payterm") 
    ) 
  } 
 
  def showPO(po: PO) = { 
    println(s"po number: ${po.ponum}") 
    println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}") 
    println(s"vendor: ${po.vendor}") 
    if (po.remarks != None) 
      println(s"remarks: ${po.remarks.get}") 
    po.podtl match { 
      case Some(barr) => 
        mgoArrayToDocumentList(barr) 
          .map { dc => toPODTL(dc)} 
          .foreach { doc: PODTL => 
            print(s"==>Item: ${doc.item} ") 
            print(s"price: ${doc.price} ") 
            print(s"qty: ${doc.qty} ") 
            doc.packing.foreach(pk => print(s"packing: ${pk} ")) 
            doc.payTerm.foreach(pt => print(s"payTerm: ${pt} ")) 
            println("") 
          } 
      case _ => 
    } 
  } 
  import org.mongodb.scala.model.Projections._ 
  import MongoActionStream._ 
  import MGOEngine._ 
  import akka.stream.scaladsl.{Sink, Source} 
 
  val proj: MGOFilterResult = find => find.projection(exclude("handler","_id")) 
  val ctx = MGOContext("testdb","po").setCommand( 
    DocumentStream(filter = None, andThen = Some(proj))) 
 
 
  val stream = mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO)) 
 
  println(getResult(mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO))))

我们看到:使用了许多代码去进行类型转换。不过也没有什么太好的办法,已经是一次性的了。我们也可以通过akka的Flow[A,B]来以stream里的A元素为变量对MongoDB数据进行更新操作:

 object MongoActionStream { 
 
    import MGOContext._ 
 
    case class StreamingInsert[A](dbName: String, 
                                  collName: String, 
                                  converter: A => Document, 
                                  parallelism: Int = 1 
                                 ) extends MGOCommands 
 
    case class StreamingDelete[A](dbName: String, 
                                  collName: String, 
                                  toFilter: A => Bson, 
                                  parallelism: Int = 1, 
                                  justOne: Boolean = false 
                                 ) extends MGOCommands 
 
    case class StreamingUpdate[A](dbName: String, 
                                  collName: String, 
                                  toFilter: A => Bson, 
                                  toUpdate: A => Bson, 
                                  parallelism: Int = 1, 
                                  justOne: Boolean = false 
                                 ) extends MGOCommands 
 
 
    case class InsertAction[A](ctx: StreamingInsert[A])( 
      implicit mongoClient: MongoClient) { 
 
      val database = mongoClient.getDatabase(ctx.dbName) 
      val collection = database.getCollection(ctx.collName) 
 
      def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] = 
        Flow[A].map(ctx.converter) 
          .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc)) 
    } 
 
    case class UpdateAction[A](ctx: StreamingUpdate[A])( 
      implicit mongoClient: MongoClient) { 
 
      val database = mongoClient.getDatabase(ctx.dbName) 
      val collection = database.getCollection(ctx.collName) 
 
      def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] = 
        if (ctx.justOne) { 
          Flow[A] 
            .mapAsync(ctx.parallelism)(a => 
              collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a)) 
        } else 
          Flow[A] 
            .mapAsync(ctx.parallelism)(a => 
              collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a)) 
    } 
 
 
    case class DeleteAction[A](ctx: StreamingDelete[A])( 
      implicit mongoClient: MongoClient) { 
 
      val database = mongoClient.getDatabase(ctx.dbName) 
      val collection = database.getCollection(ctx.collName) 
 
      def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] = 
        if (ctx.justOne) { 
          Flow[A] 
            .mapAsync(ctx.parallelism)(a => 
              collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a)) 
        } else 
          Flow[A] 
            .mapAsync(ctx.parallelism)(a => 
              collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a)) 
    } 
 
  }

下面是insert, update及delete操作的示范。在这个示范里我们同时调用了JDBCEngine,CassandraEngine和MongoDBEngine:

  import jdbcengine._ 
  import JDBCEngine._ 
  import scalikejdbc._ 
 
  case class DataRow ( 
                       rowid: Long, 
                       measureid: Long, 
                       state: String, 
                       county: String, 
                       year: Int, 
                       value: Int 
                     ) 
  val toRow: WrappedResultSet => DataRow = rs => DataRow( 
    rowid = rs.long("ROWID"), 
    measureid = rs.long("MEASUREID"), 
    state = rs.string("STATENAME"), 
    county = rs.string("COUNTYNAME"), 
    year = rs.int("REPORTYEAR"), 
    value = rs.int("VALUE") 
  ) 
 
  //construct the context 
  val h2ctx = JDBCQueryContext[DataRow]( 
    dbName = 'h2, 
    statement = "select * from AQMRPT", 
    extractor = toRow 
  ) 
 
  //source from h2 database 
  val jdbcSource = jdbcAkkaStream(h2ctx) 
 
  //document converter 
  def rowToDoc: DataRow => Document = row => Document ( 
    "rowid" -> row.rowid, 
    "measureid" ->  row.measureid, 
    "state" ->  row.state, 
    "county" ->  row.county, 
    "year" ->  row.year, 
    "value" ->  row.value 
  ) 
  def docToRow: Document => DataRow = doc => DataRow ( 
    rowid = doc.getLong("rowid"), 
    measureid = doc.getLong("measureid"), 
    state = doc.getString("state"), 
    county = doc.getString("county"), 
    year = doc.getInteger("year"), 
    value = doc.getInteger("value") 
  ) 
  //setup context 
  val mgoctx = StreamingInsert("testdb","members",rowToDoc) 
  val mgoActionStream = new MongoActionStream.InsertAction[DataRow](mgoctx) 
  val mgoActionFlow = mgoActionStream.performOnRow.map(docToRow) 
  val sink = Sink.foreach[DataRow]{ r => 
    println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") 
  } 
 
  //config jdbc drivers 
  ConfigDBsWithEnv("dev").setup('h2) 
  ConfigDBsWithEnv("dev").loadGlobalSettings() 
 
  val sts = jdbcSource.take(100).via(mgoActionFlow).to(sink).run() 
 
  val mgoCtxShow = MGOContext("testdb","members").setCommand( 
    DocumentStream(filter = None)) 
 
  mongoStream(mgoCtxShow).map(docToRow).to(sink).run() 
 
 
 
  import com.datastax.driver.core._ 
  import cassandraengine._ 
  import CQLEngine._ 
  import org.mongodb.scala.model.Filters._ 
 
  //data row converter 
  val cqlToDataRow = (rs: Row) => DataRow( 
    rowid = rs.getLong("ROWID"), 
    measureid = rs.getLong("MEASUREID"), 
    state = rs.getString("STATENAME"), 
    county = rs.getString("COUNTYNAME"), 
    year = rs.getInt("REPORTYEAR"), 
    value = rs.getInt("VALUE") 
  ) 
 
 import org.bson.conversions._ 
  import org.mongodb.scala.model.Updates._ 
 
  //#init-session 
  implicit val session = Cluster.builder 
    .addContactPoint("127.0.0.1") 
    .withPort(9042) 
    .build 
    .connect() 
 
 
  //setup context 
  val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",cqlToDataRow) 
  //construct source 
  val cqlSource = cassandraStream(cqlCtx) 
 
  def toFilter: DataRow => Bson = row => { 
    and(equal("rowid",row.rowid), lt("value",10)) 
  } 
  def toUpdate: DataRow => Bson = row => { 
    set("value" , row.value * 10) 
  } 
  val mgoCtxUpdate = StreamingUpdate("testdb","members",toFilter,toUpdate) 
  val mgoUpdateFlow = new MongoActionStream.UpdateAction[DataRow](mgoCtxUpdate) 
  val sts = cqlSource.via(mgoUpdateFlow.performOnRow).to(sink).run() 
 
 
  import org.bson.conversions._ 
  import org.mongodb.scala.model.Filters._ 
  def toDelFilter: DataRow => Bson = row => and(equal("rowid",row.rowid),equal("value",10)) 
 
  val mgoCtxDel = StreamingDelete[DataRow]("testdb","members",toDelFilter) 
  val mgoDelFlow = new DeleteAction[DataRow](mgoCtxDel) 
  val mgoCtxSrc = MGOContext("testdb","members").setCommand( 
    DocumentStream(filter = None)) 
  mongoStream(mgoCtxSrc).map(docToRow).via(mgoDelFlow.performOnRow).to(Sink.ignore).run() 
 
  import org.mongodb.scala.model.Sorts._ 
  val sortDsc: MGOFilterResult = find => find.sort(descending("rowid")) 
  val mgoCtxShow = MGOContext("testdb","members").setCommand( 
    DocumentStream(filter = None, andThen = Some(sortDsc))) 
 
  mongoStream(mgoCtxShow).map(docToRow).to(sink).run()

下面是本次示范的全部源代码:

build.sbt

name := "learn-mongo" 
 
version := "0.1" 
 
scalaVersion := "2.12.4" 
 
libraryDependencies := Seq( 
  "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0", 
  "com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0", 
  "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1", 
  "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.17", 
  "com.typesafe.akka" %% "akka-actor" % "2.5.4", 
  "com.typesafe.akka" %% "akka-stream" % "2.5.4", 
  "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16", 
  "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1", 
  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test", 
  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1", 
  "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1", 
  "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1", 
  "com.h2database"  %  "h2"                % "1.4.196", 
  "mysql" % "mysql-connector-java" % "6.0.6", 
  "org.postgresql" % "postgresql" % "42.2.0", 
  "commons-dbcp" % "commons-dbcp" % "1.4", 
  "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2", 
  "com.zaxxer" % "HikariCP" % "2.7.4", 
  "com.jolbox" % "bonecp" % "0.8.0.RELEASE", 
  "com.typesafe.slick" %% "slick" % "3.2.1", 
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3" 
)

resources/application.conf

# JDBC settings 
test { 
  db { 
    h2 { 
      driver = "org.h2.Driver" 
      url = "jdbc:h2:tcp://localhost/~/slickdemo" 
      user = "" 
      password = "" 
      poolInitialSize = 5 
      poolMaxSize = 7 
      poolConnectionTimeoutMillis = 1000 
      poolValidationQuery = "select 1 as one" 
      poolFactoryName = "commons-dbcp2" 
    } 
  } 
 
  db.mysql.driver = "com.mysql.cj.jdbc.Driver" 
  db.mysql.url = "jdbc:mysql://localhost:3306/testdb" 
  db.mysql.user = "root" 
  db.mysql.password = "123" 
  db.mysql.poolInitialSize = 5 
  db.mysql.poolMaxSize = 7 
  db.mysql.poolConnectionTimeoutMillis = 1000 
  db.mysql.poolValidationQuery = "select 1 as one" 
  db.mysql.poolFactoryName = "bonecp" 
 
  # scallikejdbc Global settings 
  scalikejdbc.global.loggingSQLAndTime.enabled = true 
  scalikejdbc.global.loggingSQLAndTime.logLevel = info 
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true 
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn 
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false 
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false 
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 
} 
dev { 
  db { 
    h2 { 
      driver = "org.h2.Driver" 
      url = "jdbc:h2:tcp://localhost/~/slickdemo" 
      user = "" 
      password = "" 
      poolFactoryName = "hikaricp" 
      numThreads = 10 
      maxConnections = 12 
      minConnections = 4 
      keepAliveConnection = true 
    } 
    mysql { 
      driver = "com.mysql.cj.jdbc.Driver" 
      url = "jdbc:mysql://localhost:3306/testdb" 
      user = "root" 
      password = "123" 
      poolInitialSize = 5 
      poolMaxSize = 7 
      poolConnectionTimeoutMillis = 1000 
      poolValidationQuery = "select 1 as one" 
      poolFactoryName = "bonecp" 
 
    } 
    postgres { 
      driver = "org.postgresql.Driver" 
      url = "jdbc:postgresql://localhost:5432/testdb" 
      user = "root" 
      password = "123" 
      poolFactoryName = "hikaricp" 
      numThreads = 10 
      maxConnections = 12 
      minConnections = 4 
      keepAliveConnection = true 
    } 
  } 
  # scallikejdbc Global settings 
  scalikejdbc.global.loggingSQLAndTime.enabled = true 
  scalikejdbc.global.loggingSQLAndTime.logLevel = info 
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true 
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn 
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false 
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false 
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 
}

JDBCEngine.scala

package jdbcengine 
import java.sql.PreparedStatement 
 
import scala.collection.generic.CanBuildFrom 
import akka.stream.scaladsl._ 
import scalikejdbc._ 
import scalikejdbc.streams._ 
import akka.NotUsed 
import akka.stream._ 
import scala.util._ 
import java.time._ 
import scala.concurrent.duration._ 
import filestreaming.FileStreaming._ 
 
import scalikejdbc.TxBoundary.Try._ 
 
import scala.concurrent.ExecutionContextExecutor 
import java.io.InputStream 
 
object JDBCContext { 
  type SQLTYPE = Int 
  val SQL_EXEDDL= 1 
  val SQL_UPDATE = 2 
  val RETURN_GENERATED_KEYVALUE = true 
  val RETURN_UPDATED_COUNT = false 
 
} 
 
case class JDBCQueryContext[M]( 
                                dbName: Symbol, 
                                statement: String, 
                                parameters: Seq[Any] = Nil, 
                                fetchSize: Int = 100, 
                                autoCommit: Boolean = false, 
                                queryTimeout: Option[Int] = None, 
                                extractor: WrappedResultSet => M) 
 
 
case class JDBCContext( 
                        dbName: Symbol, 
                        statements: Seq[String] = Nil, 
                        parameters: Seq[Seq[Any]] = Nil, 
                        fetchSize: Int = 100, 
                        queryTimeout: Option[Int] = None, 
                        queryTags: Seq[String] = Nil, 
                        sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE, 
                        batch: Boolean = false, 
                        returnGeneratedKey: Seq[Option[Any]] = Nil, 
                        // no return: None, return by index: Some(1), by name: Some("id") 
                        preAction: Option[PreparedStatement => Unit] = None, 
                        postAction: Option[PreparedStatement => Unit] = None) { 
 
  ctx => 
 
  //helper functions 
 
  def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag) 
 
  def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags) 
 
  def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size) 
 
  def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time) 
 
  def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = { 
    if (ctx.sqlType == JDBCContext.SQL_UPDATE && 
      !ctx.batch && ctx.statements.size == 1) 
      ctx.copy(preAction = action) 
    else 
      throw new IllegalStateException("JDBCContex setting error: preAction not supported!") 
  } 
 
  def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = { 
    if (ctx.sqlType == JDBCContext.SQL_UPDATE && 
      !ctx.batch && ctx.statements.size == 1) 
      ctx.copy(postAction = action) 
    else 
      throw new IllegalStateException("JDBCContex setting error: preAction not supported!") 
  } 
 
  def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { 
    if (ctx.sqlType == JDBCContext.SQL_EXEDDL) { 
      ctx.copy( 
        statements = ctx.statements ++ Seq(_statement), 
        parameters = ctx.parameters ++ Seq(Seq(_parameters)) 
      ) 
    } else 
      throw new IllegalStateException("JDBCContex setting error: option not supported!") 
  } 
 
  def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { 
    if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) { 
      ctx.copy( 
        statements = ctx.statements ++ Seq(_statement), 
        parameters = ctx.parameters ++ Seq(_parameters), 
        returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None)) 
      ) 
    } else 
      throw new IllegalStateException("JDBCContex setting error: option not supported!") 
  } 
 
  def appendBatchParameters(_parameters: Any*): JDBCContext = { 
    if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) 
      throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") 
 
    var matchParams = true 
    if (ctx.parameters != Nil) 
      if (ctx.parameters.head.size != _parameters.size) 
        matchParams = false 
    if (matchParams) { 
      ctx.copy( 
        parameters = ctx.parameters ++ Seq(_parameters) 
      ) 
    } else 
      throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!") 
  } 
 
  def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = { 
    if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) 
      throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!") 
    ctx.copy( 
      returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil 
    ) 
  } 
 
  def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { 
    ctx.copy( 
      statements = Seq(_statement), 
      parameters = Seq(_parameters), 
      sqlType = JDBCContext.SQL_EXEDDL, 
      batch = false 
    ) 
  } 
 
  def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { 
    ctx.copy( 
      statements = Seq(_statement), 
      parameters = Seq(_parameters), 
      returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None), 
      sqlType = JDBCContext.SQL_UPDATE, 
      batch = false 
    ) 
  } 
  def setBatchCommand(_statement: String): JDBCContext = { 
    ctx.copy ( 
      statements = Seq(_statement), 
      sqlType = JDBCContext.SQL_UPDATE, 
      batch = true 
    ) 
  } 
 
  type JDBCDate = LocalDate 
  type JDBCDateTime = LocalDateTime 
 
  def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd) 
  def jdbcSetNow = LocalDateTime.now() 
 
  type JDBCBlob = InputStream 
 
  def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer) = FileToInputStream(fileName,timeOut) 
 
  def jdbcBlobToFile(blob: JDBCBlob, fileName: String)( 
    implicit mat: Materializer) =  InputStreamToFile(blob,fileName) 
 
 
} 
 
object JDBCEngine { 
 
  import JDBCContext._ 
 
  private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) => 
    throw new IllegalStateException(message) 
  } 
 
  def jdbcAkkaStream[A](ctx: JDBCQueryContext[A]) 
                       (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = { 
 
    val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream { 
      val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) 
      ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) 
      val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) 
 
      sql.iterator 
        .withDBSessionForceAdjuster(session => { 
          session.connection.setAutoCommit(ctx.autoCommit) 
          session.fetchSize(ctx.fetchSize) 
        }) 
    } 
    Source.fromPublisher[A](publisher) 
  } 
 
 
  def jdbcQueryResult[C[_] <: TraversableOnce[_], A]( 
                                                      ctx: JDBCQueryContext[A])( 
                                                      implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { 
 
    val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) 
    ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) 
    rawSql.fetchSize(ctx.fetchSize) 
    implicit val session = NamedAutoSession(ctx.dbName) 
    val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) 
    sql.collection.apply[C]() 
 
  } 
 
  def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = { 
    if (ctx.sqlType != SQL_EXEDDL) { 
      Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")) 
    } 
    else { 
      NamedDB(ctx.dbName) localTx { implicit session => 
        Try { 
          ctx.statements.foreach { stm => 
            val ddl = new SQLExecution(statement = stm, parameters = Nil)( 
              before = WrappedResultSet => {})( 
              after = WrappedResultSet => {}) 
 
            ddl.apply() 
          } 
          "SQL_EXEDDL executed succesfully." 
        } 
      } 
    } 
  } 
 
  def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
    if (ctx.statements == Nil) 
      throw new IllegalStateException("JDBCContex setting error: statements empty!") 
    if (ctx.sqlType != SQL_UPDATE) { 
      Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) 
    } 
    else { 
      if (ctx.batch) { 
        if (noReturnKey(ctx)) { 
          val usql = SQL(ctx.statements.head) 
            .tags(ctx.queryTags: _*) 
            .batch(ctx.parameters: _*) 
          Try { 
            NamedDB(ctx.dbName) localTx { implicit session => 
              ctx.queryTimeout.foreach(session.queryTimeout(_)) 
              usql.apply[Seq]() 
              Seq.empty[Long].to[C] 
            } 
          } 
        } else { 
          val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None) 
          Try { 
            NamedDB(ctx.dbName) localTx { implicit session => 
              ctx.queryTimeout.foreach(session.queryTimeout(_)) 
              usql.apply[C]() 
            } 
          } 
        } 
 
      } else { 
        Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !")) 
      } 
    } 
  } 
  private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
    val Some(key) :: xs = ctx.returnGeneratedKey 
    val params: Seq[Any] = ctx.parameters match { 
      case Nil => Nil 
      case p@_ => p.head 
    } 
    val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key) 
    Try { 
      NamedDB(ctx.dbName) localTx { implicit session => 
        session.fetchSize(ctx.fetchSize) 
        ctx.queryTimeout.foreach(session.queryTimeout(_)) 
        val result = usql.apply() 
        Seq(result).to[C] 
      } 
    } 
  } 
 
  private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
    val params: Seq[Any] = ctx.parameters match { 
      case Nil => Nil 
      case p@_ => p.head 
    } 
    val before = ctx.preAction match { 
      case None => pstm: PreparedStatement => {} 
      case Some(f) => f 
    } 
    val after = ctx.postAction match { 
      case None => pstm: PreparedStatement => {} 
      case Some(f) => f 
    } 
    val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after) 
    Try { 
      NamedDB(ctx.dbName) localTx {implicit session => 
        session.fetchSize(ctx.fetchSize) 
        ctx.queryTimeout.foreach(session.queryTimeout(_)) 
        val result = usql.apply() 
        Seq(result.toLong).to[C] 
      } 
    } 
 
  } 
 
  private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
    if (noReturnKey(ctx)) 
      singleTxUpdateNoReturnKey(ctx) 
    else 
      singleTxUpdateWithReturnKey(ctx) 
  } 
 
  private def noReturnKey(ctx: JDBCContext): Boolean = { 
    if (ctx.returnGeneratedKey != Nil) { 
      val k :: xs = ctx.returnGeneratedKey 
      k match { 
        case None => true 
        case Some(k) => false 
      } 
    } else true 
  } 
 
  def noActon: PreparedStatement=>Unit = pstm => {} 
 
  def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
    Try { 
      NamedDB(ctx.dbName) localTx { implicit session => 
        session.fetchSize(ctx.fetchSize) 
        ctx.queryTimeout.foreach(session.queryTimeout(_)) 
        val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match { 
          case Nil => Seq.fill(ctx.statements.size)(None) 
          case k@_ => k 
        } 
        val sqlcmd = ctx.statements zip ctx.parameters zip keys 
        val results = sqlcmd.map { case ((stm, param), key) => 
          key match { 
            case None => 
              new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong 
            case Some(k) => 
              new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong 
          } 
        } 
        results.to[C] 
      } 
    } 
  } 
 
 
  def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
    if (ctx.statements == Nil) 
      throw new IllegalStateException("JDBCContex setting error: statements empty!") 
    if (ctx.sqlType != SQL_UPDATE) { 
      Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) 
    } 
    else { 
      if (!ctx.batch) { 
        if (ctx.statements.size == 1) 
          singleTxUpdate(ctx) 
        else 
          multiTxUpdates(ctx) 
      } else 
        Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !")) 
 
    } 
  } 
 
  case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true, 
                                 statement: String, prepareParams: R => Seq[Any]) { 
    jas => 
    def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db) 
    def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel) 
    def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered) 
 
    private def perform(r: R) = { 
      import scala.concurrent._ 
      val params = prepareParams(r) 
      NamedDB(dbName) autoCommit { session => 
        session.execute(statement,params: _*) 
      } 
      Future.successful(r) 
    } 
    def performOnRow(implicit session: DBSession): Flow[R, R, NotUsed] = 
      if (processInOrder) 
        Flow[R].mapAsync(parallelism)(perform) 
      else 
        Flow[R].mapAsyncUnordered(parallelism)(perform) 
 
  } 
  object JDBCActionStream { 
    def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] = 
      new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params) 
  } 
 
 
}

CassandraEngine.scala

package cassandraengine 
import com.datastax.driver.core._ 
 
import scala.concurrent._ 
import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture} 
 
import scala.collection.JavaConverters._ 
import scala.collection.generic.CanBuildFrom 
import scala.concurrent.duration.Duration 
import akka.NotUsed 
import akka.stream.alpakka.cassandra.scaladsl._ 
import akka.stream.scaladsl._ 
import filestreaming.FileStreaming._ 
 
object CQLContext { 
  // Consistency Levels 
  type CONSISTENCY_LEVEL = Int 
  val ANY: CONSISTENCY_LEVEL          =                                        0x0000 
  val ONE: CONSISTENCY_LEVEL          =                                        0x0001 
  val TWO: CONSISTENCY_LEVEL          =                                        0x0002 
  val THREE: CONSISTENCY_LEVEL        =                                        0x0003 
  val QUORUM : CONSISTENCY_LEVEL      =                                        0x0004 
  val ALL: CONSISTENCY_LEVEL          =                                        0x0005 
  val LOCAL_QUORUM: CONSISTENCY_LEVEL =                                        0x0006 
  val EACH_QUORUM: CONSISTENCY_LEVEL  =                                        0x0007 
  val LOCAL_ONE: CONSISTENCY_LEVEL    =                                      0x000A 
  val LOCAL_SERIAL: CONSISTENCY_LEVEL =                                     0x000B 
  val SERIAL: CONSISTENCY_LEVEL       =                                      0x000C 
 
  def apply(): CQLContext = CQLContext(statements = Nil) 
 
  def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => { 
    consistency match { 
      case ALL => ConsistencyLevel.ALL 
      case ONE => ConsistencyLevel.ONE 
      case TWO => ConsistencyLevel.TWO 
      case THREE => ConsistencyLevel.THREE 
      case ANY => ConsistencyLevel.ANY 
      case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM 
      case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE 
      case QUORUM => ConsistencyLevel.QUORUM 
      case SERIAL => ConsistencyLevel.SERIAL 
      case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL 
 
    } 
  } 
 
} 
case class CQLQueryContext[M]( 
                               statement: String, 
                               extractor: Row => M, 
                               parameter: Seq[Object] = Nil, 
                               consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None, 
                               fetchSize: Int = 100 
                             ) { ctx => 
  def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext[M] = 
    ctx.copy(consistency = Some(_consistency)) 
  def setFetchSize(pageSize: Int): CQLQueryContext[M] = 
    ctx.copy(fetchSize = pageSize) 
} 
object CQLQueryContext { 
  def apply[M](stmt: String, converter: Row => M): CQLQueryContext[M] = 
    new CQLQueryContext[M](statement = stmt, extractor = converter) 
} 
 
case class CQLContext( 
                       statements: Seq[String], 
                       parameters: Seq[Seq[Object]] = Nil, 
                       consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None 
                     ) { ctx => 
 
  def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext = 
    ctx.copy(consistency = Some(_consistency)) 
  def setCommand(_statement: String, _parameters: Object*): CQLContext = 
    ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters)) 
  def appendCommand(_statement: String, _parameters: Object*): CQLContext = 
    ctx.copy(statements = ctx.statements :+ _statement, 
      parameters = ctx.parameters ++ Seq(_parameters)) 
} 
 
object CQLEngine { 
  import CQLContext._ 
  import CQLHelpers._ 
 
  def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)( 
    implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= { 
 
    val prepStmt = session.prepare(ctx.statement) 
 
    var boundStmt =  prepStmt.bind() 
    if (ctx.parameter != Nil) { 
      val params = processParameters(ctx.parameter) 
      boundStmt = prepStmt.bind(params:_*) 
    } 
 
    ctx.consistency.foreach {consistency => 
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))} 
 
    val resultSet = session.execute(boundStmt.setFetchSize(pageSize)) 
    (resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C]) 
  } 
  def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)( 
    extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) = 
    if (resultSet.isFullyFetched) { 
      (resultSet, None) 
    } else { 
      try { 
        val result = Await.result(resultSet.fetchMoreResults(), timeOut) 
        (result, Some((result.asScala.view.map(extractor)).to[C])) 
      } catch { case e: Throwable => (resultSet, None) } 
    } 
  def cqlExecute(ctx: CQLContext)( 
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
    if (ctx.statements.size == 1) 
      cqlSingleUpdate(ctx) 
    else 
      cqlMultiUpdate(ctx) 
  } 
  def cqlSingleUpdate(ctx: CQLContext)( 
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
 
    val prepStmt = session.prepare(ctx.statements.head) 
 
    var boundStmt =  prepStmt.bind() 
    if (ctx.parameters != Nil) { 
      val params = processParameters(ctx.parameters.head) 
      boundStmt = prepStmt.bind(params:_*) 
    } 
 
    ctx.consistency.foreach {consistency => 
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))} 
    session.executeAsync(boundStmt).map(_.wasApplied()) 
  } 
  def cqlMultiUpdate(ctx: CQLContext)( 
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
    val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters 
    var batch = new BatchStatement() 
    commands.foreach { case (stm, params) => 
      val prepStmt = session.prepare(stm) 
      if (params == Nil) 
        batch.add(prepStmt.bind()) 
      else { 
        val p = processParameters(params) 
        batch.add(prepStmt.bind(p: _*)) 
      } 
    } 
    ctx.consistency.foreach {consistency => 
      batch.setConsistencyLevel(consistencyLevel(consistency))} 
    session.executeAsync(batch).map(_.wasApplied()) 
  } 
 
  def cassandraStream[A](ctx: CQLQueryContext[A]) 
                        (implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = { 
 
    val prepStmt = session.prepare(ctx.statement) 
    var boundStmt =  prepStmt.bind() 
    val params = processParameters(ctx.parameter) 
    boundStmt = prepStmt.bind(params:_*) 
    ctx.consistency.foreach {consistency => 
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))} 
 
    CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(ctx.extractor) 
  } 
 
  case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true, 
                                      statement: String, prepareParams: R => Seq[Object], 
                                      consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas => 
    def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel) 
    def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered) 
    def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] = 
      cas.copy(consistency = Some(_consistency)) 
 
    private def perform(r: R)(implicit session: Session, ec: ExecutionContext) = { 
      val prepStmt = session.prepare(statement) 
      var boundStmt =  prepStmt.bind() 
      val params = processParameters(prepareParams(r)) 
      boundStmt = prepStmt.bind(params:_*) 
      consistency.foreach { cons => 
        boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons)) 
      } 
      session.executeAsync(boundStmt).map(_ => r) 
    } 
    def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = 
      if (processInOrder) 
        Flow[R].mapAsync(parallelism)(perform) 
      else 
        Flow[R].mapAsyncUnordered(parallelism)(perform) 
 
  } 
  object CassandraActionStream { 
    def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] = 
      new CassandraActionStream[R]( statement=_statement, prepareParams = params) 
  } 
 
} 
object CQLHelpers { 
  import java.nio.ByteBuffer 
  import com.datastax.driver.core.LocalDate 
  import java.time.Instant 
  import akka.stream._ 
  import scala.concurrent.duration._ 
 
 
  implicit def listenableFutureToFuture[T]( 
                                            listenableFuture: ListenableFuture[T]): Future[T] = { 
    val promise = Promise[T]() 
    Futures.addCallback(listenableFuture, new FutureCallback[T] { 
      def onFailure(error: Throwable): Unit = { 
        promise.failure(error) 
        () 
      } 
      def onSuccess(result: T): Unit = { 
        promise.success(result) 
        () 
      } 
    }) 
    promise.future 
  } 
 
  type CQLBlob = ByteBuffer 
  case class CQLDate(year: Int, month: Int, day: Int) 
  case object CQLTodayDate 
  case class CQLDateTime(year: Int, Month: Int, 
                         day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0) 
  case object CQLDateTimeNow 
 
  def processParameters(params: Seq[Object]): Seq[Object] = { 
    import java.time.{Clock,ZoneId} 
    params.map { obj => 
      obj match { 
        case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd) 
        case CQLTodayDate => 
          val today = java.time.LocalDate.now() 
          LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth) 
        case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS))) 
        case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) => 
          Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d") 
        case p@_ => p 
      } 
    } 
  } 
 
  def fileToCQLBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer) = FileToByteBuffer(fileName,timeOut) 
 
  def cqlBlobToFile(blob: CQLBlob, fileName: String)( 
    implicit mat: Materializer) =  ByteBufferToFile(blob,fileName) 
 }

MongoEngine.scala

import java.text.SimpleDateFormat 
 
import akka.NotUsed 
import akka.stream.alpakka.mongodb.scaladsl._ 
import akka.stream.scaladsl.{Flow, Sink, Source} 
import org.mongodb.scala.MongoClient 
import org.mongodb.scala.bson.collection.immutable.Document 
import org.bson.conversions.Bson 
import org.mongodb.scala._ 
import org.mongodb.scala.model._ 
import java.util.Calendar 
 
import scala.collection.JavaConverters._ 
import filestreaming.FileStreaming._ 
import akka.stream.Materializer 
import org.mongodb.scala.bson.{BsonArray, BsonBinary} 
 
import scala.concurrent._ 
import scala.concurrent.duration._ 
 
object MGOContext { 
 
  trait MGOCommands 
 
  object MGOCommands { 
 
    case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands 
 
    case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands 
 
  /*  org.mongodb.scala.FindObservable 
  import com.mongodb.async.client.FindIterable 
  val resultDocType = FindIterable[Document] 
  val resultOption = FindObservable(resultDocType) 
    .maxScan(...) 
  .limit(...) 
  .sort(...) 
  .project(...) */ 
    case class Find[M](filter: Option[Bson] = None, 
                    andThen: Option[FindObservable[Document] => FindObservable[Document]]= None, 
                    converter: Option[Document => M] = None, 
                    firstOnly: Boolean = false) extends MGOCommands 
 
    case class DocumentStream(filter: Option[Bson] = None, 
                              andThen: Option[FindObservable[Document] => FindObservable[Document]] = None, 
                             ) extends MGOCommands 
 
    case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands 
 
    case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands 
 
    case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands 
 
    case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands 
 
    case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands 
 
    case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands 
 
 
    case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands 
 
  } 
 
  object MGOAdmins { 
 
    case class DropCollection(collName: String) extends MGOCommands 
 
    case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands 
 
    case class ListCollection(dbName: String) extends MGOCommands 
 
    case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands 
 
    case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands 
 
    case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands 
 
    case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands 
 
    case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands 
 
  } 
 
  case class MGOContext( 
                         dbName: String, 
                         collName: String, 
                         action: MGOCommands = null 
                       ) { 
    ctx => 
    def setDbName(name: String): MGOContext = ctx.copy(dbName = name) 
 
    def setCollName(name: String): MGOContext = ctx.copy(collName = name) 
 
    def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = cmd) 
  } 
 
  object MGOContext { 
    def apply(db: String, coll: String) = new MGOContext(db, coll) 
 
    def apply(db: String, coll: String, command: MGOCommands) = 
      new MGOContext(db, coll, command) 
 
  } 
 
  type MGODate = java.util.Date 
  def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = { 
    val ca = Calendar.getInstance() 
    ca.set(yyyy,mm,dd) 
    ca.getTime() 
  } 
  def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = { 
    val ca = Calendar.getInstance() 
    ca.set(yyyy,mm,dd,hr,min,sec) 
    ca.getTime() 
  } 
  def mgoDateTimeNow: MGODate = { 
    val ca = Calendar.getInstance() 
    ca.getTime 
  } 
 
 
  def mgoDateToString(dt: MGODate, formatString: String): String = { 
    val fmt= new SimpleDateFormat(formatString) 
    fmt.format(dt) 
  } 
 
  type MGOBlob = BsonBinary 
  type MGOArray = BsonArray 
 
  def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer) = FileToByteArray(fileName,timeOut) 
 
  def mgoBlobToFile(blob: MGOBlob, fileName: String)( 
    implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName) 
 
  def mgoGetStringOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getString(fieldName)) 
    else None 
  } 
  def mgoGetIntOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getInteger(fieldName)) 
    else None 
  } 
  def mgoGetLonggOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getLong(fieldName)) 
    else None 
  } 
  def mgoGetDoubleOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getDouble(fieldName)) 
    else None 
  } 
  def mgoGetBoolOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getBoolean(fieldName)) 
    else None 
  } 
  def mgoGetDateOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getDate(fieldName)) 
    else None 
  } 
  def mgoGetBlobOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      doc.get(fieldName).asInstanceOf[Option[MGOBlob]] 
    else None 
  } 
  def mgoGetArrayOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      doc.get(fieldName).asInstanceOf[Option[MGOArray]] 
    else None 
  } 
 
  def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = { 
   (arr.getValues.asScala.toList) 
      .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]] 
  } 
 
  type MGOFilterResult = FindObservable[Document] => FindObservable[Document] 
} 
object MGOEngine { 
 
  import MGOContext._ 
  import MGOCommands._ 
  import MGOAdmins._ 
 
  def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = { 
    val db = client.getDatabase(ctx.dbName) 
    val coll = db.getCollection(ctx.collName) 
    ctx.action match { 
      /* count */ 
      case Count(Some(filter), Some(opt)) => 
        coll.count(filter, opt.asInstanceOf[CountOptions]) 
          .toFuture().asInstanceOf[Future[T]] 
      case Count(Some(filter), None) => 
        coll.count(filter).toFuture() 
          .asInstanceOf[Future[T]] 
      case Count(None, None) => 
        coll.count().toFuture() 
          .asInstanceOf[Future[T]] 
      /* distinct */ 
      case Distict(field, Some(filter)) => 
        coll.distinct(field, filter).toFuture() 
          .asInstanceOf[Future[T]] 
      case Distict(field, None) => 
        coll.distinct((field)).toFuture() 
          .asInstanceOf[Future[T]] 
      /* find */ 
      case Find(None, None, optConv, false) => 
        if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]] 
        else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]] 
      case Find(None, None, optConv, true) => 
        if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]] 
        else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]] 
      case Find(Some(filter), None, optConv, false) => 
        if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]] 
        else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]] 
      case Find(Some(filter), None, optConv, true) => 
        if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]] 
        else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]] 
      case Find(None, Some(next), optConv, _) => 
        if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]] 
        else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]] 
      case Find(Some(filter), Some(next), optConv, _) => 
        if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]] 
        else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]] 
      /* aggregate */ 
      case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]] 
      /* mapReduce */ 
      case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]] 
      /* insert */ 
      case Insert(docs, Some(opt)) => 
        if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture() 
          .asInstanceOf[Future[T]] 
        else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture() 
          .asInstanceOf[Future[T]] 
      case Insert(docs, None) => 
        if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]] 
        else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]] 
      /* delete */ 
      case Delete(filter, None, onlyOne) => 
        if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]] 
        else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]] 
      case Delete(filter, Some(opt), onlyOne) => 
        if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]] 
        else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]] 
      /* replace */ 
      case Replace(filter, replacement, None) => 
        coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]] 
      case Replace(filter, replacement, Some(opt)) => 
        coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] 
      /* update */ 
      case Update(filter, update, None, onlyOne) => 
        if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]] 
        else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]] 
      case Update(filter, update, Some(opt), onlyOne) => 
        if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] 
        else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] 
      /* bulkWrite */ 
      case BulkWrite(commands, None) => 
        coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]] 
      case BulkWrite(commands, Some(opt)) => 
        coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]] 
 
      /* drop collection */ 
      case DropCollection(collName) => 
        val coll = db.getCollection(collName) 
        coll.drop().toFuture().asInstanceOf[Future[T]] 
      /* create collection */ 
      case CreateCollection(collName, None) => 
        db.createCollection(collName).toFuture().asInstanceOf[Future[T]] 
      case CreateCollection(collName, Some(opt)) => 
        db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]] 
      /* list collection */ 
      case ListCollection(dbName) => 
        client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]] 
      /* create view */ 
      case CreateView(viewName, viewOn, pline, None) => 
        db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]] 
      case CreateView(viewName, viewOn, pline, Some(opt)) => 
        db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]] 
      /* create index */ 
      case CreateIndex(key, None) => 
        coll.createIndex(key).toFuture().asInstanceOf[Future[T]] 
      case CreateIndex(key, Some(opt)) => 
        coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]] 
      /* drop index */ 
      case DropIndexByName(indexName, None) => 
        coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]] 
      case DropIndexByName(indexName, Some(opt)) => 
        coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] 
      case DropIndexByKey(key, None) => 
        coll.dropIndex(key).toFuture().asInstanceOf[Future[T]] 
      case DropIndexByKey(key, Some(opt)) => 
        coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] 
      case DropAllIndexes(None) => 
        coll.dropIndexes().toFuture().asInstanceOf[Future[T]] 
      case DropAllIndexes(Some(opt)) => 
        coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] 
    } 
  } 
 
  def mongoStream(ctx: MGOContext)( 
    implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = { 
    val db = client.getDatabase(ctx.dbName) 
    val coll = db.getCollection(ctx.collName) 
    ctx.action match { 
      case DocumentStream(None, None) => 
        MongoSource(coll.find()) 
      case DocumentStream(Some(filter), None) => 
        MongoSource(coll.find(filter)) 
      case DocumentStream(None, Some(next)) => 
        MongoSource(next(coll.find())) 
      case DocumentStream(Some(filter), Some(next)) => 
        MongoSource(next(coll.find(filter))) 
    } 
  } 
 
} 
 
  object MongoActionStream { 
    import MGOContext._ 
 
    case class StreamingInsert[A](dbName: String, 
                                  collName: String, 
                                  converter: A => Document, 
                                  parallelism: Int = 1 
                                 ) extends MGOCommands 
 
    case class StreamingDelete[A](dbName: String, 
                                  collName: String, 
                                  toFilter: A => Bson, 
                                  parallelism: Int = 1, 
                                  justOne: Boolean = false 
                                 ) extends MGOCommands 
 
    case class StreamingUpdate[A](dbName: String, 
                                  collName: String, 
                                  toFilter: A => Bson, 
                                  toUpdate: A => Bson, 
                                  parallelism: Int = 1, 
                                  justOne: Boolean = false 
                                 ) extends MGOCommands 
 
 
    case class InsertAction[A](ctx: StreamingInsert[A])( 
      implicit mongoClient: MongoClient) { 
 
      val database = mongoClient.getDatabase(ctx.dbName) 
      val collection = database.getCollection(ctx.collName) 
 
      def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] = 
        Flow[A].map(ctx.converter) 
          .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc)) 
    } 
 
    case class UpdateAction[A](ctx: StreamingUpdate[A])( 
      implicit mongoClient: MongoClient) { 
 
      val database = mongoClient.getDatabase(ctx.dbName) 
      val collection = database.getCollection(ctx.collName) 
 
      def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] = 
        if (ctx.justOne) { 
          Flow[A] 
            .mapAsync(ctx.parallelism)(a => 
              collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a)) 
        } else 
          Flow[A] 
            .mapAsync(ctx.parallelism)(a => 
              collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a)) 
    } 
 
 
    case class DeleteAction[A](ctx: StreamingDelete[A])( 
      implicit mongoClient: MongoClient) { 
 
      val database = mongoClient.getDatabase(ctx.dbName) 
      val collection = database.getCollection(ctx.collName) 
 
      def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] = 
        if (ctx.justOne) { 
          Flow[A] 
            .mapAsync(ctx.parallelism)(a => 
              collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a)) 
        } else 
          Flow[A] 
            .mapAsync(ctx.parallelism)(a => 
              collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a)) 
    } 
 
  }
import org.mongodb.scala._ 
import scala.concurrent._ 
import scala.concurrent.duration._ 
 
object MGOHelpers { 
 
  implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] { 
    override val converter: (Document) => String = (doc) => doc.toJson 
  } 
 
  implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] { 
    override val converter: (C) => String = (doc) => doc.toString 
  } 
 
  trait ImplicitObservable[C] { 
    val observable: Observable[C] 
    val converter: (C) => String 
 
    def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds) 
    def headResult() = Await.result(observable.head(), 10 seconds) 
    def printResults(initial: String = ""): Unit = { 
      if (initial.length > 0) print(initial) 
      results().foreach(res => println(converter(res))) 
    } 
    def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}") 
  } 
 
  def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = { 
      Await.result(fut,timeOut) 
  } 
  def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = { 
    Await.result(fut,timeOut) 
  } 
 
}

FileStreaming.scala

package filestreaming 
import java.io.{InputStream, ByteArrayInputStream} 
import java.nio.ByteBuffer 
import java.nio.file.Paths 
 
import akka.stream.{Materializer} 
import akka.stream.scaladsl.{FileIO, StreamConverters} 
 
import scala.concurrent.{Await} 
import akka.util._ 
import scala.concurrent.duration._ 
 
object FileStreaming { 
  def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer):ByteBuffer = { 
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
      hd ++ bs 
    } 
    (Await.result(fut, timeOut)).toByteBuffer 
  } 
 
  def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer): Array[Byte] = { 
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
      hd ++ bs 
    } 
    (Await.result(fut, timeOut)).toArray 
  } 
 
  def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer): InputStream = { 
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
      hd ++ bs 
    } 
    val buf = (Await.result(fut, timeOut)).toArray 
    new ByteArrayInputStream(buf) 
  } 
 
  def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)( 
    implicit mat: Materializer) = { 
    val ba = new Array[Byte](byteBuf.remaining()) 
    byteBuf.get(ba,0,ba.length) 
    val baInput = new ByteArrayInputStream(ba) 
    val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes)) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
 
  def ByteArrayToFile(bytes: Array[Byte], fileName: String)( 
    implicit mat: Materializer) = { 
    val bb = ByteBuffer.wrap(bytes) 
    val baInput = new ByteArrayInputStream(bytes) 
    val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
 
  def InputStreamToFile(is: InputStream, fileName: String)( 
    implicit mat: Materializer) = { 
    val source = StreamConverters.fromInputStream(() => is) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
 
}

FileStreaming.scala

package filestreaming 
import java.io.{InputStream, ByteArrayInputStream} 
import java.nio.ByteBuffer 
import java.nio.file.Paths 
 
import akka.stream.{Materializer} 
import akka.stream.scaladsl.{FileIO, StreamConverters} 
 
import scala.concurrent.{Await} 
import akka.util._ 
import scala.concurrent.duration._ 
 
object FileStreaming { 
  def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer):ByteBuffer = { 
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
      hd ++ bs 
    } 
    (Await.result(fut, timeOut)).toByteBuffer 
  } 
 
  def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer): Array[Byte] = { 
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
      hd ++ bs 
    } 
    (Await.result(fut, timeOut)).toArray 
  } 
 
  def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer): InputStream = { 
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
      hd ++ bs 
    } 
    val buf = (Await.result(fut, timeOut)).toArray 
    new ByteArrayInputStream(buf) 
  } 
 
  def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)( 
    implicit mat: Materializer) = { 
    val ba = new Array[Byte](byteBuf.remaining()) 
    byteBuf.get(ba,0,ba.length) 
    val baInput = new ByteArrayInputStream(ba) 
    val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes)) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
 
  def ByteArrayToFile(bytes: Array[Byte], fileName: String)( 
    implicit mat: Materializer) = { 
    val bb = ByteBuffer.wrap(bytes) 
    val baInput = new ByteArrayInputStream(bytes) 
    val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
 
  def InputStreamToFile(is: InputStream, fileName: String)( 
    implicit mat: Materializer) = { 
    val source = StreamConverters.fromInputStream(() => is) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
 
}

HikariCPool.scala

package jdbcengine 
import scala.collection.mutable 
import scala.concurrent.duration.Duration 
import scala.language.implicitConversions 
import com.typesafe.config._ 
import java.util.concurrent.TimeUnit 
import java.util.Properties 
import scalikejdbc.config._ 
import com.typesafe.config.Config 
import com.zaxxer.hikari._ 
import scalikejdbc.ConnectionPoolFactoryRepository 
 
/** Extension methods to make Typesafe Config easier to use */ 
class ConfigExtensionMethods(val c: Config) extends AnyVal { 
  import scala.collection.JavaConverters._ 
 
  def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default 
  def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default 
  def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default 
  def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default 
 
  def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default 
  def getDurationOr(path: String, default: => Duration = Duration.Zero) = 
    if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default 
 
  def getPropertiesOr(path: String, default: => Properties = null): Properties = 
    if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default 
 
  def toProperties: Properties = { 
    def toProps(m: mutable.Map[String, ConfigValue]): Properties = { 
      val props = new Properties(null) 
      m.foreach { case (k, cv) => 
        val v = 
          if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala) 
          else if(cv.unwrapped eq null) null 
          else cv.unwrapped.toString 
        if(v ne null) props.put(k, v) 
      } 
      props 
    } 
    toProps(c.root.asScala) 
  } 
 
  def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None 
  def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None 
  def getStringOpt(path: String) = Option(getStringOr(path)) 
  def getPropertiesOpt(path: String) = Option(getPropertiesOr(path)) 
} 
 
object ConfigExtensionMethods { 
  @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c) 
} 
 
trait HikariConfigReader extends TypesafeConfigReader { 
  self: TypesafeConfig =>      // with TypesafeConfigReader => //NoEnvPrefix => 
 
  import ConfigExtensionMethods.configExtensionMethods 
 
  def getFactoryName(dbName: Symbol): String = { 
    val c: Config = config.getConfig(envPrefix + "db." + dbName.name) 
    c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP) 
  } 
 
  def hikariCPConfig(dbName: Symbol): HikariConfig = { 
 
    val hconf = new HikariConfig() 
    val c: Config = config.getConfig(envPrefix + "db." + dbName.name) 
 
    // Connection settings 
    if (c.hasPath("dataSourceClass")) { 
      hconf.setDataSourceClassName(c.getString("dataSourceClass")) 
    } else { 
      Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _) 
    } 
    hconf.setJdbcUrl(c.getStringOr("url", null)) 
    c.getStringOpt("user").foreach(hconf.setUsername) 
    c.getStringOpt("password").foreach(hconf.setPassword) 
    c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties) 
 
    // Pool configuration 
    hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000)) 
    hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000)) 
    hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000)) 
    hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000)) 
    hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0)) 
    hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false)) 
    c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery) 
    c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql) 
    val numThreads = c.getIntOr("numThreads", 20) 
    hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5)) 
    hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads)) 
    hconf.setPoolName(c.getStringOr("poolName", dbName.name)) 
    hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false)) 
 
    // Equivalent of ConnectionPreparer 
    hconf.setReadOnly(c.getBooleanOr("readOnly", false)) 
    c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation) 
    hconf.setCatalog(c.getStringOr("catalog", null)) 
 
    hconf 
 
  } 
} 
 
import scalikejdbc._ 
trait ConfigDBs { 
  self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader => 
 
  def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { 
    getFactoryName(dbName) match { 
      case "hikaricp" => { 
        val hconf = hikariCPConfig(dbName) 
        val hikariCPSource = new HikariDataSource(hconf) 
        if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) { 
          Class.forName(hconf.getDriverClassName) 
        } 
        ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource)) 
      } 
      case _ => { 
        val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName) 
        val cpSettings = readConnectionPoolSettings(dbName) 
        if (driver != null && driver.trim.nonEmpty) { 
          Class.forName(driver) 
        } 
        ConnectionPool.add(dbName, url, user, password, cpSettings) 
      } 
    } 
  } 
 
  def setupAll(): Unit = { 
    loadGlobalSettings() 
    dbNames.foreach { dbName => setup(Symbol(dbName)) } 
  } 
 
  def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { 
    ConnectionPool.close(dbName) 
  } 
 
  def closeAll(): Unit = { 
    ConnectionPool.closeAll 
  } 
 
} 
 
 
object ConfigDBs extends ConfigDBs 
  with TypesafeConfigReader 
  with StandardTypesafeConfig 
  with HikariConfigReader 
 
case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs 
  with TypesafeConfigReader 
  with StandardTypesafeConfig 
  with HikariConfigReader 
  with EnvPrefix { 
 
  override val env = Option(envValue)

MongoStreamDemo.scala

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import org.mongodb.scala._ 
import org.mongodb.scala.connection._ 
import scala.collection.JavaConverters._ 
 
object MongoStream extends App { 
  import MGOContext._ 
  import MGOEngine._ 
  import MGOCommands._ 
  import MGOHelpers._ 
 
 
  val clusterSettings = ClusterSettings.builder() 
    .hosts(List(new ServerAddress("localhost:27017")).asJava).build() 
  val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build() 
  implicit val client = MongoClient(clientSettings) 
 
  implicit val system = ActorSystem() 
  implicit val mat = ActorMaterializer() 
  implicit val ec = system.dispatcher 
 
  case class PO ( 
                  ponum: String, 
                  podate: MGODate, 
                  vendor: String, 
                  remarks: Option[String], 
                  podtl: Option[MGOArray] 
                ) 
  def toPO(doc: Document): PO = { 
    PO( 
      ponum = doc.getString("ponum"), 
      podate = doc.getDate("podate"), 
      vendor = doc.getString("vendor"), 
      remarks = mgoGetStringOrNone(doc,"remarks"), 
      podtl = mgoGetArrayOrNone(doc,"podtl") 
    ) 
  } 
  case class PODTL( 
                    item: String, 
                    price: Double, 
                    qty: Int, 
                    packing: Option[String], 
                    payTerm: Option[String] 
                  ) 
  def toPODTL(podtl: Document): PODTL = { 
    PODTL( 
      item = podtl.getString("item"), 
      price = podtl.getDouble("price"), 
      qty = podtl.getInteger("qty"), 
      packing = mgoGetStringOrNone(podtl,"packing"), 
      payTerm = mgoGetStringOrNone(podtl,"payterm") 
    ) 
  } 
 
  def showPO(po: PO) = { 
    println(s"po number: ${po.ponum}") 
    println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}") 
    println(s"vendor: ${po.vendor}") 
    if (po.remarks != None) 
      println(s"remarks: ${po.remarks.get}") 
    po.podtl match { 
      case Some(barr) => 
        mgoArrayToDocumentList(barr) 
          .map { dc => toPODTL(dc)} 
          .foreach { doc: PODTL => 
            print(s"==>Item: ${doc.item} ") 
            print(s"price: ${doc.price} ") 
            print(s"qty: ${doc.qty} ") 
            doc.packing.foreach(pk => print(s"packing: ${pk} ")) 
            doc.payTerm.foreach(pt => print(s"payTerm: ${pt} ")) 
            println("") 
          } 
      case _ => 
    } 
  } 
  import org.mongodb.scala.model.Projections._ 
  import MongoActionStream._ 
  import MGOEngine._ 
  import akka.stream.scaladsl.{Sink, Source} 
 
  val proj: MGOFilterResult = find => find.projection(exclude("handler","_id")) 
  val ctx = MGOContext("testdb","po").setCommand( 
    DocumentStream(filter = None, andThen = Some(proj))) 
 
 
  val stream = mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO)) 
 
  println(getResult(mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO)))) 
 
 
  import jdbcengine._ 
  import JDBCEngine._ 
  import scalikejdbc._ 
 
  case class DataRow ( 
                       rowid: Long, 
                       measureid: Long, 
                       state: String, 
                       county: String, 
                       year: Int, 
                       value: Int 
                     ) 
  val toRow: WrappedResultSet => DataRow = rs => DataRow( 
    rowid = rs.long("ROWID"), 
    measureid = rs.long("MEASUREID"), 
    state = rs.string("STATENAME"), 
    county = rs.string("COUNTYNAME"), 
    year = rs.int("REPORTYEAR"), 
    value = rs.int("VALUE") 
  ) 
 
  //construct the context 
  val h2ctx = JDBCQueryContext[DataRow]( 
    dbName = 'h2, 
    statement = "select * from AQMRPT", 
    extractor = toRow 
  ) 
 
  //source from h2 database 
  val jdbcSource = jdbcAkkaStream(h2ctx) 
 
  //document converter 
  def rowToDoc: DataRow => Document = row => Document ( 
    "rowid" -> row.rowid, 
    "measureid" ->  row.measureid, 
    "state" ->  row.state, 
    "county" ->  row.county, 
    "year" ->  row.year, 
    "value" ->  row.value 
  ) 
  def docToRow: Document => DataRow = doc => DataRow ( 
    rowid = doc.getLong("rowid"), 
    measureid = doc.getLong("measureid"), 
    state = doc.getString("state"), 
    county = doc.getString("county"), 
    year = doc.getInteger("year"), 
    value = doc.getInteger("value") 
  ) 
  //setup context 
  val mgoctx = StreamingInsert("testdb","members",rowToDoc) 
  val mgoActionStream = new MongoActionStream.InsertAction[DataRow](mgoctx) 
  val mgoActionFlow = mgoActionStream.performOnRow.map(docToRow) 
  val sink = Sink.foreach[DataRow]{ r => 
    println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") 
  } 
 
  //config jdbc drivers 
  ConfigDBsWithEnv("dev").setup('h2) 
  ConfigDBsWithEnv("dev").loadGlobalSettings() 
 
  val sts = jdbcSource.take(100).via(mgoActionFlow).to(sink).run() 
 
  val mgoCtxPrint = MGOContext("testdb","members").setCommand( 
    DocumentStream(filter = None)) 
 
  mongoStream(mgoCtxPrint).map(docToRow).to(sink).run() 
 
 
 
  import com.datastax.driver.core._ 
  import cassandraengine._ 
  import CQLEngine._ 
  import org.mongodb.scala.model.Filters._ 
 
  //data row converter 
  val cqlToDataRow = (rs: Row) => DataRow( 
    rowid = rs.getLong("ROWID"), 
    measureid = rs.getLong("MEASUREID"), 
    state = rs.getString("STATENAME"), 
    county = rs.getString("COUNTYNAME"), 
    year = rs.getInt("REPORTYEAR"), 
    value = rs.getInt("VALUE") 
  ) 
 
 import org.bson.conversions._ 
  import org.mongodb.scala.model.Updates._ 
 
  //#init-session 
  implicit val session = Cluster.builder 
    .addContactPoint("127.0.0.1") 
    .withPort(9042) 
    .build 
    .connect() 
 
 
  //setup context 
  val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",cqlToDataRow) 
  //construct source 
  val cqlSource = cassandraStream(cqlCtx) 
 
  def toFilter: DataRow => Bson = row => { 
    and(equal("rowid",row.rowid), lt("value",10)) 
  } 
  def toUpdate: DataRow => Bson = row => { 
    set("value" , row.value * 10) 
  } 
  val mgoCtxUpdate = StreamingUpdate("testdb","members",toFilter,toUpdate) 
  val mgoUpdateFlow = new MongoActionStream.UpdateAction[DataRow](mgoCtxUpdate) 
  cqlSource.via(mgoUpdateFlow.performOnRow).to(sink).run() 
 
 
  import org.bson.conversions._ 
  import org.mongodb.scala.model.Filters._ 
  def toDelFilter: DataRow => Bson = row => and(equal("rowid",row.rowid),equal("value",10)) 
 
  val mgoCtxDel = StreamingDelete[DataRow]("testdb","members",toDelFilter) 
  val mgoDelFlow = new DeleteAction[DataRow](mgoCtxDel) 
  val mgoCtxSrc = MGOContext("testdb","members").setCommand( 
    DocumentStream(filter = None)) 
  mongoStream(mgoCtxSrc).map(docToRow).via(mgoDelFlow.performOnRow).to(Sink.ignore).run() 
 
  import org.mongodb.scala.model.Sorts._ 
  val sortDsc: MGOFilterResult = find => find.sort(descending("rowid")) 
  val mgoCtxShow = MGOContext("testdb","members").setCommand( 
    DocumentStream(filter = None, andThen = Some(sortDsc))) 
 
  mongoStream(mgoCtxShow).map(docToRow).to(sink).run() 
 
  scala.io.StdIn.readLine() 
 
  system.terminate() 
 
 
 
}

 

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/industrynews/12805.html

(0)
上一篇 2021年7月19日 14:55
下一篇 2021年7月19日 14:55

相关推荐

发表回复

登录后才能评论