记flume部署过程中遇到的问题以及解决方法(持续更新)_ubuntu发送flume文件夹到节点一直处于发送状态-程序员宅基地

技术标签: flume  

项目需求是将线上服务器生成的日志信息实时导入kafka,采用agent和collector分层传输,app的数据通过thrift传给agent,agent通过avro sink将数据发给collector,collector将数据汇集后,发送给kafka,拓扑结构如下:




现将调试过程中遇到的问题以及解决方法记录如下:

1、 [ERROR - org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:484)] Unexpected throwable while invoking!

java.lang.OutOfMemoryError: Java heap space

原因:flume启动时的默认最大的堆内存大小是20M,实际环境中数据量较大时,很容易出现OOM问题,在flume的基础配置文件conf下的flume-env.sh中添加

export JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"

并且在flume启动脚本flume-ng中,修改JAVA_OPTS="-Xmx20m"JAVA_OPTS="-Xmx2048m"

此处我们将堆内存的阈值跳转到了2G,实际生产环境中可以根据具体的硬件情况作出调整


2、  [ERROR - org.apache.thrift.server.TThreadedSelectorServer$SelectorThread.run(TThreadedSelectorServer.java:544)] run() exiting due to uncaught error
  java.lang.OutOfMemoryError: unable to create new native thread

  原因:如果App给flume的thrift source发送数据时,采用短连接,会无限地创建线程,使用命令 pstree 时发现java的线程数随着发送数据量的增长在不停增长,最终达到了65500多个,超过了linux系统对线程的限制,解决方法是在thrift source配置项中增加一个线程数的限制。

agent.sources.r1.threads = 50

重新启动agent发现java的线程数达到70多就不再增长了


3、 Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count

原因:这是memory channel被占满导致的错误,memory channel默认最多只缓存100条数据,在生产环境中明显不够,需要将capacity参数加大


4、warn:"Thrift source %s could not append events to the channel."。

原因:查看flume的配置文档可以发现,各种类型的sink(thrift、avro、kafka等)的默认batch-size都是100,file channel、memory channel的transactioncapacity默认也都是100,如果修改了sink的batch-size,需要将batch-size设置为小于等于channel的transactioncapacity的值,否则就会出现上面的warn导致数据无法正常发送


5、agent处报

(SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
        at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:392)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 10.200.197.82, port: 5150 }: Failed to send batch
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:315)
        at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:376)
        ... 3 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 10.200.197.82, port: 5150 }: Exception thrown from remote handler
        at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:397)
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:374)
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:303)
        ... 4 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Connection reset by peer
        at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128)
        at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:389)
        ... 6 more
Caused by: java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:59)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        ... 1 more


collector报

2017-08-21 16:36:43,010 (New I/O  worker #12) [WARN - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(NettyServer.java:201)] Unexpected exception from downstream.
org.apache.avro.AvroRuntimeException: Excessively large list allocation request detected: 349070535 items! Connection closed.
        at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:167)
        at org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:139)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:422)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:478)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.channelDisconnected(FrameDecoder.java:366)
        at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:399)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:721)
        at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:111)
        at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:66)
        at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
        at org.jboss.netty.channel.Channels.close(Channels.java:820)
        at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197)
        at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(NettyServer.java:202)
        at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
        at org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:378)
        at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:533)
        at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:48)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:84)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

原因:当agent到collector的数据在agent的avro sink处进行压缩时,在collector的avro source处必须解压,否则数据无法发送



6、org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {ssp_package-0=388595} whose size is larger than the fetch size 1048576 and hence cannot be ever returned. Increase the fetch size, or decrease the maximum message size the broker will allow.
2017-10-11 01:30:10,000 (PollableSourceRunner-KafkaSource-r1) [ERROR - org.apache.flume.source.kafka.KafkaSource.doProcess(KafkaSource.java:314)] KafkaSource EXCEPTION, {}

原因:配置kafka source时,flume作为kafka的consumer,在consumer消费kafka数据时,默认最大文件大小是1m,如果文件大小超过1m,需要手动在配置里面调整参数,

但是在flume官网的配置说明-kakka source中,并没有找到配置fetch size的地方,但是在配置的最后一行有一个

Other Kafka Consumer Properties--These properties are used to configure the Kafka Consumer. Any consumer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.consumer. For example: kafka.consumer.auto.offset.reset

