SDP(11):MongoDB-Engine功能实现详解编程语言

  根据上篇关于MongoDB-Engine的功能设计方案,我们将在这篇讨论里进行功能实现和测试。下面是具体的功能实现代码:基本上是直接调用Mongo-scala的对应函数,需要注意的是java类型和scala类型之间的相互转换:

object MGOEngine { 
import MGOContext._ 
import MGOCommands._ 
import MGOAdmins._ 
def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = { 
val db = client.getDatabase(ctx.dbName) 
val coll = db.getCollection(ctx.collName) 
ctx.action match { 
/* count */ 
case Count(Some(filter),Some(opt)) => 
coll.count(filter,opt.asInstanceOf[CountOptions]) 
.toFuture().asInstanceOf[Future[T]] 
case Count(Some(filter),None) => 
coll.count(filter).toFuture() 
.asInstanceOf[Future[T]] 
case Count(None,None) => 
coll.count().toFuture() 
.asInstanceOf[Future[T]] 
/* distinct */ 
case Distict(field,Some(filter)) => 
coll.distinct(field,filter).toFuture() 
.asInstanceOf[Future[T]] 
case Distict(field,None) => 
coll.distinct((field)).toFuture() 
.asInstanceOf[Future[T]] 
/* find */ 
case Find(None,None,optConv,false) => 
if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]] 
else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]] 
case Find(None,None,optConv,true) => 
if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]] 
else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]] 
case Find(Some(filter),None,optConv,false) => 
if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]] 
else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]] 
case Find(Some(filter),None,optConv,true) => 
if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]] 
else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]] 
case Find(None,Some(next),optConv,_) => 
if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]] 
else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]] 
case Find(Some(filter),Some(next),optConv,_) => 
if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]] 
else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]] 
/* aggregate */ 
case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]] 
/* mapReduce */ 
case MapReduce(mf,rf) => coll.mapReduce(mf,rf).toFuture().asInstanceOf[Future[T]] 
/* insert */ 
case Insert(docs,Some(opt)) => 
if (docs.size > 1) coll.insertMany(docs,opt.asInstanceOf[InsertManyOptions]).toFuture() 
.asInstanceOf[Future[T]] 
else coll.insertOne(docs.head,opt.asInstanceOf[InsertOneOptions]).toFuture() 
.asInstanceOf[Future[T]] 
case Insert(docs,None) => 
if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]] 
else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]] 
/* delete */ 
case Delete(filter,None,onlyOne) => 
if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]] 
else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]] 
case Delete(filter,Some(opt),onlyOne) => 
if (onlyOne) coll.deleteOne(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]] 
else coll.deleteMany(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]] 
/* replace */ 
case Replace(filter,replacement,None) => 
coll.replaceOne(filter,replacement).toFuture().asInstanceOf[Future[T]] 
case Replace(filter,replacement,Some(opt)) => 
coll.replaceOne(filter,replacement,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] 
/* update */ 
case Update(filter,update,None,onlyOne) => 
if (onlyOne) coll.updateOne(filter,update).toFuture().asInstanceOf[Future[T]] 
else coll.updateMany(filter,update).toFuture().asInstanceOf[Future[T]] 
case Update(filter,update,Some(opt),onlyOne) => 
if (onlyOne) coll.updateOne(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] 
else coll.updateMany(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] 
/* bulkWrite */ 
case BulkWrite(commands,None) => 
coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]] 
case BulkWrite(commands,Some(opt)) => 
coll.bulkWrite(commands,opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]] 
/* drop collection */ 
case DropCollection(collName) => 
val coll = db.getCollection(collName) 
coll.drop().toFuture().asInstanceOf[Future[T]] 
/* create collection */ 
case CreateCollection(collName,None) => 
db.createCollection(collName).toFuture().asInstanceOf[Future[T]] 
case CreateCollection(collName,Some(opt)) => 
db.createCollection(collName,opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]] 
/* list collection */ 
case ListCollection(dbName) => 
client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]] 
/* create view */ 
case CreateView(viewName,viewOn,pline,None) => 
db.createView(viewName,viewOn,pline).toFuture().asInstanceOf[Future[T]] 
case CreateView(viewName,viewOn,pline,Some(opt)) => 
db.createView(viewName,viewOn,pline,opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]] 
/* create index */ 
case CreateIndex(key,None) => 
coll.createIndex(key).toFuture().asInstanceOf[Future[T]] 
case CreateIndex(key,Some(opt)) => 
coll.createIndex(key,opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]] 
/* drop index */ 
case DropIndexByName(indexName, None) => 
coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]] 
case DropIndexByName(indexName, Some(opt)) => 
coll.dropIndex(indexName,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] 
case DropIndexByKey(key,None) => 
coll.dropIndex(key).toFuture().asInstanceOf[Future[T]] 
case DropIndexByKey(key,Some(opt)) => 
coll.dropIndex(key,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] 
case DropAllIndexes(None) => 
coll.dropIndexes().toFuture().asInstanceOf[Future[T]] 
case DropAllIndexes(Some(opt)) => 
coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] 
} 
} 
}

