如何进行Kafka 1.0.0 d代码示例分析

这篇文章将为大家详细讲解有关如何进行Kafka 1.0.0 d代码示例分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

package kafka.demo;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
 * 
 *  <p>Description: kafka 1.0.0</p> 
 * @author guangshihao
 * @date 2018年9月19日 
 *
 */
public class KafkaProduderDemo {
	public static void main(String[] args) {
		Map<String,Object> props = new HashMap<>();
		/*
         * acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1
		 * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。
		 * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
		 * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。
		 * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。
		 * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。
		 * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。
		 * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失
		 */
		props.put("acks", "1");
		//配置默认的分区方式
		props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
		//配置topic的序列化类
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		//配置value的序列化类
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*
		 * kafka broker对应的主机,格式为host1:port1,host2:port2
		 */
		props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
		//topic
		String topic = "test7";
		KafkaProducer< String, String> producer = new KafkaProducer< String, String>(props);
		for(int i = 1 ;i <= 100 ; i++) {
			String line = i+" this is a test ";
			ProducerRecord<String,String> record = new ProducerRecord<String,String>(topic,line );
			producer.send(record);
		}
		producer.close();
	}
	
	
}
//---------------------------------------------------------------------------------------------------------------------------
package kafka.demo;
import java.util.Arrays;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
public class KafkaConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");
props.put("group.id", "group_test7");
//配置topic的序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//配置value的序列化类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//自动同步offset
        props.put("enable.auto.commit","true");
        //自动同步offset的时间间隔
        props.put("auto.commit.intervals.ms", "2000");
        //当在zookeeper中发现要消费的topic没有或者topic的offset不合法时自动设置为最小值,可以设的值为 latest, earliest, none,默认为largest
        props.put("auto.offset.reset", "earliest ");
        
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
consumer.subscribe(Arrays.asList("test7"));
//consumer.beginningOffsets("");
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for(ConsumerRecord<String, String> record: records) {
System.out.println("partition:"+record.partition()+"  "+record.value());
}
//consumer.commitSync();
if((new Random(10)).nextInt()>5) {
consumer.wakeup();
}
}
}catch(WakeupException e) {
e.printStackTrace();
}finally {
consumer.close();
}
}
}

关于如何进行Kafka 1.0.0 d代码示例分析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

原创文章,作者:bd101bd101,如若转载,请注明出处:https://blog.ytso.com/223796.html

(0)
上一篇 2022年1月7日
下一篇 2022年1月7日

相关推荐

发表回复

登录后才能评论