第一种方案:
package org.rowsequence; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.io.LongWritable; /** * UDFRowSequence. */ @Description(name = "row_sequence", value = "_FUNC_() - Returns a generated row sequence number starting from 1") @UDFType(deterministic = false) public class RowSequence extends UDF { private LongWritable result = new LongWritable(); public RowSequence() { result.set(0); } public LongWritable evaluate() { result.set(result.get() + 1); return result; } }
add jar /home/hadoop/hive_study/hive_udf/hive_udf.jar
create temporary function row_sequence as org.rowsequence.RowSequence; ```
select row_sequence(),city from city;
注:上述程序受限Map个数,map数越多,容易出现多个map生成key,出现重复key,为了解决上述问题,下面这个案例便是很好解决。
第二种方案:
package com.hadoopbook.hive; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.net.Inet4Address; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.SocketException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Enumeration; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.junit.Test; /** * UDFRowSequence.确保在不同的node,取的唯一的key,不同的进程 */ @Description(name ="row_sequence", value ="_FUNC_() - Returns a generated row sequence number starting from 1") @UDFType(deterministic = false) public class RowSequence extends UDF { // 结构类型 // ip+年月日小时分钟秒+进程id+序列数 private Text result = new Text(); private static LongWritable seq_num = new LongWritable(); private String lastdate =""; public RowSequence() { result.set(""); seq_num.set(0); } public Text evaluate() { synchronized (seq_num) { seq_num.set(seq_num.get() + 1); } Date currentTime = new Date(); SimpleDateFormat formatter = new SimpleDateFormat("yyMMddHHmmss"); String dateString = formatter.format(currentTime); // 若时间改变,从新开始计数 if (!lastdate.equals(dateString)) { seq_num.set(0); } lastdate = dateString; String ip = get_ip().replace(".",""); synchronized (seq_num) { result.set(ip + dateString + getPid()+seq_num.get()); } return result; } private String get_ip() { Enumeration allNetInterfaces = null; try { allNetInterfaces = NetworkInterface.getNetworkInterfaces(); } catch (SocketException e) { // TODO Auto-generated catch block e.printStackTrace(); return"127.0.0.1"; } InetAddress ip = null; while (allNetInterfaces.hasMoreElements()) { NetworkInterface netInterface = (NetworkInterface) allNetInterfaces .nextElement(); Enumeration addresses = netInterface.getInetAddresses(); while (addresses.hasMoreElements()) { ip = (InetAddress) addresses.nextElement(); if (ip != null && ip instanceof Inet4Address) { return ip.getHostAddress().toString(); } } } return"127.0.0.1"; } private int getPid() { RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); String name = runtime.getName(); // format:"pid@hostname" try { return Integer.parseInt(name.substring(0, name.indexOf(@))); } catch (Exception e) { return -1; } } @Test public void testEvaluate() throws InterruptedException { Text val = this.evaluate(); System.out.println(val.toString()); val = this.evaluate(); System.out.println(val.toString()); Thread.sleep(1000); val = this.evaluate(); System.out.println(val.toString()); } }
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/290385.html