经常会有这样的需求:在现有数仓表的基础上,写一些sql,然后生成hive表并同步到mysql。
次数多了,就像写一个工具完成这个工作
一:背景、功能、流程介绍
1.背景:
1.数仓使用hive存储,datax导数据、airflow调度
2.不知道怎么利用hive解析sql,拿到对应的schema,但是spark知道
spark.sql(sql).schema.toList所以就用了scala
2.功能
就是通过配置完成hive,mysql的建表,airflow调度任务的生成
3.流程
1.配置mysql链接
2.根据输入sparksql,生成对应的hive,mysql表结构,建表
3.生成airflow调度任务(插入hive数据,调用datax同步数据到mysql)
二:代码
1.配置文件介绍:
MysqlToHive.properties
jdbcalias:ptx_read #mysql别名要和同步的数据库的别名保持一致
table:be_product #要同步的表名
owner=owner ##airflow任务的owner
lifecycle=180 ##hive表的生命周期,数据数据产品删除数据
airflowpath=/airflow/dags/ods/ ##生成airflow任务文件的路径
jdbc1alias : hive ##可以写多个mysql链接,不用一个来回改
jdbc1host : 127.0.0.1
jdbc1port : 3306
jdbc1user : root
jdbc1passwd : **
jdbc1db_name : test
jdbc2alias:read
jdbc2host : 127.0.0.1
jdbc2port : 3306
jdbc2user : root
jdbc2passwd :**
jdbc2db_name :test
2.基本代码:
MysqlToHive.java
object HiveToMysql {
//mysql配置内部类
case class Database(host: String,port: Int,user: String,passwd: String,db_name: String){}
//读取配置文件
def readDbPropertiesFile(fileName: String,spark:SparkSession,sql: String): Unit = {
val pp = new Properties
val fps = new FileInputStream("HiveToMysql.properties")
// val fps = Thread.currentThread.getContextClassLoader.getResourceAsStream(fileName)
pp.load(fps)
parseProperties(pp,spark,sql)
fps.close()
}
//解析配置文件对应配置
def parseProperties(pp: Properties,spark:SparkSession,sql: String): Unit = {
val table = pp.getProperty("table")
val owner = pp.getProperty("owner")
val lifecycle = pp.getProperty("lifecycle")
val jdbcalias = pp.getProperty("jdbcalias")
val airflowpath = pp.getProperty("airflowpath")
import scala.collection.mutable.ArrayBuffer
var tableColumn: ArrayBuffer[String] = new ArrayBuffer[String]();
var dbindex = 1
while (pp.getProperty("jdbc" + dbindex + "alias") != null && !pp.getProperty("jdbc" + dbindex + "alias").equals(jdbcalias)) {
dbindex += 1
}
var database = new Database(pp.getProperty("jdbc" + dbindex + "host"),pp.getProperty("jdbc" + dbindex + "port").toInt,
pp.getProperty("jdbc" + dbindex + "user"),pp.getProperty("jdbc" + dbindex + "passwd"),pp.getProperty("jdbc" + dbindex + "db_name"))
val mysqlSelectBuilder = new StringBuilder
val schemaList = spark.sql(sql).schema.toList
//sparksql 利用schema生成hive建表语句和mysql建表语句
for ( i <- 0 until schemaList.length ) {
println(schemaList.apply(i).name+"|"+schemaList.apply(i).dataType.typeName)
tableColumn += (schemaList.apply(i).name+"|"+schemaList.apply(i).dataType.typeName)
mysqlSelectBuilder.append(schemaList.apply(i).name+",")
}
mysqlSelectBuilder.deleteCharAt(mysqlSelectBuilder.length - 1)
buildExecuteHiveSql(table,tableColumn,lifecycle,owner)
buildExecuteMysql(table,tableColumn,database);
printAirflowJob(airflowpath,table,owner,jdbcalias,mysqlSelectBuilder.toString(),sql: String)
}
//airflow封装太多了,就不写了
def printAirflowJob(airflowpath:String,table:String,owner:String,jdbcalias:String,mysqlSelect:String,sql:String){
val db = table.substring(0, table.indexOf("."));
val tableNoDatabase = table.substring(table.indexOf(".") + 1);
System.out.println(airflowpath +db+"/"+ tableNoDatabase)
if (new File(airflowpath +db+"/"+ tableNoDatabase).exists())
System.out.println("folder exist,please delete the folder " + airflowpath +db+"/"+ tableNoDatabase)
else {
val dir = new File(airflowpath +db+"/"+ tableNoDatabase);
dir.mkdirs();
val pw = new PrintWriter(airflowpath +db+"/"+ tableNoDatabase + "/" + tableNoDatabase + "_dag.py")
pw.println("import airflow");
pw.println("from airflow import DAG");
pw.println(")");
pw.println("");
pw.println("");
pw.flush()
pw.close()
}
}
@throws[IOException]
def buildExecuteMysql(table:String,tableColumn:ArrayBuffer[String],database:Database): Unit = {
val mysqlSqlBuilder = new StringBuilder
mysqlSqlBuilder.append("CREATE TABLE " + table.substring(table.indexOf(".")+1)+ " ( /n")
mysqlSqlBuilder.append("dt varchar(10) DEFAULT NULL,"+"/n")
println("tableColumnCopy"+tableColumn.size)
val tableColumnCopy = tableColumn.toArray[String];
for (i <- 0 until tableColumnCopy.size) {
val fieldAndType = tableColumnCopy.apply(i).split("//|")
mysqlSqlBuilder.append(fieldAndType(0)+ " ")
if (fieldAndType(1).contains("integer") || fieldAndType(1).contains("long"))
mysqlSqlBuilder.append(" bigint(10)")
else if (fieldAndType(1).contains("float") || fieldAndType(1).contains("double") || fieldAndType(1).contains("decimal"))
mysqlSqlBuilder.append(" decimal(36,6)")
else if (fieldAndType(1).contains("string") )
mysqlSqlBuilder.append(" varchar(36)")
else if (fieldAndType(1).contains("boolean") )
mysqlSqlBuilder.append(" boolean")
else if (fieldAndType(1).contains("date") || fieldAndType(1).contains("timestamp") )
mysqlSqlBuilder.append(" varchar(36)")
mysqlSqlBuilder.append(" DEFAULT NULL," +"/n")
}
mysqlSqlBuilder.deleteCharAt(mysqlSqlBuilder.length - 2) //去除最后的回车和,
mysqlSqlBuilder.append(") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4")
System.out.println(mysqlSqlBuilder.toString)
Class.forName("com.mysql.cj.jdbc.Driver")
var con = DriverManager.getConnection("jdbc:mysql://" + database.host + ":" + database.port + "/" + database.db_name + "?serverTimezone=UTC", database.user, database.passwd)
var st = con.createStatement
st.execute(mysqlSqlBuilder.toString)
st.close();con.close();
}
@throws[IOException]
@throws[InterruptedException]
def buildExecuteHiveSql(table:String,tableColumn:ArrayBuffer[String],lifecycle:String,owner:String): Unit = {
val mysqlSqlBuilder = new StringBuilder
mysqlSqlBuilder.append("CREATE TABLE " + table+ " ( /n")
println("tableColumnCopy"+tableColumn.size)
val tableColumnCopy = tableColumn.toArray[String];
for (i <- 0 until tableColumnCopy.size) {
val fieldAndType = tableColumnCopy.apply(i).split("//|")
if(fieldAndType.apply(1).contains("integer") || fieldAndType.apply(1).contains("long"))
mysqlSqlBuilder.append(fieldAndType.apply(0)+" bigint,")
else if(fieldAndType.apply(1).contains("float") || fieldAndType.apply(1).contains("double") || fieldAndType.apply(1).contains("decimal"))
mysqlSqlBuilder.append(fieldAndType.apply(0)+" "+"double ,")
else if(fieldAndType.apply(1).contains("string"))
mysqlSqlBuilder.append(fieldAndType.apply(0)+" string,")
else if(fieldAndType.apply(1).contains("boolean"))
mysqlSqlBuilder.append(fieldAndType.apply(0)+" boolean,")
else if(fieldAndType.apply(1).contains("date")||fieldAndType.apply(1).contains("timestamp"))
mysqlSqlBuilder.append(fieldAndType.apply(0)+" string,")
mysqlSqlBuilder.append("/n")
}
mysqlSqlBuilder.deleteCharAt(mysqlSqlBuilder.length - 2) //去除最后的回车和,
mysqlSqlBuilder.append(") PARTITIONED BY ( dt string COMMENT '(一级分区)' ) /n")
mysqlSqlBuilder.append("ROW FORMAT DELIMITED STORED AS PARQUET /n")
mysqlSqlBuilder.append("TBLPROPERTIES ('lifecycle'='" + lifecycle + "','owner'='" + owner + "','parquet.compression'='snappy');")
System.out.println(mysqlSqlBuilder.toString)
val process = new ProcessBuilder("hive", "-e", "/"" + mysqlSqlBuilder.toString + "/"").redirectErrorStream(true).start
val br = new BufferedReader(new InputStreamReader(process.getInputStream))
var line = ""
do {
line = br.readLine()
Thread.sleep(1000)
println(line)
}while(line!=null)
process.waitFor
}
def main(args: Array[String]): Unit = {
// val sparkconf = new SparkConf().setAppName("test_Spark_sql").setMaster("local[2]")
// val spark = SparkSession.builder().config(sparkconf).config("spark.driver.host", "localhost").getOrCreate()
val spark= SparkSession.builder.appName("HiveToMysql").enableHiveSupport().getOrCreate()
readDbPropertiesFile("HiveToMysql.properties",spark,args(0))
}
}
3.脚本文件:HiveToMysql.sh
#!/bin/bash
mv bigData.jar .
mv HiveToMysql.properties .
sql=`cat /sql`
spark-submit /
--class HiveToMysql /
--master yarn /
--deploy-mode client /
--num-executors 1 /
--executor-memory 4g /
--executor-cores 1 /
--driver-memory 1g /
--name "HiveToMysql" /
--conf spark.speculation=true /
--conf spark.speculation.interval=30000 /
--conf spark.speculation.quantile=0.8 /
--conf spark.speculation.multiplier=1.5 /
--conf spark.dynamicAllocation.enabled=false /
--files HiveToMysql.properties /
--jars fastjson-1.2.62.jar,mysql-connector-java-8.0.18.jar /
bigData.jar "$sql"
4.可能的问题
1.scala比较烂,代码比较难阅读
2.调度的时间一样(可做需改)
3.数据类型的处理,根据业务需求
原创文章,作者:,如若转载,请注明出处:https://blog.ytso.com/273186.html