另外找一个zk,有客户端命令的
./zkCli.cmd -server 127.0.0.1:2181
ls /brokers 查看注册信息
1,kafka不支持分布式事务消息 不支持消费失败重试 2,kafka的单机TPS能跑到每秒上百万,是因为Producer端将多个小消息合并,批量发向broker 3,RocketMQ写入性能上不如kafka, 主要因为kafka主要应用于日志场景,而RocketMQ应用于业务场景,为了保证消息必达牺牲了性能,且基于线上真实场景没有在RocketMQ层做消息合并,推荐在业务层自己做。 4,没有“中心主节点”的概念,集群中所有的服务器都是对等的,因此,可以在不做任何配置的更改的情况下实现服务器的的添加与删除 https://blog.csdn.net/pengweismile/article/details/117636252 https://kafka.apache.org/quickstart ./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties ./bin/windows/kafka-server-start.bat ./config/server.properties ./bin/windows/kafka-topics.bat --create --topic topic-xmh --bootstrap-server localhost:9092 ./bin/windows/kafka-topics.bat --describe --topic topic-xmh --bootstrap-server localhost:9092 ./bin/windows/kafka-console-consumer.bat --topic topic-xmh --from-beginning --bootstrap-server localhost:9092 ./bin/windows/kafka-console-producer.bat --topic topic-xmh --bootstrap-server localhost:9092 https://blog.csdn.net/syc0616/article/details/118156641 producer配置 bootstrap.servers: kafka的地址。 acks:消息的确认机制,默认值是0。 acks=0:如果设置为0,生产者不会等待kafka的响应。 acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。 acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。 retries:配置为大于0的值的话,客户端会在消息发送失败时重新发送。 batch.size:当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。 key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。 value.serializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。 public class ProMy { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092");//,slave1:9092,slave2:9092 props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 1);//16384 props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); KafkaProducer producer = new KafkaProducer(props); producer.send(new ProducerRecord<String, String>("topic-xmh","xingkey2","xingvalues5")); producer.close(); System.out.println("*************end-procuder"); } } consumer配置 bootstrap.servers: kafka的地址。 group.id:组名 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,那么这里你只需要更改组名就可以重复消费了。 enable.auto.commit:是否自动提交,默认为true。 auto.commit.interval.ms: 从poll(拉)的回话处理时长。 session.timeout.ms:超时时间。 max.poll.records:一次最大拉取的条数。 auto.offset.reset:消费规则,默认earliest 。 earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。 latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。 none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。 key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。 value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。 public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092");//,slave1:9092,slave2:9092 props.put("group.id", "group_x3"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); // props.put("max.poll.records", 1); props.put("auto.offset.reset", "earliest"); // props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); // props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("topic-xmh")); //for(;;) { ConsumerRecords<String, String> msgList = consumer.poll(1000); // System.out.println("consumer*******:" + msgList); for (ConsumerRecord<String, String> record:msgList){ System.out.println("**************consumer*******record1:"+record+","+record.key()+","+record.value()); } //} // consumer.close(); } <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>1.0.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.0</version> </dependency>
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/280912.html