Kafka – 消费接口分析详解大数据

1.概述

  在 Kafka 中,官方对外提供了两种消费 API,一种是高等级消费 API,另一种是低等级的消费 API。在 《高级消费 API》一文中,介绍了其高级消费的 API 实现。今天给大家介绍另一种消费 API。

2.内容

  在使用过 Kafka 的高级消费 API 后,我们知道它是一种高度抽象的消费 API,使用起来简单,方便,但是对于某些特殊的需求我们可能要用到第二种更加底层的 API。那么,我们首先需要知道低级消费 API 的作用。它能帮助我们去做那些事情:

  • 一个消息进行多次读取
  • 在处理过程中只消费 Partition 其中的某一部分消息
  • 添加事物管理机制以保证消息仅被处理一次

  当然,在使用的过程当中也是有些弊端的,其内容如下:

  • 必须在程序中跟踪 Offset 的值
  • 必须找出指定的 Topic Partition 中的 Lead Broker
  • 必须处理 Broker 的变动

  使用其 API 的思路步骤如下所示:

  • 从所有处于 Active 状态的 Broker 中找出哪个是指定 Topic Partition 中的 Lead Broker
  • 找出指定 Topic Partition 中的所有备份 Broker
  • 构造请求
  • 发送请求并查询数据
  • 处理 Leader Broker 的变动

3.代码实现

3.1 Java Project

  若是使用 Java Project 工程去实现该部分代码,需要添加相关以来 JAR 文件,其内容包含如下:

  • scala-xml_${version}-${version}.jar
  • scala-library-${version}.jar
  • metrics-core-${version}.jar
  • kafka-client-${version}.jar
  • kafka_${version}-${version}.jar

  针对 Java Project 工程,需要自己筛选 JAR 去添加。保证代码的顺利执行。

3.2 Maven Project

  对 Maven 工程,在 pom.xml 文件中添加相应的依赖信息即可,简单方便。让 Maven 去管理相应的依赖 JAR 文件。内容如下所示:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.11</artifactId> 
    <version>0.8.2.1</version> 
    <exclusions> 
        <exclusion> 
            <groupId>org.apache.zookeeper</groupId> 
            <artifactId>zookeeper</artifactId> 
    </exclusion> 
    <exclusion> 
            <groupId>log4j</groupId> 
            <artifactId>log4j</artifactId> 
    </exclusion> 
    </exclusions> 
</dependency>

  这样在 Maven 工程中相应的依赖 JAR 文件就添加完成了。

3.3 代码实现

  在低级消费 API 中,实现代码如下所示:

/** 
 * @Date Mar 2, 2016 
 * 
 * @Author dengjie 
 * 
 * @Note Simple consumer api 
 */ 
public class SimpleKafkaConsumer { 
	private static Logger log = LoggerFactory.getLogger(SimpleKafkaConsumer.class); 
	private List<String> m_replicaBrokers = new ArrayList<String>(); 
 
	public SimpleKafkaConsumer() { 
		m_replicaBrokers = new ArrayList<String>(); 
	} 
 
	public static void main(String[] args) { 
		SimpleKafkaConsumer example = new SimpleKafkaConsumer(); 
		// Max read number 
		long maxReads = SystemConfig.getIntProperty("kafka.read.max"); 
		// To subscribe to the topic 
		String topic = SystemConfig.getProperty("kafka.topic"); 
		// Find partition 
		int partition = SystemConfig.getIntProperty("kafka.partition"); 
		// Broker node's ip 
		List<String> seeds = new ArrayList<String>(); 
		String[] hosts = SystemConfig.getPropertyArray("kafka.server.host", ","); 
		for (String host : hosts) { 
			seeds.add(host); 
		} 
		int port = SystemConfig.getIntProperty("kafka.server.port"); 
		try { 
			example.run(maxReads, topic, partition, seeds, port); 
		} catch (Exception e) { 
			log.error("Oops:" + e); 
			e.printStackTrace(); 
		} 
	} 
 
