kafka之七 sinkTask详解大数据

使用kafka connector 功能实现一个数据从kafka到MySQL的sinkTask

一:实现JdbcSinkConnector类

  

public class JdbcSinkConnector extends SinkConnector{ 
    private String url; 
	private String driven; 
	private String userName; 
	private String passwd; 
    public void start(Map<String, String> props) { 
		this.url = PropertyVerify.getOrElse(props, Constant_Global.URL, "jdbc:mysql://localhost/test", "'URL' is null"); 
		this.driven = PropertyVerify.getOrElse(props, Constant_Global.DRIVEN, "com.mysql.jdbc.Driver", "'DRIVEN' is null"); 
		this.userName = PropertyVerify.getOrElse(props, Constant_Global.USERNAME, "root", "'USERNAME' is null"); 
		this.passwd = PropertyVerify.getOrElse(props, Constant_Global.PASSED, "root", "'PASSED' is null"); 
	} 
	public Class<? extends Task> taskClass() { 
		return JdbcSinkTask.class; 
	} 
	public List<Map<String, String>> taskConfigs(int maxTasks) { 
		ArrayList<Map<String, String>> configs = new ArrayList<>(); 
		for(int i=0;i<maxTasks;i++){ 
			Map<String,String> conf = new HashMap<String,String>(); 
			conf.put("url", url); 
			conf.put("driven", driven); 
			conf.put("userName", userName); 
			conf.put("passwd", passwd); 
			configs.add(conf); 
		} 
		return configs; 
	} 
	@Override 
	public String version() { 
		return AppInfoParser.getVersion(); 
	} 
	@Override 
	public void stop() { 
		// TODO Auto-generated method stub	 
	} 
}

二:实现JdbcSinkConnector类

    

public class JdbcSinkTask  extends SinkTask{ 
	private static final Logger LOG = LoggerFactory.getLogger(JdbcSinkTask.class); 
	//private  Connection conn = null; 
	public String shcema; 
	private  JdbcDbWriter writer; 
	@Override 
	public String version() { 
		return  new JdbcSinkConnector().version(); 
	} 
	@Override 
	public void flush(Map<TopicPartition, OffsetAndMetadata> map) { 
		LOG.info("================flush Map start................==========================================================="); 
	} 
	@Override 
	public void put(Collection<SinkRecord> sinkRecords) { 
		if(sinkRecords.isEmpty()){ 
			return; 
		}		 
		try { 
				writer.write(sinkRecords,shcema,email); 
		} catch (SQLException | IOException e) { 
			try { 
				EmailUtil.init(Constant_Global.STMP, Constant_Global.EMAILUSER, Constant_Global.EMAILPASSWD, Constant_Global.EMAILTITAL, Constant_Global.EMAILADREE, email); 
				EmailUtil.send(" kafka sink 数据写入有问题 "); 
			} catch (MessagingException e1) { 
				e1.printStackTrace(); 
			} 
				throw new JDBCConntorException("数据写入有问题"); 
		}		 
	} 
	@Override 
	public void start(Map<String, String> pro)  { 
	    try { 
			DbPool.init(pro); 
			writer =new JdbcDbWriter(); 
		} catch (PropertyVetoException e1) { 
			e1.printStackTrace(); 
			LOG.info("数据库配置异常====="); 
		}		 
	} 
	@Override 
	public void stop() {		 
	} 
}

三 :打包运行

      3.1 单机版运行,配置文件在kafka/config目录下

          a: cp   connect-file-sink.properties    connect-jdbc-sink.properties

          b: vim connect-jdbc-sink.properties  配置如下

          

     # kafka connector properties 
     name=canal-sink-connector  #定义task名称 
     connector.class=com.trcloud.hamal.sink.jdbc.JdbcSinkConnector  #定义自己打包中的类 
     tasks.max=1  #task个数 
     topics=words-out1   #消费的topic 
     url=jdbc:mysql://172.30.50.213/test   #数据库参数 
     driven=com.mysql.jdbc.Driver #数据库驱动 
     userName=root  #数据库用户名 
     passwd=123456  #数据库密码 

        c: 启动命令

        
./connect-standalone.sh   ../config/connect-standalone.properties  ../config/connect-jdbc-sink.properties

      3.2 单机版用于测试,生产环境建议使用分布式

            a: 配置文件 vim  jdbc-sink-distributed.properties         

bootstrap.servers=node1:6667,node2:6667,node3:6667 
group.id=test-consumer-group 
key.converter=org.apache.kafka.connect.storage.StringConverter 
value.converter=org.apache.kafka.connect.json.JsonConverter 
key.converter.schemas.enable=false 
value.converter.schemas.enable=false 
internal.key.converter=org.apache.kafka.connect.json.JsonConverter 
internal.value.converter=org.apache.kafka.connect.json.JsonConverter 
internal.key.converter.schemas.enable=false 
internal.value.converter.schemas.enable=false 
offset.storage.topic=connect-offsets 
offset.flush.interval.ms=10000 
config.storage.topic=configs-topic 
status.storage.topic=connect-status

     3.2 启动命令 用rest接口启动

   

curl -X POST /connectors HTTP/1.1
Host: kafka.test.nd1
Content-Type: application/json
Accept: application/json
{
    “name”: “local-dw-sink”,
    “config”: {
    “connector.class”:”com.trcloud.hamal.sink.jdbc.JdbcSinkConnector”,
    “tasks.max”:”1″,
    “topics”:”sql-log” ,
    “url”:”jdbc:mysql://node4:3306/DW”,
    “driven”:”com.mysql.jdbc.Driver”,
    “userName”:”root”,
    “passwd”:”1234″
    }

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9401.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论