SDP(6):分布式数据库运算环境- Cassandra-Engine详解编程语言

    现代信息系统应该是避不开大数据处理的。作为一个通用的系统集成工具也必须具备大数据存储和读取能力。cassandra是一种分布式的数据库,具备了分布式数据库高可用性(high-availability)特性,对于一个实时大型分布式集成系统来说是核心支柱。与传统的关系数据库对比,cassandra从数据存储结构、读取方式等可以说是皆然不同的。如:cassandra库表设计是反范式的(denormalized)、表结构设计是反过来根据query要求设计的,等等。幸运的是自版本3.0后cassandra提供了CQL来支持数据库操作。简单来说CQL就是cassandra的SQL。CQL是一种query语言,在语法上与SQL相近。最重要的是CQL用SQL的呈现方式来描述cassandra底层数据的存储方式,让熟悉了关系数据库SQL编程人员能够容易开始使用cassandra。与SQL一样,CQL也是一种纯文本语言,可以通过多种终端接口软件包括java-client来运行CQL脚本。 目前在市面上有一些现成的cassandra客户端编程软件,有些为了实现类型安全(type-safety)还提供了Linq-DSL(language-integrated-query),但因为我们需要面向各种cassandra数据库用户,所以还是决定提供一种CQL脚本运算环境,也就是说Cassandra-Engine接受CQL脚本然后运算得出结果。

和JDBC的运算结构很相似:CQL运算也是先构建statement然后execute。与JDBC不同的是:CQL还提供non-blocking脚本运算: 

   /** 
     * Executes the provided query asynchronously. 
     * <p/> 
     * This method does not block. It returns as soon as the query has been 
     * passed to the underlying network stack. In particular, returning from 
     * this method does not guarantee that the query is valid or has even been 
     * submitted to a live node. Any exception pertaining to the failure of the 
     * query will be thrown when accessing the [email protected] ResultSetFuture}. 
     * <p/> 
     * Note that for queries that don't return a result (INSERT, UPDATE and 
     * DELETE), you will need to access the ResultSetFuture (that is, call one of 
     * its [email protected] get} methods to make sure the query was successful. 
     * 
     * @param statement the CQL query to execute (that can be any [email protected] Statement}). 
     * @return a future on the result of the query. 
     * @throws UnsupportedFeatureException if the protocol version 1 is in use and 
     *                                     a feature not supported has been used. Features that are not supported by 
     *                                     the version protocol 1 include: BatchStatement, ResultSet paging and binary 
     *                                     values in RegularStatement. 
     */ 
    ResultSetFuture executeAsync(Statement statement);

executeAsync返回结果ResultSsetFuture是个google-guava-future。我们可以用隐式转换(implicit conversion)把它转换成scala-future来使用: 

 implicit def listenableFutureToFuture[T]( 
               listenableFuture: ListenableFuture[T]): Future[T] = { 
    val promise = Promise[T]() 
    Futures.addCallback(listenableFuture, new FutureCallback[T] { 
      def onFailure(error: Throwable): Unit = { 
        promise.failure(error) 
        () 
      } 
      def onSuccess(result: T): Unit = { 
        promise.success(result) 
        () 
      } 
    }) 
    promise.future 
  }

有了这个隐式实例executeAsync返回结果自动转成Future[?],如下:

  def cqlSingleUpdate(ctx: CQLContext)( 
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
... 
      session.executeAsync(boundStmt).map(_.wasApplied()) 
  }

我们还是通过某种Context方式来构建完整可执行的statement:

case class CQLContext( 
                       statements: Seq[String], 
                       parameters: Seq[Seq[Object]] = Nil, 
                       consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None 
                     ) { ctx => 
  def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext = 
    ctx.copy(consistency = Some(_consistency)) 
  def setCommand(_statement: String, _parameters: Object*): CQLContext = 
    ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters)) 
  def appendCommand(_statement: String, _parameters: Object*): CQLContext = 
    ctx.copy(statements = ctx.statements :+ _statement, 
      parameters = ctx.parameters ++ Seq(_parameters)) 
}