注意:以上所有函数都返回Future[T]结果。下面我们来试运行这些函数,不过先关注一些细节:关于MongoDB的Date,Blob,Array等类型在scala中的使用方法:

 type MGODate = java.util.Date 
def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = { 
val ca = Calendar.getInstance() 
ca.set(yyyy,mm,dd) 
ca.getTime() 
} 
def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = { 
val ca = Calendar.getInstance() 
ca.set(yyyy,mm,dd,hr,min,sec) 
ca.getTime() 
} 
def mgoDateTimeNow: MGODate = { 
val ca = Calendar.getInstance() 
ca.getTime 
} 
def mgoDateToString(dt: MGODate, formatString: String): String = { 
val fmt= new SimpleDateFormat(formatString) 
fmt.format(dt) 
} 
type MGOBlob = BsonBinary 
type MGOArray = BsonArray 
def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
implicit mat: Materializer) = FileToByteArray(fileName,timeOut) 
def mgoBlobToFile(blob: MGOBlob, fileName: String)( 
implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)

然后就是MongoDB数据类型的读取帮助函数:

 def mgoGetStringOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
Some(doc.getString(fieldName)) 
else None 
} 
def mgoGetIntOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
Some(doc.getInteger(fieldName)) 
else None 
} 
def mgoGetLonggOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
Some(doc.getLong(fieldName)) 
else None 
} 
def mgoGetDoubleOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
Some(doc.getDouble(fieldName)) 
else None 
} 
def mgoGetBoolOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
Some(doc.getBoolean(fieldName)) 
else None 
} 
def mgoGetDateOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
Some(doc.getDate(fieldName)) 
else None 
} 
def mgoGetBlobOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
doc.get(fieldName).asInstanceOf[Option[MGOBlob]] 
else None 
} 
def mgoGetArrayOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
doc.get(fieldName).asInstanceOf[Option[MGOArray]] 
else None 
} 
def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = { 
(arr.getValues.asScala.toList) 
.asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]] 
} 
type MGOFilterResult = FindObservable[Document] => FindObservable[Document]

下面我们就开始设置试运行环境,从一个全新的collection开始:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import org.mongodb.scala._ 
import org.mongodb.scala.connection._ 
import scala.collection.JavaConverters._ 
import com.mongodb.client.model._ 
import scala.util._ 
object MongoEngineTest extends App { 
import MGOContext._ 
import MGOEngine._ 
import MGOHelpers._ 
import MGOCommands._ 
import MGOAdmins._ 
val clusterSettings = ClusterSettings.builder() 
.hosts(List(new ServerAddress("localhost:27017")).asJava).build() 
val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build() 
implicit val client = MongoClient(clientSettings) 
implicit val system = ActorSystem() 
implicit val mat = ActorMaterializer() 
implicit val ec = system.dispatcher 
val ctx = MGOContext("testdb","po").setCommand( 
DropCollection("po")) 
println(getResult(mgoExecute(ctx)))

测试运行了DropCollection指令。下面我们试着insert两个document:

