【发布时间】:2021-12-14 12:12:11
【问题描述】:
我使用 Java 16 通过 HTTP 向 API 发出请求。为了加快整体速度,我已将其加载到自定义 ForkJoinPool 上。我在下面编译了一个复制示例。
自从迁移到 Java 17(openjdk build 17.0.1+12-39)后,这会引发 RejectedExecutionException:
Caused by: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
at java.base/java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:1819)
at java.base/java.util.concurrent.ForkJoinPool.compensatedBlock(ForkJoinPool.java:3446)
at java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3432)
at java.base/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1898)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2072)
at java.net.http/jdk.internal.net.http.HttpClientImpl.send(HttpClientImpl.java:553)
at java.net.http/jdk.internal.net.http.HttpClientFacade.send(HttpClientFacade.java:119)
at Test.lambda$retrieveMany$1(Test.java:30)
为什么会这样? ForkJoinPool 是否发生了一些我不知道的变化?
代码
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.toList;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final List<String> urls = List.of("https://stackoverflow.com", "https://stackoverflow.com", "https://stackoverflow.com");
// This succeeds on JDK 16, 17
retrieveMany(urls, 4);
// This fails on JDK 17, but succeeds on 16
retrieveMany(urls, 3);
}
private static List<String> retrieveMany(List<String> urls, int threads) throws InterruptedException, ExecutionException {
return new ForkJoinPool(threads, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (t, e) -> {}, true, 0, threads, 1, null, 1, MINUTES)
.submit(() -> urls.parallelStream()
.map(url -> {
try {
return HttpClient.newBuilder().build().send(HttpRequest.newBuilder(URI.create(url)).build(), BodyHandlers.ofString()).body();
} catch (IOException | InterruptedException aE) { }
return null;
})
.collect(toList()))
.get();
}
}
【问题讨论】:
-
这种并行流黑客从来都不是预期的功能。您可以通过向 ExecutorService 提交任务来轻松并行化网络调用。
-
@MikeFHay 是也不是。如果开发人员决定不假设当调用者恰好在这样的工作线程中时并行流将使用当前工作线程的池,我们可以认为这个特性是一种令人惊讶的行为,它可能会以中描述的方式破坏程序这个问题。如果它不是开发人员可以使用的预期功能,那么它仍然是开发人员必须处理的错误来源。 JDK 开发人员从 JDK 8 到 JDK 17 有足够的时间来消除这种令人惊讶的行为,但没有。故意的。
标签: java forkjoinpool java-17 openjdk-17