DStream输出操作-外连接mysql


package org.hnsw

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


object SparkLearn {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Jxq").setMaster("local[*]")
    val sc = new StreamingContext(conf, Seconds(5))

    val dString = sc.socketTextStream("192.168.3.66",8888)
    //统计指定窗口范围内 单词个数
    val words = dString.flatMap((x)=>{
      x.split(" ")
    })
    val wordMap = words.map((x)=>{
      (x,1)
    })
    val wordcount = wordMap.reduceByKeyAndWindow((v1:Int,v2:Int)=>{
      v1+v2
    },Seconds(60),Seconds(30))
    //简单打印
    wordcount.print()
    //结果输出到外部系统   -- foreachRdd
    wordcount.foreachRDD((rdd)=>{ //执行在driver端  代码在哪运行
      rdd.foreachPartition((partRdd)=>{ //执行在worker端  工作节点
        //1)获取数据库连接
        val connect = ConnectionPool.getConnection()
        val state = connect.createStatement() // 创建第三方预处理对象
        connect.setAutoCommit(false) //默认是自动链接 关闭自动连接
        //2)使用连接拼接sql insert into aaa ('a','b')
        partRdd.foreach((x)=>{
//          state.addBatch("insert into searcheKeyWord(insert_name,keyword,search_count) values(now(), '" + x._1 +"','"+ x._2 + "')")
          state.addBatch("insert into searcheKeyWord(insert_time,keyword,search_count) values(now(), '" + x._1 + "','" + x._2+"')")
        })
        //3)执行sql,提交数据到服务器
        state.executeBatch()
        connect.commit()  //最终执行命令是mysql服务器
        //4)返回连接池
        ConnectionPool.returnConnection(connect)
      })

    })
    sc.start()
    sc.awaitTermination()


  }
}

连接池工具类

package org.hnsw.streaming

import java.sql.{Connection, DriverManager}

import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool, GenericObjectPoolConfig}
import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}

object ConnectionPool {
  // 创建连接
  val conFactory = new MysqlConnectionFactory("jdbc:mysql://127.0.0.1:3306/test?useSSL=false&serverTimezone=GMT&characterEncoding=utf-8", "root", "root", "com.mysql.jdbc.Driver")
  // GenericObjectPoolConfig 线程池的配置
  val objectPoolConfig = new GenericObjectPoolConfig[Connection]();
  objectPoolConfig.setMaxTotal(100)

  // 创建线程池
  private val pool = new GenericObjectPool[Connection](conFactory, objectPoolConfig)

  // 获取连接
  def getConnection(): Connection = {
    pool.borrowObject()
  }

  //返回连接
  def returnConnection(conn: Connection): Unit = {
    pool.returnObject(conn)
  }
}

class MysqlConnectionFactory(url: String, userName: String, password: String, className: String) extends BasePooledObjectFactory[Connection] {
  //创建连接对象
  override def create(): Connection = {
    Class.forName(className)
    DriverManager.getConnection(url, userName, password)
  }

  /**
   * DefaultPooledObject对象对对象池中对象进行的包装。
   * * 将我们自定义的对象放置到这个包装中,工具会统计对象的状态、创建时间、更新时间、返回时间、出借时间、使用时间等等信息进行统计
   */
  override def wrap(conn: Connection): PooledObject[Connection] = new DefaultPooledObject[Connection](conn)

  /*
   *激活对象
   */
  override def validateObject(pObj: PooledObject[Connection]) = !pObj.getObject.isClosed

  /*
   *销毁对象
   */
  override def destroyObject(pObj: PooledObject[Connection]) = pObj.getObject.close()
}

下载链接

https://mooc1-1.chaoxing.com/ueditorupload/read?objectId=1a799b40192dba564212eb9b7c6222d3&fileOriName=ConnectionPool.scala

原创文章,作者:carmelaweatherly,如若转载,请注明出处:https://blog.ytso.com/267217.html

(0)
上一篇 2022年6月14日
下一篇 2022年6月14日

相关推荐

发表回复

登录后才能评论