【发布时间】:2020-01-27 04:53:48
【问题描述】:
我正在使用 RxJS 6 使用类似于下面运行的示例的代码来懒惰地逐步遍历可迭代对象。这运行良好,但我无法解决我的最终用例。
完整代码here
import { EMPTY, defer, from, of } from "rxjs";
import { delay, expand, mergeMap, repeat } from "rxjs/operators";
function stepIterator (iterator) {
return defer(() => of(iterator.next())).pipe(
mergeMap(result => result.done ? EMPTY : of(result.value))
);
}
function iterateValues ({ params }) {
const { values, delay: delayMilliseconds } = params;
const isIterable = typeof values[Symbol.iterator] === "function";
// Iterable values which are emitted over time are handled manually. Otherwise
// the values are provided to Rx for resolution.
if (isIterable && delayMilliseconds > 0) {
const iterator = values[Symbol.iterator]();
// The first value is emitted immediately, the rest are emitted after time.
return stepIterator(iterator).pipe(
expand(v => stepIterator(iterator).pipe(delay(delayMilliseconds)))
);
} else {
return from(values);
}
}
const options = {
params: {
// Any iterable object is walked manually. Otherwise delegate to `from()`.
values: ["Mary", "had", "a", "little", "lamb"],
// Delay _between_ values.
delay: 350,
// Delay before the stream restarts _after the last value_.
runAgainAfter: 1000,
}
};
iterateValues(options)
// Is not repeating?!
.pipe(repeat(3))
.subscribe(
v => {
console.log(v, Date.now());
},
console.error,
() => {
console.log('Complete');
}
);
我想添加另一个选项,它将在延迟后无限次重新执行流 (runAgainAfter)。如果没有更深入地考虑result.done 的情况,我很难干净地编写这个。到目前为止,我一直无法围绕iterateValues 编写重新运行的行为。
完成用例的最佳方法是什么?
谢谢!
编辑 1:repeat 打我的脸。也许这意味着友好。
编辑2:不,重复不是重复,但可观察的正在完成。谢谢你的帮助。我很困惑。
为了后代,修订版的完整代码示例是repeat-able,并在项目之间使用一致的延迟。
import { concat, EMPTY, defer, from, interval, of, throwError } from "rxjs";
import { delay, expand, mergeMap, repeat } from "rxjs/operators";
function stepIterator(iterator) {
return defer(() => of(iterator.next())).pipe(
mergeMap(result => (result.done ? EMPTY : of(result.value)))
);
}
function iterateValues({ params }) {
const { values, delay: delayMilliseconds, times = 1 } = params;
const isIterable =
values != null && typeof values[Symbol.iterator] === "function";
if (!isIterable) {
return throwError(new Error(`\`${values}\` is not iterable`));
}
// Iterable values which are emitted over time are handled manually. Otherwise
// the values are provided to Rx for resolution.
const observable =
delayMilliseconds > 0
? defer(() => of(values[Symbol.iterator]())).pipe(
mergeMap(iterator =>
stepIterator(iterator).pipe(
expand(v => stepIterator(iterator).pipe(delay(delayMilliseconds)))
)
)
)
: from(values);
return observable.pipe(repeat(times));
}
【问题讨论】:
-
我认为你应该只使用
fromreactivex.io/rxjs/class/es6/… 你可以将一个可迭代对象传递给它,它会将其转换为可观察对象 -
另外,你真的需要一个可迭代对象吗?因为来自
options的values不是以懒惰的方式构建的?如果您只想以给定的延迟发出数组,您绝对可以做一些更简单的事情 -
我无法将延迟与
from屈服相结合。可迭代使得从 Set/Map/Array/String 等发出值变得更容易。你说得对,发出值索引查找非常容易。我发现repeat不起作用,因为iterator是在帮助程序中创建的,而不是defer。
标签: rxjs