SDP(10):文本式大数据运算环境-MongoDB-Engine功能设计详解编程语言

    为了让前面规划的互联网+数据平台能有效对电子商务数据进行管理及实现大数据统计功能,必须在平台上再增加一个MongDB-Engine:数据平台用户通过传入一种Context来指示MongoDB-Engine运算。与前面JDBC-Engine和Cassandra-Engine通过文本式传递指令不同的是:MangoDB没有一套像SQL或CQL这样的文本式编程语言。但MangoDB基本上都是通过Bson类型的参数进行运算的,Bson是个java interface:

/** 
 * An interface for types that are able to render themselves into a [email protected] BsonDocument}. 
 * 
 * @since 3.0 
 */ 
public interface Bson { 
    /** 
     * Render the filter into a BsonDocument. 
     * 
     * @param documentClass the document class in scope for the collection.  This parameter may be ignored, but it may be used to alter 
     *                      the structure of the returned [email protected] BsonDocument} based on some knowledge of the document class. 
     * @param codecRegistry the codec registry.  This parameter may be ignored, but it may be used to look up [email protected] Codec} instances for 
     *                      the document class or any other related class. 
     * @param <TDocument> the type of the document class 
     * @return the BsonDocument 
     */ 
    <TDocument> BsonDocument toBsonDocument(Class<TDocument> documentClass, CodecRegistry codecRegistry); 
}

任何实现Bson的类型都可以通过toBsonDocument来进行Bson到Document的转换。下面是Filter类型的例子:

   private static final class OperatorFilter<TItem> implements Bson { 
        private final String operatorName; 
        private final String fieldName; 
        private final TItem value; 
 
        OperatorFilter(final String operatorName, final String fieldName, final TItem value) { 
            this.operatorName = notNull("operatorName", operatorName); 
            this.fieldName = notNull("fieldName", fieldName); 
            this.value = value; 
        } 
 
        @Override 
        public <TDocument> BsonDocument toBsonDocument(final Class<TDocument> documentClass, final CodecRegistry codecRegistry) { 
            BsonDocumentWriter writer = new BsonDocumentWriter(new BsonDocument()); 
 
            writer.writeStartDocument(); 
            writer.writeName(fieldName); 
            writer.writeStartDocument(); 
            writer.writeName(operatorName); 
            encodeValue(writer, value, codecRegistry); 
            writer.writeEndDocument(); 
            writer.writeEndDocument(); 
 
            return writer.getDocument(); 
        } 
 
        @Override 
        public String toString() { 
            return operatorFilterToString(fieldName, operatorName, value); 
        } 
    }

对于MongoDB-Engine,我们需要实现下面这几大类功能才能满足平台要求,包括:

count 
disctinct 
aggregate 
mapReduce 
bulkWrite 
insert 
delete 
replace 
update

具体函数细节和功能描述如下:

  count: 
/** 
* Counts the number of documents in the collection. 
* @return a Observable with a single element indicating the number of documents 
*/ 
def count(): SingleObservable[Long] = observeLong(wrapped.count(_: SingleResultCallback[java.lang.Long])) 
/** 
* Counts the number of documents in the collection according to the given options. 
* @param filter  the query filter 
* @param options the options describing the count 
* @return a Observable with a single element indicating the number of documents 
*/ 
def count(filter: Bson, options: CountOptions): SingleObservable[Long] = 
observeLong(wrapped.count(filter, options, _: SingleResultCallback[java.lang.Long])) 
  distinct: 
/** 
* Gets the distinct values of the specified field name. 
* @param fieldName the field name 
* @tparam C       the target type of the observable. 
* @return a Observable emitting the sequence of distinct values 
*/ 
def distinct[C](fieldName: String)(implicit ct: ClassTag[C]): DistinctObservable[C] = 
DistinctObservable(wrapped.distinct(fieldName, ct)) 
/** 
* Gets the distinct values of the specified field name. 
* @param fieldName the field name 
* @param filter  the query filter 
* @tparam C       the target type of the observable. 
* @return a Observable emitting the sequence of distinct values 
*/ 
def distinct[C](fieldName: String, filter: Bson)(implicit ct: ClassTag[C]): DistinctObservable[C] = 
DistinctObservable(wrapped.distinct(fieldName, filter, ct)) 
  find: 
