HBase2实战:HBase Flink和Kafka整合详解大数据

1.概述

Apache官方发布HBase2已经有一段时间了,HBase2中包含了许多个Features,从官方JIRA来看,大约有4500+个ISSUES(查看地址),从版本上来看是一个非常大的版本了。本篇博客将为大家介绍HBase2的新特性,以及如何在实战中与Flink、Kafka等组件进行整合。

2.内容

HBase2有哪些新特性值得我们去关注,这里给大家列举部分特定。

2.1 部分新特性预览

2.1.1 Region分配优化

在HBase中遇到比较频繁的问题就是RIT问题,而在新特性中,对于Region的管理和分配有了新的调整。AssignmentManager基于ProcedureV2实现,可以快速的分配Region,另外维护Region的State存储不再依赖Zookeeper,能够更好的面对Region长时间的RIT问题。

具体参考JIRA单:[HBASE-14614]、[HBASE-17844]、[HBASE-14350]

2.1.2 Offheap优化

在HBase2中减少了对Heap内存的使用,改为Offheap内存,减少垃圾的产生,以及减少GC的停顿时间。

参考JIRA单:[HBASE-11425]

2.1.3 Compaction优化

在HBase2中,引入了MemStore新的实现类CompactingMemstore,这个类和默认的DefaultMemStore类的区别在于实现了在内存中进行Compaction。

CompactingMemstore中,数据是通过Segment作为单位进行组织的,一个MemStore中包含多个Segment。数据最开始写入时会进入到一个处理Active状态的Segment中,这个Segment是可以被修改的。当该Active状态的Segment中的数据达到阀值后,不是直接Flush到HDFS的HFile文件中,而是先Flush到内存中的一个不可修改的Segment中。CompactingMemstore会在后台将多个不可修改的Segment合并为一个更大、更紧凑的Segment。

如果RegionServer需要把MemStore中的数据Flush到磁盘,会先选择其他类型的MemStore,然后在选择CompactingMemstore。这是由于CompactingMemstore对内存的管理更加高效,所以延长CompactingMemstore的生命周期可以减少总的I/O。当CompactingMemstore被Flush到磁盘时,不可修改的Segment会被移到一个快照中进行合并,然后写入HFile。

参考JIRA单:[HBASE-15991]

2.1.4 RegionServer Group

在引入RegionServer Group之前,HBase默认使用StochasticLoadBalancer策略将表的Region移到到RegionServer里面。在HBase2中,可以将RegionServer划分到多个逻辑组中,这样可以提供多租户的能力。

参考JIRA单:[HBASE-6721]、[HBASE-16430]、[HBASE-17589]、[HBASE-17350]、[HBASE-17349]

2.1.5 Add new AsyncRpcClient

在HBase2中,客户端请求改为异步RPC机制,不再是同步Wait,这样能大大有效的提高客户端请求的并发量,有效的提高资源利用率。

参考JIRA单:[HBASE-13784]、[HBASE-12684]

3.实战整合

了解了HBase2的一些新特性之后,如何将HBase2运用到实际项目中去,下面将为大家介绍如何将HBase整合到Flink和Kafka中。数据流向如下图所示:

HBase2实战:HBase Flink和Kafka整合详解大数据

3.1 基础环境

整合环境如下所示:

  • JDK1.8
  • HBase-2.1.1
  • Flink-1.7.1
  • Kafka-2.1.0

3.1.1 依赖JAR

整合实战项目,需要依赖的JAR信息如下:

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-kafka-0.11_2.12</artifactId> 
    <version>1.7.1</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-streaming-java_2.12</artifactId> 
    <version>1.7.1</version> 
    <scope>provided</scope> 
</dependency> 
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-hbase_2.12</artifactId> 
    <version>1.7.1</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-core</artifactId> 
    <version>1.7.1</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-common</artifactId> 
    <version>2.7.4</version> 
</dependency>

建议使用Maven来管理,可以很方便的将上述依赖信息配置到pom.xml文件中。

3.2 数据准备

准备数据源,将数据写入到Kafka集群,通过Flink进行消费,进行业务逻辑处理,然后将处理后的结果写入到HBase进行落地。数据准备的实现代码如下:

public class JProducer extends Thread { 
 
