|NO.Z.00007|——————————|Deployment|——|Hadoop&OLAP数据库管理系统.v07|———————————|ClickH



[BigDataHadoop:Hadoop&OLAP数据库管理系统.V07]                           [Deployment.OLAP数据库管理系统][|ClickHouse:ClickHouse链接kafka|]



一、ClickHouse链接Kafka

### --- ClickHouse链接Kafka:此引擎与 Apache Kafka 结合使用。

~~~     # Kafka 特性:
~~~     发布或者订阅数据流。
~~~     容错存储机制。
~~~     处理流数据。
### --- 链接语法格式

~~~     # 老版格式:
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])
~~~     # 新版格式:

Kafka SETTINGS
kafka_broker_list = 'hadoop01:9092',
kafka_topic_list = 'topic1,topic2',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '/n',
kafka_schema = '',
kafka_num_consumers = 2
~~~     # 必要参数:

~~~     kafka_broker_list – 以逗号分隔的 brokers 列表 (localhost:9092)。
~~~     kafka_topic_list – topic 列表 (my_topic)。
~~~     kafka_group_name – Kafka 消费组名称 (group1)。
~~~     如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
~~~     kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,
~~~     例如 JSONEachRow。了解详细信息,请参考 Formats 部分。
~~~     # 可选参数:

~~~     kafka_row_delimiter - 每个消息体(记录)之间的分隔符。
~~~     kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。
~~~     例如,普罗托船长 需要 schema 文件路径以及根对象 schema.capnp:Message 的名字。
~~~     kafka_num_consumers – 单个表的消费者数量。
~~~     默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。
~~~     消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。

二、ClickHouse连接kafka使用示例

### --- 链接说明

~~~     消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。
~~~     如果希望获得两次数据,则使用另一个组名创建副本。
~~~     消费组可以灵活配置并且在集群之间同步。
~~~     例如,如果群集中有10个主题和5个表副本,则每个副本将获得2个主题。 
~~~     如果副本数量发生变化,主题将自动在副本中重新分配。
~~~     了解更多信息请访问 http://kafka.apache.org/intro。
~~~     SELECT 查询对于读取消息并不是很有用(调试除外),因为每条消息只能被读取一次。
~~~     使用物化视图创建实时线程更实用。
~~~     # 您可以这样做:

~~~     使用引擎创建一个 Kafka 消费者并作为一条数据流。
~~~     创建一个结构表。
~~~     创建物化视图,改视图会在后台转换引擎中的数据并将其放入之前创建的表中。
~~~     当 MATERIALIZED VIEW 添加至引擎,它将会在后台收集数据。
~~~     可以持续不断地从 Kafka 收集数据并通过 SELECT 将数据转换为所需要的格式。
### --- 块大小说明

~~~     为了提高性能,接受的消息被分组为 max_insert_block_size 大小的块。
~~~     如果未在 stream_flush_interval_ms 毫秒内形成块,
~~~     则不关心块的完整性,都会将数据刷新到表中。
### --- 停止接收主题数据或更改转换逻辑,请 detach 物化视图:

~~~     如果使用 ALTER 更改目标表,为了避免目标表与视图中的数据之间存在差异,推荐停止物化视图。
~~~     配置与 GraphiteMergeTree 类似,Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。
~~~     可以使用两个配置键:全局(kafka) 和 主题级别 (kafka_*)。
~~~     首先应用全局配置,然后应用主题级配置(如果存在)。
                            
~~~     有关详细配置选项列表,请参阅 librdkafka配置参考。
~~~     在 ClickHouse 配置中使用下划线 (_) ,并不是使用点 (.)。例如,check.crcs=true 将是 true。
DETACH TABLE consumer;
ATTACH TABLE consumer;
    <!-- Global configuration options for all tables of Kafka engine type -->
    <kafka>
        <debug>cgrp</debug>
        <auto_offset_reset>smallest</auto_offset_reset>
    </kafka>
    
    <!-- Configuration specific for topic "logs" -->
    <kafka_logs>
        <retry_backoff_ms>250</retry_backoff_ms>
        <fetch_min_bytes>100000</fetch_min_bytes>
    </kafka_logs>