/** 
* Finds all documents in the collection. 
* @tparam C   the target document type of the observable. 
* @return the find Observable 
*/ 
def find[C]()(implicit e: C DefaultsTo TResult, ct: ClassTag[C]): FindObservable[C] = 
FindObservable(wrapped.find[C](ct)) 
/** 
* Finds all documents in the collection. 
* @param filter the query filter 
* @tparam C    the target document type of the observable. 
* @return the find Observable 
*/ 
def find[C](filter: Bson)(implicit e: C DefaultsTo TResult, ct: ClassTag[C]): FindObservable[C] = 
FindObservable(wrapped.find(filter, ct)) 
  aggregate: 
/** 
* Aggregates documents according to the specified aggregation pipeline. 
* @param pipeline the aggregate pipeline 
* @return a Observable containing the result of the aggregation operation 
*         [[http://docs.mongodb.org/manual/aggregation/ Aggregation]] 
*/ 
def aggregate[C](pipeline: Seq[Bson])(implicit e: C DefaultsTo TResult, ct: ClassTag[C]): AggregateObservable[C] = 
AggregateObservable(wrapped.aggregate[C](pipeline.asJava, ct)) 
mapReduce: 
/** 
* Aggregates documents according to the specified map-reduce function. 
* @param mapFunction    A JavaScript function that associates or "maps" a value with a key and emits the key and value pair. 
* @param reduceFunction A JavaScript function that "reduces" to a single object all the values associated with a particular key. 
* @tparam C            the target document type of the observable. 
* @return a Observable containing the result of the map-reduce operation 
*/ 
def mapReduce[C](mapFunction: String, reduceFunction: String)(implicit e: C DefaultsTo TResult, ct: ClassTag[C]): MapReduceObservable[C] = 
MapReduceObservable(wrapped.mapReduce(mapFunction, reduceFunction, ct)) 
  bulkWrite: 
/** 
* Executes a mix of inserts, updates, replaces, and deletes. 
* @param requests the writes to execute 
* @return a Observable with a single element the BulkWriteResult 
*/ 
def bulkWrite(requests: Seq[_ <: WriteModel[_ <: TResult]]): SingleObservable[BulkWriteResult] = 
observe(wrapped.bulkWrite( 
requests.asJava.asInstanceOf[util.List[_ <: WriteModel[_ <: TResult]]], 
_: SingleResultCallback[BulkWriteResult] 
)) 
/** 
* Executes a mix of inserts, updates, replaces, and deletes. 
* @param requests the writes to execute 
* @param options  the options to apply to the bulk write operation 
* @return a Observable with a single element the BulkWriteResult 
*/ 
def bulkWrite(requests: Seq[_ <: WriteModel[_ <: TResult]], options: BulkWriteOptions): SingleObservable[BulkWriteResult] = 
observe(wrapped.bulkWrite( 
requests.asJava.asInstanceOf[util.List[_ <: WriteModel[_ <: TResult]]], 
options, 
_: SingleResultCallback[BulkWriteResult] 
)) 
insert: 
/** 
* Inserts the provided document. If the document is missing an identifier, the driver should generate one. 
* @param document the document to insert 
* @return a Observable with a single element indicating when the operation has completed or with either a 
*         com.mongodb.DuplicateKeyException or com.mongodb.MongoException 
*/ 
def insertOne(document: TResult): SingleObservable[Completed] = observeCompleted(wrapped.insertOne(document, _: SingleResultCallback[Void])) 
/** 
* Inserts the provided document. If the document is missing an identifier, the driver should generate one. 
* @param document the document to insert 
* @param options  the options to apply to the operation 
* @return a Observable with a single element indicating when the operation has completed or with either a 
*         com.mongodb.DuplicateKeyException or com.mongodb.MongoException 
*/ 
def insertOne(document: TResult, options: InsertOneOptions): SingleObservable[Completed] = 
observeCompleted(wrapped.insertOne(document, options, _: SingleResultCallback[Void])) 
/** 
* Inserts a batch of documents. The preferred way to perform bulk inserts is to use the BulkWrite API. However, when talking with  
* server 2.6, using this method will be faster due to constraints in the bulk API related to error handling. 
* @param documents the documents to insert 
* @return a Observable with a single element indicating when the operation has completed or with either a 
*         com.mongodb.DuplicateKeyException or com.mongodb.MongoException 
*/ 
def insertMany(documents: Seq[_ <: TResult]): SingleObservable[Completed] = 
observeCompleted(wrapped.insertMany(documents.asJava, _: SingleResultCallback[Void])) 
/** 
* Inserts a batch of documents. The preferred way to perform bulk inserts is to use the BulkWrite API. However, when talking with  
* server 2.6, using this method will be faster due to constraints in the bulk API related to error handling. 
* @param documents the documents to insert 
* @param options   the options to apply to the operation 
* @return a Observable with a single element indicating when the operation has completed or with either a 
*         com.mongodb.DuplicateKeyException or com.mongodb.MongoException 
*/ 
def insertMany(documents: Seq[_ <: TResult], options: InsertManyOptions): SingleObservable[Completed] = 
observeCompleted(wrapped.insertMany(documents.asJava, options, _: SingleResultCallback[Void])) 
  delete: 
