SDP(3):ScalikeJDBC- JDBC-Engine:Fetching详解编程语言

  ScalikeJDBC在覆盖JDBC基本功能上是比较完整的,而且实现这些功能的方式比较简洁,运算效率方面自然会稍高一筹了。理论上用ScalikeJDBC作为一种JDBC-Engine还是比较理想的:让它处于各种JDBC工具库和数据库实例之间接收JDBC运算指令然后连接目标数据库进行相关运算后返回结果。一般来说,各种JDBC工具库如ORM,FRM软件通过各自的DSL在复杂的数据库表关系环境内进行数据管理编程,最终产生相关的SQL语句即(prepared)statement+parameters传递给指定类型的数据库JDBC驱动程序去运算并产生结果。如果这样描述,那么JDBC-Engine主要的功能就是支持下面这个函数:

jdbcRunSQL(context: JDBCContext): JDBCResultSet

这个函数的用户提供一个JDBCContext类型值,然后由jdbcRunSQL进行接下来的运算并返回结果。从这个角度分析,JDBCContext最起码需要提供下面的属性: 

1、数据库连接:选择数据库连接池

2、运算参数:fetchSize, queryTimeout,queryTag。这几个参数都针对当前运算的SQL

3、Query参数:

    Query类型:select/execute/update、单条/成批、前置/后置query、generateKey

    SQL语句:statement:Seq[String]、parameters: Seq[Option[Seq[Any]]]

下面就是JDBCContext类型定义

import java.sql.PreparedStatement 
import scala.collection.generic.CanBuildFrom 
import scalikejdbc._ 
 
  object JDBCContext { 
    type SQLTYPE = Int 
    val SQL_SELECT: Int = 0 
    val SQL_EXECUTE = 1 
    val SQL_UPDATE = 2 
 
    def returnColumnByIndex(idx: Int) = Some(idx) 
 
    def returnColumnByName(col: String) = Some(col) 
  } 
 
  case class JDBCContext( 
                          dbName: Symbol, 
                          statements: Seq[String], 
                          parameters: Seq[Seq[Any]] = Nil, 
                          fetchSize: Int = 100, 
                          queryTimeout: Option[Int] = None, 
                          queryTags: Seq[String] = Nil, 
                          sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT, 
                          batch: Boolean = false, 
                          returnGeneratedKey: Option[Any] = None, 
                          // no return: None, return by index: Some(1), by name: Some("id") 
                          preAction: Option[PreparedStatement => Unit] = None, 
                          postAction: Option[PreparedStatement => Unit] = None)
重新考虑了一下,觉着把jdbc读写分开两个函数来实现更容易使用,因为这样比较符合编程模式和习性。所以最好把sqlType=SQL_SELECT类型SQL独立一个函数出来运算:
   def jdbcQueryResult[C[_] <: TraversableOnce[_], A]( 
         ctx: JDBCContext, rowConverter: WrappedResultSet => A)( 
          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { 
      ctx.sqlType match { 
        case SQL_SELECT => { 
          val params: Seq[Any] = ctx.parameters match { 
            case Nil => Nil 
            case p@_ => p.head 
          } 
          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor("")) 
          ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) 
          ctx.queryTags.foreach(rawSql.tags(_)) 
          rawSql.fetchSize(ctx.fetchSize) 
          implicit val session = NamedAutoSession(ctx.dbName) 
          val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter) 
          sql.collection.apply[C]() 
        } 
        case _ => throw new IllegalStateException("sqlType must be 'SQL_SELECT'!") 
      } 
    }

还需要提供noExtractor函数来符合SQLToCollectionImpl类型的参数款式要求:

  private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) => 
      throw new IllegalStateException(message) 
    }
我们来测试用一下jdbcQueryResult:
import scalikejdbc._ 
import JDBCEngine._ 
import configdbs._ 
import org.joda.time._ 
object JDBCQueryDemo extends App { 
  ConfigDBsWithEnv("dev").setupAll() 
 
  val ctx = JDBCContext( 
    dbName = 'h2, 
    statements = Seq("select * from members where id = ?"), 
    parameters = Seq(Seq(2)) 
  ) 
 
  //data model 
  case class Member( 
                     id: Long, 
                     name: String, 
                     description: Option[String] = None, 
                     birthday: Option[LocalDate] = None, 
                     createdAt: DateTime) 
 
  //data row converter 
  val toMember = (rs: WrappedResultSet) => Member( 
    id = rs.long("id"), 
    name = rs.string("name"), 
    description = rs.stringOpt("description"), 
    birthday = rs.jodaLocalDateOpt("birthday"), 
    createdAt = rs.jodaDateTime("created_at") 
  ) 
 
