Kafka安装配置详解程序员

  Kafka是由Apache软件基金会开发的一个高吞吐量的分布式发布订阅消息系统,由Scala和Java编写。官网地址:http://kafka.apache.org

0.基本概念

  Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker。
  Topic:每条发布到Kafka集群的消息都有一个主题名称,这个主题名称就被称为Topic。
  Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition。
  Producer:消息生产者,负责发布消息到Kafka broker。
  Consumer:消息消费者,向Kafka broker读取消息的客户端。
  Consumer Group:每个Consumer属于一个特定的Consumer Group,group name可单独设定,若不设定则属于默认的group。

1.安装步骤

  系统:Centos7.2  

  JDK:   java1.8

  另外需要安装zookeeper,可以使用已有的zookeeper服务,新版的kafka已内置了一个zookeeper环境,也可以直接使用。

  (1)下载

  kafka安装包下载地址:http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz

  /bin 可执行脚本目录

  /config 配置文件目录

tar -xzf kafka_2.11-2.1.0.tgz 
cd kafka_2.11-2.1.0

  (2)启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

  (3)配置kafka相关属性

broker.id=0   //唯一标识 
listeners=PLAINTEXT://:9092  指定服务的端口 
advertised.listeners=PLAINTEXT://192.168.31.222:9092  如果要提供外网访问 必须配置此项 
log.dirs=/tmp/kafka-logs-0  //日志目录

  (4)启动服务

bin/kafka-server-start.sh config/server.properties &

  (5)检查服务

[[email protected] config]# netstat -tunlp|egrep "(2181|9092)" 
tcp6       0      0 :::9092       :::*       LISTEN      14610/java           
tcp6       0      0 :::2181       :::*       LISTEN      20494/java 

2.集群配置

  所谓的kafka集群就是指多个broker组成的集群,通过zookeeper来进行管理。以下在本机部署三个broker组成的集群。

  首先将config/server.properties的复制三份,分别命名为server-0.properties、server-1.properties、server-2.properties,分别配置如下:

  server-0.properties:

broker.id=0 
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.31.222:9092
log.dirs=/tmp/kafka-logs-0

  server-1.properties:

broker.id=1 
listeners=PLAINTEXT://:9093 
advertised.listeners=PLAINTEXT://192.168.31.222:9093 
log.dirs=/tmp/kafka-logs-1

  server-2.properties:

broker.id=2 
listeners=PLAINTEXT://:9094 
advertised.listeners=PLAINTEXT://192.168.31.222:9094 
log.dirs=/tmp/kafka-logs-2

   然后分别都启动就可以了:

bin/kafka-server-start.sh config/server-0.properties & 
bin/kafka-server-start.sh config/server-1.properties & 
bin/kafka-server-start.sh config/server-2.properties &

  查看是否启动了:

[[email protected] ~]# netstat -tunlp|egrep "(2181|9092|9093|9094)" 
tcp6       0      0 :::9092        :::*      LISTEN      14610/java           
tcp6       0      0 :::2181        :::*      LISTEN      20494/java           
tcp6       0      0 :::9093        :::*      LISTEN      15353/java           
tcp6       0      0 :::9094        :::*      LISTEN      14974/java

3.客户端调用

  客户端使用.net core测试,使用官网推荐的confluent-kafka-dotnet类库

  生产者代码:通过192.168.31.222:9092发布消息

public static void Main(string[] args) 
    { 
        var conf = new ProducerConfig { BootstrapServers = "192.168.31.222:9092" }; 
        Action<DeliveryReportResult<Null, string>> handler = r =>  
            Console.WriteLine(!r.Error.IsError 
                ? $"Delivered message to {r.TopicPartitionOffset}" 
                : $"Delivery Error: {r.Error.Reason}"); 
 
        using (var p = new Producer<Null, string>(conf)) 
        { 
            for (int i=0; i<100; ++i){ 
                p.BeginProduce("my-topic", new Message<Null, string> { Value = i.ToString() }, handler); 
            } 
            p.Flush(TimeSpan.FromSeconds(10)); 
        } 
    }

  消费者代码:通过192.168.31.222:9093订阅接收消息

public static void Main(string[] args) 
    { 
        var conf = new ConsumerConfig 
        {  
            GroupId = "test-consumer-group", 
            BootstrapServers = "192.168.31.222:9093", 
            AutoOffsetReset = AutoOffsetResetType.Earliest 
        }; 
        using (var c = new Consumer<Ignore, string>(conf)) 
        { 
            c.Subscribe("my-topic"); 
            bool consuming = true; 
            c.OnError += (_, e) => consuming = !e.IsFatal; 
            while (consuming) 
            { 
                try{ 
                    var cr = c.Consume(); 
                    Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); 
                } 
                catch (ConsumeException e){ 
                    Console.WriteLine($"Error occured: {e.Error.Reason}"); 
                } 
            } 
            c.Close(); 
        } 
    }

 

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/2756.html

(0)
上一篇 2021年7月16日
下一篇 2021年7月16日

相关推荐

发表回复

登录后才能评论