与JDBCContext不同的是这个consistencyLevel。因为数据是重复分布在多个集群节点上的,所以需要通过consistencyLevel来注明分布式数据的读写方式:

  def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => { 
    consistency match { 
      case ALL => ConsistencyLevel.ALL 
      case ONE => ConsistencyLevel.ONE 
      case TWO => ConsistencyLevel.TWO 
      case THREE => ConsistencyLevel.THREE 
      case ANY => ConsistencyLevel.ANY 
      case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM 
      case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE 
      case QUORUM => ConsistencyLevel.QUORUM 
      case SERIAL => ConsistencyLevel.SERIAL 
      case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL 
    } 
  }

CQL statement 分simplestatement, preparedstatement和boundstatement。boundstatement可以覆盖所有类型的CQL statement构建要求。下面是一个构建boundstatement的例子:

   val prepStmt = session.prepare(ctx.statement) 
 
    var boundStmt =  prepStmt.bind() 
    if (ctx.parameter != Nil) { 
      val params = processParameters(ctx.parameter) 
      boundStmt = prepStmt.bind(params:_*) 
    }

CQL statement参数类型比较复杂,包括date,timestamp等都必须经过processParameters函数进行预处理:

  case class CQLDate(year: Int, month: Int, day: Int) 
  case object CQLTodayDate 
  case class CQLDateTime(year: Int, Month: Int, 
                         day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0) 
  case object CQLDateTimeNow 
 
  def processParameters(params: Seq[Object]): Seq[Object] = { 
    params.map { obj => 
      obj match { 
        case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd) 
        case CQLTodayDate => 
          val today = java.time.LocalDate.now() 
          LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth) 
        case CQLDateTimeNow => Instant.now() 
        case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) => 
          Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d") 
        case p@_ => p 
      } 
    } 
  }

CassandraEngine更新运算分为单条update和批次update。批次update与事物处理有异曲同工之效:批次中任何一条脚本运算失败则回滚所有更新:

 def cqlExecute(ctx: CQLContext)( 
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
    if (ctx.statements.size == 1) 
      cqlSingleUpdate(ctx) 
    else 
      cqlMultiUpdate(ctx) 
  } 
  def cqlSingleUpdate(ctx: CQLContext)( 
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
 
      val prepStmt = session.prepare(ctx.statements.head) 
 
      var boundStmt =  prepStmt.bind() 
      if (ctx.statements != Nil) { 
        val params = processParameters(ctx.parameters.head) 
        boundStmt = prepStmt.bind(params:_*) 
      } 
 
    ctx.consistency.foreach {consistency => 
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))} 
      session.executeAsync(boundStmt).map(_.wasApplied()) 
  } 
 
  def cqlMultiUpdate(ctx: CQLContext)( 
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
    val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters 
    var batch = new BatchStatement() 
    commands.foreach { case (stm, params) => 
      val prepStmt = session.prepare(stm) 
      if (params == Nil) 
        batch.add(prepStmt.bind()) 
      else { 
        val p = processParameters(params) 
        batch.add(prepStmt.bind(p: _*)) 
      } 
    } 
 
    ctx.consistency.foreach {consistency => 
      batch.setConsistencyLevel(consistencyLevel(consistency))} 
    session.executeAsync(batch).map(_.wasApplied()) 
  }

CassandraEngine update返回运算状态Future[Boolean]。下面是一段update示范:

  val createCQL =""" 
  CREATE TABLE testdb.members ( 
    id UUID primary key, 
    name TEXT, 
    description TEXT, 
    birthday DATE, 
    created_at TIMESTAMP, 
    picture BLOB 
  )""" 
 
  val ctxCreate = CQLContext().setCommand(createCQL) 
 
  val ctxInsert = CQLContext().setCommand("insert into testdb.members(id,name,description,birthday,created_at,picture)" + 
    " values(uuid(),?,?,?,?,?)", "alan xu", "alan-xu", CQLDate(1966, 11, 27), CQLDateTimeNow, cqlFileToBytes("/users/tiger/Nobody.png")) 
   
  val createData = for { 
    createTable <- cqlExecute(ctxCreate) 
    insertData <- cqlExecute(ctxInsert) 
  } yield(createTable, insertData) 
 
  createData.onComplete { 
    case Success((c,i)) => println(s"Create Table: $c, Insert Data $i") 
    case Failure(e) => println(e.getMessage) 
  }

