SDP(4):ScalikeJDBC- JDBC-Engine:Updating详解编程语言

    在上一篇博文里我们把JDBC-Engine的读取操作部分分离出来进行了讨论,在这篇准备把更新Update部分功能介绍一下。当然,JDBC-Engine的功能是基于ScalikeJDBC的,所有的操作和属性都包嵌在SQL这个类型中:

/** 
 * SQL abstraction. 
 * 
 * @param statement SQL template 
 * @param rawParameters parameters 
 * @param f  extractor function 
 * @tparam A return type 
 */ 
abstract class SQL[A, E <: WithExtractor]( 
  val statement: String, 
  private[scalikejdbc] val rawParameters: Seq[Any] 
)(f: WrappedResultSet => A) 
{...}

Update功能置于下面这几个子类中:

/** 
 * SQL which execute java.sql.Statement#executeUpdate(). 
 * 
 * @param statement SQL template 
 * @param parameters parameters 
 * @param before before filter 
 * @param after after filter 
 */ 
class SQLUpdate(val statement: String, val parameters: Seq[Any], val tags: Seq[String] = Nil)( 
    val before: (PreparedStatement) => Unit 
)( 
    val after: (PreparedStatement) => Unit 
) { 
 
  def apply()(implicit session: DBSession): Int = { 
    val attributesSwitcher = new DBSessionAttributesSwitcher(SQL("").tags(tags: _*)) 
    session match { 
      case AutoSession => 
        DB.autoCommit(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*)) 
      case NamedAutoSession(name, _) => 
        NamedDB(name, session.settings).autoCommit(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*)) 
      case ReadOnlyAutoSession => 
        DB.readOnly(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*)) 
      case ReadOnlyNamedAutoSession(name, _) => 
        NamedDB(name, session.settings).readOnly(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*)) 
      case _ => 
        DBSessionWrapper(session, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*) 
    } 
  } 
 
} 
 
/** 
 * SQL which execute java.sql.Statement#execute(). 
 * 
 * @param statement SQL template 
 * @param parameters parameters 
 * @param before before filter 
 * @param after after filter 
 */ 
class SQLExecution(val statement: String, val parameters: Seq[Any], val tags: Seq[String] = Nil)( 
    val before: (PreparedStatement) => Unit 
)( 
    val after: (PreparedStatement) => Unit 
) { 
 
  def apply()(implicit session: DBSession): Boolean = { 
    val attributesSwitcher = new DBSessionAttributesSwitcher(SQL("").tags(tags: _*)) 
    val f: DBSession => Boolean = DBSessionWrapper(_, attributesSwitcher).executeWithFilters(before, after, statement, parameters: _*) 
    // format: OFF 
    session match { 
      case AutoSession                       => DB.autoCommit(f) 
      case NamedAutoSession(name, _)         => NamedDB(name, session.settings).autoCommit(f) 
      case ReadOnlyAutoSession               => DB.readOnly(f) 
      case ReadOnlyNamedAutoSession(name, _) => NamedDB(name, session.settings).readOnly(f) 
      case _                                 => f(session) 
    } 
    // format: ON 
  } 
 
} 
/** 
 * SQL which execute java.sql.Statement#executeBatch(). 
 * 
 * @param statement SQL template 
 * @param parameters parameters 
 */ 
class SQLBatch(val statement: String, val parameters: Seq[Seq[Any]], val tags: Seq[String] = Nil) { 
 
  def apply[C[_]]()(implicit session: DBSession, cbf: CanBuildFrom[Nothing, Int, C[Int]]): C[Int] = { 
    val attributesSwitcher = new DBSessionAttributesSwitcher(SQL("").tags(tags: _*)) 
    val f: DBSession => C[Int] = DBSessionWrapper(_, attributesSwitcher).batch(statement, parameters: _*) 
    // format: OFF 
    session match { 
      case AutoSession                       => DB.autoCommit(f) 
      case NamedAutoSession(name, _)         => NamedDB(name, session.settings).autoCommit(f) 
      case ReadOnlyAutoSession               => DB.readOnly(f) 
      case ReadOnlyNamedAutoSession(name, _) => NamedDB(name, session.settings).readOnly(f) 
      case _                                 => f(session) 
    } 
    // format: ON 
  } 
 
}

按照JDBC-Engine的功能设计要求,我们大约把Update功能分成数据表构建操作DDL、批次运算Batch、和普通Update几种类型。我们是通过JDBCContext来定义具体的Update功能类型:

object JDBCContext { 
    type SQLTYPE = Int 
    val SQL_SELECT: Int = 0 
    val SQL_EXEDDL= 1 
    val SQL_UPDATE = 2 
    val RETURN_GENERATED_KEYVALUE = true 
    val RETURN_UPDATED_COUNT = false 
 
  } 
 
  case class JDBCContext( 
                          dbName: Symbol, 
                          statements: Seq[String] = Nil, 
                          parameters: Seq[Seq[Any]] = Nil, 
                          fetchSize: Int = 100, 
                          queryTimeout: Option[Int] = None, 
                          queryTags: Seq[String] = Nil, 
                          sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT, 
                          batch: Boolean = false, 
                          returnGeneratedKey: Seq[Option[Any]] = Nil, 
                          // no return: None, return by index: Some(1), by name: Some("id") 
                          preAction: Option[PreparedStatement => Unit] = None, 
                          postAction: Option[PreparedStatement => Unit] = None) { 
 
    ctx => 
 
    //helper functions 
 
    def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag) 
 
    def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags) 
 
    def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size) 
 
    def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time) 
 
    def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = { 
      if (ctx.sqlType == JDBCContext.SQL_UPDATE && 
        !ctx.batch && ctx.statements.size == 1) 
        ctx.copy(preAction = action) 
      else 
        throw new IllegalStateException("JDBCContex setting error: preAction not supported!") 
    } 
 
    def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = { 
      if (ctx.sqlType == JDBCContext.SQL_UPDATE && 
        !ctx.batch && ctx.statements.size == 1) 
        ctx.copy(postAction = action) 
      else 
        throw new IllegalStateException("JDBCContex setting error: preAction not supported!") 
    } 
 
    def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { 
      if (ctx.sqlType == JDBCContext.SQL_EXEDDL) { 
        ctx.copy( 
          statements = ctx.statements ++ Seq(_statement), 
          parameters = ctx.parameters ++ Seq(Seq(_parameters)) 
        ) 
      } else 
        throw new IllegalStateException("JDBCContex setting error: option not supported!") 
    } 
 
    def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { 
      if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) { 
        ctx.copy( 
          statements = ctx.statements ++ Seq(_statement), 
          parameters = ctx.parameters ++ Seq(_parameters), 
          returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None)) 
        ) 
      } else 
        throw new IllegalStateException("JDBCContex setting error: option not supported!") 
    } 
 
    def appendBatchParameters(_parameters: Any*): JDBCContext = { 
      if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) 
        throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") 
 
      var matchParams = true 
      if (ctx.parameters != Nil) 
        if (ctx.parameters.head.size != _parameters.size) 
          matchParams = false 
      if (matchParams) { 
        ctx.copy( 
          parameters = ctx.parameters ++ Seq(_parameters) 
        ) 
      } else 
        throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!") 
    } 
 
    def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = { 
      if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) 
         throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!") 
      ctx.copy( 
        returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil 
      ) 
    } 
 
     def setQueryCommand(_statement: String, _parameters: Any*): JDBCContext = { 
        ctx.copy( 
          statements = Seq(_statement), 
          parameters = Seq(_parameters), 
          sqlType = JDBCContext.SQL_SELECT, 
          batch = false 
        ) 
      } 
 
      def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { 
        ctx.copy( 
          statements = Seq(_statement), 
          parameters = Seq(_parameters), 
          sqlType = JDBCContext.SQL_EXEDDL, 
          batch = false 
        ) 
      } 
 
      def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { 
        ctx.copy( 
          statements = Seq(_statement), 
          parameters = Seq(_parameters), 
          returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None), 
          sqlType = JDBCContext.SQL_UPDATE, 
          batch = false 
        ) 
      } 
      def setBatchCommand(_statement: String): JDBCContext = { 
        ctx.copy ( 
          statements = Seq(_statement), 
          sqlType = JDBCContext.SQL_UPDATE, 
          batch = true 
        ) 
      } 
  }

