【问题标题】:Re-execute async RxJS stream after delay延迟后重新执行异步 RxJS 流
【发布时间】:2020-01-27 04:53:48
【问题描述】:

我正在使用 RxJS 6 使用类似于下面运行的示例的代码来懒惰地逐步遍历可迭代对象。这运行良好,但我无法解决我的最终用例。

完整代码here

import { EMPTY, defer, from, of } from "rxjs";
import { delay, expand, mergeMap, repeat } from "rxjs/operators";

function stepIterator (iterator) {
  return defer(() => of(iterator.next())).pipe(
    mergeMap(result => result.done ? EMPTY : of(result.value))
  );
}

function iterateValues ({ params }) {
  const { values, delay: delayMilliseconds } = params;
  const isIterable = typeof values[Symbol.iterator] === "function";

  // Iterable values which are emitted over time are handled manually. Otherwise
  // the values are provided to Rx for resolution.
  if (isIterable && delayMilliseconds > 0) {
    const iterator = values[Symbol.iterator]();

    // The first value is emitted immediately, the rest are emitted after time.
    return stepIterator(iterator).pipe(
      expand(v => stepIterator(iterator).pipe(delay(delayMilliseconds)))
    );
  } else {
    return from(values);
  }
}

const options = { 
  params: {
    // Any iterable object is walked manually. Otherwise delegate to `from()`.
    values: ["Mary", "had", "a", "little", "lamb"],
    // Delay _between_ values.
    delay: 350,
    // Delay before the stream restarts _after the last value_.
    runAgainAfter: 1000,
  }
};

iterateValues(options)
  // Is not repeating?!
  .pipe(repeat(3))
  .subscribe(
    v => {
      console.log(v, Date.now());
    },
    console.error,
    () => {
      console.log('Complete');
    }
  );

我想添加另一个选项,它将在延迟后无限次重新执行流 (runAgainAfter)。如果没有更深入地考虑result.done 的情况,我很难干净地编写这个。到目前为止,我一直无法围绕iterateValues 编写重新运行的行为。

完成用例的最佳方法是什么?

谢谢!

编辑 1:repeat 打我的脸。也许这意味着友好。 编辑2:不,重复不是重复,但可观察的正在完成。谢谢你的帮助。我很困惑。


为了后代,修订版的完整代码示例是repeat-able,并在项目之间使用一致的延迟。

import { concat, EMPTY, defer, from, interval, of, throwError } from "rxjs";
import { delay, expand, mergeMap, repeat } from "rxjs/operators";

function stepIterator(iterator) {
  return defer(() => of(iterator.next())).pipe(
    mergeMap(result => (result.done ? EMPTY : of(result.value)))
  );
}

function iterateValues({ params }) {
  const { values, delay: delayMilliseconds, times = 1 } = params;
  const isIterable =
    values != null && typeof values[Symbol.iterator] === "function";

  if (!isIterable) {
    return throwError(new Error(`\`${values}\` is not iterable`));
  }

  // Iterable values which are emitted over time are handled manually. Otherwise
  // the values are provided to Rx for resolution.
  const observable =
    delayMilliseconds > 0
      ? defer(() => of(values[Symbol.iterator]())).pipe(
          mergeMap(iterator =>
            stepIterator(iterator).pipe(
              expand(v => stepIterator(iterator).pipe(delay(delayMilliseconds)))
            )
          )
        )
      : from(values);

  return observable.pipe(repeat(times));
}

【问题讨论】:

  • 我认为你应该只使用 from reactivex.io/rxjs/class/es6/… 你可以将一个可迭代对象传递给它,它会将其转换为可观察对象
  • 另外,你真的需要一个可迭代对象吗?因为来自optionsvalues 不是以懒惰的方式构建的?如果您只想以给定的延迟发出数组,您绝对可以做一些更简单的事情
  • 我无法将延迟与from 屈服相结合。可迭代使得从 Set/Map/Array/String 等发出值变得更容易。你说得对,发出值索引查找非常容易。我发现repeat 不起作用,因为iterator 是在帮助程序中创建的,而不是defer

标签: rxjs


【解决方案1】:

我会说实话,但肯定会有更好的解决方案。在我的解决方案中,我最终将延迟逻辑封装在自定义 runAgainAfter 运算符中。让它成为一个独立的部分,不会直接影响你的代码逻辑。

完整的工作代码是here

还有runAgainAfter的代码,如果有人需要的话:

import { Observable } from "rxjs";

export const runAgainAfter = delay => observable => {
  return new Observable(observer => {
    let timeout;
    let subscription;
    const subscribe = () => {
      return observable.subscribe({
        next(value) {
          observer.next(value);
        },
        error(err) {
          observer.error(err);
        },
        complete() {
          timeout = setTimeout(() => {
            subscription = subscribe();
          }, delay);
        }
      });
    };
    subscription = subscribe();

    return () => {
      subscription.unsubscribe();
      clearTimeout(timeout);
    };
  });
};

希望对你有帮助

【讨论】:

  • 谢谢!在使用内部可观察对象之前,我没有成功。非常感谢工作代码并提出另一种方法来考虑。 :)
  • 在我的原始示例中修复了iterator 问题后(它是在工厂中创建的,而不是在订阅时创建的)然后repeat 工作。它也提供了一个清理界面的机会。这个版本不是两个延迟,而是一个延迟,并且应该发出该系列的多次:stackblitz.com/edit/…。再次感谢。 :)
猜你喜欢
  • 2017-05-15
  • 2018-10-25
  • 2011-05-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-02-02
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多