【问题标题】:Time bound tasks using ExectutorService使用 ExecutorService 的限时任务
【发布时间】:2015-07-20 11:07:03
【问题描述】:

我有长时间运行的任务提交给ExecutorService。该任务可能会运行相当长的一段时间。同时,新的任务被提交到内部阻塞队列。

提交的任务完成后,会发回通知以从队列中释放任务以供执行。但是,有时由于编程错误或网络问题,通知不会被触发。在这种情况下,我的任务队列可能会变得很大,并且我可能会遇到任务可能永远位于队列中的情况。

为了克服这个问题,我正在考虑编写一个线程,它会定期检查任务在队列中空闲的时间。如果任务在队列中等待了 15 分钟,我会假设之前提交的任务遇到了错误,因此没有返回。然后我将从队列中逐出任务并允许它执行。

是否有任何现有的机制来处理这个问题,否则我将不得不编写这个自定义逻辑?

注意:
我不喜欢ScheduledExecutor 服务的原因是因为并非所有任务都要定期执行。只有失败的场景应该在一定的延迟之后执行。

编辑 架构的简要概述 我正在设计的解决方案应该支持许多并发静态文件下载。通常可能有数千个下载请求。下载请求是从基于 UI 的应用程序触发的。这样我就知道什么时候会触发请求。利用这种方法,我打算限制下载请求。

当用户创建一个请求比如 300 个下载请求时会发生什么?

  1. 应用程序工作线程创建 300 个任务供下载
  2. 提交了 100 个任务。我定义的 MAX HTTP 线程池大小为 100。这意味着我最多可以支持 100 个同步并行下载(servlet 2.5) 该任务反过来要求远程 HTTP 客户端执行 HTTP get .请注意,HTTP 线程尚未发挥作用
  3. 剩余 200 个请求已排队。
  4. HTTP 客户端执行 HTTP 获取。 HTTP 线程现在以阻塞方式流式传输响应。
  5. 收到 200 OK 后,我会创建一个通知,通知其中一个客户端已完成下载。
  6. 现在,节流阀将从之前排队的 200 个请求中释放/提交其中一项任务。

在我能够接收响应(HTTP 200/HTTP 500 等)的情况下,节流机制就像一个魅力。但是,例如,在 servlet 本身抛出异常的情况下,我没有收到任何响应以指示 HTTP 工作线程是空闲的。因此,该任务有可能永远保留在队列中。为了克服这个问题,我正在考虑一种基于计时器的方法,如果 15 分钟内没有 HTTP 响应,则提交下一个队列任务以执行。一种回退机制,以避免重大内存泄漏。

【问题讨论】:

  • 请显示一些代码。不需要通知已完成的执行。 ExecutorService 应该自动选择下一个任务。你为什么这么做?提供有界队列,您还可以编写自己的“溢出处理程序”。自从我这样做以来已经有一段时间了,但是如果您有兴趣,我会去找一些例子。
  • @Fildor 在我的情况下这是必要的。这些任务触发客户端进行 HTTP GET,这反过来又消耗来自 http 线程池的工作线程,依此类推。设计比它看起来的复杂,需要相当多的解释才能让这个清晰。但假设我需要通知。
  • 然后你应该将所有任务代码包装在 try catch 中并在 finally 块中进行通知。 (当然还要适当地处理异常。)
  • @Fildor 希望就这么简单。有些问题不是 java Exceptions 。例如,假设响应是 NACK,因为没有提交任务执行所需的所有参数。我确实在处理我迄今为止遇到的所有异常和 NACK。但我怀疑还有更多可能导致内存泄漏的情况。只是为了确保任务在某个时间点得到执行。
  • 您能否再给我们一些关于您的架构的概述?如果架构真的这么复杂,真的很难给出复杂的建议。

标签: java multithreading executorservice threadpoolexecutor


【解决方案1】:

通过超时调用get() 并在捕获TimeoutException 时限制任务允许的最大时间量,执行清理。

这是一个在等待时不会阻塞主线程的实现:

ExecutorService executor = Executors.newCachedThreadPool();
ExecutorService monitor = Executors.newFixedThreadPool(99);

public void submit(Runnable task) {
    Runnable monitorTask = new Runnable() {
        @Override
        public void run() {
            Future<?> future = executor.submit(task);
            try {
                future.get(15, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                // retry waiting. iterative approach not shown here
            } catch (ExecutionException e) {
                // your task exploded
            } catch (TimeoutException e) {
                // your task timed out - perform clean up, eg
                future.cancel(true);
            }               
        }
    };
    monitor.submit(monitorTask);
}

单独的线程池用于防止没有可监视的线程,但有一个线程可用于执行导致未监视任务的任务。

【讨论】:

  • 我非常喜欢这个解决方案。但是,等待获取会使我的应用程序成为单线程的吗?我有 100 个并行线程,迫不及待地等待返回 1 get ,除非我没有正确解释它。
  • future.get(timeout, timeUnit)如果超过超时不会取消任务:超时仅意味着调用者停止等待。如果 get(...) 调用抛出 TimeoutException,您还应该调用 future.cancel(true)
  • @james 我最初打算这只是最基本的代码,但您的评论是很好的建议,并且已添加到代码中。
  • @Bohemian 这是一个绝妙的解决方案。从来没有想过可以以这种方式使用线程池 API。我对这种方法的唯一担心是过度使用线程进行监控。 100 个额外的线程来监控未来。我不优雅的解决方案是记录插入任务的时间,然后定期(15 分钟)运行一个线程来检查哪些任务已超过 15 分钟。如果有,则驱逐较旧的任务并允许执行较新的任务。这是有道理的还是我对这种方法很天真。
  • @VishalP 虽然线程确实会消耗资源,但等待的线程(如这些监视器)会“停在”一边,不会造成太大问题。
猜你喜欢
  • 1970-01-01
  • 2019-04-11
  • 2012-01-31
  • 2017-04-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-02-15
  • 1970-01-01
相关资源
最近更新 更多