三、ClickHouse链接Kafka示例

### --- 启动相关服务

~~~     # 启动zookeeper
[root@hadoop01 ~]# ./zk.sh start
~~~     # 启动ClickHouse
[root@hadoop01 ~]# systemctl start clickhouse-server
~~~     # 启动kafka
[root@hadoop01 ~]# cd /opt/yanqi/servers/kafka_2.12/bin/
[root@hadoop01 bin]# kafka-server-start.sh -daemon ../config/server.properties
~~~     # kafka创建主题
[root@hadoop01 ~]# kafka-topics.sh --create --zookeeper hadoop01:2181/myKafka --replication-factor 1 --partitions 1 --topic ch2
Created topic "ch2".

~~~     # kafka启动生产者
[root@hadoop01 ~]# kafka-console-producer.sh --broker-list hadoop01:9092 --topic ch2
~~~ 生产数据
>2021-11-01,level1,message    
>2021-11-01,level2,message
>2021-11-01,level3,message
### --- 创建ClickHouse链接kafka表

~~~     # 在ClickHouse创建表
[root@hadoop01 ~]#  clickhouse-client -m

hadoop01 :) CREATE TABLE queue (
            q_date String,
            level String,
            message String
            )
            ENGINE = Kafka SETTINGS kafka_broker_list = 'hadoop01:9092',
                                    kafka_topic_list = 'ch2',
                                    kafka_group_name = 'kafka_group_test',
                                    kafka_format = 'CSV',
                                    kafka_num_consumers = 1,
                                    kafka_skip_broken_messages = 10;
~~~     # 可以再ClickHouse表下查看到数据
~~~     但是查询完成之后就会被删除掉,需要创建一个物化视图,来存储到数据

hadoop01 :) select * from queue;

┌─q_date─────┬─level──┬─message─┐
│ 2021-11-01 │ level1 │ message │
│ 2021-11-01 │ level2 │ message │
│ 2021-11-01 │ level3 │ message │
└────────────┴────────┴─────────┘
### --- 创建物化视图保存数据

~~~     # 创建daily表
hadoop01 :) CREATE TABLE daily (
            day Date, 
            level String, 
            message String
            ) ENGINE = MergeTree(day, (day, level),8192);
~~~     # 创建物化视图:从原始queue表中查数据,粘贴到daily表当中,就可以在daily表中查询到数据

hadoop01 :) CREATE MATERIALIZED VIEW consumer TO daily
            AS SELECT q_date, level, message
            FROM queue;
### --- 验证物化视图存储的数据

~~~     # 在kafka生产者生产数据
[root@hadoop01 ~]# kafka-console-producer.sh --broker-list hadoop01:9092 --topic ch2
>2021-11-01,level1,message    
>2021-11-01,level2,message
>2021-11-01,level3,message
~~~     # 在daily表查询数据

hadoop01 :) select * from daily;

┌────────day─┬─level──┬─message─┐
│ 0000-00-00 │ level1 │ message │
└────────────┴────────┴─────────┘
┌────────day─┬─level──┬─message─┐
│ 0000-00-00 │ level2 │ message │
└────────────┴────────┴─────────┘
┌────────day─┬─level──┬─message─┐
│ 0000-00-00 │ level3 │ message │
└────────────┴────────┴─────────┘

===============================END===============================


Walter Savage Landor:strove with none,for none was worth my strife.Nature I loved and, next to Nature, Art:I warm’d both hands before the fire of life.It sinks, and I am ready to depart                                                                                                                                                   ——W.S.Landor


来自为知笔记(Wiz)

原创文章,作者:306829225,如若转载,请注明出处:https://blog.ytso.com/244970.html

(0)
上一篇 2022年4月18日
下一篇 2022年4月18日

相关推荐

发表回复

登录后才能评论