/** 
* Removes at most one document from the collection that matches the given filter.  If no documents match, the collection is not 
* modified. 
* @param filter the query filter to apply the the delete operation 
* @return a Observable with a single element the DeleteResult or with an com.mongodb.MongoException 
*/ 
def deleteOne(filter: Bson): SingleObservable[DeleteResult] = observe(wrapped.deleteOne(filter, _: SingleResultCallback[DeleteResult])) 
/** 
* Removes at most one document from the collection that matches the given filter.  If no documents match, the collection is not 
* modified. 
* @param filter the query filter to apply the the delete operation 
* @param options the options to apply to the delete operation 
* @return a Observable with a single element the DeleteResult or with an com.mongodb.MongoException 
*/ 
def deleteOne(filter: Bson, options: DeleteOptions): SingleObservable[DeleteResult] = 
observe(wrapped.deleteOne(filter, options, _: SingleResultCallback[DeleteResult])) 
/** 
* Removes all documents from the collection that match the given query filter.  If no documents match, the collection is not modified. 
* @param filter the query filter to apply the the delete operation 
* @return a Observable with a single element the DeleteResult or with an com.mongodb.MongoException 
*/ 
def deleteMany(filter: Bson): SingleObservable[DeleteResult] = observe(wrapped.deleteMany(filter, _: SingleResultCallback[DeleteResult])) 
/** 
* Removes all documents from the collection that match the given query filter.  If no documents match, the collection is not modified. 
* @param filter the query filter to apply the the delete operation 
* @param options the options to apply to the delete operation 
* @return a Observable with a single element the DeleteResult or with an com.mongodb.MongoException 
*/ 
def deleteMany(filter: Bson, options: DeleteOptions): SingleObservable[DeleteResult] = 
observe(wrapped.deleteMany(filter, options, _: SingleResultCallback[DeleteResult])) 
  replace: 
/** 
* Replace a document in the collection according to the specified arguments. 
* [[http://docs.mongodb.org/manual/tutorial/modify-documents/#replace-the-document Replace]] 
* @param filter      the query filter to apply the the replace operation 
* @param replacement the replacement document 
* @return a Observable with a single element the UpdateResult 
*/ 
def replaceOne(filter: Bson, replacement: TResult): SingleObservable[UpdateResult] = 
observe(wrapped.replaceOne(filter, replacement, _: SingleResultCallback[UpdateResult])) 
/** 
* Replace a document in the collection according to the specified arguments. 
* @param filter      the query filter to apply the the replace operation 
* @param replacement the replacement document 
* @param options     the options to apply to the replace operation 
* @return a Observable with a single element the UpdateResult 
*/ 
def replaceOne(filter: Bson, replacement: TResult, options: UpdateOptions): SingleObservable[UpdateResult] = 
observe(wrapped.replaceOne(filter, replacement, options, _: SingleResultCallback[UpdateResult])) 
update: 
/** 
* Update a single document in the collection according to the specified arguments. 
* @param filter  a document describing the query filter, which may not be null. This can be of any type for which a `Codec` is 
*                registered 
* @param update  a document describing the update, which may not be null. The update to apply must include only update operators. This 
*                can be of any type for which a `Codec` is registered 
* @return a Observable with a single element the UpdateResult 
*/ 
def updateOne(filter: Bson, update: Bson): SingleObservable[UpdateResult] = 
observe(wrapped.updateOne(filter, update, _: SingleResultCallback[UpdateResult])) 
/** 
* Update a single document in the collection according to the specified arguments. 
* @param filter  a document describing the query filter, which may not be null. This can be of any type for which a `Codec` is 
*                registered 
* @param update  a document describing the update, which may not be null. The update to apply must include only update operators. This 
*                can be of any type for which a `Codec` is registered 
* @param options the options to apply to the update operation 
* @return a Observable with a single element the UpdateResult 
*/ 
def updateOne(filter: Bson, update: Bson, options: UpdateOptions): SingleObservable[UpdateResult] = 
observe(wrapped.updateOne(filter, update, options, _: SingleResultCallback[UpdateResult])) 
/** 
* Update documents in the collection according to the specified arguments. 
* @param filter  a document describing the query filter, which may not be null. This can be of any type for which a `Codec` is 
*                registered 
* @param update  a document describing the update, which may not be null. The update to apply must include only update operators. This 
*                can be of any type for which a `Codec` is registered 
* @return a Observable with a single element the UpdateResult 
*/ 
def updateMany(filter: Bson, update: Bson): SingleObservable[UpdateResult] = 
observe(wrapped.updateMany(filter, update, _: SingleResultCallback[UpdateResult]))

