kafka源码解析之十七消费者流程(客户端如何获取topic的数据)详解编程语言

Kafka消费数据的角色分为普通消费者和高级消费者,其介绍如下:

17.1 普通消费者

特点:1)一个消息读取多次

   2)在一个处理过程中只消费某个broker上的partition的部分消息

   3)必须在程序中跟踪offset值

   4)必须找出指定TopicPartition中的lead broker

   5)必须处理broker的变动

客户端编程必须按照以下步骤:

   1)从所有活跃的broker中找出哪个是指定TopicPartition中的leader broker

   2)构造请求

   3)发送请求查询数据

   4)处理leader broker变更

客户端代码如下:

public class KafkaSimpleConsumer { 
    private List<String> m_replicaBrokers = new ArrayList<String>(); 
    public KafkaSimpleConsumer() { 
        m_replicaBrokers = new ArrayList<String>(); 
    } 
    public static void main(String args[]) { 
        KafkaSimpleConsumer example = new KafkaSimpleConsumer(); 
        // 最大读取消息数量 
        long maxReads = Long.parseLong("3"); 
        // 要订阅的topic 
        String topic = "mytopic"; 
        // 要查找的分区 
        int partition = Integer.parseInt("0"); 
        // broker节点的ip 
        List<String> seeds = new ArrayList<String>(); 
        seeds.add("192.168.4.30"); 
        seeds.add("192.168.4.31"); 
        seeds.add("192.168.4.32"); 
        // 端口 
        int port = Integer.parseInt("9092"); 
        try { 
            example.run(maxReads, topic, partition, seeds, port); 
        } catch (Exception e) { 
            System.out.println("Oops:" + e); 
            e.printStackTrace(); 
        } 
    } 
    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception { 
        // 获取指定Topic partition的元数据 
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); 
        if (metadata == null) { 
            System.out.println("Can't find metadata for Topic and Partition. Exiting"); 
            return; 
        } 
        if (metadata.leader() == null) { 
            System.out.println("Can't find Leader for Topic and Partition. Exiting"); 
            return; 
        } 
        //找到leader broker 
        String leadBroker = metadata.leader().host(); 
        String clientName = "Client_" + a_topic + "_" + a_partition; 
//链接leader broker 
        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); 
//获取topic的最新偏移量 
        long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); 
        int numErrors = 0; 
        while (a_maxReads > 0) { 
            if (consumer == null) { 
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); 
            } 
//本质上就是发送FetchRequest请求 
            FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build(); 
            FetchResponse fetchResponse = consumer.fetch(req); 
            if (fetchResponse.hasError()) { 
                numErrors++; 
                // Something went wrong! 
                short code = fetchResponse.errorCode(a_topic, a_partition); 
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); 
                if (numErrors > 5) 
                    break; 
                if (code == ErrorMapping.OffsetOutOfRangeCode()) { 
                    // We asked for an invalid offset. For simple case ask for 
                    // the last element to reset 
                    readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); 
                    continue; 
                } 
                consumer.close(); 
                consumer = null; 
                //处理topic的partition的leader发生变更的情况 
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); 
                continue; 
            } 
            numErrors = 0; 
            long numRead = 0; 
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { 
                long currentOffset = messageAndOffset.offset(); 
                if (currentOffset < readOffset) {//过滤旧的数据 
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); 
                    continue; 
                } 
                readOffset = messageAndOffset.nextOffset(); 
                ByteBuffer payload = messageAndOffset.message().payload(); 
                byte[] bytes = new byte[payload.limit()]; 
                payload.get(bytes); 
