一个flume agent异常的解决过程记录 - 施文涛 - 博客园 https://www.cnblogs.com/dongqingswt/p/5287778.html
一个flume agent异常的解决过程记录
今天在使用flume agent的时候,遇到了一个异常, 现把解决的过程记录如下:
问题的背景:
我使用flume agent 来接收从storm topology发送下来的accesslog , 做本地文件落盘。flume配置文件如下:
查看源码, 发现thrift client有一个20s的请求超时,这时候 , 查看thrift 错误日志,发现了如下异常:
直观的看这个异常,会让人想到是堆内存不够用, 调节-Xmx1024m 参数以后,问题依旧。
接着用jconsole attach到flume agent这个进程,查看堆内存的使用量, 远没有达到预设的xmx阈值。
但是·有一个表现就是线程数猛涨到2000以后,直接掉到了20左右,为什么会创建这么多线程呢, 都是什么线程呢?
结合thrift source 的源码, 发现thrift server 采用的也是java nio , 有SelectorThread做socket的read/write 就绪select
AcceptorThread做socket的accept的select ,而socket读就绪以后,收到的FrameBuffer会被包装成一个Runnable丢到线程池处理(查看
TThreadedSelectorServer的304 行),代码如下:
protected boolean requestInvoke(FrameBuffer frameBuffer) {
Runnable invocation = getRunnable(frameBuffer);
if (invoker != null) {
try {
invoker.execute(invocation);
return true;
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected execution!", rx);
return false;
}
} else {
// Invoke on the caller's thread
invocation.run();
return true;
}
}
这个任务invocation的处理逻辑是在ThriftSource的ThriftSourceHandler定义的,也就是把thrift flume event直接丢到memchannel以后返回。起初 ,我怀疑是不是flume event丢到memchannel处理太慢(比如有线程死锁),导致线程堆积, 但后面换成file channel
问题依旧,于是继续看jconsole上thrift 的Flume 线程,因为线程工厂在创建线程的时候,指定了线程名:
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
"Flume Thrift IPC Thread %d").build();
if (maxThreads == 0) {
sourceService = Executors.newCachedThreadPool(threadFactory);
} else {
sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory);
}
所以, 查看这些线程发现这些线程:
结合ThriftSource源码,发现不指定最大线程数时, thrift server的线程池的确是不停的新建线程,而maxThreads又是一个Integer.MAX_VALUE,
if (maxThreads == 0) {
sourceService = Executors.newCachedThreadPool(threadFactory);
} else {
sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory);
}
试着在flume 配置文件中指定最大线程数:
a1.channels.r1.threads=10
问题解决。
这个问题的解决也让我们反思线程池的使用,不要设定太大的最大线程数。
今天在使用flume agent的时候,遇到了一个异常, 现把解决的过程记录如下:
问题的背景:
我使用flume agent 来接收从storm topology发送下来的accesslog , 做本地文件落盘。flume配置文件如下:
查看源码, 发现thrift client有一个20s的请求超时,这时候 , 查看thrift 错误日志,发现了如下异常:
直观的看这个异常,会让人想到是堆内存不够用, 调节-Xmx1024m 参数以后,问题依旧。
接着用jconsole attach到flume agent这个进程,查看堆内存的使用量, 远没有达到预设的xmx阈值。
但是·有一个表现就是线程数猛涨到2000以后,直接掉到了20左右,为什么会创建这么多线程呢, 都是什么线程呢?
结合thrift source 的源码, 发现thrift server 采用的也是java nio , 有SelectorThread做socket的read/write 就绪select
AcceptorThread做socket的accept的select ,而socket读就绪以后,收到的FrameBuffer会被包装成一个Runnable丢到线程池处理(查看
TThreadedSelectorServer的304 行),代码如下:
protected boolean requestInvoke(FrameBuffer frameBuffer) {
Runnable invocation = getRunnable(frameBuffer);
if (invoker != null) {
try {
invoker.execute(invocation);
return true;
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected execution!", rx);
return false;
}
} else {
// Invoke on the caller's thread
invocation.run();
return true;
}
}
这个任务invocation的处理逻辑是在ThriftSource的ThriftSourceHandler定义的,也就是把thrift flume event直接丢到memchannel以后返回。起初 ,我怀疑是不是flume event丢到memchannel处理太慢(比如有线程死锁),导致线程堆积, 但后面换成file channel
问题依旧,于是继续看jconsole上thrift 的Flume 线程,因为线程工厂在创建线程的时候,指定了线程名:
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
"Flume Thrift IPC Thread %d").build();
if (maxThreads == 0) {
sourceService = Executors.newCachedThreadPool(threadFactory);
} else {
sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory);
}
所以, 查看这些线程发现这些线程:
结合ThriftSource源码,发现不指定最大线程数时, thrift server的线程池的确是不停的新建线程,而maxThreads又是一个Integer.MAX_VALUE,
if (maxThreads == 0) {
sourceService = Executors.newCachedThreadPool(threadFactory);
} else {
sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory);
}
试着在flume 配置文件中指定最大线程数:
a1.channels.r1.threads=10
问题解决。
这个问题的解决也让我们反思线程池的使用,不要设定太大的最大线程数。