【问题标题】:How do you implement heartbeat-like functionality using Reactor's Flux?如何使用 Reactor 的 Flux 实现类似心跳的功能?
【发布时间】:2020-10-11 12:00:02
【问题描述】:

假设有一个Flux 发射一些有效载荷:

Flux<Integer> payloads = Flux.range(0, 5)
        .delayElements(Duration.ofSeconds(2));

这个只发出 0 到 4 之间的整数,每 2 秒一次。

任务是将另一个信号混合到这个Flux 中。另一个信号只是每 N 个单位时间发出一些东西。像这样的:

Flux<Integer> heartbeats = Flux.just(-1)
        .repeat()
        .delayElements(Duration.ofSeconds(1));
  1. 生成的 Flux 应该每 2 秒输出一次从 0 到 4 的数字,也应该每 1 秒输出一次数字 -1
  2. 此外,生成的 Flux 必须在 payloads Flux 完成后立即完成
  3. 它应该传播两个Fluxes 中的任何一个发出的任何错误信号

(请注意,上面描述的示例仅用于说明目的。真正的Flux 发射有效载荷可能会以无法预测的速度发射:有时快,有时慢,有时很长一段时间根本不发射)。

我可以使用“开箱即用”运算符实现的最佳效果是

Flux<Integer> flux = payloads.mergeWith(heartbeats);

但这违反了要求 2,因为合并结果仅在所有合并组件完成时才完成,而 heartbeats 永远不会完成。

(实际上,如果在payloads 完成时有一个设置为true 的标志,并且在heartbeats 上使用repeat(BooleanSupplier) 变体而不是repeat(),这可能会起作用,但这会延迟完成生成的Flux 直到下一个心跳的时刻。)

接下来要尝试的是编写我自己的 Publisher 实现,该实现将以我的任务所需的方式工作,但我想避免这种情况,因为实现 Publisher+Subscription 似乎正确这是一项棘手的任务。

有没有更简单的方法?