可以看到:函数传入参数大致两个类型:Bson、XXOptions。我们还需要适度增加一些数据库管理功能,包括:

  createCollection 
listCollection 
dropCollection 
createIndex 
dropIndex 
createView

具体函数细节和功能描述如下:

dropCollection 
/** 
* Drops this collection from the Database. 
*/ 
def drop(): SingleObservable[Completed] = observeCompleted(wrapped.drop(_: SingleResultCallback[Void])) 
createIndex 
/** 
* @param key     an object describing the index key(s), which may not be null. This can be of any type for which a `Codec` is 
*                registered 
* @return a Observable with a single element indicating when the operation has completed 
*/ 
def createIndex(key: Bson): SingleObservable[String] = 
observe(wrapped.createIndex(key, _: SingleResultCallback[String])) 
/** 
* @param key     an object describing the index key(s), which may not be null. This can be of any type for which a `Codec` is 
*                registered 
* @param options the options for the index 
* @return a Observable with a single element indicating when the operation has completed 
*/ 
def createIndex(key: Bson, options: IndexOptions): SingleObservable[String] = 
observe(wrapped.createIndex(key, options, _: SingleResultCallback[String])) 
 dropIndex 
/** 
* Drops the given index. 
* @param indexName the name of the index to remove 
* @return a Observable with a single element indicating when the operation has completed 
*/ 
def dropIndex(indexName: String): SingleObservable[Completed] = observeCompleted(wrapped.dropIndex(indexName, _: SingleResultCallback[Void])) 
/** 
* Drops the given index. 
* @param indexName the name of the index to remove 
* @param dropIndexOptions options to use when dropping indexes 
* @return a Observable with a single element indicating when the operation has completed 
*/ 
def dropIndex(indexName: String, dropIndexOptions: DropIndexOptions): SingleObservable[Completed] = 
observeCompleted(wrapped.dropIndex(indexName, dropIndexOptions, _: SingleResultCallback[Void])) 
/** 
* Drops the index given the keys used to create it. 
* @param keys the keys of the index to remove 
* @return a Observable with a single element indicating when the operation has completed 
*/ 
def dropIndex(keys: Bson): SingleObservable[Completed] = observeCompleted(wrapped.dropIndex(keys, _: SingleResultCallback[Void])) 
/** 
* Drops the index given the keys used to create it. 
* @param keys the keys of the index to remove 
* @param dropIndexOptions options to use when dropping indexes 
* @return a Observable with a single element indicating when the operation has completed 
*/ 
def dropIndex(keys: Bson, dropIndexOptions: DropIndexOptions): SingleObservable[Completed] = 
observeCompleted(wrapped.dropIndex(keys, dropIndexOptions, _: SingleResultCallback[Void])) 
/** 
* Drop all the indexes on this collection, except for the default on _id. 
* @return a Observable with a single element indicating when the operation has completed 
*/ 
def dropIndexes(): SingleObservable[Completed] = 
observeCompleted(wrapped.dropIndexes(_: SingleResultCallback[Void])) 
/** 
* Drop all the indexes on this collection, except for the default on _id. 
* @param dropIndexOptions options to use when dropping indexes 
* @return a Observable with a single element indicating when the operation has completed 
*/ 
def dropIndexes(dropIndexOptions: DropIndexOptions): SingleObservable[Completed] = 
observeCompleted(wrapped.dropIndexes(dropIndexOptions, _: SingleResultCallback[Void])) 
  listCollection 
