Kafka数据每5分钟同步到Hive详解大数据

1.概述

最近有同学留言咨询Kafka数据落地到Hive的一些问题,今天笔者将为大家来介绍一种除Flink流批一体以外的方式(流批一体下次再单独写一篇给大家分享)。

2.内容

首先,我们简单来描述一下数据场景,比如有这样一个数据场景,有一批实时流数据实时写入Kafka,然后需要对Topic中的数据进行每隔5分钟进行落地到Hive,进行每5分钟分区存储。流程图如下所示:

Kafka数据每5分钟同步到Hive详解大数据

 2.1 环境依赖

整个流程,需要依赖的组件有Kafka、Flink、Hadoop。由于Flink提交需要依赖Hadoop的计算资源和存储资源,所以Hadoop的YARN和HDFS均需要启动。各个组件版本如下:

组件 版本
Kafka 2.4.0
Flink 1.10.0
Hadoop 2.10.0

2.2 每分钟落地HDFS实现

Flink消费Kafka集群中的数据,需要依赖Flink包,依赖如下:

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-filesystem_2.12</artifactId> 
    <version>${flink.connector.version}</version> 
 </dependency> 
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-kafka-0.11_2.12</artifactId> 
    <version>${flink.kafka.version}</version> 
 </dependency> 
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-streaming-java_2.12</artifactId> 
    <version>${flink.streaming.version}</version> 
 </dependency>

编写消费Topic的Flink代码,这里不对Topic中的数据做逻辑处理,在后面统一交给MapReduce来做数据预处理,直接消费并存储到HDFS上。代码如下:

public class Kafka2Hdfs { 
 
    private static Logger LOG = LoggerFactory.getLogger(Kafka2Hdfs.class); 
 
    public static void main(String[] args) { 
        if (args.length != 3) { 
            LOG.error("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist."); 
            return; 
        } 
        String bootStrapServer = args[0]; 
        String hdfsPath = args[1]; 
        int parallelism = Integer.parseInt(args[2]); 
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
        env.enableCheckpointing(5000); 
        env.setParallelism(parallelism); 
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
 
        DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_data", new SimpleStringSchema(), configByKafkaServer(bootStrapServer))); 
 
        // Storage into hdfs 
        BucketingSink<String> sink = new BucketingSink<>(hdfsPath); 
 
        sink.setBucketer(new JDateTimeBucketer<String>("HH-mm"));// 自定义存储到HDFS上的文件名,用小时和分钟来命名,方便后面算策略 
 
        sink.setBatchSize(1024 * 1024 * 4); // this is 5MB 
        sink.setBatchRolloverInterval(1000 * 30); // 30s producer a file into hdfs 
        transction.addSink(sink); 
 
        env.execute("Kafka2Hdfs"); 
    } 
 
    private static Object configByKafkaServer(String bootStrapServer) { 
        Properties props = new Properties(); 
        props.setProperty("bootstrap.servers", bootStrapServer); 
        props.setProperty("group.id", "test_bll_group"); 
        props.put("enable.auto.commit", "true"); 
        props.put("auto.commit.interval.ms", "1000"); 
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        return props; 
    } 
 
}

2.3 注意事项

  • 这里我们把时间窗口设置小一些,每30s做一次检查,如果该批次的时间窗口没有数据过来,就生成一个文件落地到HDFS上;
  • 另外,我们重写了DateTimeBucketer为JDateTimeBucketer,逻辑并不复杂,在原有的方法上加一个年-月-日/时-分的文件生成路径,例如在HDFS上的生成路径:xxxx/2020-12-26/00-00

2.4 数据预处理

这里,我们需要对落地到HDFS上的文件进行预处理,处理的逻辑是这样的。比如,现在是2020-12-26 14:00,那么我们需要将当天的13:55,13:56,13:57,13:58,13:59这最近5分钟的数据处理到一起,并加载到Hive的最近5分钟的一个分区里面去。那么,我们需要生成这样一个逻辑策略集合,用HH-mm作为key,与之最近的5个文件作为value,进行数据预处理合并。

实现代码如下:

public class DateRange { 
 