JDBCContext还提供了不少的Helper函数来协助构建特别功能的JDBCContext对象,如:setQueryCommand, setDDLCommand, setUpdateCommand, setBatchCommand。这些Helper函数提供Update功能定义的几个主要元素包括:SQL语句主体包括参数占位的statement、输入参数parameter、是否需要返回系统自动产生的主键returnGeneratedKey。在ScalikeJDBC中所有类型的Update功能可以用下面几类内部函数实现,包括:

  private[this] def batchInternal[C[_], A]( 
    template: String, 
    paramsList: Seq[Seq[Any]], 
    execute: StatementExecutor => scala.Array[A] 
  )(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { 
    ensureNotReadOnlySession(template) 
    paramsList match { 
      case Nil => Seq.empty[A].to[C] 
      case _ => 
        using(createBatchStatementExecutor( 
          conn = conn, 
          template = template, 
          returnGeneratedKeys = false, 
          generatedKeyName = None 
        )) { executor => 
          paramsList.foreach { 
            params => 
              executor.bindParams(params) 
              executor.addBatch() 
          } 
          execute(executor).to[C] 
        } 
    } 
  } 
  private[this] def updateWithFiltersInternal[A]( 
    returnGeneratedKeys: Boolean, 
    before: (PreparedStatement) => Unit, 
    after: (PreparedStatement) => Unit, 
    template: String, 
    execute: StatementExecutor => A, 
    params: Seq[Any] 
  ): A = { 
    ensureNotReadOnlySession(template) 
    using(createStatementExecutor( 
      conn = conn, 
      template = template, 
      params = params, 
      returnGeneratedKeys = returnGeneratedKeys 
    )) { 
      executor => 
        before(executor.underlying) 
        val count = execute(executor) 
        after(executor.underlying) 
        count 
    } 
  } 
  private[this] def updateWithAutoGeneratedKeyNameAndFiltersInternal[A]( 
    returnGeneratedKeys: Boolean, 
    generatedKeyName: String, 
    before: (PreparedStatement) => Unit, 
    after: (PreparedStatement) => Unit, 
    template: String, 
    execute: StatementExecutor => A, 
    params: Seq[Any] 
  ): A = { 
    ensureNotReadOnlySession(template) 
    using(createStatementExecutor( 
      conn = conn, 
      template = template, 
      params = params, 
      returnGeneratedKeys = returnGeneratedKeys, 
      generatedKeyName = Option(generatedKeyName) 
    )) { 
      executor => 
        before(executor.underlying) 
        val count = execute(executor) 
        after(executor.underlying) 
        count 
    } 
  }

我们可以看到所有类型的Update都是通过构建StatementExecutor并按其属性进行运算来实现的:

/** 
 * java.sql.Statement Executor. 
 * 
 * @param underlying preparedStatement 
 * @param template SQL template 
 * @param singleParams parameters for single execution (= not batch execution) 
 * @param isBatch is batch flag 
 */ 
case class StatementExecutor( 
    underlying: PreparedStatement, 
    template: String, 
    connectionAttributes: DBConnectionAttributes, 
    singleParams: Seq[Any] = Nil, 
    tags: Seq[String] = Nil, 
    isBatch: Boolean = false, 
    settingsProvider: SettingsProvider = SettingsProvider.default 
) extends LogSupport with UnixTimeInMillisConverterImplicits with AutoCloseable {...}

这个StatementExcutor类的属性和我们的JDBCContext属性很接近。好了,回到JDBC-Engine Update功能定义。首先是DDL功能:

 def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = { 
       if (ctx.sqlType != SQL_EXEDDL) { 
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")) 
      } 
      else { 
        NamedDB(ctx.dbName) localTx { implicit session => 
          Try { 
                ctx.statements.foreach { stm => 
                  val ddl = new SQLExecution(statement = stm, parameters = Nil)( 
                    before = WrappedResultSet => {})( 
                    after = WrappedResultSet => {}) 
 
                  ddl.apply() 
              } 
            "SQL_EXEDDL executed succesfully." 
          } 
        } 
      } 
    }

 所有JDBC-Engine的Update功能都是一个事务处理Transaction中的多条更新语句。DDL语句不需要参数所以只需要提供statement就足够了。下面是这个函数的使用示范:

 ConfigDBsWithEnv("dev").setup('h2) 
  ConfigDBsWithEnv("dev").loadGlobalSettings() 
 
  val dropSQL: String =""" 
      drop table members 
    """ 
 
  val createSQL: String =""" 
    create table members ( 
      id serial not null primary key, 
      name varchar(30) not null, 
      description varchar(1000), 
      birthday date, 
      created_at timestamp not null, 
      picture blob 
    )""" 
 
  var ctx = JDBCContext('h2) 
    try { 
      ctx = ctx.setDDLCommand(dropSQL) 
        .appendDDLCommand(createSQL) 
    } 
    catch { 
       case e: Exception => println(e.getMessage) 
    } 
 
  val resultCreateTable = jdbcExcuteDDL(ctx) 
 
  resultCreateTable match { 
    case Success(msg) => println(msg) 
    case Failure(err) => println(s"${err.getMessage}") 
  }

在这里我们修改了上次使用的members表,增加了一个blob类的picture列。这个示范在一个完整的Transaction里包括了两条DDL语句。

批次更新batch-update是指多条输入参数在一条统一的statement上施用:

  def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
      if (ctx.statements == Nil) 
        throw new IllegalStateException("JDBCContex setting error: statements empty!") 
      if (ctx.sqlType != SQL_UPDATE) { 
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) 
      } 
      else { 
        if (ctx.batch) { 
          if (noReturnKey(ctx)) { 
            val usql = SQL(ctx.statements.head) 
              .tags(ctx.queryTags: _*) 
              .batch(ctx.parameters: _*) 
            Try { 
              NamedDB(ctx.dbName) localTx { implicit session => 
                ctx.queryTimeout.foreach(session.queryTimeout(_)) 
                usql.apply[Seq]() 
                Seq.empty[Long].to[C] 
              } 
            } 
          } else { 
            val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None) 
            Try { 
              NamedDB(ctx.dbName) localTx { implicit session => 
                ctx.queryTimeout.foreach(session.queryTimeout(_)) 
                usql.apply[C]() 
              } 
            } 
          } 
 
        } else { 
          Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !")) 
        } 
      } 
    }