在上面的例子里我们用for-comprehension实现了连续运算。注意在这个例子里已经包括了date,datetime,blob等输入参数类型。

fetch-query的statement构建信息如下:

case class CQLQueryContext[M]( 
                       statement: String, 
                       parameter: Seq[Object] = Nil, 
                       consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None, 
                       extractor: Row => M 
                     )

fetch-query运算也是用execute方式实现的:

    /** 
     * Executes the provided query. 
     * <p/> 
     * This method blocks until at least some result has been received from the 
     * database. However, for SELECT queries, it does not guarantee that the 
     * result has been received in full. But it does guarantee that some 
     * response has been received from the database, and in particular 
     * guarantees that if the request is invalid, an exception will be thrown 
     * by this method. 
     * 
     * @param statement the CQL query to execute (that can be any [email protected] Statement}). 
     * @return the result of the query. That result will never be null but can 
     * be empty (and will be for any non SELECT query). 
     * @throws NoHostAvailableException    if no host in the cluster can be 
     *                                     contacted successfully to execute this query. 
     * @throws QueryExecutionException     if the query triggered an execution 
     *                                     exception, i.e. an exception thrown by Cassandra when it cannot execute 
     *                                     the query with the requested consistency level successfully. 
     * @throws QueryValidationException    if the query if invalid (syntax error, 
     *                                     unauthorized or any other validation problem). 
     * @throws UnsupportedFeatureException if the protocol version 1 is in use and 
     *                                     a feature not supported has been used. Features that are not supported by 
     *                                     the version protocol 1 include: BatchStatement, ResultSet paging and binary 
     *                                     values in RegularStatement. 
     */ 
    ResultSet execute(Statement statement);

返回结果ResultSet经过转换后成为scala collection:

  def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)( 
    implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= { 
 
    val prepStmt = session.prepare(ctx.statement) 
 
    var boundStmt =  prepStmt.bind() 
    if (ctx.parameter != Nil) { 
      val params = processParameters(ctx.parameter) 
      boundStmt = prepStmt.bind(params:_*) 
    } 
 
    ctx.consistency.foreach {consistency => 
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))} 
 
    val resultSet = session.execute(boundStmt.setFetchSize(pageSize)) 
    (resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C]) 
 
  }

fetchResultPage是分页读取的,可以用fetchMoreResults持续读取:

    /** 
     * Force fetching the next page of results for this result set, if any. 
     * <p/> 
     * This method is entirely optional. It will be called automatically while 
     * the result set is consumed (through [email protected] #one}, [email protected] #all} or iteration) 
     * when needed (i.e. when [email protected] getAvailableWithoutFetching() == 0} and 
     * [email protected] isFullyFetched() == false}). 
     * <p/> 
     * You can however call this method manually to force the fetching of the 
     * next page of results. This can allow to prefetch results before they are 
     * strictly needed. For instance, if you want to prefetch the next page of 
     * results as soon as there is less than 100 rows readily available in this 
     * result set, you can do: 
     * 
 
     *   ResultSet rs = session.execute(...); 
     *   Iterator<Row> iter = rs.iterator(); 
     *   while (iter.hasNext()) { 
     *       if (rs.getAvailableWithoutFetching() == 100 && !rs.isFullyFetched()) 
     *           rs.fetchMoreResults(); 
     *       Row row = iter.next() 
     *       ... process the row ... 
     *   } 
     * 

* This method is not blocking, so in the example above, the call to [email protected]
* fetchMoreResults} will not block the processing of the 100 currently available
* rows (but [email protected] iter.hasNext()} will block once those rows have been processed
* until the fetch query returns, if it hasn't yet).
* <p/>
* Only one page of results (for a given result set) can be
* fetched at any given time. If this method is called twice and the query
* triggered by the first call has not returned yet when the second one is
* performed, then the 2nd call will simply return a future on the currently
* in progress query.
*
* @return a future on the completion of fetching the next page of results.
* If the result set is already fully retrieved ([email protected] isFullyFetched() == true}),
* then the returned future will return immediately but not particular error will be
* thrown (you should thus call [email protected] #isFullyFetched()} to know if calling this
* method can be of any use}).
*/
ListenableFuture
<S> fetchMoreResults();

