【问题标题】:Terminal calls to Stream never execute对 Stream 的终端调用永远不会执行
【发布时间】:2015-03-10 22:16:10
【问题描述】:

我很难使用 Spring Reactor Stream API(类似于 rxjava)在我的服务中构造一个响应对象,该对象包装由两个下游服务提供的响应。

以下是我的 Channel Consumer 上的 accept() 方法。为了保护无辜者,有些名字已经改了..

@覆盖 公共无效接受(最终 NetChannel 频道){ log.info("消耗 FullHttpRequest 的 NetChannel"); 尝试 { // 我们的初始 Stream 是包含我们的 XML 文档的单个 HTTP 请求。 流请求流 = channel.in(); requestStream.filter((x) -> {return true;}) .dispatchOn(调度员) .map(new FooRequestFunction()) // 1) .flatMap(new BarRequestStreamFunction()) // 2) .flatMap(new DownstreamRequestZipFunction()) // 3) .toList() // 4) .onComplete(new ResponsesConsumer(channel)); // 5) } 捕捉(异常 e){ log.error("Channel处理过程中抛出异常", e); } }

因此,FooRequest 包装了许多 BarRequests,每个 BarRequests 都有一个关联的分类请求和一个关联的验证请求。我们想要 1) 转换为 FooRequest,2) 将 FooRequest 转换为一系列 BarRequests,3) 为每个 BarRequest 运行两个下游请求,4) 将我们所有的 BarResponse 对象聚合到整体响应,5) 将响应发送回客户端。

我遇到问题的地方是toList() 方法,它似乎永远不会执行。每次我尝试涉及Promise 的事情时,它似乎总是会崩溃,这也不例外。

FooRequestFunctionBarRequestStreamFunction 相当简单,而且似乎运行良好。它们的方法签名是:

// FooRequestFunction
public FooRequest apply(final FullHttpRequest request);

还有:

// BarRequestStreamFunction
public Stream<BarRequest> apply(FooRequest dsoRequests);

DownstreamRequestZipFunction 看起来像这样:

@覆盖 公共流应用(BarRequest t){ 流分类Res = 流 .just(t) .flatMap(new ClassifyDownstreamRequestFunction()); 流 validateRes = 流 .just(t) .flatMap(new ValidateDownstreamRequestFunction()); return Streams.zip(classifyRes, validateRes, (tuple) -> { BarResponse 响应 = 新 BarResponse(); response.setClassifyRes(tuple.getT1()); response.setValidateRes(tuple.getT2()); 返回响应; }); }

这似乎工作正常,只要两个下游请求函数都返回结果。

最后,链式调用结束时的 Consumer 有这个签名:

// ResponsesConsumer
public void accept(Promise<List<BarResponse>> responses)

它所做的是 await() 响应承诺,然后将所有这些响应聚合到单个 XML 文档中写回通道。我可以说执行永远不会达到这种方法,因为没有任何日志记录会触发。这一切似乎都停止在 .toList() 处。

有谁知道为什么这个设置似乎会执行toList() 或之后的任何内容?

编辑:好的,我有更多信息。在为应用程序中的每个线程提供命名约定以使调试更容易之后,我可以看到“shared-1”,运行 accept() 方法的线程进入 WAITING 状态,然后停留在那里。这可能与底层 Dispatcher 是一个单线程的环形缓冲区调度程序这一事实有关。

我修改了代码,使方法略有不同,并使用了多线程调度程序,并避免使用Promise,但我仍然有一个状态,即链式调用集的尾部不会执行。见下文:

@覆盖 公共无效接受(最终 NetChannel 频道){ log.info("消耗 FullHttpRequest 的 NetChannel"); 尝试 { // 我们的初始 Stream 是包含我们的 XML 文档的单个 HTTP 请求。 流请求流 = channel.in(); requestStream.filter((x) -> {return true;}) .dispatchOn(调度员) .map(new FooRequestFunction()) // 1) .flatMap(new BarRequestStreamFunction()) // 2) .flatMap(new DownstreamRequestZipFunction()) // 3) .reduce(new ArrayList(), (list,resp) -> {log.info("Reducing"); list.add(resp); return list;}) // 4) .consumeOn((x)->{log.info("Consume");}, (x)->{log.error("error");}, (x)->{log.info("完成" );}, 调度员); // 5) } 捕捉(异常 e){ log.error("Channel处理过程中抛出异常", e); } }

在上面,我已将 toList() 替换为对 reduce() 的调用,并将所有内容折叠成一个 List&lt;BarResponse&gt;。我可以看到这个执行和记录很好。但是,无论我对最后一次调用做什么,在尝试了 consume()、consumeOn() 等之后 - 它永远不会执行,也永远不会记录您在上面看到的最终调用。

查看 VisualVM,我可以看到调度程序线程都在与阻塞队列关联的同一个对象监视器上等待 - 换句话说,它们都在等待工作到达。就像 tail consumeOn() 调用被完全忽略一样。

我在这里做错了什么?我不明白什么?

编辑 2:鉴于 Johns 在下面的回复,我怀疑问题出在服务器设置上。可能仅适用于 reactor 版本 2.0.0.M2,在 Application 主类中配置如下:

@豆 公共网络服务器 httpServer( 最终环境环境, 最终的 MetricRegistry 指标, 最终 ChannelConsumer 消费者)抛出 InterruptedException { NetServer 服务器 = 新的 TcpServerSpec( NettyTcpServer.class) .env(环境) 。选项( 新的 NettyServerSocketOptions() .pipelineConfigurer((ChannelPipeline 管道) -> 管道 .addLast(新的 HttpServerCodec()) .addLast(新的 HttpObjectAggregator(MAX_CONTENT_LENGTH)))) .consume(消费者) 。得到(); server.start().await(); 返回服务器; }

没有为此配置调度程序,它似乎在后台使用 LMAX 破坏程序,而不是 NettyEventLoopDispatcher。目前尚不清楚如何设置NettyEventLoopDispatcher并将其用作替代调度程序。

【问题讨论】:

  • 调度员能力够吗?
  • 是的。我正在使用一个 Http 请求对其进行测试。它一到达 toList() 就会挂起
  • 只是对上述内容的快速评论 - 这看起来像是 Netty 的某种问题。我们尝试了一个使用 RxJava/RxNetty 的等效程序,并且遇到了非常相似的问题。除此之外,我们无法了解更多信息,因为我们的任务预算已用完。

标签: java spring rx-java project-reactor


【解决方案1】:

consumeOn() 调用是多余的,因为您已经在那个 Dispatcher 上(除非您的“真实”代码使用与此示例不同的东西)。当您编写输出时,无论如何它都会在内部切换到NettyEventLoopDispatcher

我做了一个快速检查,以确保此流程在 TcpServer 之外工作,并且按预期工作:

@Test
public void testStream() throws InterruptedException {
    Stream<String> s1 = Streams.just("Hello World!");

    s1.filter(s -> s.startsWith("Hello"))
      .dispatchOn(Environment.sharedDispatcher())
      .map(s -> s.toUpperCase())
      .flatMap(s -> Streams.just(s, s))
      .flatMap(s -> Streams.just(s, s))
      .reduce(new ArrayList<>(), (l, s) -> {
          l.add(s);
          return l;
      })
      .consume(l -> {
          System.out.println("thread: " + Thread.currentThread() + ", l: " + l);
      });

    Thread.sleep(500);
}

通过简单地注释掉 .dispatchOn() 调用来了解流是否在 Netty 线程上工作会很有趣。

【讨论】:

  • 以前试过。发生的情况是,标记为“shared-2”的线程最终将所有内容记录到消费者,然后进入等待状态,消费者不执行。线程转储显示它正在等待队列的对象监视器。从线程转储上的堆栈跟踪来看,它看起来像是在使用 LMAX 破坏器。这是我能做到的。
  • 修改consumeOn()到consume()没有效果,我也试过了。
猜你喜欢
  • 2019-05-25
  • 1970-01-01
  • 2019-12-21
  • 1970-01-01
  • 2019-12-07
  • 1970-01-01
  • 2020-01-23
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多