自定义Hive Sql Job分析工具详解大数据

前言

我们都知道,在大数据领域,Hive的出现帮我降低了许多使用Hadoop书写方式的学习成本.使用用户可以使用类似Sql的语法规则写明查询语句,从hive表数据中查询目标数据.最为重要的是这些sql语句会最终转化为map reduce作业进行处理.这也是Hive最强大的地方.可以简单的理解为Hive就是依托在Hadoop上的1个壳.但是这里有一点点小小的不同,不是每段hive查询sql语句与最后生成的job一一对应,如果你的这段sql是一个大sql,他在转化掉之后,会衍生出许多小job,这些小job是独立存在运行的,以不同的job名称进行区别,但是也会保留公共的job名称.所以一个问题来了,对于超级长的hive sql语句,我想查看到底是哪段子sql花费了我大量的执行时间,在JobHistory上只有每个子Job的运行时间,没有子Job对应的sql语句,一旦这个功能有了之后,就会帮助我们迅速的定位到问题所在.

Hive子Job中的Sql

OK,带着上述的目标,我们要想分析出到底哪段子sql所衍生的job运行时间更长,就要先知道这些sql到底在存在与哪里.在前面的描述中,已经提到了,Hive是依托于Hadoop,自然Hive提交的job信息也是保存在Hadoop的HDFS上的.在联想一下JobHistory中的各个文件类型.你应该会发现带有下面后缀的文件存在.

自定义Hive Sql Job分析工具详解大数据

我们发现里面包含了之前分析过的.jhist文件,还有带conf字符的.xml格式文件,从文件名上来看就是job提交时的一些配置信息,然后我们用vim命令查阅conf.xml后缀的文件,看看里面是不是有我们想要的hive qury string 这样的属性

自定义Hive Sql Job分析工具详解大数据

OK,目标算是找到了,这的确就是我们想要的属性.说明这样的信息的确是存在的,后面的操作就是怎么去解析这段有用的信息了.

程序工具分析Hive Sql Job

知道了目标数据源,我们能想到的最简单快速的方法就是逐行解析文件,做做文本匹配,筛选关键信息.这些代码谁都会写,首先要传入一个HDFS目录地址,这个是在JobHistory的存储目录上加上一个具体日期目录,这段解析程序在文章的末尾会加上.下面列举在调试分析程序时遇到的一些问题,这个还是比较有用的.

1.hive sql中的中文导致解析出现乱码

这个又是非常讨厌的java解析出现乱码的原因,因为考虑到sql中存在中文注释,而Hadoop在存中文的时候都是用utf8的编码方式,所以读出文件数据后进行一次转utf-8编码方式的处理,就是下面所示代码.

