一:kafka官文API中只提供了消费者和生产者的API,但对于创建TOPIC的API没有介绍。项目中需要整合kafka的admin功能时候,就没有参考的方法。通过浏览kafka的源码,你会发现其中的创建topic的代码逻辑;下面就是笔者总结获取的;
二 :创建topic javaAPI实现
public class KafkaTopicBean {
// topic name
private String topic;
// partition num
private Integer partition;
// replication num
private Integer replication;
private String descrbe;
// 操作类型
private Integer operationType;
public Integer getOperationType() {
return operationType;
}
public void setOperationType(Integer operationType) {
this.operationType = operationType;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Integer getPartition() {
return partition;
}
public void setPartition(Integer partition) {
this.partition = partition;
}
public Integer getReplication() {
return replication;
}
public void setReplication(Integer replication) {
this.replication = replication;
}
public String getDescrbe() {
return descrbe;
}
public void setDescrbe(String descrbe) {
this.descrbe = descrbe;
}
@Override
public String toString() {
return "KafkaTopicBean [topic=" + topic + ", partition=" + partition
+ ", replication=" + replication + ", descrbe=" + descrbe
+ ", operationType=" + operationType + "]";
}
}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.0.1</version>
</dependency>
1:实现kafka创建topic
public static void createKafaTopic() {
ZkUtils zkUtils = ZkUtils.apply(
"172.30.251.331:2181,172.30.251.341:2181", 30000, 30000,
JaasUtils.isZkSecurityEnabled());
Properties conf = new Properties();
AdminUtils.createTopic(zkUtils, bean.getTopic(),
bean.getPartition(), bean.getReplication(),
new Properties(), new RackAwareMode.Enforced$());
}
2: 实现kafka删除topic
private static ResultDTO deleteTopic(ZkUtils zkUtils2, KafkaTopicBean bean) {
ResultDTO DAO = null;
boolean isCheck = checkDeleteTopic(bean);
if (isCheck) {
if (Topic.isInternal(bean.getTopic())) {
DAO = new ResultDTO(
Global_Constant.DTO_ERROR_CODE,
"Topic %s is a kafka internal topic and is not allowed to be marked for deletion");
} else {
zkUtils.createPersistentPath(
zkUtils.getDeleteTopicPath(bean.getTopic()), null,
zkUtils.DefaultAcls());
DAO = new ResultDTO(Global_Constant.DTO_SUCCESS_CODE,
"success in delete topic");
}
} else {
DAO = new ResultDTO(Global_Constant.DTO_ERROR_CODE,
"delete topic but topic name is empty");
}
return DAO;
}
private static boolean checkDeleteTopic(KafkaTopicBean bean) {
if (null == bean.getTopic() || bean.getTopic().isEmpty()
|| "".equals(bean.getTopic().trim())) {
return false;
} else {
Seq<String> topics = zkUtils.getAllTopics();
if (topics.contains(bean.getTopic())) {
return true;
} else {
return false;
}
}
}
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9402.html