//打印消息 
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); 
                numRead++; 
                a_maxReads--; 
            } 
            if (numRead == 0) { 
                try { 
                    Thread.sleep(1000); 
                } catch (InterruptedException ie) { 
                } 
            } 
        } 
        if (consumer != null) 
            consumer.close(); 
    } 
    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { 
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); 
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); 
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); 
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); 
        OffsetResponse response = consumer.getOffsetsBefore(request); 
 
        if (response.hasError()) { 
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); 
            return 0; 
        } 
        long[] offsets = response.offsets(topic, partition); 
        return offsets[0]; 
    } 
    /** 
     * @param a_oldLeader 
     * @param a_topic 
     * @param a_partition 
     * @param a_port 
     * @return String 
     * @throws Exception 
     *找一个leader broker,其实就是发送TopicMetadataRequest请求 
     */ 
    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { 
        for (int i = 0; i < 3; i++) { 
            boolean goToSleep = false; 
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); 
            if (metadata == null) { 
                goToSleep = true; 
            } else if (metadata.leader() == null) { 
                goToSleep = true; 
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { 
                // first time through if the leader hasn't changed give 
                // ZooKeeper a second to recover 
                // second time, assume the broker did recover before failover, 
                // or it was a non-Broker issue 
                // 
                goToSleep = true; 
            } else { 
                return metadata.leader().host(); 
            } 
            if (goToSleep) { 
                try { 
                    Thread.sleep(1000); 
                } catch (InterruptedException ie) { 
                } 
            } 
        } 
        System.out.println("Unable to find new leader after Broker failure. Exiting"); 
        throw new Exception("Unable to find new leader after Broker failure. Exiting"); 
    } 
    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { 
        PartitionMetadata returnMetaData = null; 
        loop: for (String seed : a_seedBrokers) { 
            SimpleConsumer consumer = null; 
            try { 
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); 
                List<String> topics = Collections.singletonList(a_topic); 
                TopicMetadataRequest req = new TopicMetadataRequest(topics); 
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); 
                List<TopicMetadata> metaData = resp.topicsMetadata(); 
                for (TopicMetadata item : metaData) { 
                    for (PartitionMetadata part : item.partitionsMetadata()) { 
                        if (part.partitionId() == a_partition) { 
                            returnMetaData = part; 
                            break loop; 
                        } 
                    } 
                } 
            } catch (Exception e) { 
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e); 
            } finally { 
                if (consumer != null) 
                    consumer.close(); 
            } 
        } 
        if (returnMetaData != null) { 
            m_replicaBrokers.clear(); 
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) { 
                m_replicaBrokers.add(replica.host()); 
            } 
        } 
        return returnMetaData; 
    } 
}

17.2 高级消费者

特点:

1)消费过的数据无法再次消费,如果想要再次消费数据,要么换另一个group

2)为了记录每次消费的位置,必须提交TopicAndPartition的offset,offset提交支持两种方式:

①提交至ZK (频繁操作zk是效率比较低的)

②提交至kafka内部

3)客户端通过stream获取数据,stream即指的是来自一个或多个服务器上的一个或者多个partition的消息。每一个stream都对应一个单线程处理。因此,client能够设置满足自己需求的stream数目。总之,一个stream也许代表了多个服务器partion的消息的聚合,但是每一个partition都只能到一个stream。

4)consumer和partition的关系:

       ①如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数

       ②如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀

       ③如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同

 

客户端编程必须按照以下步骤:

1)设计topic和stream的关系,即K为topic,V为stream的个数N

2)开启N个消费组线程消费这N个stream

客户端代码如下:

import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.consumer.ConsumerIterator; /**  * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example  *  * @author Fung  */ public class KafkaHighConsumer {     private final ConsumerConnector consumer;     private final String topic;     private ExecutorService executor;     public KafkaHighConsumer(String a_zookeeper, String a_groupId, String a_topic) {         consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));         this.topic = a_topic;     }     public void shutdown() {         if (consumer != null)             consumer.shutdown();         if (executor != null)             executor.shutdown();     }     public void run(int numThreads) {         Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //设计topic和stream的关系,即K为topic,V为stream的个数N         topicCountMap.put(topic, new Integer(numThreads)); //获取numThreads个stream         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer                 .createMessageStreams(topicCountMap);         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);         executor = Executors.newFixedThreadPool(numThreads);         int threadNumber = 0; //开启N个消费组线程消费这N个stream         for (final KafkaStream stream : streams) {             executor.submit(new ConsumerMsgTask(stream, threadNumber));             threadNumber++;         }     }     private static ConsumerConfig createConsumerConfig(String a_zookeeper,                                                        String a_groupId) {         Properties props = new Properties();         props.put("zookeeper.connect", a_zookeeper);         props.put("group.id", a_groupId);         props.put("zookeeper.session.timeout.ms", "400");         props.put("zookeeper.sync.time.ms", "200");         props.put("auto.commit.interval.ms", "1000");         return new ConsumerConfig(props);     }     public static void main(String[] arg) {         String[] args = {"172.168.63.221:2188", "group-1", "page_visits", "12"};         String zooKeeper = args[0];         String groupId = args[1];         String topic = args[2];         int threads = Integer.parseInt(args[3]);         KafkaHighConsumer demo = new KafkaHighConsumer(zooKeeper, groupId, topic);         demo.run(threads);         try {             Thread.sleep(10000);         } catch (InterruptedException ie) {         }         demo.shutdown();     }     public class ConsumerMsgTask implements Runnable {         private KafkaStream m_stream;         private int m_threadNumber;         public ConsumerMsgTask(KafkaStream stream, int threadNumber) {             m_threadNumber = threadNumber;             m_stream = stream;         }         public void run() {// KafkaStream的本质就是一个网络迭代器             ConsumerIterator<byte[], byte[]> it = m_stream.iterator();             while (it.hasNext())                 System.out.println("Thread " + m_threadNumber + ": "                         + new String(it.next().message()));             System.out.println("Shutting down Thread: " + m_threadNumber);         }     }      /**      * Created by Administrator on 2016/4/11.      */     public static class KafkaProducer {     } }

其具体的消费逻辑如下:

ka.png

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11809.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论