package org.hnsw
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkLearn {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Jxq").setMaster("local[*]")
val sc = new StreamingContext(conf, Seconds(5))
val dString = sc.socketTextStream("192.168.3.66",8888)
//统计指定窗口范围内 单词个数
val words = dString.flatMap((x)=>{
x.split(" ")
})
val wordMap = words.map((x)=>{
(x,1)
})
val wordcount = wordMap.reduceByKeyAndWindow((v1:Int,v2:Int)=>{
v1+v2
},Seconds(60),Seconds(30))
//简单打印
wordcount.print()
//结果输出到外部系统 -- foreachRdd
wordcount.foreachRDD((rdd)=>{ //执行在driver端 代码在哪运行
rdd.foreachPartition((partRdd)=>{ //执行在worker端 工作节点
//1)获取数据库连接
val connect = ConnectionPool.getConnection()
val state = connect.createStatement() // 创建第三方预处理对象
connect.setAutoCommit(false) //默认是自动链接 关闭自动连接
//2)使用连接拼接sql insert into aaa ('a','b')
partRdd.foreach((x)=>{
// state.addBatch("insert into searcheKeyWord(insert_name,keyword,search_count) values(now(), '" + x._1 +"','"+ x._2 + "')")
state.addBatch("insert into searcheKeyWord(insert_time,keyword,search_count) values(now(), '" + x._1 + "','" + x._2+"')")
})
//3)执行sql,提交数据到服务器
state.executeBatch()
connect.commit() //最终执行命令是mysql服务器
//4)返回连接池
ConnectionPool.returnConnection(connect)
})
})
sc.start()
sc.awaitTermination()
}
}
连接池工具类
package org.hnsw.streaming
import java.sql.{Connection, DriverManager}
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool, GenericObjectPoolConfig}
import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}
object ConnectionPool {
// 创建连接
val conFactory = new MysqlConnectionFactory("jdbc:mysql://127.0.0.1:3306/test?useSSL=false&serverTimezone=GMT&characterEncoding=utf-8", "root", "root", "com.mysql.jdbc.Driver")
// GenericObjectPoolConfig 线程池的配置
val objectPoolConfig = new GenericObjectPoolConfig[Connection]();
objectPoolConfig.setMaxTotal(100)
// 创建线程池
private val pool = new GenericObjectPool[Connection](conFactory, objectPoolConfig)
// 获取连接
def getConnection(): Connection = {
pool.borrowObject()
}
//返回连接
def returnConnection(conn: Connection): Unit = {
pool.returnObject(conn)
}
}
class MysqlConnectionFactory(url: String, userName: String, password: String, className: String) extends BasePooledObjectFactory[Connection] {
//创建连接对象
override def create(): Connection = {
Class.forName(className)
DriverManager.getConnection(url, userName, password)
}
/**
* DefaultPooledObject对象对对象池中对象进行的包装。
* * 将我们自定义的对象放置到这个包装中,工具会统计对象的状态、创建时间、更新时间、返回时间、出借时间、使用时间等等信息进行统计
*/
override def wrap(conn: Connection): PooledObject[Connection] = new DefaultPooledObject[Connection](conn)
/*
*激活对象
*/
override def validateObject(pObj: PooledObject[Connection]) = !pObj.getObject.isClosed
/*
*销毁对象
*/
override def destroyObject(pObj: PooledObject[Connection]) = pObj.getObject.close()
}
下载链接
原创文章,作者:carmelaweatherly,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/267217.html