[java] 
view plain
copy
print
?

  1. …  
  2. fileSystem = path.getFileSystem(new Configuration());  
  3.             in = fileSystem.open(path);  
  4.             InputStreamReader isr;  
  5.             BufferedReader br;  
  6.   
  7.             isr = new InputStreamReader(in, “UTF-8”);  
  8.             br = new BufferedReader(isr);  
  9.   
  10.             while ((str = br.readLine()) != null) {  
  11. …  


2.单线程解析文件速度过慢

之前在测试环境中做文件解析看不出真实效果,文件一下解析就OK了,但是到真实环境中,多达几万个job文件,程序马上就吃不消了,算上解析文件,再把结果写入mysql,耗时达到60多分钟,后面改成了多线程的方式,后来开到10个线程去跑,速度才快了许多.

3.结果数据写入MySql过慢

后来处理速度是上去了,但是写入sql速度过慢,比如说,我有一次测试,开10个线程区解析,花了8分钟就解析好了几万个文件数据,但是插入数据库花了20分钟左右,而且量也就几万条语句.后来改成了批处理的方式,效果并没有什么大的改变,这个慢的问题具体并没有被解决掉,怀疑可能是因有些语句中存在超长的hive sql语句导致的.

下面是程序的主要分析代码,分为工具类代码,和解析线程类,代码全部链接再此处:https://github.com/linyiqun/yarn-jobhistory-crawler/tree/master/jobHiveSqlAnalyse

主工具代码

[java] 
view plain
copy
print
?

  1. package org.apache.hadoop.mapreduce.v2.hs.tool.sqlanalyse;  
  2.   
  3. import java.io.BufferedReader;  
  4. import java.io.FileNotFoundException;  
  5. import java.io.IOException;  
  6. import java.io.InputStream;  
  7. import java.io.InputStreamReader;  
  8. import java.io.PrintStream;  
  9. import java.util.ArrayList;  
  10. import java.util.HashMap;  
  11. import java.util.LinkedList;  
  12. import java.util.List;  
  13. import java.util.Map.Entry;  
  14.   
  15. import org.apache.hadoop.conf.Configuration;  
  16. import org.apache.hadoop.fs.FSDataInputStream;  
  17. import org.apache.hadoop.fs.FileContext;  
  18. import org.apache.hadoop.fs.FileStatus;  
  19. import org.apache.hadoop.fs.FileSystem;  
  20. import org.apache.hadoop.fs.Path;  
  21. import org.apache.hadoop.fs.RemoteIterator;  
  22. import org.apache.hadoop.fs.UnsupportedFileSystemException;  
  23. import org.apache.hadoop.io.IOUtils;  
  24.   
  25. public class HiveSqlAnalyseTool {  
  26.     private int threadNum;  
  27.     private String dirType;  
  28.     private String jobHistoryPath;  
  29.   
  30.     private FileContext doneDirFc;  
  31.     private Path doneDirPrefixPath;  
  32.   
  33.     private LinkedList<FileStatus> fileStatusList;  
  34.     private HashMap<String, String[]> dataInfos;  
  35.     private DbClient dbClient;  
  36.   
  37.     public HiveSqlAnalyseTool(String dirType, String jobHistoryPath,  
  38.             int threadNum) {  
  39.         this.threadNum = threadNum;  
  40.         this.dirType = dirType;  
  41.         this.jobHistoryPath = jobHistoryPath;  
  42.   
  43.         this.dataInfos = new HashMap<String, String[]>();  
  44.         this.fileStatusList = new LinkedList<FileStatus>();  
  45.         this.dbClient = new DbClient(BaseValues.DB_URL,  
  46.                 BaseValues.DB_USER_NAME, BaseValues.DB_PASSWORD,  
  47.                 BaseValues.DB_HIVE_SQL_STAT_TABLE_NAME);  
  48.   
  49.         try {  
  50.             doneDirPrefixPath = FileContext.getFileContext(new Configuration())  
  51.                     .makeQualified(new Path(this.jobHistoryPath));  
  52.             doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri());  
  53.         } catch (UnsupportedFileSystemException e) {  
  54.             // TODO Auto-generated catch block  
  55.             e.printStackTrace();  
  56.         } catch (IllegalArgumentException e) {  
  57.             // TODO Auto-generated catch block  
  58.             e.printStackTrace();  
  59.         }  
  60.     }  
  61.   
  62.     public void readJobInfoFiles() {  
  63.         List<FileStatus> files;  
  64.   
  65.         files = new ArrayList<FileStatus>();  
  66.         try {  
  67.             files = scanDirectory(doneDirPrefixPath, doneDirFc, files);  
  68.         } catch (IOException e) {  
  69.             // TODO Auto-generated catch block  
  70.             e.printStackTrace();  
  71.         }  
  72.   
  73.         if (files != null) {  
  74.             for (FileStatus fs : files) {  
  75.                 // parseFileInfo(fs);  
  76.             }  
  77.             System.out.println(“files num is “ + files.size());  
  78.             System.out  
  79.                     .println(“fileStatusList size is” + fileStatusList.size());  
  80.   
  81.             ParseThread[] threads;  
  82.             threads = new ParseThread[threadNum];  
  83.             for (int i = 0; i < threadNum; i++) {  
  84.                 System.out.println(“thread “ + i + “start run”);  
  85.                 threads[i] = new ParseThread(this, fileStatusList, dataInfos);  
  86.                 threads[i].start();  
  87.             }  
  88.   
  89.             for (int i = 0; i < threadNum; i++) {  
  90.                 System.out.println(“thread “ + i + “join run”);  
  91.                 try {  
  92.                     if (threads[i] != null) {  
  93.                         threads[i].join();  
  94.                     }  
  95.                 } catch (InterruptedException e) {  
  96.                     // TODO Auto-generated catch block  
  97.                     e.printStackTrace();  
  98.                 }  
  99.             }  
  100.         } else {  
  101.             System.out.println(“files is null”);  
  102.         }  
  103.   
  104.         printStatDatas();  
  105.     }  
  106.   
  107.     protected List<FileStatus> scanDirectory(Path path, FileContext fc,  
  108.             List<FileStatus> jhStatusList) throws IOException {  
  109.         path = fc.makeQualified(path);  
  110.         System.out.println(“dir path is “ + path.getName());  
  111.         try {  
  112.             RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);  
  113.             while (fileStatusIter.hasNext()) {  
  114.                 FileStatus fileStatus = fileStatusIter.next();  
  115.                 Path filePath = fileStatus.getPath();  
  116.   
  117.                 if (fileStatus.isFile()) {  
  118.                     jhStatusList.add(fileStatus);  
  119.                     fileStatusList.add(fileStatus);  
  120.                 } else if (fileStatus.isDirectory()) {  
  121.                     scanDirectory(filePath, fc, jhStatusList);  
  122.                 }  
  123.             }  
  124.         } catch (FileNotFoundException fe) {  
  125.             System.out.println(“Error while scanning directory “ + path);  
  126.         }  
  127.   
  128.         return jhStatusList;  
  129.     }  
  130.   
  131.     private void parseFileInfo(FileStatus fs) {  
  132.         String resultStr;  
  133.         String str;  
  134.         String username;  
  135.         String fileType;  
  136.         String jobId;  
  137.         String jobName;  
  138.         String hiveSql;  
  139.   
  140.         int startPos;  
  141.         int endPos;  
  142.         int hiveSqlFlag;  
  143.         long launchTime;  
  144.         long finishTime;  
  145.         int mapTaskNum;  
  146.         int reduceTaskNum;  
  147.         String xmlNameFlag;  
  148.         String launchTimeFlag;  
  149.         String finishTimeFlag;  
  150.         String launchMapFlag;  
  151.         String launchReduceFlag;  
  152.   
  153.         Path path;  
  154.         FileSystem fileSystem;  
  155.         InputStream in;  
  156.   
  157.         resultStr = “”;  
  158.         fileType = “”;  
  159.         hiveSql = “”;  
  160.         jobId = “”;  
  161.         jobName = “”;  
  162.         username = “”;  
  163.         hiveSqlFlag = 0;  
  164.         launchTime = 0;  
  165.         finishTime = 0;  
  166.         mapTaskNum = 0;  
  167.         reduceTaskNum = 0;  
  168.         xmlNameFlag = “<value>”;  
  169.         launchTimeFlag = “/”launchTime/”:”;  
  170.         finishTimeFlag = “/”finishTime/”:”;  
  171.         launchMapFlag = “/”Launched map tasks/””;  
  172.         launchReduceFlag = “/”Launched reduce tasks/””;  
  173.   
  174.         path = fs.getPath();  
  175.         str = path.getName();  
  176.         if (str.endsWith(“.xml”)) {  
  177.             fileType = “config”;  
  178.   
  179.             endPos = str.lastIndexOf(“_”);  
  180.             jobId = str.substring(0, endPos);  
  181.         } else if (str.endsWith(“.jhist”)) {  
  182.             fileType = “info”;  
  183.   
  184.             endPos = str.indexOf(“-“);  
  185.             jobId = str.substring(0, endPos);  
  186.         } else {  
  187.             return;  
  188.         }  
  189.   
  190.         try {  
  191.             fileSystem = path.getFileSystem(new Configuration());  
  192.             in = fileSystem.open(path);  
  193.             InputStreamReader isr;  
  194.             BufferedReader br;  
  195.   
  196.             isr = new InputStreamReader(in, “UTF-8”);  
  197.             br = new BufferedReader(isr);  
  198.   
  199.             while ((str = br.readLine()) != null) {  
  200.                 if (str.contains(“mapreduce.job.user.name”)) {  
  201.                     startPos = str.indexOf(xmlNameFlag);  
  202.                     endPos = str.indexOf(“</value>”);  
  203.                     username = str.substring(startPos + xmlNameFlag.length(),  
  204.                             endPos);  
  205.                 } else if (str.contains(“mapreduce.job.name”)) {  
  206.                     startPos = str.indexOf(xmlNameFlag);  
  207.                     endPos = str.indexOf(“</value>”);  
  208.                     jobName = str.substring(startPos + xmlNameFlag.length(),  
  209.                             endPos);  
  210.                 } else if (str.contains(“hive.query.string”)) {  
  211.                     hiveSqlFlag = 1;  
  212.                     hiveSql = str;  
  213.                 } else if (hiveSqlFlag == 1) {  
  214.                     hiveSql += str;  
  215.   
  216.                     if (str.contains(“</value>”)) {  
  217.                         startPos = hiveSql.indexOf(xmlNameFlag);  
  218.                         endPos = hiveSql.indexOf(“</value>”);  
  219.                         hiveSql = hiveSql.substring(  
  220.                                 startPos + xmlNameFlag.length(), endPos);  
  221.   
  222.                         hiveSqlFlag = 0;  
  223.                     }  
  224.                 } else if (str.startsWith(“{/”type/”:/”JOB_INITED/””)) {  
  225.                     startPos = str.indexOf(launchTimeFlag);  
  226.                     str = str.substring(startPos + launchTimeFlag.length());  
  227.                     endPos = str.indexOf(“,”);  
  228.                     launchTime = Long.parseLong(str.substring(0, endPos));  
  229.                 } else if (str.startsWith(“{/”type/”:/”JOB_FINISHED/””)) {  
  230.                     mapTaskNum = parseTaskNum(launchMapFlag, str);  
  231.                     reduceTaskNum = parseTaskNum(launchReduceFlag, str);  
  232.   
  233.                     startPos = str.indexOf(finishTimeFlag);  
  234.                     str = str.substring(startPos + finishTimeFlag.length());  
  235.                     endPos = str.indexOf(“,”);  
  236.                     finishTime = Long.parseLong(str.substring(0, endPos));  
  237.                 }  
  238.             }  
  239.   
  240.             System.out.println(“jobId is “ + jobId);  
  241.             System.out.println(“jobName is “ + jobName);  
  242.             System.out.println(“username is “ + username);  
  243.             System.out.println(“map task num is “ + mapTaskNum);  
  244.             System.out.println(“reduce task num is “ + reduceTaskNum);  
  245.             System.out.println(“launchTime is “ + launchTime);  
  246.             System.out.println(“finishTime is “ + finishTime);  
  247.             System.out.println(“hive query sql is “ + hiveSql);  
  248.         } catch (IOException e) {  
  249.             // TODO Auto-generated catch block  
  250.             e.printStackTrace();  
  251.         }  
  252.   
  253.         if (fileType.equals(“config”)) {  
  254.             insertConfParseData(jobId, jobName, username, hiveSql);  
  255.         } else if (fileType.equals(“info”)) {  
  256.             insertJobInfoParseData(jobId, launchTime, finishTime, mapTaskNum,  
  257.                     reduceTaskNum);  
  258.         }  
  259.     }  
  260.   
  261.     private void insertConfParseData(String jobId, String jobName,  
  262.             String username, String sql) {  
  263.         String[] array;  
  264.   
  265.         if (dataInfos.containsKey(jobId)) {  
  266.             array = dataInfos.get(jobId);  
  267.         } else {  
  268.             array = new String[BaseValues.DB_COLUMN_HIVE_SQL_LEN];  
  269.         }  
  270.   
  271.         array[BaseValues.DB_COLUMN_HIVE_SQL_JOBID] = jobId;  
  272.         array[BaseValues.DB_COLUMN_HIVE_SQL_JOBNAME] = jobName;  
  273.         array[BaseValues.DB_COLUMN_HIVE_SQL_USERNAME] = username;  
  274.         array[BaseValues.DB_COLUMN_HIVE_SQL_HIVE_SQL] = sql;  
  275.   
  276.         dataInfos.put(jobId, array);  
  277.     }  
  278.   
  279.     private void insertJobInfoParseData(String jobId, long launchTime,  
  280.             long finishedTime, int mapTaskNum, int reduceTaskNum) {  
  281.         String[] array;  
  282.   
  283.         if (dataInfos.containsKey(jobId)) {  
  284.             array = dataInfos.get(jobId);  
  285.         } else {  
  286.             array = new String[BaseValues.DB_COLUMN_HIVE_SQL_LEN];  
  287.         }  
  288.   
  289.         array[BaseValues.DB_COLUMN_HIVE_SQL_JOBID] = jobId;  
  290.         array[BaseValues.DB_COLUMN_HIVE_SQL_START_TIME] = String  
  291.                 .valueOf(launchTime);  
  292.         array[BaseValues.DB_COLUMN_HIVE_SQL_FINISH_TIME] = String  
  293.                 .valueOf(finishedTime);  
  294.         array[BaseValues.DB_COLUMN_HIVE_SQL_MAP_TASK_NUM] = String  
  295.                 .valueOf(mapTaskNum);  
  296.         array[BaseValues.DB_COLUMN_HIVE_SQL_REDUCE_TASK_NUM] = String  
  297.                 .valueOf(reduceTaskNum);  
  298.   
  299.         dataInfos.put(jobId, array);  
  300.     }  
  301.   
  302.     private int parseTaskNum(String flag, String jobStr) {  
  303.         int taskNum;  
  304.         int startPos;  
  305.         int endPos;  
  306.   
  307.         String tmpStr;  
  308.   
  309.         taskNum = 0;  
  310.         tmpStr = jobStr;  
  311.         startPos = tmpStr.indexOf(flag);  
  312.   
  313.         if (startPos == –1) {  
  314.             return 0;  
  315.         }  
  316.   
  317.         tmpStr = tmpStr.substring(startPos + flag.length());  
  318.         endPos = tmpStr.indexOf(“}”);  
  319.         tmpStr = tmpStr.substring(0, endPos);  
  320.         taskNum = Integer.parseInt(tmpStr.split(“:”)[1]);  
  321.   
  322.         return taskNum;  
  323.     }  
  324.   
  325.     private void printStatDatas() {  
  326.         String jobId;  
  327.         String jobInfo;  
  328.         String[] infos;  
  329.   
  330.         if (dbClient != null) {  
  331.             dbClient.createConnection();  
  332.         }  
  333.   
  334.         if (dataInfos != null) {  
  335.             System.out.println(“map data size is” + dataInfos.size());  
  336.               
  337.             if (dbClient != null && dirType.equals(“dateTimeDir”)) {  
  338.                 dbClient.insertDataBatch(dataInfos);  
  339.             }  
  340.         }  
  341.   
  342.         /*for (Entry<String, String[]> entry : this.dataInfos.entrySet()) { 
  343.             jobId = entry.getKey(); 
  344.             infos = entry.getValue(); 
  345.  
  346.             jobInfo = String 
  347.                     .format(“jobId is %s, jobName:%s, usrname:%s, launchTime:%s, finishTime:%s, mapTaskNum:%s, reduceTaskNum:%s, querySql:%s”, 
  348.                             jobId, infos[1], infos[2], infos[3], infos[4], 
  349.                             infos[5], infos[6], infos[7]); 
  350.             // System.out.println(“job detail info ” + jobInfo); 
  351.  
  352.             if (dbClient != null && dirType.equals(“dateTimeDir”)) { 
  353.                 dbClient.insertHiveSqlStatData(infos); 
  354.             } 
  355.         }*/  
  356.   
  357.         if (dbClient != null) {  
  358.             dbClient.closeConnection();  
  359.         }  
  360.     }  
  361.   
  362.     public synchronized FileStatus getOneFile() {  
  363.         FileStatus fs;  
  364.   
  365.         fs = null;  
  366.         if (fileStatusList != null & fileStatusList.size() > 0) {  
  367.             fs = fileStatusList.poll();  
  368.         }  
  369.   
  370.         return fs;  
  371.     }  
  372.   
  373.     public synchronized void addDataToMap(String jobId, String[] values) {  
  374.         if (dataInfos != null) {  
  375.             dataInfos.put(jobId, values);  
  376.         }  
  377.     }  
  378. }  



