SDP(9):MongoDB-Scala – data access and modeling详解编程语言

    MongoDB是一种文件型数据库,对数据格式没有硬性要求,所以可以实现灵活多变的数据存储和读取。MongoDB又是一种分布式数据库,与传统关系数据库不同的是,分布式数据库不支持table-join,所以在设计数据库表结构方面与关系数据库有很大的不同。分布式数据库有一套与传统观念不同的数据模式,在设计库表结构时必须从满足各种数据抽取的需要为主要目的。关系数据库设计要求遵循范式模式(normalization)库表结构,在抽取数据时再通过table-join联结关系表。因为分布式数据库不支持table-join,在读取跨表数据时就需要多次抽取,影响数据处理的效率。MongoDB作为文件型数据库最大的特点就是容许嵌入Document:我们可以把相关联的Document嵌入在另一个关联Document中,这样就可以一次性读取全部数据,实现反范式(denormalization)的数据模式了。这方面MongoDB比Cassandra更加优胜。MongoDB支持灵活多样的索引方式,使它成为提供高效数据读取的分布式数据库最佳选择。另外,MongoDB还通过提供sort、aggregation、map-reduce来支持丰富强大的大数据统计功能。

   在使用MongoDB前我们必须熟悉它的数据模式和设计理念:在大数据时代的今天,数据的产生和使用发生了质的变化,传统关系数据库数据模式已经无法满足现代信息系统的要求。比如,在设计个人信息表时要考虑有些人有两个地址,有些甚至没有地址,又有些有传真号,还有这个那个的其它特点等等。在关系数据库模式设计中我们必须作出取舍,牺牲一些属性。但MongoDB的文件类数据库特点容许不同的数据格式,能实现完整的数据采集与储存。下面是一个采购单的Document设计:

  val po1 = Document ( 
    "ponum" -> "po18012301", 
    "vendor" -> "The smartphone compay", 
    "podate" -> podate1, 
    "remarks" -> "urgent, rush order", 
    "handler" -> pic, 
    "podtl" -> Seq( 
      Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"), 
      Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days") 
    ) 
  ) 
 
  val po2 = Document ( 
    "ponum" -> "po18022002", 
    "vendor" -> "The Samsung compay", 
    "podate" -> podate2, 
    "podtl" -> Seq( 
      Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"), 
      Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"), 
      Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury") 
    ) 
  )

po1和po2都在podtl键嵌入了多条采购项目Document。首先,po1与po2有结构上的不同:po1多出了remarks、handler这两个键。嵌入的Document各自也有不同的结构。在这个例子里我特别加了date、binary、array类型的使用示范:

  val ca = Calendar.getInstance() 
  ca.set(2011,10,23) 
  val podate1 = ca.getTime 
  ca.set(2012,12,23) 
  val podate2 = ca.getTime 
 
  val pic = FileToByteArray("/users/tiger-macpro/sample.png",3 seconds)

MongoDB的Date是java.util.Date,可以用Calendar来操作。再看看下面类型转换中的数据类型对应: 

 

  case class PO ( 
                 ponum: String, 
                 podate: java.util.Date, 
                 vendor: String, 
                 remarks: Option[String], 
                 podtl: Option[BsonArray], 
                 handler: Option[BsonBinary] 
                 ) 
  def toPO(doc: Document): PO = { 
      val ks = doc.keySet 
      PO( 
        ponum = doc.getString("ponum"), 
        podate = doc.getDate("podate"), 
        vendor = doc.getString("vendor"), 
        remarks = { 
          if (ks.contains("remarks")) 
            Some(doc.getString("remarks")) 
          else 
            None 
        }, 
        podtl = { 
          if (ks.contains("podtl")) 
            doc.get("podtl").asInstanceOf[Option[BsonArray]] 
          else 
            None 
        }, 
        handler = { 
          if (ks.contains("handler")) 
            doc.get("handler").asInstanceOf[Option[BsonBinary]] 
          else 
            None 
        } 
      ) 
    } 
 
   case class PODTL( 
                   item: String, 
                   price: Double, 
                   qty: Int, 
                   packing: Option[String], 
                   payTerm: Option[String] 
                   ) 
   def toPODTL(podtl: Document): PODTL = { 
     val ks = podtl.keySet 
     PODTL( 
       item = podtl.getString("item"), 
       price = podtl.getDouble("price"), 
       qty = podtl.getInteger("qty"), 
       packing = { 
         if (ks.contains("packing")) 
           Some(podtl.getString("packing")) 
         else None 
       }, 
       payTerm = { 
         if(ks.contains("payterm")) 
           Some(podtl.getString("payterm")) 
         else None 
       } 
     ) 
   }

