同意https://stackoverflow.com/users/3371051/pedro,CompletableFuture 是 Java 内置的,而 ListenableFuture 则依赖 Guava。将 ListenableFuture 转换为 CompletableFuture 很容易。
- 在 ListenableFuture 可侦听回调中创建 CompletableFuture、complete() 或 completeExceptionally()。
- 使用 thenCompose 将 CompletableFuture 与 ListenableFuture 链接起来
- 使用 allOf 替换 Futures.allAsList
就像我为 com.datastax 驱动程序核心 3.11.0 所做的以下测试更改
CompletableFuture initAsync() {
CompletableFuture ret = new CompletableFuture();
如果(工厂.isShutdown){
ret.completeExceptionally(
new ConnectionException(endPoint, "连接工厂已关闭"));
返回 ret;
}
协议版本协议版本 =
factory.protocolVersion == null
? ProtocolVersion.NEWEST_SUPPORTED
: factory.protocolVersion;
尝试 {
引导引导 = factory.newBootstrap();
ProtocolOptions protocolOptions = factory.configuration.getProtocolOptions();
引导程序处理程序(
新的初始化器(
这,
协议版本,
protocolOptions.getCompression().compressor(),
协议选项.getSSLOptions(),
factory.configuration.getPoolingOptions().getHeartbeatIntervalSeconds(),
factory.configuration.getNettyOptions(),
factory.configuration.getCodecRegistry(),
factory.configuration.getMetricsOptions().isEnabled()
? factory.manager.metrics
: 空值));
ChannelFuture 未来 = bootstrap.connect(endPoint.resolve());
writer.incrementAndGet();
未来.addListener(
新的 ChannelFutureListener() {
@覆盖
公共无效操作完成(ChannelFuture 未来)抛出异常 {
writer.decrementAndGet();
// 注意:future.channel()在某些错误情况下可以为null,所以我们需要提防
// 它在下面的其余代码中。
频道=未来.频道();
if (isClosed() && 频道 != null) {
渠道
。关闭()
.addListener(
新的 ChannelFutureListener() {
@覆盖
公共无效操作完成(ChannelFuture 未来)抛出异常 {
ret.completeExceptionally(
新的运输异常(
Connection.this.endPoint,
“初始化期间连接关闭。”));
}
});
} 别的 {
如果(频道!= null){
Connection.this.factory.allChannels.add(channel);
}
如果(!future.isSuccess()){
if (logger.isDebugEnabled())
记录器.调试(
字符串格式(
"%s 连接到 %s%s 时出错",
连接.this,
Connection.this.endPoint,
extractMessage(future.cause())));
ret.completeExceptionally(
新的运输异常(
Connection.this.endPoint, "无法连接", future.cause()));
} 别的 {
断言通道!= null;
记录器.调试(
"{} 连接已建立,正在初始化传输", Connection.this);
channel.closeFuture().addListener(new ChannelCloseListener());
ret.complete(null);
}
}
}
});
} 捕捉(运行时异常 e){
closeAsync().force();
扔 e;
}
执行器 initExecutor =
factory.manager.configuration.getPoolingOptions().getInitializationExecutor();
返回 ret.thenCompose(
无效-> {
CompletableFuture ret2 = new CompletableFuture();
ProtocolOptions protocolOptions = factory.configuration.getProtocolOptions();
未来启动ResponseFuture =
写(
新的请求。启动(
协议选项.getCompression(),协议选项.isNoCompact()));
ListenableFuture channelReadyFuture =
GuavaCompatibility.INSTANCE.transformAsync(
启动响应未来,
onStartupResponse(protocolVersion, initExecutor),
初始化执行器);
GuavaCompatibility.INSTANCE.addCallback(
通道就绪未来,
新的未来回调(){
@覆盖
公共无效onSuccess(无效结果){
ret2.complete(null);
}
@覆盖
公共无效onFailure(Throwable t){
// 确保连接正确关闭。
if (t instanceof ClusterNameMismatchException
|| t instanceof UnsupportedProtocolVersionException) {
// 只是传播
closeAsync().force();
ret2.completeExceptionally(t);
} 别的 {
// Defunct 以确保将发出错误信号(将主机标记为关闭)
可投掷 e =
(t instanceof ConnectionException
|| t instanceof DriverException
|| t instanceof InterruptedException
|| t instanceof 错误)
?吨
:新的连接异常(
Connection.this.endPoint,
字符串格式(
"传输初始化期间出现意外错误 (%s)", t),
t);
ret2.completeExceptionally(defunct(e));
}
// 如果调用者取消返回的未来,确保连接关闭。
如果(!isClosed()){
closeAsync().force();
}
}
});
返回 ret2;
});
}
私有 CompletableFuture createPools(集合主机){
列表> 期货 = Lists.newArrayListWithCapacity(hosts.size());
对于(主机主机:主机)
if (host.state != Host.State.DOWN) futures.add(maybeAddPool(host, null));
返回 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
}