kafka源码解析之十六生产者流程(客户端如何向topic发送数据)详解编程语言

客户端向topic发送数据分为两种方式:1.异步,2同步。其配置为producer.type,如果为sync,则是同步发送;如果为async,则是异步发送。

客户端代码如下:

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 
import java.util.*; 
public class KafkaProducer { 
    private Producer<String,String> inner; 
    public KafkaProducer() throws Exception{ 
        Properties properties = new Properties(); 
        properties.load(ClassLoader.getSystemResourceAsStream("KafkaProducer.properties")); 
/* KafkaProducer.properties内容如下 
*metadata.broker.list=172.23.9.134:9092,172.23.9.135:9092,172.23.9.136:9092 
* producer.type=sync 
* compression.codec=0 
* serializer.class=kafka.serializer.StringEncoder 
* request.required.acks=1 
*/ 
//根据KfkaProducer.properties加载配置信息 
        ProducerConfig config = new ProducerConfig(properties); 
        inner = new Producer<String, String>(config); 
    } 
//一条一条发送 
    public void send(String topicName, String message) { 
        if(topicName == null || message == null){ 
            return; 
        } 
        KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message); 
        inner.send(km); 
    } 
//批量发送 
    public void send(String topicName, Collection<String> messages) { 
        if(topicName == null || messages == null){ 
            return; 
        } 
        if(messages.isEmpty()){ 
            return; 
        } 
        List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(); 
        for(String entry : messages){ 
            KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry); 
            kms.add(km); 
        } 
        inner.send(kms); 
    } 
    public void close(){ 
        inner.close(); 
    } 
    public static void main(String[] args) { 
        KafkaProducer producer = null; 
        try{ 
            producer = new KafkaProducer(); 
            int i=0; 
            while(true){ 
                StringBuffer sbMsg = new StringBuffer(); 
                sbMsg.append("content"); 
//以KV对的形式发送 
                producer.send("key", sbMsg.toString()); 
            } 
        }catch(Exception e){ 
            e.printStackTrace(); 
        }finally{ 
            if(producer != null){ 
                producer.close(); 
            } 
        } 
    } 
}

那么接下去会发生什么呢?最终会调用到kafka.javaapi.producer

package kafka.javaapi.producer 
 
import kafka.producer.ProducerConfig 
import kafka.producer.KeyedMessage 
import scala.collection.mutable 
class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) 
{  //最终实现生成者功能的是kafka.producer.Producer 
  def this(config: ProducerConfig) = this(new kafka.producer.Producer[K,V](config)) 
  def send(message: KeyedMessage[K,V]) { 
    underlying.send(message) 
  } 
  def send(messages: java.util.List[KeyedMessage[K,V]]) { 
    import collection.JavaConversions._ 
    underlying.send((messages: mutable.Buffer[KeyedMessage[K,V]]).toSeq: _*) 
  } 
  def close = underlying.close 
}

其具体的发送逻辑如下:

kafka源码解析之十六生产者流程(客户端如何向topic发送数据)详解编程语言

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/11810.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论