解析线程代码ParseThread.java:

[java] 
view plain
copy
print
?

  1. package org.apache.hadoop.mapreduce.v2.hs.tool.sqlanalyse;  
  2.   
  3. import java.io.BufferedReader;  
  4. import java.io.IOException;  
  5. import java.io.InputStream;  
  6. import java.io.InputStreamReader;  
  7. import java.util.HashMap;  
  8. import java.util.LinkedList;  
  9.   
  10. import org.apache.hadoop.conf.Configuration;  
  11. import org.apache.hadoop.fs.FileStatus;  
  12. import org.apache.hadoop.fs.FileSystem;  
  13. import org.apache.hadoop.fs.Path;  
  14.   
  15. public class ParseThread extends Thread{  
  16.     private HiveSqlAnalyseTool tool;  
  17.     private LinkedList<FileStatus> fileStatus;  
  18.     private HashMap<String, String[]> dataInfos;  
  19.       
  20.     public ParseThread(HiveSqlAnalyseTool tool, LinkedList<FileStatus> fileStatus, HashMap<String, String[]> dataInfos){  
  21.         this.tool = tool;  
  22.         this.fileStatus = fileStatus;  
  23.         this.dataInfos = dataInfos;  
  24.     }  
  25.       
  26.     @Override  
  27.     public void run() {  
  28.         FileStatus fs;  
  29.           
  30.         while(fileStatus != null && !fileStatus.isEmpty()){  
  31.             fs = tool.getOneFile();  
  32.             parseFileInfo(fs);  
  33.         }  
  34.           
  35.         super.run();  
  36.     }  
  37.   
  38.     private void parseFileInfo(FileStatus fs) {  
  39.         String str;  
  40.         String username;  
  41.         String fileType;  
  42.         String jobId;  
  43.         String jobName;  
  44.         String hiveSql;  
  45.   
  46.         int startPos;  
  47.         int endPos;  
  48.         int hiveSqlFlag;  
  49.         long launchTime;  
  50.         long finishTime;  
  51.         int mapTaskNum;  
  52.         int reduceTaskNum;  
  53.         String xmlNameFlag;  
  54.         String launchTimeFlag;  
  55.         String finishTimeFlag;  
  56.         String launchMapFlag;  
  57.         String launchReduceFlag;  
  58.   
  59.         Path path;  
  60.         FileSystem fileSystem;  
  61.         InputStream in;  
  62.   
  63.         fileType = “”;  
  64.         hiveSql = “”;  
  65.         jobId = “”;  
  66.         jobName = “”;  
  67.         username = “”;  
  68.         hiveSqlFlag = 0;  
  69.         launchTime = 0;  
  70.         finishTime = 0;  
  71.         mapTaskNum = 0;  
  72.         reduceTaskNum = 0;  
  73.         xmlNameFlag = “<value>”;  
  74.         launchTimeFlag = “/”launchTime/”:”;  
  75.         finishTimeFlag = “/”finishTime/”:”;  
  76.         launchMapFlag = “/”Launched map tasks/””;  
  77.         launchReduceFlag = “/”Launched reduce tasks/””;  
  78.   
  79.         path = fs.getPath();  
  80.         str = path.getName();  
  81.         if (str.endsWith(“.xml”)) {  
  82.             fileType = “config”;  
  83.   
  84.             endPos = str.lastIndexOf(“_”);  
  85.             jobId = str.substring(0, endPos);  
  86.         } else if (str.endsWith(“.jhist”)) {  
  87.             fileType = “info”;  
  88.   
  89.             endPos = str.indexOf(“-“);  
  90.             jobId = str.substring(0, endPos);  
  91.         }else{  
  92.             return;  
  93.         }  
  94.   
  95.         try {  
  96.             fileSystem = path.getFileSystem(new Configuration());  
  97.             in = fileSystem.open(path);  
  98.             InputStreamReader isr;  
  99.             BufferedReader br;  
  100.   
  101.             isr = new InputStreamReader(in, “UTF-8”);  
  102.             br = new BufferedReader(isr);  
  103.   
  104.             while ((str = br.readLine()) != null) {  
  105.                 if (str.contains(“mapreduce.job.user.name”)) {  
  106.                     startPos = str.indexOf(xmlNameFlag);  
  107.                     endPos = str.indexOf(“</value>”);  
  108.                     username = str.substring(startPos + xmlNameFlag.length(),  
  109.                             endPos);  
  110.                 } else if (str.contains(“mapreduce.job.name”)) {  
  111.                     startPos = str.indexOf(xmlNameFlag);  
  112.                     endPos = str.indexOf(“</value>”);  
  113.                     jobName = str.substring(startPos + xmlNameFlag.length(),  
  114.                             endPos);  
  115.                 } else if (str.contains(“hive.query.string”)) {  
  116.                     hiveSqlFlag = 1;  
  117.   
  118.                     hiveSql = str;  
  119.                 } else if (hiveSqlFlag == 1) {  
  120.                     hiveSql += str;  
  121.   
  122.                     if (str.contains(“</value>”)) {  
  123.                         startPos = hiveSql.indexOf(xmlNameFlag);  
  124.                         endPos = hiveSql.indexOf(“</value>”);  
  125.                         hiveSql = hiveSql.substring(  
  126.                                 startPos + xmlNameFlag.length(), endPos);  
  127.   
  128.                         hiveSqlFlag = 0;  
  129.                     }  
  130.                 } else if (str.startsWith(“{/”type/”:/”JOB_INITED/””)) {  
  131.                     startPos = str.indexOf(launchTimeFlag);  
  132.                     str = str.substring(startPos + launchTimeFlag.length());  
  133.                     endPos = str.indexOf(“,”);  
  134.                     launchTime = Long.parseLong(str.substring(0, endPos));  
  135.                 } else if (str.startsWith(“{/”type/”:/”JOB_FINISHED/””)) {  
  136.                     mapTaskNum = parseTaskNum(launchMapFlag, str);  
  137.                     reduceTaskNum = parseTaskNum(launchReduceFlag, str);  
  138.   
  139.                     startPos = str.indexOf(finishTimeFlag);  
  140.                     str = str.substring(startPos + finishTimeFlag.length());  
  141.                     endPos = str.indexOf(“,”);  
  142.                     finishTime = Long.parseLong(str.substring(0, endPos));  
  143.                 }  
  144.             }  
  145.   
  146.             /*System.out.println(“jobId is ” + jobId); 
  147.             System.out.println(“jobName is ” + jobName); 
  148.             System.out.println(“username is ” + username); 
  149.             System.out.println(“map task num is ” + mapTaskNum); 
  150.             System.out.println(“reduce task num is ” + reduceTaskNum); 
  151.             System.out.println(“launchTime is ” + launchTime); 
  152.             System.out.println(“finishTime is ” + finishTime); 
  153.             System.out.println(“hive query sql is ” + hiveSql);*/  
  154.         } catch (IOException e) {  
  155.             // TODO Auto-generated catch block  
  156.             e.printStackTrace();  
  157.         }  
  158.   
  159.         if (fileType.equals(“config”)) {  
  160.             insertConfParseData(jobId, jobName, username, hiveSql);  
  161.         } else if (fileType.equals(“info”)) {  
  162.             insertJobInfoParseData(jobId, launchTime, finishTime, mapTaskNum,  
  163.                     reduceTaskNum);  
  164.         }  
  165.     }  
  166.   
  167.     private void insertConfParseData(String jobId, String jobName,  
  168.             String username, String sql) {  
  169.         String[] array;  
  170.   
  171.         if (dataInfos.containsKey(jobId)) {  
  172.             array = dataInfos.get(jobId);  
  173.         } else {  
  174.             array = new String[BaseValues.DB_COLUMN_HIVE_SQL_LEN];  
  175.         }  
  176.   
  177.         array[BaseValues.DB_COLUMN_HIVE_SQL_JOBID] = jobId;  
  178.         array[BaseValues.DB_COLUMN_HIVE_SQL_JOBNAME] = jobName;  
  179.         array[BaseValues.DB_COLUMN_HIVE_SQL_USERNAME] = username;  
  180.         array[BaseValues.DB_COLUMN_HIVE_SQL_HIVE_SQL] = sql;  
  181.   
  182.         tool.addDataToMap(jobId, array);  
  183.     }  
  184.   
  185.     private void insertJobInfoParseData(String jobId, long launchTime,  
  186.             long finishedTime, int mapTaskNum, int reduceTaskNum) {  
  187.         String[] array;  
  188.   
  189.         if (dataInfos.containsKey(jobId)) {  
  190.             array = dataInfos.get(jobId);  
  191.         } else {  
  192.             array = new String[BaseValues.DB_COLUMN_HIVE_SQL_LEN];  
  193.         }  
  194.   
  195.         array[BaseValues.DB_COLUMN_HIVE_SQL_JOBID] = jobId;  
  196.         array[BaseValues.DB_COLUMN_HIVE_SQL_START_TIME] = String  
  197.                 .valueOf(launchTime);  
  198.         array[BaseValues.DB_COLUMN_HIVE_SQL_FINISH_TIME] = String  
  199.                 .valueOf(finishedTime);  
  200.         array[BaseValues.DB_COLUMN_HIVE_SQL_MAP_TASK_NUM] = String  
  201.                 .valueOf(mapTaskNum);  
  202.         array[BaseValues.DB_COLUMN_HIVE_SQL_REDUCE_TASK_NUM] = String  
  203.                 .valueOf(reduceTaskNum);  
  204.   
  205.         tool.addDataToMap(jobId, array);  
  206.     }  
  207.   
  208.     private int parseTaskNum(String flag, String jobStr) {  
  209.         int taskNum;  
  210.         int startPos;  
  211.         int endPos;  
  212.   
  213.         String tmpStr;  
  214.   
  215.         taskNum = 0;  
  216.         tmpStr = jobStr;  
  217.         startPos = tmpStr.indexOf(flag);  
  218.           
  219.         if(startPos == –1){  
  220.             return 0;  
  221.         }  
  222.           
  223.         tmpStr = tmpStr.substring(startPos + flag.length());  
  224.         endPos = tmpStr.indexOf(“}”);  
  225.         tmpStr = tmpStr.substring(0, endPos);  
  226.         taskNum = Integer.parseInt(tmpStr.split(“:”)[1]);  
  227.   
  228.         return taskNum;  
  229.     }  
  230. }  

其他更多Yarn,Hadoop方面代码的分析请点击链接
https://github.com/linyiqun/hadoop-yarn
,后续将会继续更新YARN其他方面的代码分析。

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9186.html

(0)
上一篇 2021年7月19日 09:17
下一篇 2021年7月19日 09:17

相关推荐

发表回复

登录后才能评论