一、错误日志如下:
下午4点43:28.444分 ERROR KafkaSink
Failed to publish events
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:243)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
下午4点43:28.445分 ERROR SinkRunner
Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:267)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:243)
... 3 more
Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
下午4点43:31.657分 INFO ReliableTaildirEventReader
Last read was never committed - resetting position
二、错误原因分析
将Atlas中的元数据导出之后,使用Flume进行采集,因此需要分析文件大小,仔细查找之后发现最大的一个JSON文件有200M,故原因就是采集的JSON文件过大导致
三、解决思路
1、自定义Flume拦截器,对大文件单独处理,看看能否将JSON文件中的数据拆分开
2、同时调整如下参数:
#批次处理行数 a1.sources.r1.batchSize = 10000 #interceptor a1.sources.r1.interceptors=i1 a1.sources.r1.interceptors.i1.type=com.meiyijia.pd.flume.interceptor.BigDataInterceptor$Builder a1.sources.r1.interceptors.i1.param=parameter #关闭没有新增内容的文件超时时间(毫秒 a1.sources.r1.idleTimeout = 1000 #最大数据大小 a1.sinks.k1.kafka.producer.max.request.size = 1053741824 #客户端总缓存大小 a1.sinks.k1.kafka.producer.buffer.memory = 15053741824 a1.sinks.k1.kafka.producer.max.block.ms = 30000 a1.sinks.k1.kafka.producer.request.timeout.ms = 10000 a1.sinks.k1.kafka.flumeBatchSize = 10000 a1.sinks.k1.kafka.linger.ms = 1 a1.sinks.k1.kafka.batch.size = 10000 #失败重试次数 a1.sinks.k1.kafka.producer.retries = 3 # channel a1.channels.c1.type = memory #channel中最多缓存多少 a1.channels.c1.capacity = 2000000 #channel一次最多吐给sink多少 a1.channels.c1.transactionCapacity = 12000 #event的活跃时间 a1.channels.c1.keep-alive = 3
四、参考文档:
https://www.csdn.net/tags/Ntjacg3sODcwMTUtYmxvZwO0O0OO0O0O.html
原创文章,作者:dweifng,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/272435.html