SDP(11):MongoDB-Engine功能实现详解编程语言

  根据上篇关于MongoDB-Engine的功能设计方案,我们将在这篇讨论里进行功能实现和测试。下面是具体的功能实现代码:基本上是直接调用Mongo-scala的对应函数,需要注意的是java类型和scala类型之间的相互转换:

object MGOEngine { 
  import MGOContext._ 
  import MGOCommands._ 
  import MGOAdmins._ 
 
  def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = { 
    val db = client.getDatabase(ctx.dbName) 
    val coll = db.getCollection(ctx.collName) 
    ctx.action match { 
        /* count */ 
      case Count(Some(filter),Some(opt)) => 
        coll.count(filter,opt.asInstanceOf[CountOptions]) 
          .toFuture().asInstanceOf[Future[T]] 
      case Count(Some(filter),None) => 
        coll.count(filter).toFuture() 
          .asInstanceOf[Future[T]] 
      case Count(None,None) => 
        coll.count().toFuture() 
          .asInstanceOf[Future[T]] 
        /* distinct */ 
      case Distict(field,Some(filter)) => 
        coll.distinct(field,filter).toFuture() 
          .asInstanceOf[Future[T]] 
      case Distict(field,None) => 
        coll.distinct((field)).toFuture() 
          .asInstanceOf[Future[T]] 
      /* find */ 
      case Find(None,None,optConv,false) => 
        if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]] 
        else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]] 
      case Find(None,None,optConv,true) => 
        if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]] 
        else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]] 
      case Find(Some(filter),None,optConv,false) => 
        if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]] 
        else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]] 
      case Find(Some(filter),None,optConv,true) => 
        if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]] 
        else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]] 
      case Find(None,Some(next),optConv,_) => 
        if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]] 
        else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]] 
      case Find(Some(filter),Some(next),optConv,_) => 
        if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]] 
        else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]] 
        /* aggregate */ 
      case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]] 
        /* mapReduce */ 
      case MapReduce(mf,rf) => coll.mapReduce(mf,rf).toFuture().asInstanceOf[Future[T]] 
       /* insert */ 
      case Insert(docs,Some(opt)) => 
        if (docs.size > 1) coll.insertMany(docs,opt.asInstanceOf[InsertManyOptions]).toFuture() 
          .asInstanceOf[Future[T]] 
        else coll.insertOne(docs.head,opt.asInstanceOf[InsertOneOptions]).toFuture() 
          .asInstanceOf[Future[T]] 
      case Insert(docs,None) => 
        if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]] 
        else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]] 
        /* delete */ 
      case Delete(filter,None,onlyOne) => 
         if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]] 
         else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]] 
      case Delete(filter,Some(opt),onlyOne) => 
        if (onlyOne) coll.deleteOne(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]] 
        else coll.deleteMany(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]] 
        /* replace */ 
      case Replace(filter,replacement,None) => 
         coll.replaceOne(filter,replacement).toFuture().asInstanceOf[Future[T]] 
      case Replace(filter,replacement,Some(opt)) => 
        coll.replaceOne(filter,replacement,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] 
        /* update */ 
      case Update(filter,update,None,onlyOne) => 
        if (onlyOne) coll.updateOne(filter,update).toFuture().asInstanceOf[Future[T]] 
        else coll.updateMany(filter,update).toFuture().asInstanceOf[Future[T]] 
      case Update(filter,update,Some(opt),onlyOne) => 
        if (onlyOne) coll.updateOne(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] 
        else coll.updateMany(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] 
        /* bulkWrite */ 
      case BulkWrite(commands,None) => 
         coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]] 
      case BulkWrite(commands,Some(opt)) => 
        coll.bulkWrite(commands,opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]] 
 
        /* drop collection */ 
      case DropCollection(collName) => 
        val coll = db.getCollection(collName) 
        coll.drop().toFuture().asInstanceOf[Future[T]] 
        /* create collection */ 
      case CreateCollection(collName,None) => 
        db.createCollection(collName).toFuture().asInstanceOf[Future[T]] 
      case CreateCollection(collName,Some(opt)) => 
        db.createCollection(collName,opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]] 
        /* list collection */ 
      case ListCollection(dbName) => 
        client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]] 
        /* create view */ 
      case CreateView(viewName,viewOn,pline,None) => 
        db.createView(viewName,viewOn,pline).toFuture().asInstanceOf[Future[T]] 
      case CreateView(viewName,viewOn,pline,Some(opt)) => 
        db.createView(viewName,viewOn,pline,opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]] 
        /* create index */ 
      case CreateIndex(key,None) => 
        coll.createIndex(key).toFuture().asInstanceOf[Future[T]] 
      case CreateIndex(key,Some(opt)) => 
        coll.createIndex(key,opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]] 
        /* drop index */ 
      case DropIndexByName(indexName, None) => 
        coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]] 
      case DropIndexByName(indexName, Some(opt)) => 
        coll.dropIndex(indexName,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] 
      case DropIndexByKey(key,None) => 
        coll.dropIndex(key).toFuture().asInstanceOf[Future[T]] 
      case DropIndexByKey(key,Some(opt)) => 
        coll.dropIndex(key,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] 
      case DropAllIndexes(None) => 
        coll.dropIndexes().toFuture().asInstanceOf[Future[T]] 
      case DropAllIndexes(Some(opt)) => 
        coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] 
    } 
 
  } 
 
}