下面是分页持续读取的实现:

  def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)( 
       extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) = 
    if (resultSet.isFullyFetched) { 
      (resultSet, None) 
    } else { 
      try { 
        val result = Await.result(resultSet.fetchMoreResults(), timeOut) 
        (result, Some((result.asScala.view.map(extractor)).to[C])) 
      } catch { case e: Throwable => (resultSet, None) } 
    }

我们用这两个函数来读取上面用cqlInsert脚本加入cassandra的数据:

  //data model 
  case class Member( 
                     id: String, 
                     name: String, 
                     description: Option[String] = None, 
                     birthday: LocalDate, 
                     createdAt: java.util.Date, 
                     picture: Option[ByteBuffer] = None) 
 
  //data row converter 
  val toMember = (rs: Row) => Member( 
    id = rs.getUUID("id").toString, 
    name = rs.getString("name"), 
    description = { 
      val d = rs.getString("description") 
      if (d == null) 
        None 
      else 
        Some(d) 
 
      Some(rs.getColumnDefinitions.toString) 
    }, 
    birthday = rs.getDate("birthday"), 
    createdAt = rs.getTimestamp("created_at"), 
    picture = { 
      val pic = rs.getBytes("picture") 
      if (pic == null) 
        None 
      else 
        Some(pic) 
 
    } 
  ) 
 
 try { 
   val qtx = CQLQueryContext(statement = "select * from testdb.members", extractor = toMember) 
   val (resultSet, vecResults) = fetchResultPage[Vector, Member](qtx) 
 
   var vecMembers: Vector[Member] = vecResults 
 
   var isExh = resultSet.isExhausted 
   var nextPage: (ResultSet, Option[Vector[Member]]) = (resultSet, Some(vecResults)) 
   while (!isExh) { 
     nextPage = fetchMorePages[Vector,Member](nextPage._1,1 second)(toMember) 
     nextPage._2.foreach {vec => 
       vecMembers = vecMembers ++ vec 
     } 
     isExh = resultSet.isExhausted 
   } 
   vecMembers.foreach { m => 
     println(s"id: ${m.id}-name:${m.name}-${m.description} birthday: ${m.birthday.toString}") 
     println(s"created_at: ${cqlDateTimeString(m.createdAt,"yyyy-MM-dd HH:mm:ss.SSS")}") 
     m.picture match { 
       case Some(buf) => 
         val fname = s"/users/tiger/pic-${m.name}.png" 
         cqlBytesToFile(buf,fname) 
         println(s"saving picture to $fname") 
       case _ => println("empty picture!") 
     } 
   } 
 } catch { 
   case e: Exception => println(e.getMessage) 
 }

在上面的示范里我们还引用了一些helper函数:

 def cqlFileToBytes(fileName: String): ByteBuffer = { 
    val fis = new FileInputStream(fileName) 
    val b = new Array[Byte](fis.available + 1) 
    val length = b.length 
    fis.read(b) 
    ByteBuffer.wrap(b) 
  } 
 
 
  def cqlBytesToFile(bytes: ByteBuffer, fileName: String)( 
        implicit mat: Materializer): Future[IOResult] = { 
    val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes)) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
 
  def cqlDateTimeString(date: java.util.Date, fmt: String): String = { 
    val outputFormat = new java.text.SimpleDateFormat(fmt) 
    outputFormat.format(date) 
  } 
 
  def useJava8DateTime(cluster: Cluster) = { 
    //for jdk8 datetime format 
    cluster.getConfiguration().getCodecRegistry() 
      .register(InstantCodec.instance) 
  }