如果batch-update是某种Insert操作的话我们可以通过cox.batch注明返回由JDBC系统自动产生的唯一键。这些主键一般在构建表时注明,包括:serial, auto_increment等。如果不返回主键则返回update语句的更新状态如更新数据条数等。在上面这个函数里SQLBatchWithGeneratedKey.apply()返回insert数据主键,所以statement必须是INSERT语句。SQLBatch.apply()则用来运算update语句并返回更新数据的条数。下面是jdbcBatchUpdate函数的使用示范:

 val insertSQL = "insert into members(name,birthday,description,created_at,picture) values (?, ?, ?, ?, ?)" 
  val dateCreated = DateTime.now 
 
  import java.io.FileInputStream 
 
  val picfile = new File("/users/tiger/Nobody.png") 
  val fis = new FileInputStream(picfile) 
 
  ctx = JDBCContext('h2) 
  try { 
    ctx = ctx.setBatchCommand(insertSQL).appendBatchParameters( 
      "John",new LocalDate("2008-03-01"),"youngest user",dateCreated,None).appendBatchParameters( 
      "peter", None, "no birth date", dateCreated, fis) 
      .appendBatchParameters( 
        "susan", None, "no birth date", dateCreated, None) 
      .setBatchReturnGeneratedKeyOption(JDBCContext.RETURN_GENERATED_KEYVALUE) 
  } 
  catch { 
    case e: Exception => println(e.getMessage) 
  } 
 
  var resultInserts = jdbcBatchUpdate(ctx) 
 
  resultInserts match { 
    case Success(msg) => println(msg) 
    case Failure(err) => println(s"${err.getMessage}") 
  }

上面这个例子里一个transaction批次包含了三条Insert语句,其中一条涉及存入picture字段:我们只需要把图像文件InputStream作为普通参数传人即可。我们也可以把任何类型的非batch-update语句捆绑在统一的transaction里运算,而且可以指定每条update返回类型:自动产生的主键或者更新数据条数:

def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
      if (ctx.statements == Nil) 
        throw new IllegalStateException("JDBCContex setting error: statements empty!") 
      if (ctx.sqlType != SQL_UPDATE) { 
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) 
      } 
      else { 
        if (!ctx.batch) { 
          if (ctx.statements.size == 1) 
            singleTxUpdate(ctx) 
          else 
            multiTxUpdates(ctx) 
        } else 
          Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !")) 
 
      } 
    }

这个update函数又被细分为单条语句singleTxUpdate和多条语句multiTxUpdates。无论单条或多条update函数又被分为返回主键或更新状态类型的函数:

 

 private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
       implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
       val Some(key) :: xs = ctx.returnGeneratedKey 
       val params: Seq[Any] = ctx.parameters match { 
         case Nil => Nil 
         case p@_ => p.head 
       } 
       val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key) 
       Try { 
         NamedDB(ctx.dbName) localTx { implicit session => 
           session.fetchSize(ctx.fetchSize) 
           ctx.queryTimeout.foreach(session.queryTimeout(_)) 
           val result = usql.apply() 
           Seq(result).to[C] 
         } 
       } 
     } 
 
      private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
        implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
      val params: Seq[Any] = ctx.parameters match { 
        case Nil => Nil 
        case p@_ => p.head 
      } 
      val before = ctx.preAction match { 
        case None => pstm: PreparedStatement => {} 
        case Some(f) => f 
      } 
      val after = ctx.postAction match { 
        case None => pstm: PreparedStatement => {} 
        case Some(f) => f 
      } 
      val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after) 
      Try { 
        NamedDB(ctx.dbName) localTx {implicit session => 
          session.fetchSize(ctx.fetchSize) 
          ctx.queryTimeout.foreach(session.queryTimeout(_)) 
          val result = usql.apply() 
          Seq(result.toLong).to[C] 
        } 
      } 
 
    } 
 
    private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
      if (noReturnKey(ctx)) 
        singleTxUpdateNoReturnKey(ctx) 
      else 
        singleTxUpdateWithReturnKey(ctx) 
    } 
 
    private def noReturnKey(ctx: JDBCContext): Boolean = { 
      if (ctx.returnGeneratedKey != Nil) { 
        val k :: xs = ctx.returnGeneratedKey 
         k match { 
          case None => true 
          case Some(k) => false 
        } 
      } else true 
    } 
 
    def noActon: PreparedStatement=>Unit = pstm => {} 
 
    def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
        Try { 
          NamedDB(ctx.dbName) localTx { implicit session => 
            session.fetchSize(ctx.fetchSize) 
            ctx.queryTimeout.foreach(session.queryTimeout(_)) 
            val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match { 
              case Nil => Seq.fill(ctx.statements.size)(None) 
              case k@_ => k 
            } 
            val sqlcmd = ctx.statements zip ctx.parameters zip keys 
            val results = sqlcmd.map { case ((stm, param), key) => 
              key match { 
                case None => 
                  new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong 
                case Some(k) => 
                  new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong 
              } 
            } 
            results.to[C] 
          } 
        } 
     }