    public static void main(String[] args) { 
        JProducer jproducer = new JProducer(); 
        jproducer.start(); 
    } 
     
    @Override 
    public void run() { 
        producer(); 
    } 
 
    private void producer() { 
        Properties props = config(); 
        Producer<String, String> producer = new KafkaProducer<>(props); 
        for (int i = 0; i < 10; i++) { 
            String json = "{/"id/":" + i + ",/"ip/":/"192.168.0." + i + "/",/"date/":" + new Date().toString() + "}"; 
            String k = "key" + i; 
            producer.send(new ProducerRecord<String, String>("flink_topic", k, json)); 
        } 
        producer.close(); 
    } 
 
    private Properties config() { 
        Properties props = new Properties(); 
        props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092"); 
        props.put("acks", "1"); 
        props.put("retries", 0); 
        props.put("batch.size", 16384); 
        props.put("linger.ms", 1); 
        props.put("buffer.memory", 33554432); 
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
        props.put("partitioner.class", "org.smartloli.kafka.connector.flink.producer.TestSimplePartitioner"); 
        return props; 
    } 
}

通过上述应用程序,将生产的消息数据写入到Kafka的Topic中,准备好数据源。

3.3 处理数据并落地到HBase

使用Flink消费Kafka集群中刚刚准备好的数据源,然后进行逻辑处理后,将结果写入到HBase集群进行存储,具体实现代码如下:

public class FlinkHBase { 
 
    private static String zkServer = "dn1,dn2,dn3"; 
    private static String port = "2181"; 
    private static TableName tableName = TableName.valueOf("testflink"); 
    private static final String cf = "ke"; 
    private static final String topic = "flink_topic"; 
 
    public static void main(String[] args) { 
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
        env.enableCheckpointing(1000); 
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
 
        DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(), configByKafka())); 
        transction.rebalance().map(new MapFunction<String, Object>() { 
            private static final long serialVersionUID = 1L; 
 
            public String map(String value) throws IOException { 
                write2HBase(value); 
                return value; 
            } 
        }).print(); 
        try { 
            env.execute(); 
        } catch (Exception ex) { 
            ex.printStackTrace(); 
        } 
    } 
 
    public static Properties configByKafka() { 
        Properties props = new Properties(); 
        props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092"); 
        props.put("group.id", "kv_flink"); 
        props.put("enable.auto.commit", "true"); 
        props.put("auto.commit.interval.ms", "1000"); 
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        return props; 
    } 
 
    public static void write2HBase(String value) throws IOException { 
        Configuration config = HBaseConfiguration.create(); 
 
        config.set("hbase.zookeeper.quorum", zkServer); 
        config.set("hbase.zookeeper.property.clientPort", port); 
        config.setInt("hbase.rpc.timeout", 30000); 
        config.setInt("hbase.client.operation.timeout", 30000); 
        config.setInt("hbase.client.scanner.timeout.period", 30000); 
 
        Connection connect = ConnectionFactory.createConnection(config); 
        Admin admin = connect.getAdmin(); 
        if (!admin.tableExists(tableName)) { 
            admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(cf))); 
        } 
        Table table = connect.getTable(tableName); 
        TimeStamp ts = new TimeStamp(new Date()); 
        Date date = ts.getDate(); 
        Put put = new Put(Bytes.toBytes(date.getTime())); 
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("test"), Bytes.toBytes(value)); 
        table.put(put); 
        table.close(); 
        connect.close(); 
    } 
}

将该应用程序提交到Flink集群,通过Flink消费Kafka集群中的数据,成功执行该应用程序后,可以到HBase集群进行验证,看数据是否有写入成功。

3.4 数据验证

进入到HBase集群,执行hbase shell命令进入到Console界面,然后执行如下命令查看数据是否有写入成功:

hbase(main):009:0> scan 'testflink',LIMIT=>2

执行上述命令,结果如下所示:

HBase2实战:HBase Flink和Kafka整合详解大数据

4.总结

HBase2发布的新特性很有必要去研究和剖析,对于优化HBase集群或多或少有些许帮助。通过研究这些新特性,来帮助我们有效的应用到实战项目中。

5.结束语

这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

另外,博主出书了《Kafka并不难学》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9867.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论