/** 
* Finds all the collections in this database. 
* @tparam TResult the target document type of the iterable. 
* @return the fluent list collections interface 
*/ 
def listCollections[TResult]()(implicit e: TResult DefaultsTo Document, ct: ClassTag[TResult]): ListCollectionsObservable[TResult] = ListCollectionsObservable(wrapped.listCollections(ct)) 
createCollection 
/** 
* Create a new collection with the given name. 
* @param collectionName the name for the new collection to create 
* @return a Observable identifying when the collection has been created 
*/ 
def createCollection(collectionName: String): SingleObservable[Completed] = 
observeCompleted(wrapped.createCollection(collectionName, _: SingleResultCallback[Void])) 
/** 
* Create a new collection with the selected options 
* @param collectionName the name for the new collection to create 
* @param options        various options for creating the collection 
* @return a Observable identifying when the collection has been created 
*/ 
def createCollection(collectionName: String, options: CreateCollectionOptions): SingleObservable[Completed] = 
observeCompleted(wrapped.createCollection(collectionName, options, _: SingleResultCallback[Void])) 
 createView 
/** 
* Creates a view with the given name, backing collection/view name, and aggregation pipeline that defines the view. 
* @param viewName the name of the view to create 
* @param viewOn   the backing collection/view for the view 
* @param pipeline the pipeline that defines the view 
*/ 
def createView(viewName: String, viewOn: String, pipeline: Seq[Bson]): SingleObservable[Completed] = 
observeCompleted(wrapped.createView(viewName, viewOn, pipeline.asJava, _: SingleResultCallback[Void])) 
/** 
* Creates a view with the given name, backing collection/view name, aggregation pipeline, and options that defines the view. 
* @param viewName          the name of the view to create 
* @param viewOn            the backing collection/view for the view 
* @param pipeline          the pipeline that defines the view 
* @param createViewOptions various options for creating the view 
*/ 
def createView(viewName: String, viewOn: String, pipeline: Seq[Bson], createViewOptions: CreateViewOptions): SingleObservable[Completed] = 
observeCompleted(wrapped.createView(viewName, viewOn, pipeline.asJava, createViewOptions, _: SingleResultCallback[Void]))

下面就是根据以上需求分析初步做出的功能框架设计方案:

import org.bson.conversions.Bson 
import org.mongodb.scala._ 
trait MGOCommands 
object MGOCommands  { 
case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands 
case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands 
case class Find(filter: Option[Bson]) extends MGOCommands 
case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands 
case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands 
case class Insert(newdocs: Seq[Document], options: Option[Any]) extends MGOCommands 
case class Delete(filter: Bson, options: Option[Any], onlyOne: Boolean = false) extends MGOCommands 
case class Replace(filter: Bson, replacement: Document, options: Option[Any]) extends MGOCommands 
case class Update(filter: Bson, update: Bson, options: Option[Any]) extends MGOCommands 
case class BulkWrite(commands: Seq[MGOCommands], options: Option[Any]) extends MGOCommands 
} 
object MGOAdmins { 
case class DropCollection(collName: String) extends MGOCommands 
case class CreateCollection(collName: String, options: Option[Any]) extends MGOCommands 
case class ListCollection(dbName: String) extends MGOCommands 
case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any]) extends MGOCommands 
case class CreateIndex(filter: Option[Bson], options: Option[Any]) extends MGOCommands 
case class DropIndexByName(indexName: String, options: Option[Any]) extends MGOCommands 
case class DropIndexByKey(key: Bson, options: Option[Any]) extends MGOCommands 
case class DropAllIndexes(options: Option[Any]) extends MGOCommands 
} 
case class MGOContext ( 
dbName: String, 
collName: String, 
action: MGOCommands = null 
) { ctx => 
def setDbName(name: String): MGOContext = ctx.copy(dbName = name) 
def setCollName(name: String): MGOContext = ctx.copy(collName = name) 
def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = cmd) 
} 
object MGOContext { 
def apply(db: String, coll: String) = new MGOContext(db,coll) 
def apply(db: String, coll: String, command: MGOCommands) = 
new MGOContext(db,coll,command) 
}

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/12807.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论