下面是这个函数的使用示范: 

 val updateSQL = "update members set description = ? where id < ?" 
  ctx = JDBCContext('h2) 
  try { 
     ctx = ctx.setUpdateCommand(JDBCContext.RETURN_GENERATED_KEYVALUE,insertSQL, 
       "max", None, "no birth date", dateCreated, None) 
       .appendUpdateCommand(JDBCContext.RETURN_UPDATED_COUNT, updateSQL, "id++", 10) 
      .appendUpdateCommand(JDBCContext.RETURN_UPDATED_COUNT,"delete members where id = 1") 
  } 
  catch { 
    case e: Exception => println(e.getMessage) 
  } 
  var resultUpdates = jdbcTxUpdates[Vector](ctx) 
 
  resultUpdates match { 
    case Success(msg) => println(msg) 
    case Failure(err) => println(s"${err.getMessage}") 
  }

在这个例子里我们把insert,update和delete混在了一个transaction里。最后,我们再把试验数据,包括blob字段读出来:

  //data model 
  case class Member( 
                     id: Long, 
                     name: String, 
                     description: Option[String] = None, 
                     birthday: Option[LocalDate] = None, 
                     createdAt: DateTime, 
                     picture: InputStream) 
 
  //data row converter 
  val toMember = (rs: WrappedResultSet) => Member( 
    id = rs.long("id"), 
    name = rs.string("name"), 
    description = rs.stringOpt("description"), 
    birthday = rs.jodaLocalDateOpt("birthday"), 
    createdAt = rs.jodaDateTime("created_at"), 
    picture = rs.binaryStream("picture") 
  ) 
 
  ctx = JDBCContext('h2) 
  ctx = ctx.setQueryCommand("select * from members").setQueryTimeout(Some(1000)) 
 
  val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember) 
 
  val buffer = new Array[Byte](1024) 
 
  vecMember.foreach {row => 
    println(s"id: ${row.id} name: ${row.name}") 
    println(s"name: ${row.name}") 
    if (row.picture == null) 
      println("picture empty") 
    else { 
      val fname = s"/users/tiger/pic${row.id}.png" 
      val file = new File(fname) 
      val output = new FileOutputStream(file) 
 
      println(s"saving picture to $fname") 
 
      row.picture.available() 
      while (row.picture.read(buffer) > 0) { 
        output.write(buffer) 
      } 
 
      output.close() 
 
    } 
  }

下面是本次讨论的示范源代码:

build.sbt

name := "learn-scalikeJDBC" 
 
version := "0.1" 
 
scalaVersion := "2.12.4" 
 
// Scala 2.10, 2.11, 2.12 
libraryDependencies ++= Seq( 
  "org.scalikejdbc" %% "scalikejdbc"       % "3.1.0", 
  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.1.0"   % "test", 
  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.1.0", 
  "com.h2database"  %  "h2"                % "1.4.196", 
  "mysql" % "mysql-connector-java" % "6.0.6", 
  "org.postgresql" % "postgresql" % "42.2.0", 
  "commons-dbcp" % "commons-dbcp" % "1.4", 
  "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2", 
  "com.zaxxer" % "HikariCP" % "2.7.4", 
  "com.jolbox" % "bonecp" % "0.8.0.RELEASE", 
  "com.typesafe.slick" %% "slick" % "3.2.1", 
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3" 
)

resources/application.conf

 

# JDBC settings 
test { 
  db { 
    h2 { 
      driver = "org.h2.Driver" 
      url = "jdbc:h2:tcp://localhost/~/slickdemo" 
      user = "" 
      password = "" 
      poolInitialSize = 5 
      poolMaxSize = 7 
      poolConnectionTimeoutMillis = 1000 
      poolValidationQuery = "select 1 as one" 
      poolFactoryName = "commons-dbcp2" 
    } 
  } 
 
  db.mysql.driver = "com.mysql.cj.jdbc.Driver" 
  db.mysql.url = "jdbc:mysql://localhost:3306/testdb" 
  db.mysql.user = "root" 
  db.mysql.password = "123" 
  db.mysql.poolInitialSize = 5 
  db.mysql.poolMaxSize = 7 
  db.mysql.poolConnectionTimeoutMillis = 1000 
  db.mysql.poolValidationQuery = "select 1 as one" 
  db.mysql.poolFactoryName = "bonecp" 
 
  # scallikejdbc Global settings 
  scalikejdbc.global.loggingSQLAndTime.enabled = true 
  scalikejdbc.global.loggingSQLAndTime.logLevel = info 
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true 
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn 
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false 
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false 
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 
} 
dev { 
  db { 
    h2 { 
      driver = "org.h2.Driver" 
      url = "jdbc:h2:tcp://localhost/~/slickdemo" 
      user = "" 
      password = "" 
      poolFactoryName = "hikaricp" 
      numThreads = 10 
      maxConnections = 12 
      minConnections = 4 
      keepAliveConnection = true 
    } 
    mysql { 
      driver = "com.mysql.cj.jdbc.Driver" 
      url = "jdbc:mysql://localhost:3306/testdb" 
      user = "root" 
      password = "123" 
      poolInitialSize = 5 
      poolMaxSize = 7 
      poolConnectionTimeoutMillis = 1000 
      poolValidationQuery = "select 1 as one" 
      poolFactoryName = "bonecp" 
 
    } 
    postgres { 
      driver = "org.postgresql.Driver" 
      url = "jdbc:postgresql://localhost:5432/testdb" 
      user = "root" 
      password = "123" 
      poolFactoryName = "hikaricp" 
      numThreads = 10 
      maxConnections = 12 
      minConnections = 4 
      keepAliveConnection = true 
    } 
  } 
  # scallikejdbc Global settings 
  scalikejdbc.global.loggingSQLAndTime.enabled = true 
  scalikejdbc.global.loggingSQLAndTime.logLevel = info 
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true 
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn 
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false 
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false 
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 
}

JDBCEngine.scala

package jdbccontext 
import java.sql.PreparedStatement 
 
import scala.collection.generic.CanBuildFrom 
import scalikejdbc._ 
 
import scala.util._ 
import scalikejdbc.TxBoundary.Try._ 
 
  object JDBCContext { 
    type SQLTYPE = Int 
    val SQL_SELECT: Int = 0 
    val SQL_EXEDDL= 1 
    val SQL_UPDATE = 2 
    val RETURN_GENERATED_KEYVALUE = true 
    val RETURN_UPDATED_COUNT = false 
 
  } 
 
  case class JDBCContext( 
                          dbName: Symbol, 
                          statements: Seq[String] = Nil, 
                          parameters: Seq[Seq[Any]] = Nil, 
                          fetchSize: Int = 100, 
                          queryTimeout: Option[Int] = None, 
                          queryTags: Seq[String] = Nil, 
                          sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT, 
                          batch: Boolean = false, 
                          returnGeneratedKey: Seq[Option[Any]] = Nil, 
                          // no return: None, return by index: Some(1), by name: Some("id") 
                          preAction: Option[PreparedStatement => Unit] = None, 
                          postAction: Option[PreparedStatement => Unit] = None) { 
 
    ctx => 
 
    //helper functions 
 
    def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag) 
 
    def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags) 
 
    def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size) 
 
    def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time) 
 
    def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = { 
      if (ctx.sqlType == JDBCContext.SQL_UPDATE && 
        !ctx.batch && ctx.statements.size == 1) 
        ctx.copy(preAction = action) 
      else 
        throw new IllegalStateException("JDBCContex setting error: preAction not supported!") 
    } 
 
    def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = { 
      if (ctx.sqlType == JDBCContext.SQL_UPDATE && 
        !ctx.batch && ctx.statements.size == 1) 
        ctx.copy(postAction = action) 
      else 
        throw new IllegalStateException("JDBCContex setting error: preAction not supported!") 
    } 
 
    def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { 
      if (ctx.sqlType == JDBCContext.SQL_EXEDDL) { 
        ctx.copy( 
          statements = ctx.statements ++ Seq(_statement), 
          parameters = ctx.parameters ++ Seq(Seq(_parameters)) 
        ) 
      } else 
        throw new IllegalStateException("JDBCContex setting error: option not supported!") 
    } 
 
    def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { 
      if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) { 
        ctx.copy( 
          statements = ctx.statements ++ Seq(_statement), 
          parameters = ctx.parameters ++ Seq(_parameters), 
          returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None)) 
        ) 
      } else 
        throw new IllegalStateException("JDBCContex setting error: option not supported!") 
    } 
 
    def appendBatchParameters(_parameters: Any*): JDBCContext = { 
      if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) 
        throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") 
 
      var matchParams = true 
      if (ctx.parameters != Nil) 
        if (ctx.parameters.head.size != _parameters.size) 
          matchParams = false 
      if (matchParams) { 
        ctx.copy( 
          parameters = ctx.parameters ++ Seq(_parameters) 
        ) 
      } else 
        throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!") 
    } 
 
    def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = { 
      if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) 
         throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!") 
      ctx.copy( 
        returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil 
      ) 
    } 
 
     def setQueryCommand(_statement: String, _parameters: Any*): JDBCContext = { 
        ctx.copy( 
          statements = Seq(_statement), 
          parameters = Seq(_parameters), 
          sqlType = JDBCContext.SQL_SELECT, 
          batch = false 
        ) 
      } 
 
      def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { 
        ctx.copy( 
          statements = Seq(_statement), 
          parameters = Seq(_parameters), 
          sqlType = JDBCContext.SQL_EXEDDL, 
          batch = false 
        ) 
      } 
 
      def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { 
        ctx.copy( 
          statements = Seq(_statement), 
          parameters = Seq(_parameters), 
          returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None), 
          sqlType = JDBCContext.SQL_UPDATE, 
          batch = false 
        ) 
      } 
      def setBatchCommand(_statement: String): JDBCContext = { 
        ctx.copy ( 
          statements = Seq(_statement), 
          sqlType = JDBCContext.SQL_UPDATE, 
          batch = true 
        ) 
      } 
  } 
 
  object JDBCEngine { 
 
    import JDBCContext._ 
 
    private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) => 
      throw new IllegalStateException(message) 
    } 
 
    def jdbcQueryResult[C[_] <: TraversableOnce[_], A]( 
         ctx: JDBCContext, rowConverter: WrappedResultSet => A)( 
          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { 
 
      ctx.sqlType match { 
        case SQL_SELECT => { 
          val params: Seq[Any] = ctx.parameters match { 
            case Nil => Nil 
            case p@_ => p.head 
          } 
          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor("")) 
          ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) 
          ctx.queryTags.foreach(rawSql.tags(_)) 
          rawSql.fetchSize(ctx.fetchSize) 
          implicit val session = NamedAutoSession(ctx.dbName) 
          val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter) 
          sql.collection.apply[C]() 
        } 
        case _ => throw new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_SELECT'!") 
      } 
    } 
 
    def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = { 
       if (ctx.sqlType != SQL_EXEDDL) { 
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")) 
      } 
      else { 
        NamedDB(ctx.dbName) localTx { implicit session => 
          Try { 
                ctx.statements.foreach { stm => 
                  val ddl = new SQLExecution(statement = stm, parameters = Nil)( 
                    before = WrappedResultSet => {})( 
                    after = WrappedResultSet => {}) 
 
                  ddl.apply() 
              } 
            "SQL_EXEDDL executed succesfully." 
          } 
        } 
      } 
    } 
 
    def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
      if (ctx.statements == Nil) 
        throw new IllegalStateException("JDBCContex setting error: statements empty!") 
      if (ctx.sqlType != SQL_UPDATE) { 
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) 
      } 
      else { 
        if (ctx.batch) { 
          if (noReturnKey(ctx)) { 
            val usql = SQL(ctx.statements.head) 
              .tags(ctx.queryTags: _*) 
              .batch(ctx.parameters: _*) 
            Try { 
              NamedDB(ctx.dbName) localTx { implicit session => 
                ctx.queryTimeout.foreach(session.queryTimeout(_)) 
                usql.apply[Seq]() 
                Seq.empty[Long].to[C] 
              } 
            } 
          } else { 
            val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None) 
            Try { 
              NamedDB(ctx.dbName) localTx { implicit session => 
                ctx.queryTimeout.foreach(session.queryTimeout(_)) 
                usql.apply[C]() 
              } 
            } 
          } 
 
        } else { 
          Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !")) 
        } 
      } 
    } 
     private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
       implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
       val Some(key) :: xs = ctx.returnGeneratedKey 
       val params: Seq[Any] = ctx.parameters match { 
         case Nil => Nil 
         case p@_ => p.head 
       } 
       val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key) 
       Try { 
         NamedDB(ctx.dbName) localTx { implicit session => 
           session.fetchSize(ctx.fetchSize) 
           ctx.queryTimeout.foreach(session.queryTimeout(_)) 
           val result = usql.apply() 
           Seq(result).to[C] 
         } 
       } 
     } 
 
      private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
        implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
      val params: Seq[Any] = ctx.parameters match { 
        case Nil => Nil 
        case p@_ => p.head 
      } 
      val before = ctx.preAction match { 
        case None => pstm: PreparedStatement => {} 
        case Some(f) => f 
      } 
      val after = ctx.postAction match { 
        case None => pstm: PreparedStatement => {} 
        case Some(f) => f 
      } 
      val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after) 
      Try { 
        NamedDB(ctx.dbName) localTx {implicit session => 
          session.fetchSize(ctx.fetchSize) 
          ctx.queryTimeout.foreach(session.queryTimeout(_)) 
          val result = usql.apply() 
          Seq(result.toLong).to[C] 
        } 
      } 
 
    } 
 
    private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
      if (noReturnKey(ctx)) 
        singleTxUpdateNoReturnKey(ctx) 
      else 
        singleTxUpdateWithReturnKey(ctx) 
    } 
 
    private def noReturnKey(ctx: JDBCContext): Boolean = { 
      if (ctx.returnGeneratedKey != Nil) { 
        val k :: xs = ctx.returnGeneratedKey 
         k match { 
          case None => true 
          case Some(k) => false 
        } 
      } else true 
    } 
 
    def noActon: PreparedStatement=>Unit = pstm => {} 
 
    def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
        Try { 
          NamedDB(ctx.dbName) localTx { implicit session => 
            session.fetchSize(ctx.fetchSize) 
            ctx.queryTimeout.foreach(session.queryTimeout(_)) 
            val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match { 
              case Nil => Seq.fill(ctx.statements.size)(None) 
              case k@_ => k 
            } 
            val sqlcmd = ctx.statements zip ctx.parameters zip keys 
            val results = sqlcmd.map { case ((stm, param), key) => 
              key match { 
                case None => 
                  new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong 
                case Some(k) => 
                  new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong 
              } 
            } 
            results.to[C] 
          } 
        } 
     } 
 
 
    def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( 
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { 
      if (ctx.statements == Nil) 
        throw new IllegalStateException("JDBCContex setting error: statements empty!") 
      if (ctx.sqlType != SQL_UPDATE) { 
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) 
      } 
      else { 
        if (!ctx.batch) { 
          if (ctx.statements.size == 1) 
            singleTxUpdate(ctx) 
          else 
            multiTxUpdates(ctx) 
        } else 
          Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !")) 
 
      } 
    } 
 
  }

