【问题标题】:Consumer producer pattern where producer waits for all consumers to complete before retriggering itself?消费者生产者模式,生产者在重新触发之前等待所有消费者完成?
【发布时间】:2014-10-16 19:22:15
【问题描述】:

在这种情况下(任何版本)我试图找出 Java 中的最佳方法来实现单个生产者多个消费者,其中我使用 ExecutorService(优先选择,不需要)生产者需要“永远”运行但每次运行时,它都需要等待所有内容都被完全处理,例如所有消费者线程都已终止,队列为空,并且没有任何项目要生产。生产者也应该只以固定的时间间隔轮询其数据源。

举个例子:我希望我的生产者每 30 分钟轮询一次它的数据源以获取记录并将它们提交到队列中。如果消费者的处理时间超过 30 分钟,我希望生产者等到所有项目都处理完毕后再再次轮询其数据源(因为它会在 30 分钟过去后立即这样做)。

不找人为我编写代码。一些基本的提示/指导将不胜感激。

这是我尝试使用的简化示例实现。我已经采取了所有可怕的尝试来解决这个问题。请注意,用于构造 ThreadPoolExecutor 的硬编码参数最终将是动态的。

import java.util.concurrent.*;
public class ItemProcessorService {
    public static void main(String args[]) throws InterruptedException {
        RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        int corePoolSize = 5,
                maxPoolSize = 10,
                keepAlive = 10,
                queueCapacity = 1;
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAlive, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueCapacity), threadFactory, rejectionHandler);

        for (int i = 0; i < 10; i++) {
            executor.execute(new ItemConsumer());
        }
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Executor finished");
    }
}

class ItemConsumer implements Runnable {
    @Override
    public void run() {
        processItem();
    }

    private void processItem() {
        try {
            Thread.sleep(3000);
            System.out.println(Thread.currentThread().getName() + " - processed item");
        } catch (InterruptedException e) {
            //e.printStackTrace();
        }
    }
}

class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString() + " - rejected");
    }
}

【问题讨论】:

  • 您应该使用executor.awaitTermination(...),而不是在循环中忙着等待。
  • 谢谢。这让我写起来很不舒服。我在一个示例中找到了它,并没有完全探索 executors 功能。我认为有一个体面的等待。

标签: java multithreading concurrency consumer producer


【解决方案1】:

您可以通过为生产者的每次运行创建一个ExecutorService 来很好地解决这个问题。生产者创建它,关闭它,并等待它的终止。

要安排生产者,请使用ScheduledExecutorService.scheduleWithFixedDelay(...)ScheduledExecutorService.scheduleAtFixedRate(...),具体取决于您的需要:

  • scheduleWithFixedDelay(...) 将在两次运行之间保持指定的时间,因此无论运行持续多长时间,下一次都会在指定的时间后进行:

    ...|XXXXXX|.............|XXXXX|.............|XXXXXXXXX|.............|X <--- fix ---> <--- fix ---> <--- fix --->

  • scheduleAtFixedRate(...)试图保持调度率,所以如果生产者需要更长的时间,两次运行之间的时间会减少,但两次运行永远不会重叠:

    ...|XXXXXXXX|...|XXXXX|......|XXXXXXXXXXXXXXX||XXX|....|XX <--- fix ---><--- fix ---><--- fix ---><--- fix ---><-

生产者外观的简单示例:

public class Producer implements Runnable {
    public void run() {
        ExecutorService executor = ... // create new executor

        // queue items
        for (Object item : itemSource) {
            executor.schedule(new Consumer(item));
        }

        // shutdown executor
        executor.shutdown();
        executor.awaitTermination(2, TimeUnit.HOURS);
    }
}

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

scheduler.scheduleWithFixedDelay(new Producer(), 30, 30, TimeUnit.MINUTES);
// or
scheduler.scheduleAtFixedRate(new Producer(), 30, 30, TimeUnit.MINUTES);

当然,您必须进行适当的异常处理,因为生产者运行中的每个未捕获的异常都会阻止调度程序重新调度它。

请注意,创建 executor 可能会很昂贵,因此这种方法仅适用于消耗项目比创建 executor 更昂贵的情况(在您的情况下 - 似乎是这种情况)。

