hive自定义RowSequence函数


第一种方案:

package org.rowsequence;  

import org.apache.hadoop.hive.ql.exec.Description;  
import org.apache.hadoop.hive.ql.exec.UDF;  
import org.apache.hadoop.hive.ql.udf.UDFType;  
import org.apache.hadoop.io.LongWritable;  

/** 
 * UDFRowSequence. 
 */  
@Description(name = "row_sequence",  
    value = "_FUNC_() - Returns a generated row sequence number starting from 1")  
@UDFType(deterministic = false)  
public class RowSequence extends UDF {
          
     
    private LongWritable result = new LongWritable();  

      public RowSequence() {  
        result.set(0);  
      }  

      public LongWritable evaluate() {  
        result.set(result.get() + 1);  
        return result;  
      }  
}
add jar /home/hadoop/hive_study/hive_udf/hive_udf.jar
create temporary function row_sequence as org.rowsequence.RowSequence;  ```
select row_sequence(),city from city;

注:上述程序受限Map个数,map数越多,容易出现多个map生成key,出现重复key,为了解决上述问题,下面这个案例便是很好解决。

第二种方案:

package com.hadoopbook.hive;

import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Enumeration;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.junit.Test;

/**
 * UDFRowSequence.确保在不同的node,取的唯一的key,不同的进程
*/
@Description(name ="row_sequence", value ="_FUNC_() - Returns a generated row sequence number starting from 1")
@UDFType(deterministic = false)
public class RowSequence extends UDF {
          
   
// 结构类型
// ip+年月日小时分钟秒+进程id+序列数
private Text result = new Text();
private static LongWritable seq_num = new LongWritable();
private String lastdate ="";

public RowSequence() {
result.set("");
seq_num.set(0);
}

public Text evaluate() {
synchronized (seq_num) {
seq_num.set(seq_num.get() + 1);
}
Date currentTime = new Date();
SimpleDateFormat formatter = new SimpleDateFormat("yyMMddHHmmss");
String dateString = formatter.format(currentTime);
// 若时间改变,从新开始计数
if (!lastdate.equals(dateString)) {
seq_num.set(0);
}
lastdate = dateString;
String ip = get_ip().replace(".","");
synchronized (seq_num) {
result.set(ip + dateString + getPid()+seq_num.get());
}
return result;
}

private String get_ip() {
Enumeration allNetInterfaces = null;
try {
allNetInterfaces = NetworkInterface.getNetworkInterfaces();
} catch (SocketException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return"127.0.0.1";
}
InetAddress ip = null;
while (allNetInterfaces.hasMoreElements()) {
NetworkInterface netInterface = (NetworkInterface) allNetInterfaces
.nextElement();
Enumeration addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
ip = (InetAddress) addresses.nextElement();
if (ip != null && ip instanceof Inet4Address) {
return ip.getHostAddress().toString();
}
}
}
return"127.0.0.1";
}
 private int getPid() { 
 RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); 
 String name = runtime.getName(); // format:"pid@hostname"
 try { 
 return Integer.parseInt(name.substring(0, name.indexOf(@))); 
 } catch (Exception e) { 
 return -1; 
}
}
@Test
public void testEvaluate() throws InterruptedException {
Text val = this.evaluate();
System.out.println(val.toString());
val = this.evaluate();
System.out.println(val.toString());
Thread.sleep(1000);
val = this.evaluate();
System.out.println(val.toString());
}
}

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/aiops/290385.html

(0)
上一篇 2022年10月2日 16:35
下一篇 2022年10月2日 16:35

相关推荐

发表回复

登录后才能评论