一、错误日志如下:
下午4点43:28.444分 ERROR KafkaSink Failed to publish events java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29) at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:243) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. 下午4点43:28.445分 ERROR SinkRunner Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: Failed to publish events at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:267) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29) at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:243) ... 3 more Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received. 下午4点43:31.657分 INFO ReliableTaildirEventReader Last read was never committed - resetting position
二、错误原因分析
将Atlas中的元数据导出之后,使用Flume进行采集,因此需要分析文件大小,仔细查找之后发现最大的一个JSON文件有200M,故原因就是采集的JSON文件过大导致
三、解决思路
1、自定义Flume拦截器,对大文件单独处理,看看能否将JSON文件中的数据拆分开
2、同时调整如下参数:
#批次处理行数 a1.sources.r1.batchSize = 10000 #interceptor a1.sources.r1.interceptors=i1 a1.sources.r1.interceptors.i1.type=com.meiyijia.pd.flume.interceptor.BigDataInterceptor$Builder a1.sources.r1.interceptors.i1.param=parameter #关闭没有新增内容的文件超时时间(毫秒 a1.sources.r1.idleTimeout = 1000 #最大数据大小 a1.sinks.k1.kafka.producer.max.request.size = 1053741824 #客户端总缓存大小 a1.sinks.k1.kafka.producer.buffer.memory = 15053741824 a1.sinks.k1.kafka.producer.max.block.ms = 30000 a1.sinks.k1.kafka.producer.request.timeout.ms = 10000 a1.sinks.k1.kafka.flumeBatchSize = 10000 a1.sinks.k1.kafka.linger.ms = 1 a1.sinks.k1.kafka.batch.size = 10000 #失败重试次数 a1.sinks.k1.kafka.producer.retries = 3 # channel a1.channels.c1.type = memory #channel中最多缓存多少 a1.channels.c1.capacity = 2000000 #channel一次最多吐给sink多少 a1.channels.c1.transactionCapacity = 12000 #event的活跃时间 a1.channels.c1.keep-alive = 3
四、参考文档:
https://www.csdn.net/tags/Ntjacg3sODcwMTUtYmxvZwO0O0OO0O0O.html
原创文章,作者:dweifng,如若转载,请注明出处:https://blog.ytso.com/272435.html