还需要一个ByteBufferInputStream类型来实现blob内容的读取:

 class ByteBufferInputStream(buf: ByteBuffer) extends InputStream { 
    override def read: Int = { 
      if (!buf.hasRemaining) return -1 
      buf.get 
    } 
 
    override def read(bytes: Array[Byte], off: Int, len: Int): Int = { 
      val length: Int = Math.min(len, buf.remaining) 
      buf.get(bytes, off, length) 
      length 
    } 
  } 
  object ByteBufferInputStream { 
    def apply(buf: ByteBuffer): ByteBufferInputStream = { 
      new ByteBufferInputStream(buf) 
    } 
  }

下面就是本次讨论示范源代码:

build.sbt

name := "learn_cassandra" 
 
version := "0.1" 
 
scalaVersion := "2.12.4" 
 
libraryDependencies := Seq( 
  "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0", 
  "com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0", 
  "com.typesafe.akka" %% "akka-actor" % "2.5.4", 
  "com.typesafe.akka" %% "akka-stream" % "2.5.4", 
  "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16", 
  "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1", 
  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test", 
  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1", 
  "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1", 
  "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1", 
  "com.h2database"  %  "h2"                % "1.4.196", 
  "mysql" % "mysql-connector-java" % "6.0.6", 
  "org.postgresql" % "postgresql" % "42.2.0", 
  "commons-dbcp" % "commons-dbcp" % "1.4", 
  "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2", 
  "com.zaxxer" % "HikariCP" % "2.7.4", 
  "com.jolbox" % "bonecp" % "0.8.0.RELEASE", 
  "com.typesafe.slick" %% "slick" % "3.2.1", 
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3")

CassandraEngine.scala

import com.datastax.driver.core._ 
import scala.concurrent._ 
import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture} 
import scala.collection.JavaConverters._ 
import scala.collection.generic.CanBuildFrom 
import scala.concurrent.duration.Duration 
 
object CQLContext { 
  // Consistency Levels 
  type CONSISTENCY_LEVEL = Int 
  val ANY: CONSISTENCY_LEVEL          =                                        0x0000 
  val ONE: CONSISTENCY_LEVEL          =                                        0x0001 
  val TWO: CONSISTENCY_LEVEL          =                                        0x0002 
  val THREE: CONSISTENCY_LEVEL        =                                        0x0003 
  val QUORUM : CONSISTENCY_LEVEL      =                                        0x0004 
  val ALL: CONSISTENCY_LEVEL          =                                        0x0005 
  val LOCAL_QUORUM: CONSISTENCY_LEVEL =                                        0x0006 
  val EACH_QUORUM: CONSISTENCY_LEVEL  =                                        0x0007 
  val LOCAL_ONE: CONSISTENCY_LEVEL    =                                      0x000A 
  val LOCAL_SERIAL: CONSISTENCY_LEVEL =                                     0x000B 
  val SERIAL: CONSISTENCY_LEVEL       =                                      0x000C 
 
  def apply(): CQLContext = CQLContext(statements = Nil) 
 
  def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => { 
    consistency match { 
      case ALL => ConsistencyLevel.ALL 
      case ONE => ConsistencyLevel.ONE 
      case TWO => ConsistencyLevel.TWO 
      case THREE => ConsistencyLevel.THREE 
      case ANY => ConsistencyLevel.ANY 
      case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM 
      case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE 
      case QUORUM => ConsistencyLevel.QUORUM 
      case SERIAL => ConsistencyLevel.SERIAL 
      case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL 
 
    } 
  } 
 
} 
case class CQLQueryContext[M]( 
                       statement: String, 
                       parameter: Seq[Object] = Nil, 
                       consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None, 
                       extractor: Row => M 
                     ) 
 