注意:以上所有函数都返回Future[T]结果。下面我们来试运行这些函数,不过先关注一些细节:关于MongoDB的Date,Blob,Array等类型在scala中的使用方法:

 type MGODate = java.util.Date 
  def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = { 
    val ca = Calendar.getInstance() 
    ca.set(yyyy,mm,dd) 
    ca.getTime() 
  } 
  def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = { 
    val ca = Calendar.getInstance() 
    ca.set(yyyy,mm,dd,hr,min,sec) 
    ca.getTime() 
  } 
  def mgoDateTimeNow: MGODate = { 
    val ca = Calendar.getInstance() 
    ca.getTime 
  } 
 
 
  def mgoDateToString(dt: MGODate, formatString: String): String = { 
    val fmt= new SimpleDateFormat(formatString) 
    fmt.format(dt) 
  } 
 
  type MGOBlob = BsonBinary 
  type MGOArray = BsonArray 
 
  def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer) = FileToByteArray(fileName,timeOut) 
 
  def mgoBlobToFile(blob: MGOBlob, fileName: String)( 
    implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)

然后就是MongoDB数据类型的读取帮助函数:

 def mgoGetStringOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getString(fieldName)) 
    else None 
  } 
  def mgoGetIntOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getInteger(fieldName)) 
    else None 
  } 
  def mgoGetLonggOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getLong(fieldName)) 
    else None 
  } 
  def mgoGetDoubleOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getDouble(fieldName)) 
    else None 
  } 
  def mgoGetBoolOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getBoolean(fieldName)) 
    else None 
  } 
  def mgoGetDateOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getDate(fieldName)) 
    else None 
  } 
  def mgoGetBlobOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      doc.get(fieldName).asInstanceOf[Option[MGOBlob]] 
    else None 
  } 
  def mgoGetArrayOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      doc.get(fieldName).asInstanceOf[Option[MGOArray]] 
    else None 
  } 
 
  def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = { 
   (arr.getValues.asScala.toList) 
      .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]] 
  } 
 
  type MGOFilterResult = FindObservable[Document] => FindObservable[Document]

下面我们就开始设置试运行环境,从一个全新的collection开始:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import org.mongodb.scala._ 
import org.mongodb.scala.connection._ 
 
import scala.collection.JavaConverters._ 
import com.mongodb.client.model._ 
 
import scala.util._ 
 
object MongoEngineTest extends App { 
import MGOContext._ 
import MGOEngine._ 
import MGOHelpers._ 
import MGOCommands._ 
import MGOAdmins._ 
 