  val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember) 
 
  println(s"members in vector: $vecMember") 
 
  val ctx1 = ctx.copy(dbName = 'mysql) 
 
  val names: List[String] = jdbcQueryResult[List,String](ctx1,{rs: WrappedResultSet => rs.string("name")}) 
 
  println(s"selected name: $names") 
 
  val ctx2 = ctx1.copy(dbName = 'postgres) 
  val idname: List[(Long,String)] = jdbcQueryResult[List,(Long,String)](ctx2,{rs: WrappedResultSet => (rs.long("id"),rs.string("name"))}) 
 
  println(s"selected id+name: $idname") 
}

如果我们使用Slick-DSL进行数据库管理编程后应该如何与JDBC-Engine对接:

 object SlickDAO { 
    import slick.jdbc.H2Profile.api._ 
 
    case class CountyModel(id: Int, name: String) 
    case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") { 
      def id = column[Int]("ID",O.AutoInc,O.PrimaryKey) 
      def name = column[String]("NAME",O.Length(64)) 
      def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply) 
    } 
    val CountyQuery = TableQuery[CountyTable] 
    val filter = "Kansas" 
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"} 
    val statement = qry.result.statements.head 
  } 
  import SlickDAO._ 
 
 
  val slickCtx = JDBCContext( 
    dbName = 'h2, 
    statements = Seq(statement), 
  ) 
 
  val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx,{ 
    rs: WrappedResultSet => CountyModel(id=rs.int("id"),name=rs.string("name"))}) 
  vecCounty.foreach(r => println(s"${r.id},${r.name}"))

输出正确。

下面就是本次示范的源代码:

 build.sbt

name := "learn-scalikeJDBC" 
 
version := "0.1" 
 
scalaVersion := "2.12.4" 
 
// Scala 2.10, 2.11, 2.12 
libraryDependencies ++= Seq( 
  "org.scalikejdbc" %% "scalikejdbc"       % "3.1.0", 
  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.1.0"   % "test", 
  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.1.0", 
  "com.h2database"  %  "h2"                % "1.4.196", 
  "mysql" % "mysql-connector-java" % "6.0.6", 
  "org.postgresql" % "postgresql" % "42.2.0", 
  "commons-dbcp" % "commons-dbcp" % "1.4", 
  "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2", 
  "com.zaxxer" % "HikariCP" % "2.7.4", 
  "com.jolbox" % "bonecp" % "0.8.0.RELEASE", 
  "com.typesafe.slick" %% "slick" % "3.2.1", 
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3" 
)

resources/application.conf 包括H2,MySQL,PostgreSQL