  val pic = fileToMGOBlob("/users/tiger/nobody.png") 
val po1 = Document ( 
"ponum" -> "po18012301", 
"vendor" -> "The smartphone compay", 
"podate" -> mgoDate(2017,5,13), 
"remarks" -> "urgent, rush order", 
"handler" -> pic, 
"podtl" -> Seq( 
Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"), 
Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days") 
) 
) 
val po2 = Document ( 
"ponum" -> "po18022002", 
"vendor" -> "The Samsung compay", 
"podate" -> mgoDate(2015,11,6), 
"podtl" -> Seq( 
Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"), 
Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"), 
Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury") 
) 
) 
val optInsert = new InsertManyOptions().ordered(true) 
val ctxInsert = ctx.setCommand( 
Insert(Seq(po1,po2),Some(optInsert)) 
) println(getResult(mgoExecute(ctxInsert)))

注意InsertManyOptions的具体设定方式。 为了配合更方便准确的强类型操作,我们需要进行Document类型到具体应用类型之间的对应转换:

  case class PO ( 
ponum: String, 
podate: MGODate, 
vendor: String, 
remarks: Option[String], 
podtl: Option[MGOArray], 
handler: Option[MGOBlob] 
) 
def toPO(doc: Document): PO = { 
PO( 
ponum = doc.getString("ponum"), 
podate = doc.getDate("podate"), 
vendor = doc.getString("vendor"), 
remarks = mgoGetStringOrNone(doc,"remarks"), 
podtl = mgoGetArrayOrNone(doc,"podtl"), 
handler = mgoGetBlobOrNone(doc,"handler") 
) 
} 
case class PODTL( 
item: String, 
price: Double, 
qty: Int, 
packing: Option[String], 
payTerm: Option[String] 
) 
def toPODTL(podtl: Document): PODTL = { 
PODTL( 
item = podtl.getString("item"), 
price = podtl.getDouble("price"), 
qty = podtl.getInteger("qty"), 
packing = mgoGetStringOrNone(podtl,"packing"), 
payTerm = mgoGetStringOrNone(podtl,"payterm") 
) 
} 
def showPO(po: PO) = { 
println(s"po number: ${po.ponum}") 
println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}") 
println(s"vendor: ${po.vendor}") 
if (po.remarks != None) 
println(s"remarks: ${po.remarks.get}") 
po.podtl match { 
case Some(barr) => 
mgoArrayToDocumentList(barr) 
.map { dc => toPODTL(dc)} 
.foreach { doc: PODTL => 
print(s"==>Item: ${doc.item} ") 
print(s"price: ${doc.price} ") 
print(s"qty: ${doc.qty} ") 
doc.packing.foreach(pk => print(s"packing: ${pk} ")) 
doc.payTerm.foreach(pt => print(s"payTerm: ${pt} ")) 
println("") 
} 
case _ => 
} 
po.handler match { 
case Some(blob) => 
val fileName = s"/users/tiger/${po.ponum}.png" 
mgoBlobToFile(blob,fileName) 
println(s"picture saved to ${fileName}") 
case None => println("no picture provided") 
} 
}

在上面的代码里我们使用了前面提供的MongoDB数据类型读取帮助函数。下面我们测试对poCollection中的Document进行查询,示范包括projection,sort,filter等:

  import org.mongodb.scala.model.Projections._ 
import org.mongodb.scala.model.Filters._ 
import org.mongodb.scala.model.Sorts._ 
val sort: MGOFilterResult = find => find.sort(descending("ponum")) 
val proj: MGOFilterResult = find => find.projection(and(include("ponum","podate"),include("vendor"),excludeId())) 
val ctxFind = ctx.setCommand(Find(andThen=Some(proj))) 
val ctxFindFirst = ctx.setCommand(Find(firstOnly=true,converter = Some(toPO _))) 
val ctxFindArrayItem = ctx.setCommand( 
Find(filter = Some(equal("podtl.qty",100)), converter = Some(toPO _)) 
) 
for { 
_ <- mgoExecute[List[Document]](ctxFind).andThen { 
case Success(docs) => docs.map(toPO).foreach(showPO) 
println("-------------------------------") 
case Failure(e) => println(e.getMessage) 
} 
_ <- mgoExecute[PO](ctxFindFirst).andThen { 
case Success(doc) => showPO(doc) 
println("-------------------------------") 
case Failure(e) => println(e.getMessage) 
} 
_ <- mgoExecute[List[PO]](ctxFindArrayItem).andThen { 
case Success(docs) => docs.foreach(showPO) 
println("-------------------------------") 
case Failure(e) => println(e.getMessage) 
} 
} yield()