注意BsonBinary和BsonArray这两个类型和它们的使用方法。我们可以用嵌入Document的键作为查询条件:

   poCollection.find(equal("podtl.qty",100)).toFuture().onComplete { 
    case Success(docs) => docs.map(toPO).foreach (showPO) 
      println("-------------------------------") 
    case Failure(e) => println(e.getMessage) 
  }

我们可以用toPO和toPODTL把po,podtl对应到case class,然后用强类型方式来使用它们:

   def showPO(po: PO) = { 
     println(s"po number: ${po.ponum}") 
     println(s"po date: ${po.podate.toString}") 
     println(s"vendor: ${po.vendor}") 
     if (po.remarks != None) 
       println(s"remarks: ${po.remarks.get}") 
     po.podtl match { 
       case Some(barr) => 
         val docs = barr.getValues.asScala.toList 
         docs.map { dc => 
           toPODTL(dc.asInstanceOf[org.bson.BsonDocument]) 
         }.foreach { doc: PODTL => 
             print(s"==>Item: ${doc.item} ") 
             print(s"price: ${doc.price} ") 
             print(s"qty: ${doc.qty} ") 
             doc.packing.foreach(pk => print(s"packing: ${pk} ")) 
             doc.payTerm.foreach(pt => print(s"payTerm: ${pt} ")) 
             println("") 
           } 
       case _ => 
     } 
 
     po.handler match { 
       case Some(bs) => 
         val fileName = s"/users/tiger-macpro/${po.ponum}.png" 
         ByteArrayToFile(bs.getData,fileName) 
         println(s"picture saved to ${fileName}") 
       case None => println("no picture provided") 
     } 
   } 
   poCollection.find(equal("podtl.qty",100)).toFuture().onComplete { 
     case Success(docs) => docs.map(toPO).foreach (showPO) 
       println("------------------------------") 
     case Failure(e) => println(e.getMessage) 
   } 
   poCollection.find().toFuture().onComplete { 
    case Success(docs) => docs.map(toPO).foreach (showPO) 
      println("-------------------------------") 
    case Failure(e) => println(e.getMessage) 
  }

试运行显示结果如下:

po number: po18022002 
po date: Wed Jan 23 11:57:50 HKT 2013 
vendor: The Samsung compay 
==>Item: samsung galaxy s8 price: 2300.0 qty: 100 packing: standard  
==>Item: samsung galaxy s7 price: 1897.0 qty: 1000 payTerm: 30 days  
==>Item: apple iphone7 price: 6500.0 qty: 100 packing: luxury  
no picture provided 
------------------------------- 
po number: po18012301 
po date: Wed Nov 23 11:57:50 HKT 2011 
vendor: The smartphone compay 
remarks: urgent, rush order 
==>Item: sony smartphone price: 2389.0 qty: 1239 packing: standard  
==>Item: ericson smartphone price: 897.0 qty: 1000 payTerm: 30 days  
picture saved to /users/tiger-macpro/po18012301.png 
po number: po18022002 
po date: Wed Jan 23 11:57:50 HKT 2013 
vendor: The Samsung compay 
==>Item: samsung galaxy s8 price: 2300.0 qty: 100 packing: standard  
==>Item: samsung galaxy s7 price: 1897.0 qty: 1000 payTerm: 30 days  
==>Item: apple iphone7 price: 6500.0 qty: 100 packing: luxury  
no picture provided 
------------------------------

