【问题标题】:Java concurrent tasks implementationJava并发任务实现
【发布时间】:2014-04-14 21:44:10
【问题描述】:

我的问题是:我最多可以运行三个并发任务。这些任务可以同时处理 1 到 100 个作业。我有很多线程不断提交单个作业,我想尽快回复它们。在一项任务中处理 100 个作业所需的时间与在一项任务中处理 1 个作业所需的时间相同。作业以随机间隔出现。提交作业的线程需要阻塞,直到作业完成或超时。快速响应提交作业的线程是这里的驱动程序。

所以我目前的逻辑是这样的:如果有

我只是不太确定在 Java 中设置它的最佳方法。我创建了一个简单的信号量版本,它工作正常,但没有利用同时提交作业的能力。我应该如何最好地扩展它以完全满足我的要求? (不需要使用信号量,这正是我目前所拥有的)。

private static final Semaphore semaphore = new Semaphore(3);

public static Response doJob(Job job) throws Exception
{ 
    final boolean tryAcquire = this.semaphore.tryAcquire(this.maxWaitTime, TimeUnit.MILLISECONDS);

    if (tryAcquire)
    {
        try
        {
            return doJobInNewTask(job); // we'd actually like to do all the jobs which are queued up waiting for the semaphore (if there are any)
        }
        finally
        {
            this.semaphore.release()
        }       
    }
}

【问题讨论】:

  • 研究java.util.concurrent中的类,尤其是执行器和线程池。您可能不必重新发明轮子。
  • @JimGarrison 谢谢,我会的,到目前为止,我在该领域的研究还没有提出任何明显的答案。你能指出我更具体的吗?棘手的部分是在某些情况下而不是在其他情况下一起处理排队的任务。默认情况下,线程池和执行程序似乎没有给我那种级别的控制,但我可能看不正确。

标签: java asynchronous concurrency java.util.concurrent


【解决方案1】:

您可以使用具有固定大小线程池的Executor 服务:

class ExecutorExample {
    private final static ExecutorService executorService;
    private final static long maxWaitTime = 5000;

    static {
        executorService = Executors.newFixedThreadPool(3);
    }

    private static class Response {}
    private static class Job {}

    public static Response doJob(final Job job) throws Exception {
        final Future<Response> future = executorService.submit(
            new Callable<Response>() {
                @Override
                public Response call() throws Exception {
                    return doJobInNewTask(job);
                }
            }
        );
        try {
            // get() blocks until the task finishes.
            return future.get(maxWaitTime, TimeUnit.MILLISECONDS);
        }
        catch (final TimeoutException e) {
            // we timed out, so *try* to cancel the task (may be too late)
            future.cancel(/*mayInterruptIfRunning:*/false);
            throw e;
        }
    }

    private static Response doJobInNewTask(final Job job) {
        try { Thread.sleep(maxWaitTime / 2); }
        catch (final InterruptedException ignored) {}
        return new Response();
    }

    public static void main(final String[] args) {
        final List<Thread> threads = new ArrayList<>();

        for (int i = 0; i < 10; i++) {
            final Thread t = new Thread() {
                @Override
                public void run() {
                    try {
                        System.out.println(doJob(new Job()));
                    }
                    catch (final Exception e) {
                        System.out.println(e.getClass().getSimpleName());
                    }
                }
            };
            threads.add(t);
            t.start();
        }

        for (final Thread thread : threads) {
            try { thread.join(); }
            catch (final InterruptedException ignored) {}
        }

        System.out.println("Done!");
    }
}

输出:

ExecutorExample$Response@1fe4169
ExecutorExample$Response@9fdee
ExecutorExample$Response@15b123b
ExecutorExample$Response@bbfa5c
ExecutorExample$Response@10d95cd
ExecutorExample$Response@131de9b
TimeoutException
TimeoutException
TimeoutException
TimeoutException
Done!

这里的一个潜在问题是取消。由于调度超出了您的控制,因此可能会在您等待超时后启动任务,但 before cancel() 有机会做它的事情。结果不会被传播,但如果任务有有意义的副作用,这种方法可能会产生问题。

【讨论】:

  • 感谢@MikeStrobel,据我了解,您的代码使用 ThreadPool/Future 模式实现了超时要求和 3 个同时任务的限制。但它没有利用在一项任务中一起处理作业的能力。假设你有 3 个正在运行的任务,5 个排队,当一个任务槽空闲时,它可以一次性完成所有 5 个排队的工作。
  • 啊,我错过了你说的“在一项任务中处理 100 个作业所需的时间与在一项任务中处理 1 个作业所需的时间相同。”。您可以修改我的示例,以便计划的 Callable 从队列中获取尽可能多的作业,而不是捕获和运行单个任务。基本上,不是调度单个作业的处理,而是调度一种“队列/批处理处理器”(限制它处理的项目数量)。
  • 我尝试了您的方法,但最终发现调整我早期的代码以满足要求更容易。但是您的方法可能会更好...如果您有任何进一步的建议,我将我的新代码发布在 stackexchange 代码审查网站上:codereview.stackexchange.com/questions/47352/…
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-05-30
相关资源
最近更新 更多