  val clusterSettings = ClusterSettings.builder() 
    .hosts(List(new ServerAddress("localhost:27017")).asJava).build() 
  val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build() 
  implicit val client = MongoClient(clientSettings) 
 
  implicit val system = ActorSystem() 
  implicit val mat = ActorMaterializer() 
  implicit val ec = system.dispatcher 
 
  val ctx = MGOContext("testdb","po").setCommand( 
    DropCollection("po")) 
  println(getResult(mgoExecute(ctx)))

测试运行了DropCollection指令。下面我们试着insert两个document:

  val pic = fileToMGOBlob("/users/tiger/nobody.png") 
  val po1 = Document ( 
    "ponum" -> "po18012301", 
    "vendor" -> "The smartphone compay", 
    "podate" -> mgoDate(2017,5,13), 
    "remarks" -> "urgent, rush order", 
    "handler" -> pic, 
    "podtl" -> Seq( 
      Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"), 
      Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days") 
    ) 
  ) 
 
  val po2 = Document ( 
    "ponum" -> "po18022002", 
    "vendor" -> "The Samsung compay", 
    "podate" -> mgoDate(2015,11,6), 
    "podtl" -> Seq( 
      Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"), 
      Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"), 
      Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury") 
    ) 
  ) 
 
  val optInsert = new InsertManyOptions().ordered(true) 
  val ctxInsert = ctx.setCommand( 
    Insert(Seq(po1,po2),Some(optInsert)) 
  ) println(getResult(mgoExecute(ctxInsert)))

注意InsertManyOptions的具体设定方式。 为了配合更方便准确的强类型操作,我们需要进行Document类型到具体应用类型之间的对应转换:

  case class PO ( 
                  ponum: String, 
                  podate: MGODate, 
                  vendor: String, 
                  remarks: Option[String], 
                  podtl: Option[MGOArray], 
                  handler: Option[MGOBlob] 
                ) 
  def toPO(doc: Document): PO = { 
    PO( 
      ponum = doc.getString("ponum"), 
      podate = doc.getDate("podate"), 
      vendor = doc.getString("vendor"), 
      remarks = mgoGetStringOrNone(doc,"remarks"), 
      podtl = mgoGetArrayOrNone(doc,"podtl"), 
      handler = mgoGetBlobOrNone(doc,"handler") 
    ) 
  } 
 
  case class PODTL( 
                    item: String, 
                    price: Double, 
                    qty: Int, 
                    packing: Option[String], 
                    payTerm: Option[String] 
                  ) 
  def toPODTL(podtl: Document): PODTL = { 
    PODTL( 
      item = podtl.getString("item"), 
      price = podtl.getDouble("price"), 
      qty = podtl.getInteger("qty"), 
      packing = mgoGetStringOrNone(podtl,"packing"), 
      payTerm = mgoGetStringOrNone(podtl,"payterm") 
    ) 
  } 
 
  def showPO(po: PO) = { 
    println(s"po number: ${po.ponum}") 
    println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}") 
    println(s"vendor: ${po.vendor}") 
    if (po.remarks != None) 
      println(s"remarks: ${po.remarks.get}") 
    po.podtl match { 
      case Some(barr) => 
        mgoArrayToDocumentList(barr) 
         .map { dc => toPODTL(dc)} 
           .foreach { doc: PODTL => 
          print(s"==>Item: ${doc.item} ") 
          print(s"price: ${doc.price} ") 
          print(s"qty: ${doc.qty} ") 
          doc.packing.foreach(pk => print(s"packing: ${pk} ")) 
          doc.payTerm.foreach(pt => print(s"payTerm: ${pt} ")) 
          println("") 
        } 
      case _ => 
    } 
 
    po.handler match { 
      case Some(blob) => 
        val fileName = s"/users/tiger/${po.ponum}.png" 
        mgoBlobToFile(blob,fileName) 
        println(s"picture saved to ${fileName}") 
      case None => println("no picture provided") 
    } 
 
  }