case class CQLContext( 
                       statements: Seq[String], 
                       parameters: Seq[Seq[Object]] = Nil, 
                       consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None 
                     ) { ctx => 
 
  def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext = 
    ctx.copy(consistency = Some(_consistency)) 
  def setCommand(_statement: String, _parameters: Object*): CQLContext = 
    ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters)) 
  def appendCommand(_statement: String, _parameters: Object*): CQLContext = 
    ctx.copy(statements = ctx.statements :+ _statement, 
      parameters = ctx.parameters ++ Seq(_parameters)) 
} 
 
object CQLEngine { 
  import CQLContext._ 
  import CQLHelpers._ 
 
  def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)( 
    implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= { 
 
    val prepStmt = session.prepare(ctx.statement) 
 
    var boundStmt =  prepStmt.bind() 
    if (ctx.parameter != Nil) { 
      val params = processParameters(ctx.parameter) 
      boundStmt = prepStmt.bind(params:_*) 
    } 
 
    ctx.consistency.foreach {consistency => 
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))} 
 
    val resultSet = session.execute(boundStmt.setFetchSize(pageSize)) 
    (resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C]) 
  } 
  def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)( 
       extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) = 
    if (resultSet.isFullyFetched) { 
      (resultSet, None) 
    } else { 
      try { 
        val result = Await.result(resultSet.fetchMoreResults(), timeOut) 
        (result, Some((result.asScala.view.map(extractor)).to[C])) 
      } catch { case e: Throwable => (resultSet, None) } 
    } 
  def cqlExecute(ctx: CQLContext)( 
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
    if (ctx.statements.size == 1) 
      cqlSingleUpdate(ctx) 
    else 
      cqlMultiUpdate(ctx) 
  } 
  def cqlSingleUpdate(ctx: CQLContext)( 
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
 
      val prepStmt = session.prepare(ctx.statements.head) 
 
      var boundStmt =  prepStmt.bind() 
      if (ctx.statements != Nil) { 
        val params = processParameters(ctx.parameters.head) 
        boundStmt = prepStmt.bind(params:_*) 
      } 
 
    ctx.consistency.foreach {consistency => 
      boundStmt.setConsistencyLevel(consistencyLevel(consistency))} 
      session.executeAsync(boundStmt).map(_.wasApplied()) 
  } 
  def cqlMultiUpdate(ctx: CQLContext)( 
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = { 
    val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters 
    var batch = new BatchStatement() 
    commands.foreach { case (stm, params) => 
      val prepStmt = session.prepare(stm) 
      if (params == Nil) 
        batch.add(prepStmt.bind()) 
      else { 
        val p = processParameters(params) 
        batch.add(prepStmt.bind(p: _*)) 
      } 
    } 
    ctx.consistency.foreach {consistency => 
      batch.setConsistencyLevel(consistencyLevel(consistency))} 
    session.executeAsync(batch).map(_.wasApplied()) 
  } 
} 
object CQLHelpers { 
  import java.nio.ByteBuffer 
  import java.io._ 
  import java.nio.file._ 
  import com.datastax.driver.core.LocalDate 
  import com.datastax.driver.extras.codecs.jdk8.InstantCodec 
  import java.time.Instant 
  import akka.stream.scaladsl._ 
  import akka.stream._ 
 
  implicit def listenableFutureToFuture[T]( 
               listenableFuture: ListenableFuture[T]): Future[T] = { 
    val promise = Promise[T]() 
    Futures.addCallback(listenableFuture, new FutureCallback[T] { 
      def onFailure(error: Throwable): Unit = { 
        promise.failure(error) 
        () 
      } 
      def onSuccess(result: T): Unit = { 
        promise.success(result) 
        () 
      } 
    }) 
    promise.future 
  } 
 
  case class CQLDate(year: Int, month: Int, day: Int) 
  case object CQLTodayDate 
  case class CQLDateTime(year: Int, Month: Int, 
                         day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0) 
  case object CQLDateTimeNow 
 
