【发布时间】:2021-11-17 05:31:08
【问题描述】:
我有一个场景,我需要使用阻塞 API 向服务器发送 1M 消息。 API不接受批量请求,所以我必须一个一个发送1M消息。
我正在考虑使用多个线程来发送它们,而不是使用一个线程。
调用者必须等待所有 1M 消息发送完毕才能继续。
我的实现如下:
public class MySender {
private final MyPublisher myPublisher;
private final ExecutorService threadPool;
private final Map<String, List<CompletableFuture<Void>>> jobMap = Maps.newConcurrentMap();
public MySender (final MyPublisher myPublisher,
ExecutorService threadPool) {
this.myPublisher= myPublisher;
this.threadPool = threadPool;
}
public void send(final MyData event) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> doSubmit(event), threadPool);
List<CompletableFuture<Void>> futureList = jobMap.computeIfAbsent(event.getID(), entry -> new ArrayList<>());
futureList.add(future);
}
public void notifySendComplete(final String id) {
if(!jobMap.containsKey(id)) {
return;
}
jobMap.get(id).forEach(CompletableFuture::join);
jobMap.remove(id);
}
private void doSubmit(final MyData event) {
try {
....
myPublisher.send(event);
....
} catch(Exception e) {
// log error
}
}
}
客户端类可以这样简单地使用这个类:
myInputList.forEach(input -> {
MyData event = createData(input);
mySender.send(event);
})
mySender.notifySendComplete();
我认为这个实现会起作用,但问题很明显。它需要在map中持有1M CompletableFuture,不符合垃圾回收条件。
这是一个大问题吗?如果是这样,有没有更好的方法?
限制:
- 无法关闭线程池
- 我可以使用 CountDownLatch 实现它,但不允许在我的项目中使用它。
【问题讨论】:
标签: java multithreading