JDBCEngineDemo.scala

import java.io.File 
import java.io.FileOutputStream 
import java.io.InputStream 
import jdbccontext._ 
import configdbs._ 
import org.joda.time._ 
import scala.util._ 
import JDBCEngine._ 
 
import scalikejdbc._ 
object CrudDemo extends App { 
  ConfigDBsWithEnv("dev").setup('h2) 
  ConfigDBsWithEnv("dev").loadGlobalSettings() 
 
  val dropSQL: String =""" 
      drop table members 
    """ 
 
  val createSQL: String =""" 
    create table members ( 
      id serial not null primary key, 
      name varchar(30) not null, 
      description varchar(1000), 
      birthday date, 
      created_at timestamp not null, 
      picture blob 
    )""" 
 
  var ctx = JDBCContext('h2) 
    try { 
      ctx = ctx.setDDLCommand(dropSQL) 
        .appendDDLCommand(createSQL) 
    } 
    catch { 
       case e: Exception => println(e.getMessage) 
    } 
 
  val resultCreateTable = jdbcExcuteDDL(ctx) 
 
  resultCreateTable match { 
    case Success(msg) => println(msg) 
    case Failure(err) => println(s"${err.getMessage}") 
  } 
 
  val insertSQL = "insert into members(name,birthday,description,created_at,picture) values (?, ?, ?, ?, ?)" 
  val dateCreated = DateTime.now 
 
  import java.io.FileInputStream 
 
  val picfile = new File("/users/tiger/Nobody.png") 
  val fis = new FileInputStream(picfile) 
 
  ctx = JDBCContext('h2) 
  try { 
    ctx = ctx.setBatchCommand(insertSQL).appendBatchParameters( 
      "John",new LocalDate("2008-03-01"),"youngest user",dateCreated,None).appendBatchParameters( 
      "peter", None, "no birth date", dateCreated, fis) 
      .appendBatchParameters( 
        "susan", None, "no birth date", dateCreated, None) 
      .setBatchReturnGeneratedKeyOption(JDBCContext.RETURN_GENERATED_KEYVALUE) 
  } 
  catch { 
    case e: Exception => println(e.getMessage) 
  } 
 
  var resultInserts = jdbcBatchUpdate(ctx) 
 
  resultInserts match { 
    case Success(msg) => println(msg) 
    case Failure(err) => println(s"${err.getMessage}") 
  } 
 
 
  val updateSQL = "update members set description = ? where id < ?" 
  ctx = JDBCContext('h2) 
  try { 
     ctx = ctx.setUpdateCommand(JDBCContext.RETURN_GENERATED_KEYVALUE,insertSQL, 
       "max", None, "no birth date", dateCreated, None) 
       .appendUpdateCommand(JDBCContext.RETURN_UPDATED_COUNT, updateSQL, "id++", 10) 
      .appendUpdateCommand(JDBCContext.RETURN_UPDATED_COUNT,"delete members where id = 1") 
  } 
  catch { 
    case e: Exception => println(e.getMessage) 
  } 
  var resultUpdates = jdbcTxUpdates[Vector](ctx) 
 
  resultUpdates match { 
    case Success(msg) => println(msg) 
    case Failure(err) => println(s"${err.getMessage}") 
  } 
 
 
  //data model 
  case class Member( 
                     id: Long, 
                     name: String, 
                     description: Option[String] = None, 
                     birthday: Option[LocalDate] = None, 
                     createdAt: DateTime, 
                     picture: InputStream) 
 
  //data row converter 
  val toMember = (rs: WrappedResultSet) => Member( 
    id = rs.long("id"), 
    name = rs.string("name"), 
    description = rs.stringOpt("description"), 
    birthday = rs.jodaLocalDateOpt("birthday"), 
    createdAt = rs.jodaDateTime("created_at"), 
    picture = rs.binaryStream("picture") 
  ) 
 
  ctx = JDBCContext('h2) 
  ctx = ctx.setQueryCommand("select * from members").setQueryTimeout(Some(1000)) 
 
  val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember) 
 
  val buffer = new Array[Byte](1024) 
 
  vecMember.foreach {row => 
    println(s"id: ${row.id} name: ${row.name}") 
    println(s"name: ${row.name}") 
    if (row.picture == null) 
      println("picture empty") 
    else { 
      val fname = s"/users/tiger/pic${row.id}.png" 
      val file = new File(fname) 
      val output = new FileOutputStream(file) 
 
      println(s"saving picture to $fname") 
 
      row.picture.available() 
      while (row.picture.read(buffer) > 0) { 
        output.write(buffer) 
      } 
 
      output.close() 
 
    } 
  } 
 
}

 

 

 

 

 

 

 

 

 

 

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/industrynews/12813.html

(0)
上一篇 2021年7月19日 14:55
下一篇 2021年7月19日 14:55

相关推荐

发表回复

登录后才能评论