# JDBC settings 
test { 
db { 
h2 { 
driver = "org.h2.Driver" 
url = "jdbc:h2:tcp://localhost/~/slickdemo" 
user = "" 
password = "" 
poolInitialSize = 5 
poolMaxSize = 7 
poolConnectionTimeoutMillis = 1000 
poolValidationQuery = "select 1 as one" 
poolFactoryName = "commons-dbcp2" 
} 
} 
db.mysql.driver = "com.mysql.cj.jdbc.Driver" 
db.mysql.url = "jdbc:mysql://localhost:3306/testdb" 
db.mysql.user = "root" 
db.mysql.password = "123" 
db.mysql.poolInitialSize = 5 
db.mysql.poolMaxSize = 7 
db.mysql.poolConnectionTimeoutMillis = 1000 
db.mysql.poolValidationQuery = "select 1 as one" 
db.mysql.poolFactoryName = "bonecp" 
# scallikejdbc Global settings 
scalikejdbc.global.loggingSQLAndTime.enabled = true 
scalikejdbc.global.loggingSQLAndTime.logLevel = info 
scalikejdbc.global.loggingSQLAndTime.warningEnabled = true 
scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 
scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn 
scalikejdbc.global.loggingSQLAndTime.singleLineMode = false 
scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false 
scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 
} 
dev { 
db { 
h2 { 
driver = "org.h2.Driver" 
url = "jdbc:h2:tcp://localhost/~/slickdemo" 
user = "" 
password = "" 
poolFactoryName = "hikaricp" 
numThreads = 10 
maxConnections = 12 
minConnections = 4 
keepAliveConnection = true 
} 
mysql { 
driver = "com.mysql.cj.jdbc.Driver" 
url = "jdbc:mysql://localhost:3306/testdb" 
user = "root" 
password = "123" 
poolInitialSize = 5 
poolMaxSize = 7 
poolConnectionTimeoutMillis = 1000 
poolValidationQuery = "select 1 as one" 
poolFactoryName = "bonecp" 
} 
postgres { 
driver = "org.postgresql.Driver" 
url = "jdbc:postgresql://localhost:5432/testdb" 
user = "root" 
password = "123" 
poolFactoryName = "hikaricp" 
numThreads = 10 
maxConnections = 12 
minConnections = 4 
keepAliveConnection = true 
} 
} 
# scallikejdbc Global settings 
scalikejdbc.global.loggingSQLAndTime.enabled = true 
scalikejdbc.global.loggingSQLAndTime.logLevel = info 
scalikejdbc.global.loggingSQLAndTime.warningEnabled = true 
scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 
scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn 
scalikejdbc.global.loggingSQLAndTime.singleLineMode = false 
scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false 
scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10

HikariConfig.scala  HikariCP连接池实现

package configdbs 
import scala.collection.mutable 
import scala.concurrent.duration.Duration 
import scala.language.implicitConversions 
import com.typesafe.config._ 
import java.util.concurrent.TimeUnit 
import java.util.Properties 
import scalikejdbc.config._ 
import com.typesafe.config.Config 
import com.zaxxer.hikari._ 
import scalikejdbc.ConnectionPoolFactoryRepository 
/** Extension methods to make Typesafe Config easier to use */ 
class ConfigExtensionMethods(val c: Config) extends AnyVal { 
import scala.collection.JavaConverters._ 
def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default 
def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default 
def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default 
def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default 
def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default 
def getDurationOr(path: String, default: => Duration = Duration.Zero) = 
if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default 
def getPropertiesOr(path: String, default: => Properties = null): Properties = 
if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default 
def toProperties: Properties = { 
def toProps(m: mutable.Map[String, ConfigValue]): Properties = { 
val props = new Properties(null) 
m.foreach { case (k, cv) => 
val v = 
if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala) 
else if(cv.unwrapped eq null) null 
else cv.unwrapped.toString 
if(v ne null) props.put(k, v) 
} 
props 
} 
toProps(c.root.asScala) 
} 
def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None 
def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None 
def getStringOpt(path: String) = Option(getStringOr(path)) 
def getPropertiesOpt(path: String) = Option(getPropertiesOr(path)) 
} 
object ConfigExtensionMethods { 
@inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c) 
} 
trait HikariConfigReader extends TypesafeConfigReader { 
self: TypesafeConfig =>      // with TypesafeConfigReader => //NoEnvPrefix => 
 
import ConfigExtensionMethods.configExtensionMethods 
def getFactoryName(dbName: Symbol): String = { 
val c: Config = config.getConfig(envPrefix + "db." + dbName.name) 
c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP) 
} 
def hikariCPConfig(dbName: Symbol): HikariConfig = { 
val hconf = new HikariConfig() 
val c: Config = config.getConfig(envPrefix + "db." + dbName.name) 
// Connection settings 
if (c.hasPath("dataSourceClass")) { 
hconf.setDataSourceClassName(c.getString("dataSourceClass")) 
} else { 
Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _) 
} 
hconf.setJdbcUrl(c.getStringOr("url", null)) 
c.getStringOpt("user").foreach(hconf.setUsername) 
c.getStringOpt("password").foreach(hconf.setPassword) 
c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties) 
// Pool configuration 
hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000)) 
hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000)) 
hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000)) 
hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000)) 
hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0)) 
hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false)) 
c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery) 
c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql) 
val numThreads = c.getIntOr("numThreads", 20) 
hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5)) 
hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads)) 
hconf.setPoolName(c.getStringOr("poolName", dbName.name)) 
hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false)) 
// Equivalent of ConnectionPreparer 
hconf.setReadOnly(c.getBooleanOr("readOnly", false)) 
c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation) 
hconf.setCatalog(c.getStringOr("catalog", null)) 
hconf 
} 
} 
import scalikejdbc._ 
trait ConfigDBs { 
self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader => 
def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { 
getFactoryName(dbName) match { 
case "hikaricp" => { 
val hconf = hikariCPConfig(dbName) 
val hikariCPSource = new HikariDataSource(hconf) 
if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) { 
Class.forName(hconf.getDriverClassName) 
} 
ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource)) 
} 
case _ => { 
val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName) 
val cpSettings = readConnectionPoolSettings(dbName) 
if (driver != null && driver.trim.nonEmpty) { 
Class.forName(driver) 
} 
ConnectionPool.add(dbName, url, user, password, cpSettings) 
} 
} 
} 
def setupAll(): Unit = { 
loadGlobalSettings() 
dbNames.foreach { dbName => setup(Symbol(dbName)) } 
} 
def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { 
ConnectionPool.close(dbName) 
} 
def closeAll(): Unit = { 
ConnectionPool.closeAll 
} 
} 
object ConfigDBs extends ConfigDBs 
with TypesafeConfigReader 
with StandardTypesafeConfig 
with HikariConfigReader 
case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs 
with TypesafeConfigReader 
with StandardTypesafeConfig 
with HikariConfigReader 
with EnvPrefix { 
override val env = Option(envValue) 
}

