客户端向topic发送数据分为两种方式:1.异步,2同步。其配置为producer.type,如果为sync,则是同步发送;如果为async,则是异步发送。
客户端代码如下:
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.*;
public class KafkaProducer {
private Producer<String,String> inner;
public KafkaProducer() throws Exception{
Properties properties = new Properties();
properties.load(ClassLoader.getSystemResourceAsStream("KafkaProducer.properties"));
/* KafkaProducer.properties内容如下
*metadata.broker.list=172.23.9.134:9092,172.23.9.135:9092,172.23.9.136:9092
* producer.type=sync
* compression.codec=0
* serializer.class=kafka.serializer.StringEncoder
* request.required.acks=1
*/
//根据KfkaProducer.properties加载配置信息
ProducerConfig config = new ProducerConfig(properties);
inner = new Producer<String, String>(config);
}
//一条一条发送
public void send(String topicName, String message) {
if(topicName == null || message == null){
return;
}
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
inner.send(km);
}
//批量发送
public void send(String topicName, Collection<String> messages) {
if(topicName == null || messages == null){
return;
}
if(messages.isEmpty()){
return;
}
List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
for(String entry : messages){
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
kms.add(km);
}
inner.send(kms);
}
public void close(){
inner.close();
}
public static void main(String[] args) {
KafkaProducer producer = null;
try{
producer = new KafkaProducer();
int i=0;
while(true){
StringBuffer sbMsg = new StringBuffer();
sbMsg.append("content");
//以KV对的形式发送
producer.send("key", sbMsg.toString());
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(producer != null){
producer.close();
}
}
}
}
那么接下去会发生什么呢?最终会调用到kafka.javaapi.producer
package kafka.javaapi.producer
import kafka.producer.ProducerConfig
import kafka.producer.KeyedMessage
import scala.collection.mutable
class Producer[K,V](private val underlying: kafka.producer.Producer[K,V])
{ //最终实现生成者功能的是kafka.producer.Producer
def this(config: ProducerConfig) = this(new kafka.producer.Producer[K,V](config))
def send(message: KeyedMessage[K,V]) {
underlying.send(message)
}
def send(messages: java.util.List[KeyedMessage[K,V]]) {
import collection.JavaConversions._
underlying.send((messages: mutable.Buffer[KeyedMessage[K,V]]).toSeq: _*)
}
def close = underlying.close
}
其具体的发送逻辑如下:
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/11810.html