此处配置用的是kafka的配置方法,在kafka官网的配置文档-consumer configs-max.partition.fetch.bytes有相关说明

agent.sources.r1.kafka.consumer.max.partition.fetch.bytes = 10240000

此处将consumer的fetch.byte加到10m


7、2017-10-13 01:19:47,991 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:240)] Failed to publish events
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2606058 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
        at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2606058 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

原因:与上一点类似,此处是kafka sink时,flume作为producer,也要设置文件的fetch大小,同样是参考kafka官网的配置

agent.sinks.k1.kafka.producer.max.request.size = 10240000


8、java.io.IOException: Too many open files
        at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
        at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
        at org.mortbay.jetty.nio.SelectChannelConnector$1.acceptChannel(SelectChannelConnector.java:75)
        at org.mortbay.io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:686)
        at org.mortbay.io.nio.SelectorManager.doSelect(SelectorManager.java:192)
        at org.mortbay.jetty.nio.SelectChannelConnector.accept(SelectChannelConnector.java:124)
        at org.mortbay.jetty.AbstractConnector$Acceptor.run(AbstractConnector.java:708)
        at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)

原因:文件句柄占用太多,首先查看flume占用句柄个数

lsof -p pid | wc -l 

pid是flume进程号,

vim /etc/security/limits.conf 

在最后加入  
* soft nofile 4096  
* hard nofile 4096  

最前的 * 表示所有用户,改完后重启下flume服务


9、(kafka-producer-network-thread | producer-1) [ERROR - org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:130)] Uncaught 
error in kafka producer I/O thread:
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'throttle_time_ms': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
        at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
        at java.lang.Thread.run(Thread.java:744)

原因:kafka集群版本较老,flume版本较新,此处kafka使用的版本是较老的0.8.2, flume使用1.7则会报上述错误,只能将flume降为1.6版本


9、sink到kafka上的数据没有均匀的分布在各个partition上,而是全部放在了同一个partition上

原因:这是老版本flume遗留下的一个bug,需要在event中构造一个包含key为 key 的header 键值对就能达到目的

[plain]  view plain   copy
  1. a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder  
  2. a1.sources.flume0.interceptors.i1.headerName = key  
真正没有随机的原因本文并没有直接去找到,是借助另一种方式解决了问题
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/StrideBin/article/details/80797373

智能推荐

硬石-专题04 步进电机控制(第4节)_28BYJ-48步进电机介绍_28byj步进电机发烫-程序员宅基地

文章浏览阅读605次。一,步进电机的几个概念1,步进电机普遍存在发热的情况二,28BYJ-48步进电机介绍1,28BYJ-48步进电机自带减速器,四相五线(减速器不是用来减速的,注意和刹车的区别)_28byj步进电机发烫

ARM架构概览-程序员宅基地

文章浏览阅读2.3k次。ARM架构概览_arm架构

8.S5PV210之RTC实时时钟代码_s5pv210 rtc 驱动-程序员宅基地

文章浏览阅读649次。RTC.h:RTC.c:main.c_s5pv210 rtc 驱动

零基础学AI,轻松搞定人工智能6大方向【附资料】_搞定ai-程序员宅基地

文章浏览阅读4.5k次。在当前这个市场最不缺的是资料,不缺的原因是因为市场大量泛滥着各种姿势的学习资料;最缺的也是资料,缺的原因是因为真正有体系,有价值的资料少之又少。所以,我们送你一份价值2980的学习路线图+AI课程资料+线上直播课学习,这些资料足够你入门AI领域!添加CSDN小姐姐仅限500人!领取价值2980人工智能学习路线图+课程资料+视频资料AI初级应用工程师的完整学习路径图~添加CSDN小姐姐仅限500人!领取价值2980人工智能学习路线图+课程资料+视频资料._搞定ai

以算法岗为例:我最想对入职前的自己说些什么?-程序员宅基地

文章浏览阅读251次。来自:机器学习与推荐系统进入算法岗的正式工作虽然只有半年多,但从 19 年 1 月就在算法岗开始实习,算下来到现在都有两年多的工作经验了,不过正式参加工作以后还是有了很多新的感悟,这篇文章..._算法岗入职很紧张

2017 Wuhan University Programming Contest (Online Round) D. Events,线段树区间更新+最值查询!...-程序员宅基地