    public static void main(String[] args) { 
        for (int i = 0; i < 24; i++) { 
            for (int j = 0; j < 60; j++) { 
                if (j % 5 == 0) { 
                    if (j < 10) { 
                        if (i < 10) { 
                            if (i == 0 && j == 0) { 
                                System.out.println("0" + i + "-0" + j + "=>23-59,23-58,23-57,23-56,23-55"); 
                            } else { 
                                if (j == 0) { 
                                    String tmp = ""; 
                                    for (int k = 1; k <= 5; k++) { 
                                        tmp += "0" + (i - 1) + "-" + (60 - k) + ","; 
                                    } 
                                    System.out.println("0" + i + "-0" + j + "=>" + tmp.substring(0, tmp.length() - 1)); 
                                } else { 
                                    String tmp = ""; 
                                    for (int k = 1; k <= 5; k++) { 
                                        if (j - k < 10) { 
                                            tmp += "0" + i + "-0" + (j - k) + ","; 
                                        } else { 
                                            tmp += "0" + i + "-" + (j - k) + ","; 
                                        } 
                                    } 
                                    System.out.println("0" + i + "-0" + j + "=>" + tmp.substring(0, tmp.length() - 1)); 
                                } 
                            } 
                        } else { 
                            if (j == 0) { 
                                String tmp = ""; 
                                for (int k = 1; k <= 5; k++) { 
                                    if (i - 1 < 10) { 
                                        tmp += "0" + (i - 1) + "-" + (60 - k) + ","; 
                                    } else { 
                                        tmp += (i - 1) + "-" + (60 - k) + ","; 
                                    } 
                                } 
                                System.out.println(i + "-0" + j + "=>" + tmp.substring(0, tmp.length() - 1)); 
                            } else { 
                                String tmp = ""; 
                                for (int k = 1; k <= 5; k++) { 
                                    if (j - k < 10) { 
                                        tmp += i + "-0" + (j - k) + ","; 
                                    } else { 
                                        tmp += i + "-" + (j - k) + ","; 
                                    } 
                                } 
                                System.out.println(i + "-0" + j + "=>" + tmp.substring(0, tmp.length() - 1)); 
                            } 
                        } 
                    } else { 
                        if (i < 10) { 
                            String tmp = ""; 
                            for (int k = 1; k <= 5; k++) { 
                                if (j - k < 10) { 
                                    tmp += "0" + i + "-0" + (j - k) + ","; 
                                } else { 
                                    tmp += "0" + i + "-" + (j - k) + ","; 
                                } 
                            } 
                            System.out.println("0" + i + "-" + j + "=>" + tmp.substring(0, tmp.length() - 1)); 
                        } else { 
                            String tmp = ""; 
                            for (int k = 1; k <= 5; k++) { 
                                if (j - 1 < 10) { 
                                    tmp += i + "-0" + (j - k) + ","; 
                                } else { 
                                    tmp += i + "-" + (j - k) + ","; 
                                } 
                            } 
                            System.out.println(i + "-" + j + "=>" + tmp.substring(0, tmp.length() - 1)); 
                        } 
                    } 
                } 
            } 
        } 
    } 
 
}

预览结果如下:

Kafka数据每5分钟同步到Hive详解大数据

 需要注意的是,如果发生了第二天00:00,那么我们需要用到前一天的00-00=>23-59,23-58,23-57,23-56,23-55这5个文件中的数据来做预处理。

2.5 数据加载

准备好数据后,我们可以使用Hive的load命令直接加载HDFS上预处理的文件,把数据加载到对应的表中,实现命令如下:

load data inpath '/cluster01/hive/hfile/data/min/2020-12-26/14-05/' overwrite into table jketable partition(day='2020-12-26-14-05')

这里,我们在执行命令时,可能文件不存在会导致加载出错。那我们在加载HDFS路径之前,先判断一下路径是否存在。

实现脚本如下所示:

hdfs dfs -ls /cluster01/hive/hfile/data/min/2020-12-26/14-05/ | wc -l > /tmp/hdfs_check_files.txt 
 
hdfs_check_files=`cat /tmp/hdfs_check_files.txt` 

# 判断HDFS上文件是否存在
if [ $hdfs_check_files -eq 0 ] then echo "Match file is null.Stop hive load script." else echo "Match file is exist.Start hive load script." hive -e "load data inpath '/cluster01/hive/hfile/data/min/2020-12-26/14-05/' overwrite into table jketable partition(day='2020-12-26-14-05')" fi

3.总结

整个流程为,先使用Flink消费存储在Kafka中的数据,按照每分钟进行存储,然后将具体需要聚合的时间段进行策略生成,比如每5分钟、10分钟、15分钟等等,可以在DateRange类中修改对应的策略逻辑。最后,再将预处理好的数据使用hive命令进行加载。整个过程,流程较多。如果我们使用Flink的流批一体特性,可以通过Flink直接建表,然后使用Flink消费Kafka中的数据后,直接分区落地到Hive表,这个就留到下次再给大家分享吧。

4.结束语

这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9851.html

(0)
上一篇 2021年7月19日 11:37
下一篇 2021年7月19日 11:37

相关推荐

发表回复

登录后才能评论