下面是本次示范的源代码:

build.sbt

name := "learn-mongo" 
 
version := "0.1" 
 
scalaVersion := "2.12.4" 
 
libraryDependencies := Seq( 
  "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1", 
  "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.17" 
)

FileStreaming.scala

import java.nio.file.Paths 
 
import akka.stream.{Materializer} 
import akka.stream.scaladsl.{FileIO, StreamConverters} 
 
import scala.concurrent.{Await} 
import akka.util._ 
import scala.concurrent.duration._ 
 
object FileStreaming { 
  def FileToByteBuffer(fileName: String, timeOut: FiniteDuration)( 
    implicit mat: Materializer):ByteBuffer = { 
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
      hd ++ bs 
    } 
    (Await.result(fut, timeOut)).toByteBuffer 
  } 
 
  def FileToByteArray(fileName: String, timeOut: FiniteDuration)( 
    implicit mat: Materializer): Array[Byte] = { 
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
      hd ++ bs 
    } 
    (Await.result(fut, timeOut)).toArray 
  } 
 
  def FileToInputStream(fileName: String, timeOut: FiniteDuration)( 
    implicit mat: Materializer): InputStream = { 
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => 
      hd ++ bs 
    } 
    val buf = (Await.result(fut, timeOut)).toArray 
    new ByteArrayInputStream(buf) 
  } 
 
  def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)( 
    implicit mat: Materializer) = { 
    val ba = new Array[Byte](byteBuf.remaining()) 
    byteBuf.get(ba,0,ba.length) 
    val baInput = new ByteArrayInputStream(ba) 
    val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes)) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
 
  def ByteArrayToFile(bytes: Array[Byte], fileName: String)( 
    implicit mat: Materializer) = { 
    val bb = ByteBuffer.wrap(bytes) 
    val baInput = new ByteArrayInputStream(bytes) 
    val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
 
  def InputStreamToFile(is: InputStream, fileName: String)( 
    implicit mat: Materializer) = { 
    val source = StreamConverters.fromInputStream(() => is) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
}

MongoScala103.scala

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import java.util.Calendar 
 
import org.bson.BsonBinary 
 
import scala.util._ 
import FileStreaming._ 
 
import scala.concurrent.duration._ 
import org.mongodb.scala._ 
import org.mongodb.scala.bson.{BsonArray, BsonDocument} 
 
import scala.collection.JavaConverters._ 
 
import org.mongodb.scala.connection.ClusterSettings 
import org.mongodb.scala.model.Filters._ 
object MongoScala103 extends App { 
  import Helpers._ 
 
  val clusterSettings = ClusterSettings.builder() 
    .hosts(List(new ServerAddress("localhost:27017")).asJava).build() 
  val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build() 
  val client = MongoClient(clientSettings) 
 
  implicit val system = ActorSystem() 
  implicit val mat = ActorMaterializer() 
  implicit val ec = system.dispatcher 
 
 
  val db: MongoDatabase = client.getDatabase("testdb") 
  val poOrgCollection: MongoCollection[Document] = db.getCollection("po") 
  poOrgCollection.drop.headResult() 
  val poCollection: MongoCollection[Document] = db.getCollection("po") 
 
 
  val ca = Calendar.getInstance() 
  ca.set(2011,10,23) 
  val podate1 = ca.getTime 
  ca.set(2012,12,23) 
  val podate2 = ca.getTime 
 
  val pic = FileToByteArray("/users/tiger-macpro/sample.png",3 seconds) 
 
  val po1 = Document ( 
    "ponum" -> "po18012301", 
    "vendor" -> "The smartphone compay", 
    "podate" -> podate1, 
    "remarks" -> "urgent, rush order", 
    "handler" -> pic, 
    "podtl" -> Seq( 
      Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"), 
      Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days") 
    ) 
  ) 
 
  val po2 = Document ( 
    "ponum" -> "po18022002", 
    "vendor" -> "The Samsung compay", 
    "podate" -> podate2, 
    "podtl" -> Seq( 
      Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"), 
      Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"), 
      Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury") 
    ) 
  ) 
 
 
  poCollection.insertMany(Seq(po1,po2)).headResult() 
 
  case class PO ( 
                 ponum: String, 
                 podate: java.util.Date, 
                 vendor: String, 
                 remarks: Option[String], 
                 podtl: Option[BsonArray], 
                 handler: Option[BsonBinary] 
                 ) 
  def toPO(doc: Document): PO = { 
      val ks = doc.keySet 
      PO( 
        ponum = doc.getString("ponum"), 
        podate = doc.getDate("podate"), 
        vendor = doc.getString("vendor"), 
        remarks = { 
          if (ks.contains("remarks")) 
            Some(doc.getString("remarks")) 
          else 
            None 
        }, 
        podtl = { 
          if (ks.contains("podtl")) 
            doc.get("podtl").asInstanceOf[Option[BsonArray]] 
          else 
            None 
        }, 
        handler = { 
          if (ks.contains("handler")) 
            doc.get("handler").asInstanceOf[Option[BsonBinary]] 
          else 
            None 
        } 
      ) 
    } 
 
   case class PODTL( 
                   item: String, 
                   price: Double, 
                   qty: Int, 
                   packing: Option[String], 
                   payTerm: Option[String] 
                   ) 
   def toPODTL(podtl: Document): PODTL = { 
     val ks = podtl.keySet 
     PODTL( 
       item = podtl.getString("item"), 
       price = podtl.getDouble("price"), 
       qty = podtl.getInteger("qty"), 
       packing = { 
         if (ks.contains("packing")) 
           Some(podtl.getString("packing")) 
         else None 
       }, 
       payTerm = { 
         if(ks.contains("payterm")) 
           Some(podtl.getString("payterm")) 
         else None 
       } 
     ) 
   } 
 
   def showPO(po: PO) = { 
     println(s"po number: ${po.ponum}") 
     println(s"po date: ${po.podate.toString}") 
     println(s"vendor: ${po.vendor}") 
     if (po.remarks != None) 
       println(s"remarks: ${po.remarks.get}") 
     po.podtl match { 
       case Some(barr) => 
         val docs = barr.getValues.asScala.toList 
         docs.map { dc => 
           toPODTL(dc.asInstanceOf[org.bson.BsonDocument]) 
         }.foreach { doc: PODTL => 
             print(s"==>Item: ${doc.item} ") 
             print(s"price: ${doc.price} ") 
             print(s"qty: ${doc.qty} ") 
             doc.packing.foreach(pk => print(s"packing: ${pk} ")) 
             doc.payTerm.foreach(pt => print(s"payTerm: ${pt} ")) 
             println("") 
           } 
       case _ => 
     } 
 
     po.handler match { 
       case Some(bs) => 
         val fileName = s"/users/tiger-macpro/${po.ponum}.png" 
         ByteArrayToFile(bs.getData,fileName) 
         println(s"picture saved to ${fileName}") 
       case None => println("no picture provided") 
     } 
 
   } 
 
   poCollection.find().toFuture().onComplete { 
     case Success(docs) => docs.map(toPO).foreach (showPO) 
       println("------------------------------") 
     case Failure(e) => println(e.getMessage) 
   } 
 
 
   poCollection.find(equal("podtl.qty",100)).toFuture().onComplete { 
    case Success(docs) => docs.map(toPO).foreach (showPO) 
      println("-------------------------------") 
    case Failure(e) => println(e.getMessage) 
  } 
 
 
  scala.io.StdIn.readLine() 
  system.terminate() 
 
}

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/12808.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论