在上面的代码里我们使用了前面提供的MongoDB数据类型读取帮助函数。下面我们测试对poCollection中的Document进行查询,示范包括projection,sort,filter等:

  import org.mongodb.scala.model.Projections._ 
  import org.mongodb.scala.model.Filters._ 
  import org.mongodb.scala.model.Sorts._ 
  val sort: MGOFilterResult = find => find.sort(descending("ponum")) 
  val proj: MGOFilterResult = find => find.projection(and(include("ponum","podate"),include("vendor"),excludeId())) 
  val ctxFind = ctx.setCommand(Find(andThen=Some(proj))) 
  val ctxFindFirst = ctx.setCommand(Find(firstOnly=true,converter = Some(toPO _))) 
  val ctxFindArrayItem = ctx.setCommand( 
    Find(filter = Some(equal("podtl.qty",100)), converter = Some(toPO _)) 
  ) 
   
  for { 
    _ <- mgoExecute[List[Document]](ctxFind).andThen { 
      case Success(docs) => docs.map(toPO).foreach(showPO) 
        println("-------------------------------") 
      case Failure(e) => println(e.getMessage) 
    } 
 
    _ <- mgoExecute[PO](ctxFindFirst).andThen { 
      case Success(doc) => showPO(doc) 
        println("-------------------------------") 
      case Failure(e) => println(e.getMessage) 
    } 
 
    _ <- mgoExecute[List[PO]](ctxFindArrayItem).andThen { 
      case Success(docs) => docs.foreach(showPO) 
        println("-------------------------------") 
      case Failure(e) => println(e.getMessage) 
    } 
  } yield()

因为mgoExecute返回Future结果,所以我们可以用for-comprehension对几个运算进行串联运行。

下面是这次示范的源代码:

build.sbt

name := "learn-mongo" 
 
version := "0.1" 
 
scalaVersion := "2.12.4" 
 
libraryDependencies := Seq( 
  "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1", 
  "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.17" 
)

MGOHelpers.scala

import org.mongodb.scala._ 
import scala.concurrent._ 
import scala.concurrent.duration._ 
 
object MGOHelpers { 
 
  implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] { 
    override val converter: (Document) => String = (doc) => doc.toJson 
  } 
 
  implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] { 
    override val converter: (C) => String = (doc) => doc.toString 
  } 
 
  trait ImplicitObservable[C] { 
    val observable: Observable[C] 
    val converter: (C) => String 
 
    def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds) 
    def headResult() = Await.result(observable.head(), 10 seconds) 
    def printResults(initial: String = ""): Unit = { 
      if (initial.length > 0) print(initial) 
      results().foreach(res => println(converter(res))) 
    } 
    def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}") 
  } 
 
  def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = { 
      Await.result(fut,timeOut) 
  } 
  def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = { 
    Await.result(fut,timeOut) 
  } 
 
}

FileStreaming.scala

import java.io.{InputStream, ByteArrayInputStream} 
import java.nio.ByteBuffer 
import java.nio.file.Paths 
 
import akka.stream.{Materializer} 
import akka.stream.scaladsl.{FileIO, StreamConverters} 
 
import scala.concurrent.{Await} 
import akka.util._ 
import scala.concurrent.duration._ 
 