  def processParameters(params: Seq[Object]): Seq[Object] = { 
    params.map { obj => 
      obj match { 
        case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd) 
        case CQLTodayDate => 
          val today = java.time.LocalDate.now() 
          LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth) 
        case CQLDateTimeNow => Instant.now() 
        case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) => 
          Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d") 
        case p@_ => p 
      } 
    } 
  } 
  class ByteBufferInputStream(buf: ByteBuffer) extends InputStream { 
    override def read: Int = { 
      if (!buf.hasRemaining) return -1 
      buf.get 
    } 
 
    override def read(bytes: Array[Byte], off: Int, len: Int): Int = { 
      val length: Int = Math.min(len, buf.remaining) 
      buf.get(bytes, off, length) 
      length 
    } 
  } 
  object ByteBufferInputStream { 
    def apply(buf: ByteBuffer): ByteBufferInputStream = { 
      new ByteBufferInputStream(buf) 
    } 
  } 
  class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream { 
 
    override def write(b: Int): Unit = { 
      buf.put(b.toByte) 
    } 
 
    override def write(bytes: Array[Byte], off: Int, len: Int): Unit = { 
      buf.put(bytes, off, len) 
    } 
  } 
  object FixsizedByteBufferOutputStream { 
    def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf) 
  } 
  class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream { 
 
    private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR 
 
    override def write(b: Array[Byte], off: Int, len: Int): Unit = { 
      val position = buf.position 
      val limit = buf.limit 
      val newTotal: Long = position + len 
      if(newTotal > limit){ 
        var capacity = (buf.capacity * increasing) 
        while(capacity <= newTotal){ 
          capacity = (capacity*increasing) 
        } 
        increase(capacity.toInt) 
      } 
 
      buf.put(b, 0, len) 
    } 
 
    override def write(b: Int): Unit= { 
      if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt) 
      buf.put(b.toByte) 
    } 
    protected def increase(newCapacity: Int): Unit = { 
      buf.limit(buf.position) 
      buf.rewind 
      val newBuffer = 
        if (onHeap) ByteBuffer.allocate(newCapacity) 
        else  ByteBuffer.allocateDirect(newCapacity) 
      newBuffer.put(buf) 
      buf.clear 
      buf = newBuffer 
    } 
    def size: Long = buf.position 
    def capacity: Long = buf.capacity 
    def byteBuffer: ByteBuffer = buf 
  } 
  object ExpandingByteBufferOutputStream { 
    val DEFAULT_INCREASING_FACTOR = 1.5f 
    def apply(size: Int, increasingBy: Float, onHeap: Boolean) = { 
      if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0") 
      val buffer: ByteBuffer = 
        if (onHeap) ByteBuffer.allocate(size) 
        else ByteBuffer.allocateDirect(size) 
      new ExpandingByteBufferOutputStream(buffer,onHeap) 
    } 
    def apply(size: Int): ExpandingByteBufferOutputStream = { 
      apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false) 
    } 
 
    def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = { 
      apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap) 
    } 
 
    def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = { 
      apply(size, increasingBy, false) 
    } 
 
  } 
  def cqlFileToBytes(fileName: String): ByteBuffer = { 
    val fis = new FileInputStream(fileName) 
    val b = new Array[Byte](fis.available + 1) 
    val length = b.length 
    fis.read(b) 
    ByteBuffer.wrap(b) 
  } 
  def cqlBytesToFile(bytes: ByteBuffer, fileName: String)( 
        implicit mat: Materializer): Future[IOResult] = { 
    val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes)) 
    source.runWith(FileIO.toPath(Paths.get(fileName))) 
  } 
  def cqlDateTimeString(date: java.util.Date, fmt: String): String = { 
    val outputFormat = new java.text.SimpleDateFormat(fmt) 
    outputFormat.format(date) 
  } 
  def useJava8DateTime(cluster: Cluster) = { 
    //for jdk8 datetime format 
    cluster.getConfiguration().getCodecRegistry() 
      .register(InstantCodec.instance) 
  } 
}

CQLEngineDemo.scala

import scala.util._ 
import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import com.datastax.driver.core._ 
import CQLEngine._ 
import CQLHelpers._ 
import com.datastax.driver.core.LocalDate 
import java.nio.ByteBuffer 
import scala.concurrent.duration._ 
 