【讨论】:

  • 这似乎是最优雅的解决方案。我目前正试图弄清楚为什么我的线程池(x)值(pool-x-thread-n)不断增加。即使我正在关闭,是否保留了池的句柄?我应该编辑我的问题以显示我的代码的当前状态吗?
  • 默认线程工厂使用内部静态计数器poolNumber,每个实例都会增加。由于每个执行器都有自己的实例,因此每次创建执行器时,数量都会增加。没有异常。
  • 所以 Thread-Pool-99999-thread-1 等不应该关心我吗?它不代表仍然存在的多个池,每次我实例化一个执行器时只代表一些池?
  • 看Executors$DefaultThreadFactory的源码!它只是一个计数器,每次创建执行程序时都会递增。在调试模式下,您可以检查线程数。
【解决方案2】:

使用结合 CountDownLatch 的自调度生产者(另请参阅 here)。我知道你不需要这些代码,但我认为在这种情况下,有一些代码来试验并没有什么坏处:

import java.util.*;
import java.util.concurrent.*;

public class WaitForConsumers {

public static void main(String[] args) {

    try {
        new WaitForConsumers().demo();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

public static final int NUMBER_OF_SLEEP_TASKS = 100;

public void demo() throws Exception {

    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    Producer p = new Producer(scheduler, 1000L);
    p.start();
    try {
        Thread.sleep(3000);
        p.stop();
        System.out.println("Stopped.");
    } finally {
        scheduler.shutdownNow();
    }
}

public static List<Long> produce() {
    List<Long> items = new ArrayList<Long>();
    for (int i = 0; i < NUMBER_OF_SLEEP_TASKS; i++) items.add(i * 1L);
    return items;
}

public static void consume(List<Runnable> tasks, CountDownLatch consumersRunning) throws Exception {

    ExecutorService executor = Executors.newCachedThreadPool();
    for (Runnable r : tasks) executor.execute(r);
    try {
        consumersRunning.await();
    } finally {
        executor.shutdownNow();
    }
}

class Producer implements Runnable {

    ScheduledExecutorService scheduler;
    long frequencyMs;
    volatile boolean stop;
    ScheduledFuture<?> producerTask;

    Producer(ScheduledExecutorService scheduler, long frequencyMs) {
        this.scheduler = scheduler;
        this.frequencyMs = frequencyMs;
    }

    void start() {
        scheduler.execute(this);
    }

    void stop() {

        System.out.println("Stopping producer.");
        stop = true;
        if (producerTask != null) {
            producerTask.cancel(true);
        }
    }

    @Override public void run() {

        long startTime = System.currentTimeMillis();
        List<Long> items = produce();
        CountDownLatch consumersRunning = new CountDownLatch(items.size());
        List<Runnable> tasks = wrap(items, consumersRunning);
        try {
            System.out.println("Consuming " + tasks.size() + " tasks.");
            consume(tasks, consumersRunning);
            System.out.println("Consumed tasks.");
        } catch (Exception e) {
            e.printStackTrace();
            stop = true;
        } finally {
            if (stop) {
                System.out.println("Producer stopping.");
            } else {
                long waitTime = frequencyMs - (System.currentTimeMillis() - startTime);
                if (waitTime < 1L) {
                    scheduler.execute(this);
                } else {
                    System.out.println("Next run in " + waitTime + " ms.");
                    producerTask = scheduler.schedule(this, waitTime, TimeUnit.MILLISECONDS);
                }
            }
        }
    }

    List<Runnable> wrap(List<Long> items, final CountDownLatch consumersRunning) {

        List<Runnable> tasks = new ArrayList<Runnable>();
        for (Long l : items) {
            tasks.add(new SleepTask(l, consumersRunning));
        }
        return tasks;
    }
} // class Producer

class SleepTask implements Runnable {

    long sleepTime;
    CountDownLatch consumersRunning;

    public SleepTask(long sleepTime, CountDownLatch consumersRunning) {
        this.sleepTime = sleepTime;
        this.consumersRunning = consumersRunning;
    }

    @Override public void run() {

        try {
            Thread.sleep(sleepTime);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumersRunning.countDown();
        }
    }
} // class SleepTask
}

【讨论】:

    【解决方案3】:

    为未来的任务寻找线程池执行器。它会让你的生产者在所有线程完成之前等待。下面是例子:

     for (int job = 0; job < yourList.size(); job++) {
            Future<String[]> future = Service.getCompletionService().take();
    
            if (future.isDone()) {
            }
        }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-06-14
      • 2021-02-16
      • 2015-10-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多