kafka之五 kafkaAdmin API详解大数据

   一:kafka官文API中只提供了消费者和生产者的API,但对于创建TOPIC的API没有介绍。项目中需要整合kafka的admin功能时候,就没有参考的方法。通过浏览kafka的源码,你会发现其中的创建topic的代码逻辑;下面就是笔者总结获取的;

二 :创建topic javaAPI实现

    

public class KafkaTopicBean { 
 
	// topic name 
	private String topic; 
	// partition num 
	private Integer partition; 
	// replication num 
	private Integer replication; 
	private String descrbe; 
	// 操作类型 
	private Integer operationType; 
 
	public Integer getOperationType() { 
		return operationType; 
	} 
 
	public void setOperationType(Integer operationType) { 
		this.operationType = operationType; 
	} 
 
	public String getTopic() { 
		return topic; 
	} 
 
	public void setTopic(String topic) { 
		this.topic = topic; 
	} 
 
	public Integer getPartition() { 
		return partition; 
	} 
 
	public void setPartition(Integer partition) { 
		this.partition = partition; 
	} 
 
	public Integer getReplication() { 
		return replication; 
	} 
 
	public void setReplication(Integer replication) { 
		this.replication = replication; 
	} 
 
	public String getDescrbe() { 
		return descrbe; 
	} 
 
	public void setDescrbe(String descrbe) { 
		this.descrbe = descrbe; 
	} 
 
	@Override 
	public String toString() { 
		return "KafkaTopicBean [topic=" + topic + ", partition=" + partition 
				+ ", replication=" + replication + ", descrbe=" + descrbe 
				+ ", operationType=" + operationType + "]"; 
	} 
 
} 

  <dependency>

           <groupId>org.apache.kafka</groupId>

           <artifactId>kafka_2.10</artifactId>

            <version>0.10.0.1</version>

    </dependency>

    1:实现kafka创建topic


 public static void createKafaTopic() { 
		 
		ZkUtils zkUtils = ZkUtils.apply( 
				"172.30.251.331:2181,172.30.251.341:2181", 30000, 30000, 
				JaasUtils.isZkSecurityEnabled()); 
                Properties conf = new Properties(); 
		AdminUtils.createTopic(zkUtils, bean.getTopic(), 
                        bean.getPartition(), bean.getReplication(), 
                        new Properties(), new RackAwareMode.Enforced$()); 
}


2: 实现kafka删除topic     

private static ResultDTO deleteTopic(ZkUtils zkUtils2, KafkaTopicBean bean) { 
	ResultDTO DAO = null; 
	boolean isCheck = checkDeleteTopic(bean); 
	if (isCheck) { 
			if (Topic.isInternal(bean.getTopic())) { 
				DAO = new ResultDTO( 
						Global_Constant.DTO_ERROR_CODE, 
						"Topic %s is a kafka internal topic and is not allowed to be marked for deletion"); 
			} else { 
				zkUtils.createPersistentPath( 
						zkUtils.getDeleteTopicPath(bean.getTopic()), null, 
						zkUtils.DefaultAcls()); 
				DAO = new ResultDTO(Global_Constant.DTO_SUCCESS_CODE, 
						"success in delete topic"); 
			} 
	} else { 
			DAO = new ResultDTO(Global_Constant.DTO_ERROR_CODE, 
					"delete topic but topic name is empty"); 
	} 
	return DAO;
} 
private static boolean checkDeleteTopic(KafkaTopicBean bean) { 
	if (null == bean.getTopic() || bean.getTopic().isEmpty() 
				|| "".equals(bean.getTopic().trim())) { 
			return false; 
	} else { 
		Seq<String> topics = zkUtils.getAllTopics(); 
		if (topics.contains(bean.getTopic())) { 
			return true; 
		} else { 
			return false; 
		} 
	} 
}



原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9402.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论