object CQLEngineDemo extends App { 
 
  //#init-mat 
  implicit val cqlsys = ActorSystem("cqlSystem") 
  implicit val mat = ActorMaterializer() 
  implicit val ec = cqlsys.dispatcher 
 
  val cluster = new Cluster 
  .Builder() 
    .addContactPoints("localhost") 
    .withPort(9042) 
    .build() 
 
  useJava8DateTime(cluster) 
  implicit val session = cluster.connect() 
 
  val createCQL =""" 
  CREATE TABLE testdb.members ( 
    id UUID primary key, 
    name TEXT, 
    description TEXT, 
    birthday DATE, 
    created_at TIMESTAMP, 
    picture BLOB 
  )""" 
 
  val ctxCreate = CQLContext().setCommand(createCQL) 
 
  val ctxInsert = CQLContext().setCommand("insert into testdb.members(id,name,description,birthday,created_at,picture)" + 
    " values(uuid(),?,?,?,?,?)", "alan xu", "alan-xu", CQLDate(1966, 11, 27), CQLDateTimeNow, cqlFileToBytes("/users/tiger/Nobody.png")) 
 
  val createData = for { 
    createTable <- cqlExecute(ctxCreate) 
    insertData <- cqlExecute(ctxInsert) 
  } yield(createTable, insertData) 
 
  createData.onComplete { 
    case Success((c,i)) => println(s"Create Table: $c, Insert Data $i") 
    case Failure(e) => println(e.getMessage) 
  } 
  scala.io.StdIn.readLine() 
  //data model 
  case class Member( 
                     id: String, 
                     name: String, 
                     description: Option[String] = None, 
                     birthday: LocalDate, 
                     createdAt: java.util.Date, 
                     picture: Option[ByteBuffer] = None) 
 
  //data row converter 
  val toMember = (rs: Row) => Member( 
    id = rs.getUUID("id").toString, 
    name = rs.getString("name"), 
    description = { 
      val d = rs.getString("description") 
      if (d == null) 
        None 
      else 
        Some(d) 
 
      Some(rs.getColumnDefinitions.toString) 
    }, 
    birthday = rs.getDate("birthday"), 
    createdAt = rs.getTimestamp("created_at"), 
    picture = { 
      val pic = rs.getBytes("picture") 
      if (pic == null) 
        None 
      else 
        Some(pic) 
 
    } 
  ) 
 
 try { 
   val qtx = CQLQueryContext(statement = "select * from testdb.members", extractor = toMember) 
   val (resultSet, vecResults) = fetchResultPage[Vector, Member](qtx) 
 
   var vecMembers: Vector[Member] = vecResults 
 
   var isExh = resultSet.isExhausted 
   var nextPage: (ResultSet, Option[Vector[Member]]) = (resultSet, Some(vecResults)) 
   while (!isExh) { 
     nextPage = fetchMorePages[Vector,Member](nextPage._1,1 second)(toMember) 
     nextPage._2.foreach {vec => 
       vecMembers = vecMembers ++ vec 
     } 
     isExh = resultSet.isExhausted 
   } 
   vecMembers.foreach { m => 
     println(s"id: ${m.id}-name:${m.name}-${m.description} birthday: ${m.birthday.toString}") 
     println(s"created_at: ${cqlDateTimeString(m.createdAt,"yyyy-MM-dd HH:mm:ss.SSS")}") 
     m.picture match { 
       case Some(buf) => 
         val fname = s"/users/tiger/pic-${m.name}.png" 
         cqlBytesToFile(buf,fname) 
         println(s"saving picture to $fname") 
       case _ => println("empty picture!") 
     } 
   } 
 } catch { 
   case e: Exception => println(e.getMessage) 
 } 
   
  scala.io.StdIn.readLine() 
  session.close() 
  cluster.close() 
  cqlsys.terminate() 
   
}

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/industrynews/12811.html

(0)
上一篇 2021年7月19日 14:55
下一篇 2021年7月19日 14:55

相关推荐

发表回复

登录后才能评论