object FileStreaming { 
  def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer):ByteBuffer = { 
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
      hd ++ bs 
    } 
    (Await.result(fut, timeOut)).toByteBuffer 
  } 
 
  def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer): Array[Byte] = { 
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
      hd ++ bs 
    } 
    (Await.result(fut, timeOut)).toArray 
  } 
 
  def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer): InputStream = { 
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
      hd ++ bs 
    } 
    val buf = (Await.result(fut, timeOut)).toArray 
    new ByteArrayInputStream(buf) 
  } 
 
  def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)( 
    implicit mat: Materializer) = { 
    val ba = new Array[Byte](byteBuf.remaining()) 
    byteBuf.get(ba,0,ba.length) 
    val baInput = new ByteArrayInputStream(ba) 
    val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes)) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
 
  def ByteArrayToFile(bytes: Array[Byte], fileName: String)( 
    implicit mat: Materializer) = { 
    val bb = ByteBuffer.wrap(bytes) 
    val baInput = new ByteArrayInputStream(bytes) 
    val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
 
  def InputStreamToFile(is: InputStream, fileName: String)( 
    implicit mat: Materializer) = { 
    val source = StreamConverters.fromInputStream(() => is) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
 
}

MongoEngine.scala

import java.text.SimpleDateFormat 
 
import org.bson.conversions.Bson 
import org.mongodb.scala._ 
import org.mongodb.scala.model._ 
import java.util.Calendar 
import scala.collection.JavaConverters._ 
import FileStreaming._ 
import akka.stream.Materializer 
import org.mongodb.scala.bson.{BsonArray, BsonBinary} 
 
import scala.concurrent._ 
import scala.concurrent.duration._ 
 
object MGOContext { 
 
  trait MGOCommands 
 
  object MGOCommands { 
 
    case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands 
 
    case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands 
 
  /*  org.mongodb.scala.FindObservable 
  import com.mongodb.async.client.FindIterable 
  val resultDocType = FindIterable[Document] 
  val resultOption = FindObservable(resultDocType) 
    .maxScan(...) 
  .limit(...) 
  .sort(...) 
  .project(...) */ 
    case class Find[M](filter: Option[Bson] = None, 
                    andThen: Option[FindObservable[Document] => FindObservable[Document]]= None, 
                    converter: Option[Document => M] = None, 
                    firstOnly: Boolean = false) extends MGOCommands 
 
    case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands 
 
    case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands 
 
    case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands 
 
    case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands 
 
    case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands 
 
    case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands 
 
    case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands 
 
  } 
 
  object MGOAdmins { 
 
    case class DropCollection(collName: String) extends MGOCommands 
 
    case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands 
 
    case class ListCollection(dbName: String) extends MGOCommands 
 
    case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands 
 
    case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands 
 
    case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands 
 
    case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands 
 
    case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands 
 
  } 
 
  case class MGOContext( 
                         dbName: String, 
                         collName: String, 
                         action: MGOCommands = null 
                       ) { 
    ctx => 
    def setDbName(name: String): MGOContext = ctx.copy(dbName = name) 
 
    def setCollName(name: String): MGOContext = ctx.copy(collName = name) 
 
    def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = cmd) 
  } 
 
  object MGOContext { 
    def apply(db: String, coll: String) = new MGOContext(db, coll) 
 
    def apply(db: String, coll: String, command: MGOCommands) = 
      new MGOContext(db, coll, command) 
 
  } 
 
  type MGODate = java.util.Date 
  def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = { 
    val ca = Calendar.getInstance() 
    ca.set(yyyy,mm,dd) 
    ca.getTime() 
  } 
  def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = { 
    val ca = Calendar.getInstance() 
    ca.set(yyyy,mm,dd,hr,min,sec) 
    ca.getTime() 
  } 
  def mgoDateTimeNow: MGODate = { 
    val ca = Calendar.getInstance() 
    ca.getTime 
  } 
 
 
  def mgoDateToString(dt: MGODate, formatString: String): String = { 
    val fmt= new SimpleDateFormat(formatString) 
    fmt.format(dt) 
  } 
 
  type MGOBlob = BsonBinary 
  type MGOArray = BsonArray 
 
  def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
    implicit mat: Materializer) = FileToByteArray(fileName,timeOut) 
 
  def mgoBlobToFile(blob: MGOBlob, fileName: String)( 
    implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName) 
 
  def mgoGetStringOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getString(fieldName)) 
    else None 
  } 
  def mgoGetIntOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getInteger(fieldName)) 
    else None 
  } 
  def mgoGetLonggOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getLong(fieldName)) 
    else None 
  } 
  def mgoGetDoubleOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getDouble(fieldName)) 
    else None 
  } 
  def mgoGetBoolOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getBoolean(fieldName)) 
    else None 
  } 
  def mgoGetDateOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      Some(doc.getDate(fieldName)) 
    else None 
  } 
  def mgoGetBlobOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      doc.get(fieldName).asInstanceOf[Option[MGOBlob]] 
    else None 
  } 
  def mgoGetArrayOrNone(doc: Document, fieldName: String) = { 
    if (doc.keySet.contains(fieldName)) 
      doc.get(fieldName).asInstanceOf[Option[MGOArray]] 
    else None 
  } 
 
  def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = { 
   (arr.getValues.asScala.toList) 
      .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]] 
  } 
 
  type MGOFilterResult = FindObservable[Document] => FindObservable[Document] 
} 
object MGOEngine { 
  import MGOContext._ 
  import MGOCommands._ 
  import MGOAdmins._ 
 