因为mgoExecute返回Future结果,所以我们可以用for-comprehension对几个运算进行串联运行。

下面是这次示范的源代码:

build.sbt

name := "learn-mongo" 
version := "0.1" 
scalaVersion := "2.12.4" 
libraryDependencies := Seq( 
"org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1", 
"com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.17" 
)

MGOHelpers.scala

import org.mongodb.scala._ 
import scala.concurrent._ 
import scala.concurrent.duration._ 
object MGOHelpers { 
implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] { 
override val converter: (Document) => String = (doc) => doc.toJson 
} 
implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] { 
override val converter: (C) => String = (doc) => doc.toString 
} 
trait ImplicitObservable[C] { 
val observable: Observable[C] 
val converter: (C) => String 
def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds) 
def headResult() = Await.result(observable.head(), 10 seconds) 
def printResults(initial: String = ""): Unit = { 
if (initial.length > 0) print(initial) 
results().foreach(res => println(converter(res))) 
} 
def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}") 
} 
def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = { 
Await.result(fut,timeOut) 
} 
def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = { 
Await.result(fut,timeOut) 
} 
}

FileStreaming.scala

import java.io.{InputStream, ByteArrayInputStream} 
import java.nio.ByteBuffer 
import java.nio.file.Paths 
import akka.stream.{Materializer} 
import akka.stream.scaladsl.{FileIO, StreamConverters} 
import scala.concurrent.{Await} 
import akka.util._ 
import scala.concurrent.duration._ 
object FileStreaming { 
def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
implicit mat: Materializer):ByteBuffer = { 
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
hd ++ bs 
} 
(Await.result(fut, timeOut)).toByteBuffer 
} 
def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
implicit mat: Materializer): Array[Byte] = { 
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
hd ++ bs 
} 
(Await.result(fut, timeOut)).toArray 
} 
def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
implicit mat: Materializer): InputStream = { 
val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
hd ++ bs 
} 
val buf = (Await.result(fut, timeOut)).toArray 
new ByteArrayInputStream(buf) 
} 
def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)( 
implicit mat: Materializer) = { 
val ba = new Array[Byte](byteBuf.remaining()) 
byteBuf.get(ba,0,ba.length) 
val baInput = new ByteArrayInputStream(ba) 
val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes)) 
source.runWith(FileIO.toPath(Paths.get(fileName))) 
} 
def ByteArrayToFile(bytes: Array[Byte], fileName: String)( 
implicit mat: Materializer) = { 
val bb = ByteBuffer.wrap(bytes) 
val baInput = new ByteArrayInputStream(bytes) 
val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) 
source.runWith(FileIO.toPath(Paths.get(fileName))) 
} 
def InputStreamToFile(is: InputStream, fileName: String)( 
implicit mat: Materializer) = { 
val source = StreamConverters.fromInputStream(() => is) 
source.runWith(FileIO.toPath(Paths.get(fileName))) 
} 
}

MongoEngine.scala