【问题讨论】:

    标签: java reactive-programming project-reactor


    【解决方案1】:

    FluxSink API 可能有助于实现您的要求。您可以通过Flux::create 访问API,并使用FluxSink 发射器将发射的信号发送到下游。

    这是一个在 Kotlin 中的简单实现:

    val flux = Flux.create<Int> { emitter ->
        payloads.subscribe(
            { emitter.next(it) },
            { emitter.error(it) },
            { emitter.complete() },
        )
    
        heartbeats.subscribe(
            { emitter.next(it) },
            { emitter.error(it) },
        )
    }
    

    请注意,在一般情况下,此实现会将 payloadsheartbeats 变为 hot Fluxes。

    【讨论】:

    • 感谢您的建议!它确实有帮助,但经过测试,我发现提供的代码存在一些缺陷。首先,它不处理取消:在取消对合并的订阅的订阅时,输入发布者都不会被销毁。此外,heartbeats 永远不会在 payloads 完成时被取消。并且需求将是无限的(Long.MAX_VALUE)。我试图在我自己的答案中解决这个问题。无论如何,非常感谢Flux.create()的提示!
    【解决方案2】:

    “似乎是一项棘手的任务”- 是的,除非您为此使用正确的工具,否则这很棘手。您需要的解决方案是一个具有一个输出端口和延迟能力的演员,取自我的图书馆df4j

    import org.df4j.core.port.OutFlow;
    import org.df4j.core.dataflow.Actor;
    
    class DelayActor extends Actor {
        OutFlow<Integer> out = new OutFlow<>(this) ;
        int period = 1000; // ms
        int cnt = 0;
        int maxValue = 4;
    
        @Override
        protected void runAction() throws Throwable {
            out.onNext(-1);
            if (cnt % 2 == 0) {
                int value = cnt / 2;
                out.onNext(value);
                if (value == maxValue) {
                    complete();
                    out.onComplete();
                    return;
                }
            }
            cnt++;
            delay(period);
        }
    }
    

    端口OutFlow out 实现org.reactivestreams.Publisher 并且可以接受所有org.reactivestreams.Subscribers。如果需要Flux,可以获取为Flux.from(actor.out)

    【讨论】:

    • 谢谢 Alexei,但我可能对我描述的玩具任务过于具体(只是为了说明)。实际上,我需要将心跳信号混合到与发射率没有已知关系的Flux。它每秒可以发射一千次,或者它可以每分钟发射一次,或者从不发射任何东西,或者它可以在其生命的不同时期完成上述所有操作。如果我理解正确,您的解决方案将“-1 每 2 秒,0-4 每秒一次”逻辑一起烘焙,它似乎不适用于一般情况。
    【解决方案3】:

    基于@zokni 的回答,这是我使用Flux.create() 能够实现的目标:

    Flux<Integer> flux = Flux.create(sink -> {
        AtomicReference<Subscription> payloadSubscription = new AtomicReference<>();
        AtomicReference<Subscription> heartbeatsSubscription = new AtomicReference<>();
    
        payloads.subscribe(
                t -> {
                    sink.next(t);
                    payloadSubscription.get().request(1);
                },
                sink::error,
                () -> {
                    sink.complete();
                    heartbeatsSubscription.get().cancel();
                },
                subscription -> {
                    payloadSubscription.set(subscription);
                    subscription.request(1);
                }
        );
        heartbeats.subscribe(sink::next, sink::error, () -> {}, subscription -> {
            heartbeatsSubscription.set(subscription);
            subscription.request(Long.MAX_VALUE);
        });
    
        sink.onCancel(() -> {
            payloadSubscription.get().cancel();
            heartbeatsSubscription.get().cancel();
        });
    });
    

    这个解决方案非常冗长,但它具有以下方便的属性:

    1. payloads 发布者的完成使合并的发布者也完成
    2. payloads's 和 heartbeats' 订阅都在合并发布者订阅取消时被取消
    3. heartbeats'订阅在payloads完成时被正确取消
    4. 数据是从payloads逐个元素请求的,而不是无限的需求
    5. 重复订阅合并的发布者(使用Flux.repeat())产生预期结果

    我仍然不确定是否可以从 heartbeats 请求 Long.MAX_VALUE,也许在那里做同样的事情是有意义的:只有在前一个元素被消耗后,一个一个地询问另一个元素。

    【讨论】:

      【解决方案4】:

      因此,我们希望将同一个订阅者订阅到两个不同的发布者。由于某些原因,它被反应流的规范所禁止。幸运的是,我的异步库 DF4J 实现了我命名为 ReverseFlow 的通信协议,其中多个生产者可以为同一个消费者提供数据——就像许多线程可以以异步方式写入同一个 BlockingQueue 一样。

      import java.time.Duration;
      import org.df4j.core.communicator.AsyncArrayBlockingQueue;
      import org.df4j.core.dataflow.Actor;
      import org.df4j.core.port.InpFlow;
      import org.df4j.core.port.OutChannel;
      import org.junit.Test;
      import reactor.core.publisher.Flux;
      
      /** converts reactive stream into ReverseFlow.Producer */
      class Adapter<T> extends Actor {
          public InpFlow<T> inp = new InpFlow<>(this);
          public OutChannel<T> out = new OutChannel<>(this);
      
          {start();}
      
          @Override
          protected void runAction() throws Throwable {
              if (inp.isCompletedExceptionally()) {
                  out.onError(inp.getCompletionException());
              } else if (inp.isCompleted()) {
                  out.onComplete();
              } else {
                  out.onNext(inp.remove());
              }
          }
      }
      
      @Test
      public void mergeTest() throws InterruptedException {
          Flux<Integer> payloads = Flux.range(0, 5)
              .delayElements(Duration.ofSeconds(2));
          Adapter<Integer> adapter1 = new Adapter<>();
          payloads.subscribe(adapter1.inp);
          Flux<Integer> heartbeats = Flux.just(-1)
              .repeat()
              .delayElements(Duration.ofSeconds(1));
          Adapter<Integer> adapter2 = new Adapter<>();
          heartbeats.subscribe(adapter2.inp);
          /* we use a queue with asynchronous interfaces on both ends */
          AsyncArrayBlockingQueue<Integer> queue = new AsyncArrayBlockingQueue<>();
          queue.feedFrom(adapter1.out);
          queue.feedFrom(adapter2.out);
          /* on the output end, AsyncArrayBlockingQueue is an ordinary Publisher */
          Flux<Integer> merged = Flux.from(queue);
          merged.subscribe(System.out::println);
          Thread.sleep(13000);
      }
      
      <dependencies>
          <dependency>
              <groupId>org.df4j</groupId>
              <artifactId>df4j-core</artifactId>
              <version>8.3</version>
          </dependency>
          <dependency>
              <groupId>io.projectreactor</groupId>
              <artifactId>reactor-core</artifactId>
              <version>3.3.2.RELEASE</version>
          </dependency>
      </dependencies>
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-02-28
        • 2020-09-07
        • 1970-01-01
        • 2020-01-26
        • 1970-01-01
        相关资源
        最近更新 更多