  def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = { 
    val db = client.getDatabase(ctx.dbName) 
    val coll = db.getCollection(ctx.collName) 
    ctx.action match { 
        /* count */ 
      case Count(Some(filter),Some(opt)) => 
        coll.count(filter,opt.asInstanceOf[CountOptions]) 
          .toFuture().asInstanceOf[Future[T]] 
      case Count(Some(filter),None) => 
        coll.count(filter).toFuture() 
          .asInstanceOf[Future[T]] 
      case Count(None,None) => 
        coll.count().toFuture() 
          .asInstanceOf[Future[T]] 
        /* distinct */ 
      case Distict(field,Some(filter)) => 
        coll.distinct(field,filter).toFuture() 
          .asInstanceOf[Future[T]] 
      case Distict(field,None) => 
        coll.distinct((field)).toFuture() 
          .asInstanceOf[Future[T]] 
      /* find */ 
      case Find(None,None,optConv,false) => 
        if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]] 
        else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]] 
      case Find(None,None,optConv,true) => 
        if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]] 
        else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]] 
      case Find(Some(filter),None,optConv,false) => 
        if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]] 
        else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]] 
      case Find(Some(filter),None,optConv,true) => 
        if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]] 
        else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]] 
      case Find(None,Some(next),optConv,_) => 
        if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]] 
        else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]] 
      case Find(Some(filter),Some(next),optConv,_) => 
        if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]] 
        else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]] 
        /* aggregate */ 
      case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]] 
        /* mapReduce */ 
      case MapReduce(mf,rf) => coll.mapReduce(mf,rf).toFuture().asInstanceOf[Future[T]] 
       /* insert */ 
      case Insert(docs,Some(opt)) => 
        if (docs.size > 1) coll.insertMany(docs,opt.asInstanceOf[InsertManyOptions]).toFuture() 
          .asInstanceOf[Future[T]] 
        else coll.insertOne(docs.head,opt.asInstanceOf[InsertOneOptions]).toFuture() 
          .asInstanceOf[Future[T]] 
      case Insert(docs,None) => 
        if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]] 
        else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]] 
        /* delete */ 
      case Delete(filter,None,onlyOne) => 
         if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]] 
         else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]] 
      case Delete(filter,Some(opt),onlyOne) => 
        if (onlyOne) coll.deleteOne(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]] 
        else coll.deleteMany(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]] 
        /* replace */ 
      case Replace(filter,replacement,None) => 
         coll.replaceOne(filter,replacement).toFuture().asInstanceOf[Future[T]] 
      case Replace(filter,replacement,Some(opt)) => 
        coll.replaceOne(filter,replacement,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] 
        /* update */ 
      case Update(filter,update,None,onlyOne) => 
        if (onlyOne) coll.updateOne(filter,update).toFuture().asInstanceOf[Future[T]] 
        else coll.updateMany(filter,update).toFuture().asInstanceOf[Future[T]] 
      case Update(filter,update,Some(opt),onlyOne) => 
        if (onlyOne) coll.updateOne(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] 
        else coll.updateMany(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] 
        /* bulkWrite */ 
      case BulkWrite(commands,None) => 
         coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]] 
      case BulkWrite(commands,Some(opt)) => 
        coll.bulkWrite(commands,opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]] 
 
        /* drop collection */ 
      case DropCollection(collName) => 
        val coll = db.getCollection(collName) 
        coll.drop().toFuture().asInstanceOf[Future[T]] 
        /* create collection */ 
      case CreateCollection(collName,None) => 
        db.createCollection(collName).toFuture().asInstanceOf[Future[T]] 
      case CreateCollection(collName,Some(opt)) => 
        db.createCollection(collName,opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]] 
        /* list collection */ 
      case ListCollection(dbName) => 
        client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]] 
        /* create view */ 
      case CreateView(viewName,viewOn,pline,None) => 
        db.createView(viewName,viewOn,pline).toFuture().asInstanceOf[Future[T]] 
      case CreateView(viewName,viewOn,pline,Some(opt)) => 
        db.createView(viewName,viewOn,pline,opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]] 
        /* create index */ 
      case CreateIndex(key,None) => 
        coll.createIndex(key).toFuture().asInstanceOf[Future[T]] 
      case CreateIndex(key,Some(opt)) => 
        coll.createIndex(key,opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]] 
        /* drop index */ 
      case DropIndexByName(indexName, None) => 
        coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]] 
      case DropIndexByName(indexName, Some(opt)) => 
        coll.dropIndex(indexName,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] 
      case DropIndexByKey(key,None) => 
        coll.dropIndex(key).toFuture().asInstanceOf[Future[T]] 
      case DropIndexByKey(key,Some(opt)) => 
        coll.dropIndex(key,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] 
      case DropAllIndexes(None) => 
        coll.dropIndexes().toFuture().asInstanceOf[Future[T]] 
      case DropAllIndexes(Some(opt)) => 
        coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] 
    } 
 
  } 
 
}

