【问题标题】:Using CompletableFuture and thread pool for a large number of tasks使用 CompletableFuture 和线程池处理大量任务
【发布时间】:2021-11-17 05:31:08
【问题描述】:

我有一个场景,我需要使用阻塞 API 向服务器发送 1M 消息。 API不接受批量请求,所以我必须一个一个发送1M消息。

我正在考虑使用多个线程来发送它们,而不是使用一个线程。

调用者必须等待所有 1M 消息发送完毕才能继续。

我的实现如下:

public class MySender {
    private final MyPublisher myPublisher;
    private final ExecutorService threadPool;
    private final Map<String, List<CompletableFuture<Void>>> jobMap = Maps.newConcurrentMap();

    public MySender (final MyPublisher myPublisher,
                     ExecutorService threadPool) {
        this.myPublisher= myPublisher;
        this.threadPool = threadPool;
    }

    public void send(final MyData event) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> doSubmit(event), threadPool);
        List<CompletableFuture<Void>> futureList = jobMap.computeIfAbsent(event.getID(), entry -> new ArrayList<>());
        futureList.add(future);
    }

    public void notifySendComplete(final String id) {
        if(!jobMap.containsKey(id)) {
            return;
        }

        jobMap.get(id).forEach(CompletableFuture::join);
        jobMap.remove(id);
    }

    private void doSubmit(final MyData event) {
         try {
             ....
             myPublisher.send(event);
             ....
         } catch(Exception e) {
             // log error
         }
    }
}

客户端类可以这样简单地使用这个类:

myInputList.forEach(input -> {
    MyData event = createData(input);
    mySender.send(event);
})

mySender.notifySendComplete();

我认为这个实现会起作用,但问题很明显。它需要在map中持有1M CompletableFuture,不符合垃圾回收条件。

这是一个大问题吗?如果是这样,有没有更好的方法?

限制:

  1. 无法关闭线程池
  2. 我可以使用 CountDownLatch 实现它,但不允许在我的项目中使用它。

【问题讨论】:

    标签: java multithreading


    【解决方案1】:

    这是个大问题吗?

    这取决于...如果内存不是问题,不。如果您想提高内存使用效率,那么可以。

    如果有,有没有更好的方法?

    忘掉地图吧,你没有用它做任何有价值的事情......相反,使用一个整数变量(我们称之为pending)来跟踪有多少消息排队等待发送......稍后,你的notifySendComplete 方法将检查是否有待处理的消息,如果有,它必须休眠/等待,直到没有更多消息...

    总结一下,

    send 应该将变量加一...

    doSubmit 应该将变量减一 ...

    notifySendComplete 应该检查变量的值,只有当变量的值为零时才返回...等待,你应该让你的线程休眠,然后再次检查值,直到变量变为零。

    为了保持对变量的控制(在线程之间同步其值),有多种 JAVA 模式:

    • 您可以使用 AtomicInteger ...
    • 您可以使用synchronized 方法...

    顺便说一句,对我来说,内存问题仍然存在!你可能会问“为什么”?因为,您在发送它们之前加载了 1M 条消息......方法应该是:只要有空闲线程或工作人员,我就会阅读和发送消息......在没有更多工作人员的那一刻,我应该避免阅读更多消息等等……

    【讨论】:

      【解决方案2】:

      您可能不仅希望解决方案多线程,还希望使用 Bulkhead 来限制等待请求的数量,因为如果目前没有限制,如果您点击,服务的所有者将为您设置配额他们有 1M 的请求。 看看 Histryx Bulkhead,它为你管理线程池,你可以调整最大并发线程 What is Bulkhead Pattern used by Hystrix? 另外,正如其他人提到的,您从内存中的 1M 记录开始,这些记录来自哪里?如果来自数据库,您可能需要考虑使用 R2DB 驱动程序来使用可以在加载消息时处理消息的反应流,而不是将所有消息加载到内存然后处理。 见https://www.baeldung.com/java-reactive-systems

      【讨论】:

        【解决方案3】:

        您可以使用并行流将事件转换为您需要发送的数据,只要工作人员准备好就可以发送。

        myInputList.stream().parallel().forEach(mySender::doSubmit)
        

        如果您可能同时在程序的其他地方使用并行流(或常见的ForkJoinPool),则可以使用您可以空闲的线程创建一个专用于该进程的线程池。

        int extraThreads = ...;
        ForkJoinPool pool = new ForkJoinPool(extraThreads);
        ForkJoinTask<?> ticket =
            pool.submit(() -> myInputList.stream().parallel().forEach(mySender::doSubmit));
        ticket.get();
        pool.shutdown();
        

        【讨论】:

          猜你喜欢
          • 2022-01-18
          • 2012-02-12
          • 1970-01-01
          • 2015-03-27
          • 1970-01-01
          • 2011-11-03
          • 1970-01-01
          • 1970-01-01
          • 2015-06-26
          相关资源
          最近更新 更多