文章浏览阅读87次。D. Events 线段树区间更新查询区间历史最小值,看似很简单的题意写了两天才写出来。 题意:n个数,Q次操作,每次操作对一个区间[l,r]的数同时加上C,然后输出这段区间的历史最小值。 思路:在线段树区间更新最值查询的基础上再用一个变量表示...

随便推点

如何清除计算机连接网络的记录,彻底清除上网记录-程序员宅基地

文章浏览阅读9k次。很多朋友电脑使用过一段时间后会发现浏览器窗口中有很多浏览过得网址记录。比如只要我们在浏览器中输入www.pc841.com之类的网站,刚输到3个WWW就会发现浏览器输入框中有一大堆的网址记录,如下图所示:相关阅读:电脑收藏夹位置在哪? 如何将网址加入收藏夹!如此多上网记录该如何清除上网记录呢如上图,浏览器网址输入框下面就显示了一大堆我原来输入的一些查找的网址词条.我们知道浏览器中含有过多的网上记录..._清除外接网卡记录

事务故障、介质故障、系统故障恢复方法及区别_系统故障和介质故障的区别-程序员宅基地

文章浏览阅读1.1w次,点赞21次,收藏61次。1. 数据库的事务故障指的是什么,发生事务故障后,DBMS如何恢复数据库?事务故障指某个事务在运行过程中由于种种原因未运行至正常终止点就夭折了。恢复方法:撤销事务。即清除该事务对数据库的所有修改,使得这个事务像根本没有启动过一样。(需要从后到前撤销,最新完成的操作的更新影响要先消失。因此,需要从后到前扫描日志文件。)2. ..._系统故障和介质故障的区别

ITK:从灰度图像计算直方图_itk 图片直方图-程序员宅基地

文章浏览阅读523次。ITK:从灰度图像计算直方图内容提要输入输出C++实现代码内容提要输入输出Frequency = [ 0,8593,17734,11515,5974,2225,2400,3422,3531,3283,2125,2628,1954,152,0,0 ]C++实现代码#include "itkImage.h"#include "itkImageFileReader.h"#include "itkImageToHistogramFilter.h"#include _itk 图片直方图

2006-2018全国省级以上开发区空间特征变化_全国开发区 四至 shp-程序员宅基地

文章浏览阅读194次。之前写过空间统计工具中的标准椭圆差,GIS的空间统计工具—标准椭圆差和线性方向平均值,这次用这个工具水一篇文章,结果不代表任何实际意义,感谢大家不取关之恩。开发区数据:全球变化科学研究出版系统,2006年、2018年全国省级以上开发区shp点数据,地理坐标为WGS84。行政区划数据:网络【省级以上开发区标准椭圆差】然后以下都是用一个标准椭圆差做的,即包含了68%的数据。打开属性表分别统计一下2006年和2018年的结果。XstdDist和YStdDist表示X轴的长度和Y轴的长度_全国开发区 四至 shp

【转载】笔记:计算机_体系结构_操作系统_软件_操作系统内核_GNU_Linux_C_Python_Latex_Java_TCP/IP_MacOS_Windows这些词语的历史,关系-程序员宅基地

文章浏览阅读381次。一、计算机的发明世上本无路,走的人多了,就有了路。世上本无计算机,琢磨的人多了……没有计算机,一切无从谈起。三个人对计算机的发明功不可没,居功至伟。阿兰·图灵(Alan Mathison Turing)、阿塔那索夫(John Vincent Atanasoff)、和冯·诺依曼(John von Neumann)。图灵从理论上证明了计算机的可行性;阿塔那索夫实践了图灵的理论;冯·诺依曼奠...

c语言json写配置文件,在C语言中解析json配置文件-程序员宅基地

文章浏览阅读411次。业务需求在C或者C++项目中常常需要解析配置文件,我们常见的配置文件格式一般就是.ini,xml,lua或者是一般的text文件,这些格式比较恼人的一个问题就是数据格式过于冗余,或者功能不够强大,不支持正则匹配,或者实现解析文件的代码过多,效率不高等等。比较大型的开源项目,比如Nginx,ATS等都有自己比较庞大的配置文件格式,特别是Nginx,语言十分独特简洁,功能强大,但是往往代码较为繁杂。那..._c语言配置文件一般用什么格式

推荐文章

热门文章

相关标签