JDBCEngine.scala jdbcQueryResult函数实现

import java.sql.PreparedStatement 
import scala.collection.generic.CanBuildFrom 
import scalikejdbc._ 
object JDBCContext { 
type SQLTYPE = Int 
val SQL_SELECT: Int = 0 
val SQL_EXECUTE = 1 
val SQL_UPDATE = 2 
def returnColumnByIndex(idx: Int) = Some(idx) 
def returnColumnByName(col: String) = Some(col) 
} 
case class JDBCContext( 
dbName: Symbol, 
statements: Seq[String], 
parameters: Seq[Seq[Any]] = Nil, 
fetchSize: Int = 100, 
queryTimeout: Option[Int] = None, 
queryTags: Seq[String] = Nil, 
sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT, 
batch: Boolean = false, 
returnGeneratedKey: Option[Any] = None, 
// no return: None, return by index: Some(1), by name: Some("id") 
preAction: Option[PreparedStatement => Unit] = None, 
postAction: Option[PreparedStatement => Unit] = None) 
object JDBCEngine { 
import JDBCContext._ 
private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) => 
throw new IllegalStateException(message) 
} 
def jdbcQueryResult[C[_] <: TraversableOnce[_], A]( 
ctx: JDBCContext, rowConverter: WrappedResultSet => A)( 
implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { 
ctx.sqlType match { 
case SQL_SELECT => { 
val params: Seq[Any] = ctx.parameters match { 
case Nil => Nil 
case p@_ => p.head 
} 
val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor("boom!")) 
ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) 
ctx.queryTags.foreach(rawSql.tags(_)) 
rawSql.fetchSize(ctx.fetchSize) 
implicit val session = NamedAutoSession(ctx.dbName) 
val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter) 
sql.collection.apply[C]() 
} 
case _ => throw new IllegalStateException("sqlType must be 'SQL_SELECT'!") 
} 
} 
}

JDBCQueryDemo.scala  功能测试代码

import scalikejdbc._ 
import JDBCEngine._ 
import configdbs._ 
import org.joda.time._ 
object JDBCQueryDemo extends App { 
ConfigDBsWithEnv("dev").setupAll() 
val ctx = JDBCContext( 
dbName = 'h2, 
statements = Seq("select * from members where id = ?"), 
parameters = Seq(Seq(2)) 
) 
//data model 
case class Member( 
id: Long, 
name: String, 
description: Option[String] = None, 
birthday: Option[LocalDate] = None, 
createdAt: DateTime) 
//data row converter 
val toMember = (rs: WrappedResultSet) => Member( 
id = rs.long("id"), 
name = rs.string("name"), 
description = rs.stringOpt("description"), 
birthday = rs.jodaLocalDateOpt("birthday"), 
createdAt = rs.jodaDateTime("created_at") 
) 
val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember) 
println(s"members in vector: $vecMember") 
val ctx1 = ctx.copy(dbName = 'mysql) 
 
val names: List[String] = jdbcQueryResult[List,String](ctx1,{rs: WrappedResultSet => rs.string("name")}) 
println(s"selected name: $names") 
val ctx2 = ctx1.copy(dbName = 'postgres) 
val idname: List[(Long,String)] = jdbcQueryResult[List,(Long,String)](ctx2,{rs: WrappedResultSet => (rs.long("id"),rs.string("name"))}) 
println(s"selected id+name: $idname") 
object SlickDAO { 
import slick.jdbc.H2Profile.api._ 
case class CountyModel(id: Int, name: String) 
case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") { 
def id = column[Int]("ID",O.AutoInc,O.PrimaryKey) 
def name = column[String]("NAME",O.Length(64)) 
def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply) 
} 
val CountyQuery = TableQuery[CountyTable] 
val filter = "Kansas" 
val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"} 
val statement = qry.result.statements.head 
} 
import SlickDAO._ 
val slickCtx = JDBCContext( 
dbName = 'h2, 
statements = Seq(statement), 
) 
val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx,{ 
rs: WrappedResultSet => CountyModel(id=rs.int("id"),name=rs.string("name"))}) 
vecCounty.foreach(r => println(s"${r.id},${r.name}")) 
}

 

 

 

 

 

 

 

 

 

 

 

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/12814.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论