【问题标题】:What is the best way to implement a poller with timeout as a reactive stream.将超时的轮询器作为反应流实现的最佳方法是什么。
【发布时间】:2018-05-08 17:54:12
【问题描述】:

对具有超时的轮询器建模的最佳方法是什么,其中某种条件会导致提前退出为“反应性流”?

例如

如果我有一个 observable 每秒产生一个递减的正整数序列

9,8,7,6,5,4,3,2,1,0

编写消费者的最佳方法是在 5 秒后获取最新的单个事件,或者如果它在超时之前产生,则获取“0”事件。

这是我目前的代码:(Java 中的示例)

    int intialValue = 10;

    AtomicInteger counter = new AtomicInteger(intialValue);
    Integer val = Observable.interval(1, TimeUnit.SECONDS)
                            .map(tick -> counter.decrementAndGet())
                            .takeUntil(it -> it == 0)
                            .takeUntil(Observable.timer(5, TimeUnit.SECONDS))
                            .lastElement()
                            .blockingGet();

    System.out.println(val);

如果 initialValue = 10,我希望打印 6。如果 initialValue = 2,我希望在 5 秒超时到期之前打印 0。

如果有更好的方法可以做到这一点,我很感兴趣。

【问题讨论】:

  • observable 链中包含三个元素:源(interval().map())、值选择(it == 0)、时间选择(timer())。没有比这更紧凑的了。

标签: rxjs rx-java reactive-programming system.reactive rx-java2


【解决方案1】:

我认为没有比您所做的更好的方法了。您必须具备以下条件:

  • 发射时间间隔 (interval)
  • 用于递减和存储最后一个值的聚合器 (scan)
  • 值的终止条件 (takeWhile)
  • 按时终止条件 (takeUntil(timer(...)))
  • 在完成时获取最后一个值 (last)

每一个都由一个运算符表示。你不能做太多的事情来解决这个问题。我使用了几个不同的运算符(scan 用于聚合,takeWhile 用于终止值)但它们的运算符数量相同。

const { interval, timer } = rxjs;
const { scan, takeWhile, takeUntil, last, tap } = rxjs.operators;

function poll(start) {
  console.log('start', start);
  interval(1000).pipe(
    scan((x) => x - 1, start),
    takeWhile((x) => x >= 0),
    takeUntil(timer(5000)),
    tap((x) => { console.log('tap', x); }),
    last()
  ).subscribe(
    (x) => { console.log('next', x); },
    (e) => { console.log('error', e); },
    () => { console.log('complete'); }
  );
}

poll(10);
setTimeout(() => { poll(2); }, 6000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.1.0/rxjs.umd.min.js"></script>

我不清楚您希望它如何在边界上发挥作用。在您的示例中,您总是在发出之前递减,所以如果您的初始值为 10,那么您发出 9、8、7、6(4 个值)。如果您想从 10 开始,那么 .你可以做 scan(..., start + 1) 但这会在 7 结束,因为 takeUntil(...) 中的计时器与源间隔对齐,因此 6 将被排除在外。如果你想发出 5 个值,那么你可以做takeUntil(timer(5001))。此外,如果您不想等待一秒钟来发出第一个值,那么您可以将startWith(start) 放在scan(...) 之后。或者你可以用scan(..., start + 1) 代替源区间来做timer(0, 1000)

还要注意,在产生无效值 (-1) 之前,值终止 (takeWhile) 不会终止。所以它会在收到终止值(0)后持续一秒钟。似乎大多数终止运算符都是这样工作的,如果它们以某个值终止,那么它们不会让其他运算符通过。

您可以使用take(5) 而不是takeUntil(timer(5000)),因为您知道如果这适用于您的场景,它会在匹配的时间间隔内触发。这也可以解决由于计时器排队而排除最后一个值的问题。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-12-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多