MongoEngineTest.scala

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import org.mongodb.scala._ 
import org.mongodb.scala.connection._ 
 
import scala.collection.JavaConverters._ 
import com.mongodb.client.model._ 
 
import scala.util._ 
 
object MongoEngineTest extends App { 
import MGOContext._ 
import MGOEngine._ 
import MGOHelpers._ 
import MGOCommands._ 
import MGOAdmins._ 
 
  val clusterSettings = ClusterSettings.builder() 
    .hosts(List(new ServerAddress("localhost:27017")).asJava).build() 
  val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build() 
  implicit val client = MongoClient(clientSettings) 
 
  implicit val system = ActorSystem() 
  implicit val mat = ActorMaterializer() 
  implicit val ec = system.dispatcher 
 
  val ctx = MGOContext("testdb","po").setCommand( 
    DropCollection("po")) 
  println(getResult(mgoExecute(ctx))) 
 
 
  val pic = fileToMGOBlob("/users/tiger/nobody.png") 
  val po1 = Document ( 
    "ponum" -> "po18012301", 
    "vendor" -> "The smartphone compay", 
    "podate" -> mgoDate(2017,5,13), 
    "remarks" -> "urgent, rush order", 
    "handler" -> pic, 
    "podtl" -> Seq( 
      Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"), 
      Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days") 
    ) 
  ) 
 
  val po2 = Document ( 
    "ponum" -> "po18022002", 
    "vendor" -> "The Samsung compay", 
    "podate" -> mgoDate(2015,11,6), 
    "podtl" -> Seq( 
      Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"), 
      Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"), 
      Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury") 
    ) 
  ) 
 
  val optInsert = new InsertManyOptions().ordered(true) 
  val ctxInsert = ctx.setCommand( 
    Insert(Seq(po1,po2),Some(optInsert)) 
  ) 
 // println(getResult(mgoExecute(ctxInsert))) 
 