	public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) 
			throws Exception { 
		// Get point topic partition's meta 
		PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); 
		if (metadata == null) { 
			log.info("[SimpleKafkaConsumer.run()] - Can't find metadata for Topic and Partition. Exiting"); 
			return; 
		} 
		if (metadata.leader() == null) { 
			log.info("[SimpleKafkaConsumer.run()] - Can't find Leader for Topic and Partition. Exiting"); 
			return; 
		} 
		String leadBroker = metadata.leader().host(); 
		String clientName = "Client_" + a_topic + "_" + a_partition; 
 
		SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); 
		long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), 
				clientName); 
		int numErrors = 0; 
		while (a_maxReads > 0) { 
			if (consumer == null) { 
				consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); 
			} 
			FetchRequest req = new FetchRequestBuilder().clientId(clientName) 
					.addFetch(a_topic, a_partition, readOffset, 100000).build(); 
			FetchResponse fetchResponse = consumer.fetch(req); 
 
			if (fetchResponse.hasError()) { 
				numErrors++; 
				// Something went wrong! 
				short code = fetchResponse.errorCode(a_topic, a_partition); 
				log.info("[SimpleKafkaConsumer.run()] - Error fetching data from the Broker:" + leadBroker 
						+ " Reason: " + code); 
				if (numErrors > 5) 
					break; 
				if (code == ErrorMapping.OffsetOutOfRangeCode()) { 
					// We asked for an invalid offset. For simple case ask for 
					// the last element to reset 
					readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), 
							clientName); 
					continue; 
				} 
				consumer.close(); 
				consumer = null; 
				leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); 
				continue; 
			} 
			numErrors = 0; 
 
			long numRead = 0; 
			for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { 
				long currentOffset = messageAndOffset.offset(); 
				if (currentOffset < readOffset) { 
					log.info("[SimpleKafkaConsumer.run()] - Found an old offset: " + currentOffset + " Expecting: " 
							+ readOffset); 
					continue; 
				} 
 
				readOffset = messageAndOffset.nextOffset(); 
				ByteBuffer payload = messageAndOffset.message().payload(); 
 
				byte[] bytes = new byte[payload.limit()]; 
				payload.get(bytes); 
				System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); // Message deal enter 
				numRead++; 
				a_maxReads--; 
			} 
 
			if (numRead == 0) { 
				try { 
					Thread.sleep(1000); 
				} catch (InterruptedException ie) { 
				} 
			} 
		} 
		if (consumer != null) 
			consumer.close(); 
	} 
 
	public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, 
			String clientName) { 
		TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); 
		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); 
		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); 
		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, 
				kafka.api.OffsetRequest.CurrentVersion(), clientName); 
		OffsetResponse response = consumer.getOffsetsBefore(request); 
 
		if (response.hasError()) { 
			log.info("[SimpleKafkaConsumer.getLastOffset()] - Error fetching data Offset Data the Broker. Reason: " 
					+ response.errorCode(topic, partition)); 
			return 0; 
		} 
		long[] offsets = response.offsets(topic, partition); 
		return offsets[0]; 
	} 
 
	/** 
	 * @param a_oldLeader 
	 * @param a_topic 
	 * @param a_partition 
	 * @param a_port 
	 * @return String 
	 * @throws Exception 
	 *             find next leader broker 
	 */ 
	private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { 
		for (int i = 0; i < 3; i++) { 
			boolean goToSleep = false; 
			PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); 
			if (metadata == null) { 
				goToSleep = true; 
			} else if (metadata.leader() == null) { 
				goToSleep = true; 
			} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { 
				// first time through if the leader hasn't changed give 
				// ZooKeeper a second to recover 
				// second time, assume the broker did recover before failover, 
				// or it was a non-Broker issue 
				// 
				goToSleep = true; 
			} else { 
				return metadata.leader().host(); 
			} 
			if (goToSleep) { 
				try { 
					Thread.sleep(1000); 
				} catch (InterruptedException ie) { 
				} 
			} 
		} 
		throw new Exception("Unable to find new leader after Broker failure. Exiting"); 
	} 
 
	private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { 
		PartitionMetadata returnMetaData = null; 
		loop: for (String seed : a_seedBrokers) { 
			SimpleConsumer consumer = null; 
			try { 
				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); 
				List<String> topics = Collections.singletonList(a_topic); 
				TopicMetadataRequest req = new TopicMetadataRequest(topics); 
				kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); 
 
				List<TopicMetadata> metaData = resp.topicsMetadata(); 
				for (TopicMetadata item : metaData) { 
					for (PartitionMetadata part : item.partitionsMetadata()) { 
						if (part.partitionId() == a_partition) { 
							returnMetaData = part; 
							break loop; 
						} 
					} 
				} 
			} catch (Exception e) { 
				log.error("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " 
						+ a_partition + "] Reason: " + e); 
			} finally { 
				if (consumer != null) 
					consumer.close(); 
			} 
		} 
		if (returnMetaData != null) { 
			m_replicaBrokers.clear(); 
			for (kafka.cluster.Broker replica : returnMetaData.replicas()) { 
				m_replicaBrokers.add(replica.host()); 
			} 
		} 
		return returnMetaData; 
	} 
} 

4.总结

  在使用 Kafka 低级消费 API 时,要明确我们所使用的业务场景,一般建议还是使用高级消费 API,除非遇到特殊需要。另外,在使用过程中,注意 Leader Broker 的处理,和 Offset 的管理。

5.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9978.html

(0)
上一篇 2021年7月19日 11:44
下一篇 2021年7月19日 11:44

相关推荐

发表回复

登录后才能评论