【问题标题】:How not to depend on subscription time in Reactor如何不依赖 Reactor 中的订阅时间
【发布时间】:2018-09-22 21:58:36
【问题描述】:

我一直在阅读 Reactor 文档,但我无法为以下问题找到合适的模式。 我有一个应该异步执行某些操作的方法。我以 Flux 的形式返回结果响应,消费者可以订阅它。

该方法有如下定义:

Flux<ResultMessage> sendRequest(RequestMessage message);

返回的通量是热通量,结果可以在任何给定时间异步出现。

潜在消费者可以通过以下方式使用它:

sendRequest(message).subscribe(response->doSomethinWithResponse(response);

实现可以是这样的:

Flux<ResultMessage> sendRequest(RequestMessage message) {
   Flux<ResultMessage> result = incomingMessageStream
            .filter( resultMessage -> Objects.equals( resultMessage.getId(), message.getId() ) )
            .take( 2 );
// The message sending is done here...

    return result;
}

incomingMessageStream 是通过此通道的所有消息的Flux。 这个实现的问题是消费者在结果消息到来之后才被订阅,它可能会错过其中的一些。

所以,我正在寻找一种解决方案,让消费者不依赖订阅时间。可能根本不需要潜在消费者订阅生成的Flux。我正在寻找一个通用的解决方案,但如果不可能,您可以假设结果消息的数量不大于 2。

【问题讨论】:

    标签: java reactive-programming project-reactor


    【解决方案1】:

    一段时间后,我创建了一个似乎可行的解决方案:

    Flux<ResultMessage> sendRequest(RequestMessage message) {
      final int maxResponsesCount = 2;
      final Duration responseTimeout = Duration.ofSeconds( 10 );
      final Duration subscriptionTimeout = Duration.ofSeconds( 5 );
    
      // (1) 
      ConnectableFlux<ResultMessage> result = incomingMessageStream
          .ofType( ResultMessage.class )
          .filter( resultMessage ->Objects.equals(resultMessage.getId(), message.getId() ) )
          .take( maxResponsesCount )
          .timeout( responseTimeout )
          .replay( maxResponsesCount );
      Disposable connectionDisposable = result.connect();
    
      // (2)
      AtomicReference<Subscription> subscriptionForCancelSubscription = new AtomicReference<>();
      Mono.delay( subscriptionTimeout )
        .doOnSubscribe( subscriptionForCancelSubscription::set )
        .subscribe( x -> connectionDisposable.dispose() );
    
      // The message sending is done here...
    
      // (3)
      return result
        .doOnSubscribe(s ->subscriptionForCancelSubscription.get().cancel())
        .doFinally( signalType -> connectionDisposable.dispose() );
    }
    

    我正在使用 ConnectableFlux 立即连接到流,无需订阅,它设置为使用 reply() 方法来存储所有消息,因此以后任何订阅者都不会错过响应消息 (1)。

    可以执行的路径很少:

    1. 调用了方法,但未对通量执行订阅
      • 解决方案 - 如果未完成订阅,有一个计时器会在 5 秒后删除连接的通量资源。 (2)
    2. 方法被调用并订阅了通量

      2.1。没有消息被返回

      • 解决方案 - 设置了获取响应的超时时间 (.timeout( responseTimeout ))。之后.doFinally(..) 清理资源 (1)(3)。

      2.2。部分响应消息已返回

      • 解决方案 - 与 2.1 相同。

      2.3。所有响应消息均已返回

      • 解决方案 - 由于已达到最大元素数 (.take( maxResponsesCount )) (1)(3)
      • ,因此执行 doFinally()

    我还没有对此进行一些认真的测试,如果出现问题,我会在这个答案中添加更正。

    【讨论】:

      猜你喜欢
      • 2020-10-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-06-20
      • 2020-04-01
      • 1970-01-01
      • 2020-11-27
      相关资源
      最近更新 更多