  case class PO ( 
                  ponum: String, 
                  podate: MGODate, 
                  vendor: String, 
                  remarks: Option[String], 
                  podtl: Option[MGOArray], 
                  handler: Option[MGOBlob] 
                ) 
  def toPO(doc: Document): PO = { 
    PO( 
      ponum = doc.getString("ponum"), 
      podate = doc.getDate("podate"), 
      vendor = doc.getString("vendor"), 
      remarks = mgoGetStringOrNone(doc,"remarks"), 
      podtl = mgoGetArrayOrNone(doc,"podtl"), 
      handler = mgoGetBlobOrNone(doc,"handler") 
    ) 
  } 
 
  case class PODTL( 
                    item: String, 
                    price: Double, 
                    qty: Int, 
                    packing: Option[String], 
                    payTerm: Option[String] 
                  ) 
  def toPODTL(podtl: Document): PODTL = { 
    PODTL( 
      item = podtl.getString("item"), 
      price = podtl.getDouble("price"), 
      qty = podtl.getInteger("qty"), 
      packing = mgoGetStringOrNone(podtl,"packing"), 
      payTerm = mgoGetStringOrNone(podtl,"payterm") 
    ) 
  } 
 
  def showPO(po: PO) = { 
    println(s"po number: ${po.ponum}") 
    println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}") 
    println(s"vendor: ${po.vendor}") 
    if (po.remarks != None) 
      println(s"remarks: ${po.remarks.get}") 
    po.podtl match { 
      case Some(barr) => 
        mgoArrayToDocumentList(barr) 
         .map { dc => toPODTL(dc)} 
           .foreach { doc: PODTL => 
          print(s"==>Item: ${doc.item} ") 
          print(s"price: ${doc.price} ") 
          print(s"qty: ${doc.qty} ") 
          doc.packing.foreach(pk => print(s"packing: ${pk} ")) 
          doc.payTerm.foreach(pt => print(s"payTerm: ${pt} ")) 
          println("") 
        } 
      case _ => 
    } 
 
    po.handler match { 
      case Some(blob) => 
        val fileName = s"/users/tiger/${po.ponum}.png" 
        mgoBlobToFile(blob,fileName) 
        println(s"picture saved to ${fileName}") 
      case None => println("no picture provided") 
    } 
 
  } 
 
  import org.mongodb.scala.model.Projections._ 
  import org.mongodb.scala.model.Filters._ 
  import org.mongodb.scala.model.Sorts._ 
  val sort: MGOFilterResult = find => find.sort(descending("ponum")) 
  val proj: MGOFilterResult = find => find.projection(and(include("ponum","podate"),include("vendor"),excludeId())) 
  val ctxFind = ctx.setCommand(Find(andThen=Some(proj))) 
  val ctxFindFirst = ctx.setCommand(Find(firstOnly=true,converter = Some(toPO _))) 
  val ctxFindArrayItem = ctx.setCommand( 
    Find(filter = Some(equal("podtl.qty",100)), converter = Some(toPO _)) 
  ) 
 
  for { 
    _ <- mgoExecute[List[Document]](ctxFind).andThen { 
      case Success(docs) => docs.map(toPO).foreach(showPO) 
        println("-------------------------------") 
      case Failure(e) => println(e.getMessage) 
    } 
 
    _ <- mgoExecute[PO](ctxFindFirst).andThen { 
      case Success(doc) => showPO(doc) 
        println("-------------------------------") 
      case Failure(e) => println(e.getMessage) 
    } 
 
    _ <- mgoExecute[List[PO]](ctxFindArrayItem).andThen { 
      case Success(docs) => docs.foreach(showPO) 
        println("-------------------------------") 
      case Failure(e) => println(e.getMessage) 
    } 
  } yield() 
 
 
  scala.io.StdIn.readLine() 
 
 
  system.terminate() 
}

 

 

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/industrynews/12806.html

(0)
上一篇 2021年7月19日 14:55
下一篇 2021年7月19日 14:55

相关推荐

发表回复

登录后才能评论