admin管理员组文章数量:1611515
场景还原:
在测试环境进行数据清洗,由原始日志数据发送至HDFS,过程为:
原始日志文件–>flume->kafka->flume->HDFS
今天测试一亿条数据时出现问题,问题发生位置在flume->HDFS,问题整体描述如下:
ERROR kafka.KafkaSource: KafkaSource EXCEPTION, {}
org.apache.flume.ChannelFullException: The channel has reached it's capacity. This might be the result of a sink on the channel having too low of batch size, a downstream system running slower than normal, or that the channel capacity is just too low. [channel=c2]
at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:505)
at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:191)
at org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:311)
at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:60)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
at java.lang.Thread.run(Thread.java:748)
问题原因分析
主要看一下这一句:
org.apache.flume.ChannelFullException: The channel has reached it's capacity. This might be the result of a sink on the channel having too low of batch size, a downstream system running slower than normal, or that the channel capacity is just too low. [channel
译文:
这可能是由于通道上的接收器批大小过低、下游系统运行速度比正常慢,或者通道容量过低造成的。
分析:
日志打出来的很清楚了,意思就是HDFS Sink 接受的速度小于source的速度,还有一种可能就是channel的内存较小,导致数据溢出.所以根据日志在CDH中进行参数调整,将上述问题的情况调整一下.
操作步骤
调整channel的内存大小,设置超时时间,对大数据量进行缓冲.
参数 说明 capacity 在 channel 中 最多能保存多少个 event。默认是100 transactionCapacity 在每次从source中获取数据或者将数据sink出去的一次事务操作中,最多处理的 event 数。默认是100 byteCapacity 在 channel 中最多能容纳 所有event body的总字节数。默认是 JVM最大可用内存(-Xmx )的80% 。需要注意的是,如> 果你设置多个memory channel,并且它们的数据都是从同一个source来的,那么计算时是用所有channel中event body的字节数加起来计> 算的,也就是说会重复计算,所以需要注意一下。不建议将该值设置为0 byteCapacityBufferPercentage 这个值的含义跟上面一样,只不过这个是计算event header跟最大可用内存的字节占比。默认是20,也> > 就是最大可用内存的20% keep-alive 尝试添加或者删除一个event的超时时间,单位为秒。默认是3,也就是3秒 我的设置如下:
通道支持事务的最大大小 a1.channels.c1.transactionCapacity=1000000 #添加或者删除一个event的超时时间,单位为秒,默认是3 a1.channels.c1.keep-alive=60 #添加event,最多保存多少个event,默认是100 a1.channels.c1.capacity=1000000
C2也进行了如上配置,最后flume没报错,并且开始上传一亿条数据.
总结
在数据量大的时候,我们需要清楚flume以及kafka的运行机制,并且要知道flume的吞吐量,
比如说一亿条数据,采用flume的默认event保存机制100条是完全不够支撑的,
将超时时间换做60s给予缓冲,使用
1000000
的event来进行存储则可以解决面对的问题.
第二种解决办法(未尝试)
从java最大内存大小入手
修改java最大内存大小
vi bin/flume-ng
JAVA_OPTS="-Xmx2048m"
把jvm的堆空间设置大一点,防止flume本身的配置满足要求,但是堆大小不满足
参考链接
参考一:参考链接 1
参考二:参考链接 2
版权声明:本文标题:Kafka- flume报错:org.apache.flume.ChannelFullException: The channel has reached it‘s capacity. 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dianzi/1728620378a1166280.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论