【问题标题】:Is Guava's ListenableFuture still useful/ up to date (after Java 8)?Guava 的 ListenableFuture 是否仍然有用/最新(在 Java 8 之后)?
【发布时间】:2018-03-11 13:56:06
【问题描述】:

从 Java 8 开始,Guava 的一些特性已经过时(例如字符串连接器、可选值、前置条件检查、Futures 等)。

ListenableFuture documentation(目前自一年前以来没有更新)他们说:

“我们强烈建议您在所有代码中始终使用 ListenableFuture 而不是 Future,因为...”

我在一个 old 项目中使用番石榴(和 cassandra),我的问题是:Java 8 标准库是否已经具有使 ListenableFuture 过时的东西,或者它仍然是最好的 @987654330 @ 选择? 谢谢。

【问题讨论】:

  • 听起来你在找CompletableFuture
  • 我们更喜欢ListenableFuture API(带有FluentFutureFutures 静态实用程序)而不是CompletableFuture,因为我们认为我们已经找到了一个更简单但更强大的API,但主要是你我只想选择一个并坚持下去。我会尝试在那里扩展我们的文档——感谢您指出这个漏洞。
  • 感谢您的回答。我会坚持ListenableFuture。我最近几天一直在阅读标准CompletableFuture。我在这里留下一些链接,为可能感兴趣的其他人解释它:New Concurrency Utilities in Java 8CompletableFuture explained- in germanJava9 CompletableFuture improvements
  • 我个人认为没有理由在 java8 之后继续使用 ListenableFuture,除了 cassandra 驱动程序使用 ListenableFuture 的事实。虽然编写一个将 ListenableFuture 转换为 CompletableFuture 的静态函数很简单。最新版驱动使用java8期货,但还是alphadatastax.com/dev/blog/introducing-java-driver-4

标签: java concurrency guava


【解决方案1】:

同意https://stackoverflow.com/users/3371051/pedro,CompletableFuture 是 Java 内置的,而 ListenableFuture 则依赖 Guava。将 ListenableFuture 转换为 CompletableFuture 很容易。

  1. 在 ListenableFuture 可侦听回调中创建 CompletableFuture、complete() 或 completeExceptionally()。
  2. 使用 thenCompose 将 CompletableFuture 与 ListenableFuture 链接起来
  3. 使用 allOf 替换 Futures.allAsList

就像我为 com.datastax 驱动程序核心 3.11.0 所做的以下测试更改

CompletableFuture initAsync() { CompletableFuture ret = new CompletableFuture(); 如果(工厂.isShutdown){ ret.completeExceptionally( new ConnectionException(endPoint, "连接工厂已关闭")); 返回 ret; } 协议版本协议版本 = factory.protocolVersion == null ? ProtocolVersion.NEWEST_SUPPORTED : factory.protocolVersion; 尝试 { 引导引导 = factory.newBootstrap(); ProtocolOptions protocolOptions = factory.configuration.getProtocolOptions(); 引导程序处理程序( 新的初始化器( 这, 协议版本, protocolOptions.getCompression().compressor(), 协议选项.getSSLOptions(), factory.configuration.getPoolingOptions().getHeartbeatIntervalSeconds(), factory.configuration.getNettyOptions(), factory.configuration.getCodecRegistry(), factory.configuration.getMetricsOptions().isEnabled() ? factory.manager.metrics : 空值)); ChannelFuture 未来 = bootstrap.connect(endPoint.resolve()); writer.incrementAndGet(); 未来.addListener( 新的 ChannelFutureListener() { @覆盖 公共无效操作完成(ChannelFuture 未来)抛出异常 { writer.decrementAndGet(); // 注意:future.channel()在某些错误情况下可以为null,所以我们需要提防 // 它在下面的其余代码中。 频道=未来.频道(); if (isClosed() && 频道 != null) { 渠道 。关闭() .addListener( 新的 ChannelFutureListener() { @覆盖 公共无效操作完成(ChannelFuture 未来)抛出异常 { ret.completeExceptionally( 新的运输异常( Connection.this.endPoint, “初始化期间连接关闭。”)); } }); } 别的 { 如果(频道!= null){ Connection.this.factory.allChannels.add(channel); } 如果(!future.isSuccess()){ if (logger.isDebugEnabled()) 记录器.调试( 字符串格式( "%s 连接到 %s%s 时出错", 连接.this, Connection.this.endPoint, extractMessage(future.cause()))); ret.completeExceptionally( 新的运输异常( Connection.this.endPoint, "无法连接", future.cause())); } 别的 { 断言通道!= null; 记录器.调试( "{} 连接已建立,正在初始化传输", Connection.this); channel.closeFuture().addListener(new ChannelCloseListener()); ret.complete(null); } } } }); } 捕捉(运行时异常 e){ closeAsync().force(); 扔 e; } 执行器 initExecutor = factory.manager.configuration.getPoolingOptions().getInitializationExecutor(); 返回 ret.thenCompose( 无效-> { CompletableFuture ret2 = new CompletableFuture(); ProtocolOptions protocolOptions = factory.configuration.getProtocolOptions(); 未来启动ResponseFuture = 写( 新的请求。启动( 协议选项.getCompression(),协议选项.isNoCompact())); ListenableFuture channelReadyFuture = GuavaCompatibility.INSTANCE.transformAsync( 启动响应未来, onStartupResponse(protocolVersion, initExecutor), 初始化执行器); GuavaCompatibility.INSTANCE.addCallback( 通道就绪未来, 新的未来回调(){ @覆盖 公共无效onSuccess(无效结果){ ret2.complete(null); } @覆盖 公共无效onFailure(Throwable t){ // 确保连接正确关闭。 if (t instanceof ClusterNameMismatchException || t instanceof UnsupportedProtocolVersionException) { // 只是传播 closeAsync().force(); ret2.completeExceptionally(t); } 别的 { // Defunct 以确保将发出错误信号(将主机标记为关闭) 可投掷 e = (t instanceof ConnectionException || t instanceof DriverException || t instanceof InterruptedException || t instanceof 错误) ?吨 :新的连接异常( Connection.this.endPoint, 字符串格式( "传输初始化期间出现意外错误 (%s)", t), t); ret2.completeExceptionally(defunct(e)); } // 如果调用者取消返回的未来,确保连接关闭。 如果(!isClosed()){ closeAsync().force(); } } }); 返回 ret2; }); } 私有 CompletableFuture createPools(集合主机){ 列表> 期货 = Lists.newArrayListWithCapacity(hosts.size()); 对于(主机主机:主机) if (host.state != Host.State.DOWN) futures.add(maybeAddPool(host, null)); 返回 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); }

【讨论】:

    猜你喜欢
    • 2018-10-14
    • 1970-01-01
    • 2023-03-05
    • 1970-01-01
    • 2010-12-10
    • 2020-12-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多