import java.text.SimpleDateFormat 
import org.bson.conversions.Bson 
import org.mongodb.scala._ 
import org.mongodb.scala.model._ 
import java.util.Calendar 
import scala.collection.JavaConverters._ 
import FileStreaming._ 
import akka.stream.Materializer 
import org.mongodb.scala.bson.{BsonArray, BsonBinary} 
import scala.concurrent._ 
import scala.concurrent.duration._ 
object MGOContext { 
trait MGOCommands 
object MGOCommands { 
case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands 
case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands 
/*  org.mongodb.scala.FindObservable 
import com.mongodb.async.client.FindIterable 
val resultDocType = FindIterable[Document] 
val resultOption = FindObservable(resultDocType) 
.maxScan(...) 
.limit(...) 
.sort(...) 
.project(...) */ 
case class Find[M](filter: Option[Bson] = None, 
andThen: Option[FindObservable[Document] => FindObservable[Document]]= None, 
converter: Option[Document => M] = None, 
firstOnly: Boolean = false) extends MGOCommands 
case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands 
case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands 
case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands 
case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands 
case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands 
case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands 
case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands 
} 
object MGOAdmins { 
case class DropCollection(collName: String) extends MGOCommands 
case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands 
case class ListCollection(dbName: String) extends MGOCommands 
case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands 
case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands 
case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands 
case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands 
case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands 
} 
case class MGOContext( 
dbName: String, 
collName: String, 
action: MGOCommands = null 
) { 
ctx => 
def setDbName(name: String): MGOContext = ctx.copy(dbName = name) 
def setCollName(name: String): MGOContext = ctx.copy(collName = name) 
def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = cmd) 
} 
object MGOContext { 
def apply(db: String, coll: String) = new MGOContext(db, coll) 
def apply(db: String, coll: String, command: MGOCommands) = 
new MGOContext(db, coll, command) 
} 
type MGODate = java.util.Date 
def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = { 
val ca = Calendar.getInstance() 
ca.set(yyyy,mm,dd) 
ca.getTime() 
} 
def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = { 
val ca = Calendar.getInstance() 
ca.set(yyyy,mm,dd,hr,min,sec) 
ca.getTime() 
} 
def mgoDateTimeNow: MGODate = { 
val ca = Calendar.getInstance() 
ca.getTime 
} 
def mgoDateToString(dt: MGODate, formatString: String): String = { 
val fmt= new SimpleDateFormat(formatString) 
fmt.format(dt) 
} 
type MGOBlob = BsonBinary 
type MGOArray = BsonArray 
def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)( 
implicit mat: Materializer) = FileToByteArray(fileName,timeOut) 
def mgoBlobToFile(blob: MGOBlob, fileName: String)( 
implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName) 
def mgoGetStringOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
Some(doc.getString(fieldName)) 
else None 
} 
def mgoGetIntOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
Some(doc.getInteger(fieldName)) 
else None 
} 
def mgoGetLonggOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
Some(doc.getLong(fieldName)) 
else None 
} 
def mgoGetDoubleOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
Some(doc.getDouble(fieldName)) 
else None 
} 
def mgoGetBoolOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
Some(doc.getBoolean(fieldName)) 
else None 
} 
def mgoGetDateOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
Some(doc.getDate(fieldName)) 
else None 
} 
def mgoGetBlobOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
doc.get(fieldName).asInstanceOf[Option[MGOBlob]] 
else None 
} 
def mgoGetArrayOrNone(doc: Document, fieldName: String) = { 
if (doc.keySet.contains(fieldName)) 
doc.get(fieldName).asInstanceOf[Option[MGOArray]] 
else None 
} 
def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = { 
(arr.getValues.asScala.toList) 
.asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]] 
} 
type MGOFilterResult = FindObservable[Document] => FindObservable[Document] 
} 
object MGOEngine { 
import MGOContext._ 
import MGOCommands._ 
import MGOAdmins._ 
def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = { 
val db = client.getDatabase(ctx.dbName) 
val coll = db.getCollection(ctx.collName) 
ctx.action match { 
/* count */ 
case Count(Some(filter),Some(opt)) => 
coll.count(filter,opt.asInstanceOf[CountOptions]) 
.toFuture().asInstanceOf[Future[T]] 
case Count(Some(filter),None) => 
coll.count(filter).toFuture() 
.asInstanceOf[Future[T]] 
case Count(None,None) => 
coll.count().toFuture() 
.asInstanceOf[Future[T]] 
/* distinct */ 
case Distict(field,Some(filter)) => 
coll.distinct(field,filter).toFuture() 
.asInstanceOf[Future[T]] 
case Distict(field,None) => 
coll.distinct((field)).toFuture() 
.asInstanceOf[Future[T]] 
/* find */ 
case Find(None,None,optConv,false) => 
if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]] 
else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]] 
case Find(None,None,optConv,true) => 
if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]] 
else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]] 
case Find(Some(filter),None,optConv,false) => 
if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]] 
else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]] 
case Find(Some(filter),None,optConv,true) => 
if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]] 
else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]] 
case Find(None,Some(next),optConv,_) => 
if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]] 
else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]] 
case Find(Some(filter),Some(next),optConv,_) => 
if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]] 
else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]] 
/* aggregate */ 
case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]] 
/* mapReduce */ 
case MapReduce(mf,rf) => coll.mapReduce(mf,rf).toFuture().asInstanceOf[Future[T]] 
/* insert */ 
case Insert(docs,Some(opt)) => 
if (docs.size > 1) coll.insertMany(docs,opt.asInstanceOf[InsertManyOptions]).toFuture() 
.asInstanceOf[Future[T]] 
else coll.insertOne(docs.head,opt.asInstanceOf[InsertOneOptions]).toFuture() 
.asInstanceOf[Future[T]] 
case Insert(docs,None) => 
if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]] 
else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]] 
/* delete */ 
case Delete(filter,None,onlyOne) => 
if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]] 
else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]] 
case Delete(filter,Some(opt),onlyOne) => 
if (onlyOne) coll.deleteOne(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]] 
else coll.deleteMany(filter,opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]] 
/* replace */ 
case Replace(filter,replacement,None) => 
coll.replaceOne(filter,replacement).toFuture().asInstanceOf[Future[T]] 
case Replace(filter,replacement,Some(opt)) => 
coll.replaceOne(filter,replacement,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] 
/* update */ 
case Update(filter,update,None,onlyOne) => 
if (onlyOne) coll.updateOne(filter,update).toFuture().asInstanceOf[Future[T]] 
else coll.updateMany(filter,update).toFuture().asInstanceOf[Future[T]] 
case Update(filter,update,Some(opt),onlyOne) => 
if (onlyOne) coll.updateOne(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] 
else coll.updateMany(filter,update,opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] 
/* bulkWrite */ 
case BulkWrite(commands,None) => 
coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]] 
case BulkWrite(commands,Some(opt)) => 
coll.bulkWrite(commands,opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]] 
/* drop collection */ 
case DropCollection(collName) => 
val coll = db.getCollection(collName) 
coll.drop().toFuture().asInstanceOf[Future[T]] 
/* create collection */ 
case CreateCollection(collName,None) => 
db.createCollection(collName).toFuture().asInstanceOf[Future[T]] 
case CreateCollection(collName,Some(opt)) => 
db.createCollection(collName,opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]] 
/* list collection */ 
case ListCollection(dbName) => 
client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]] 
/* create view */ 
case CreateView(viewName,viewOn,pline,None) => 
db.createView(viewName,viewOn,pline).toFuture().asInstanceOf[Future[T]] 
case CreateView(viewName,viewOn,pline,Some(opt)) => 
db.createView(viewName,viewOn,pline,opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]] 
/* create index */ 
case CreateIndex(key,None) => 
coll.createIndex(key).toFuture().asInstanceOf[Future[T]] 
case CreateIndex(key,Some(opt)) => 
coll.createIndex(key,opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]] 
/* drop index */ 
case DropIndexByName(indexName, None) => 
coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]] 
case DropIndexByName(indexName, Some(opt)) => 
coll.dropIndex(indexName,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] 
case DropIndexByKey(key,None) => 
coll.dropIndex(key).toFuture().asInstanceOf[Future[T]] 
case DropIndexByKey(key,Some(opt)) => 
coll.dropIndex(key,opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] 
case DropAllIndexes(None) => 
coll.dropIndexes().toFuture().asInstanceOf[Future[T]] 
case DropAllIndexes(Some(opt)) => 
coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] 
} 
} 
}

