【问题标题】:spring integration task executor queue filled with more recordsspring 集成任务执行器队列填充了更多记录
【发布时间】:2014-12-09 05:17:26
【问题描述】:

我开始构建一个 Spring Integration 应用程序,其中输入网关生成固定数量 (50) 的记录,然后停止生成新记录。中间有基本的filters/routers/transformers,结束服务激活器和任务执行器配置如下:

<int:service-activator input-channel="inChannel" output-channel="outChannel" ref="svcProcessor">
        <int:poller fixed-rate="100" task-executor="myTaskExecutor"/>
</int:service-activator>

<task:executor id = "myTaskExecutor" pool-size="5" queue-capacity="100"/>

我尝试在 svcProcessor 方法的请求处添加一些调试信息:

@Qualifier(value="myTaskExecutor")
@Autowired 
ThreadPoolTaskExecutor executor;

@ServiceActivator
public Order processOrder(Order order) {
  log.debug("---- " + "executor size: " + executor.getActiveCount() + 
        " q: " + executor.getThreadPoolExecutor().getQueue().size() + 
        " r: " + executor.getThreadPoolExecutor().getQueue().remainingCapacity()+
        " done: " + executor.getThreadPoolExecutor().getCompletedTaskCount() +
        " task: " + executor.getThreadPoolExecutor().getTaskCount()
    );
   //
   //process order takes up to 5 seconds.
   //
   return order;
}

有时程序运行后,日志显示队列已超过50,最终得到拒绝异常:

23:38:31.096 DEBUG [myTaskExecutor-2] ---- executor size: 5 q: 44 r: 56 done: 11 task: 60
23:38:31.870 DEBUG [myTaskExecutor-5] ---- executor size: 5 q: 51 r: 49 done: 11 task: 67
23:38:33.600 DEBUG [myTaskExecutor-4] ---- executor size: 5 q: 69 r: 31 done: 11 task: 85
23:32:46.792 DEBUG [myTaskExecutor-1] ---- executor size: 5 q: 72 r: 28 done: 11 task: 88

使用 5 和 100 的配置,看起来活动计数和队列大小/剩余总和看起来正确,但我不清楚为什么队列中有超过 50 条记录,并且 taskCount 也大于限制 50。

我是否从执行者和队列中查看了错误的信息?

谢谢

更新: (不确定我是否应该打开另一个问题)

我尝试了spring-integration(分支SI3.0.x)的cafeDemo的xml版本,并使用了文档中提供的池,但使用了100毫秒速率并增加了容量:

<int:service-activator input-channel="hotDrinks" ref="barista" method="prepareHotDrink" output-channel="preparedDrinks">
    <int:poller task-executor="pool" fixed-rate="100"/>          
</int:service-activator>
<task:executor id="pool" pool-size="5" queue-capacity="200"/>

我跑了之后,20号左右投递后也出现拒绝异常:

 org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6c31732b[Running, pool size = 5, active threads = 5, queued tasks = 200, completed tasks = 0]]

在异常发生之前只有大约 32 个订单,所以我不确定为什么排队的任务 = 200 和完成的任务 = 0?

谢谢

【问题讨论】:

  • 您正在寻找正确的信息。然而,由于这些值在运行时变化非常频繁,所以它并不完美
  • 您使用的是哪个版本的 Spring?
  • 我正在使用 SI 3.0.3.RELEASE,它使用 Spring 3.2.8。

标签: spring-integration


【解决方案1】:

getTaskCount() 这个方法给出了自启动以来分配给执行者的总任务数。因此,它会随着时间的推移而增加。

根据 java 的文档,其他变量是近似数字。

  1. getCompletedTaskCount() 返回已完成执行的大致任务总数。
  2. public int getActiveCount() 返回正在积极执行任务的线程的大致数量。

理想情况下,getTaskCount()getCompletedTaskCount() 将随时间线性增加,因为它包括自代码执行开始以来分配的所有先前任务。但是,activeCount 应该小于 50,但作为近似数字,它有时会超过 50,几乎没有余量。

参考:- https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html

【讨论】:

  • 谢谢黑豹。我知道这些是近似数字,但没想到它会更高并最终被拒绝。我使用了队列容量 100 希望不会发生拒绝。我尝试将服务时间减少到 0.5 秒(更快的消费者),getTaskCount() 返回的值较低但仍然 > 55。
  • 我相信您的服务激活器会在一段时间后不断推送记录,因此 getTaskCount 正在增加。因为它是所有任务的计数,包括已完成的任务。如果您正在寻找 pendingTask,您需要 getTaskCount - getCompletedTask
  • 你知道什么会导致这种持续推动吗?我没有使用循环将消息发送回源。或者,它是否需要在某处明确标记消息“已完成”,以免再次推送?
猜你喜欢
  • 1970-01-01
  • 2012-06-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-11
  • 2014-11-09
相关资源
最近更新 更多