MongoEngineTest.scala

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import org.mongodb.scala._ 
import org.mongodb.scala.connection._ 
import scala.collection.JavaConverters._ 
import com.mongodb.client.model._ 
import scala.util._ 
object MongoEngineTest extends App { 
import MGOContext._ 
import MGOEngine._ 
import MGOHelpers._ 
import MGOCommands._ 
import MGOAdmins._ 
val clusterSettings = ClusterSettings.builder() 
.hosts(List(new ServerAddress("localhost:27017")).asJava).build() 
val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build() 
implicit val client = MongoClient(clientSettings) 
implicit val system = ActorSystem() 
implicit val mat = ActorMaterializer() 
implicit val ec = system.dispatcher 
val ctx = MGOContext("testdb","po").setCommand( 
DropCollection("po")) 
println(getResult(mgoExecute(ctx))) 
val pic = fileToMGOBlob("/users/tiger/nobody.png") 
val po1 = Document ( 
"ponum" -> "po18012301", 
"vendor" -> "The smartphone compay", 
"podate" -> mgoDate(2017,5,13), 
"remarks" -> "urgent, rush order", 
"handler" -> pic, 
"podtl" -> Seq( 
Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"), 
Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days") 
) 
) 
val po2 = Document ( 
"ponum" -> "po18022002", 
"vendor" -> "The Samsung compay", 
"podate" -> mgoDate(2015,11,6), 
"podtl" -> Seq( 
Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"), 
Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"), 
Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury") 
) 
) 
val optInsert = new InsertManyOptions().ordered(true) 
val ctxInsert = ctx.setCommand( 
Insert(Seq(po1,po2),Some(optInsert)) 
) 
// println(getResult(mgoExecute(ctxInsert))) 
case class PO ( 
ponum: String, 
podate: MGODate, 
vendor: String, 
remarks: Option[String], 
podtl: Option[MGOArray], 
handler: Option[MGOBlob] 
) 
def toPO(doc: Document): PO = { 
PO( 
ponum = doc.getString("ponum"), 
podate = doc.getDate("podate"), 
vendor = doc.getString("vendor"), 
remarks = mgoGetStringOrNone(doc,"remarks"), 
podtl = mgoGetArrayOrNone(doc,"podtl"), 
handler = mgoGetBlobOrNone(doc,"handler") 
) 
} 
case class PODTL( 
item: String, 
price: Double, 
qty: Int, 
packing: Option[String], 
payTerm: Option[String] 
) 
def toPODTL(podtl: Document): PODTL = { 
PODTL( 
item = podtl.getString("item"), 
price = podtl.getDouble("price"), 
qty = podtl.getInteger("qty"), 
packing = mgoGetStringOrNone(podtl,"packing"), 
payTerm = mgoGetStringOrNone(podtl,"payterm") 
) 
} 
def showPO(po: PO) = { 
println(s"po number: ${po.ponum}") 
println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}") 
println(s"vendor: ${po.vendor}") 
if (po.remarks != None) 
println(s"remarks: ${po.remarks.get}") 
po.podtl match { 
case Some(barr) => 
mgoArrayToDocumentList(barr) 
.map { dc => toPODTL(dc)} 
.foreach { doc: PODTL => 
print(s"==>Item: ${doc.item} ") 
print(s"price: ${doc.price} ") 
print(s"qty: ${doc.qty} ") 
doc.packing.foreach(pk => print(s"packing: ${pk} ")) 
doc.payTerm.foreach(pt => print(s"payTerm: ${pt} ")) 
println("") 
} 
case _ => 
} 
po.handler match { 
case Some(blob) => 
val fileName = s"/users/tiger/${po.ponum}.png" 
mgoBlobToFile(blob,fileName) 
println(s"picture saved to ${fileName}") 
case None => println("no picture provided") 
} 
} 
import org.mongodb.scala.model.Projections._ 
import org.mongodb.scala.model.Filters._ 
import org.mongodb.scala.model.Sorts._ 
val sort: MGOFilterResult = find => find.sort(descending("ponum")) 
val proj: MGOFilterResult = find => find.projection(and(include("ponum","podate"),include("vendor"),excludeId())) 
val ctxFind = ctx.setCommand(Find(andThen=Some(proj))) 
val ctxFindFirst = ctx.setCommand(Find(firstOnly=true,converter = Some(toPO _))) 
val ctxFindArrayItem = ctx.setCommand( 
Find(filter = Some(equal("podtl.qty",100)), converter = Some(toPO _)) 
) 
for { 
_ <- mgoExecute[List[Document]](ctxFind).andThen { 
case Success(docs) => docs.map(toPO).foreach(showPO) 
println("-------------------------------") 
case Failure(e) => println(e.getMessage) 
} 
_ <- mgoExecute[PO](ctxFindFirst).andThen { 
case Success(doc) => showPO(doc) 
println("-------------------------------") 
case Failure(e) => println(e.getMessage) 
} 
_ <- mgoExecute[List[PO]](ctxFindArrayItem).andThen { 
case Success(docs) => docs.foreach(showPO) 
println("-------------------------------") 
case Failure(e) => println(e.getMessage) 
} 
} yield() 
scala.io.StdIn.readLine() 
system.terminate() 
}

 

 

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/12806.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论