【发布时间】:2014-08-27 10:24:48
【问题描述】:
我已经删除了样板文件以达到重点
// a.js
// My observables from stream and event
this.a = Rx.Node.fromStream(this.aStream());
this.itemSource = Rx.Observable.fromEvent(ee, 'addItem');
// Zip 'em
this.itemcombo = Rx.Observable.zip(this.a, this.itemSource, function (s1, s2) {
return {item: s2, a: s1.toString()};
});
// Streams the lowercase alphabet
rb.prototype.aStream = function aStream() {
var rs = Readable();
var c = 97;
rs._read = function () {
rs.push(String.fromCharCode(c++));
console.log('Hit!');
if (c > 'z'.charCodeAt(0)) {
rs.push(null);
}
};
return rs;
};
// b.js(需要上面导出的模块)
rb.enqueue('a'); // The method simply does an ee.emit('addItem', ...) in the module to trigger the itemSource observable
我期望看到的:
{item: 'a', a: 'a'} 打印在控制台中
发生了什么:
Hit! 在 {item: 'a', a: 'a'} 之前被打印了 24 次。这意味着zip 从aStream 中获取所有值,缓冲它们,然后执行它应该做的事情。
我如何获得与zip 提供的相同功能但又懒惰?我的目标是使用无限流/可观察的并用有限(异步)流压缩它。
编辑
通过runnable查看/编辑它:RX Zip test Edit 2根据答案更新代码 -> 现在没有输出。
【问题讨论】:
-
请重新添加样板并简化示例。
-
@DaveSexton 请参阅:pastebin.com/mnc82KuV 和 pastebin.com/8HxURWYc 以获取复制/粘贴/运行版本 - 谢谢!我认为这个例子不能再简化了。它是 2 个流和 zip 功能。我已经包含了我用作参考的流,它可能被忽略了,忽略那部分是安全的,但我认为它可能有用。
-
@DaveSexton 查看我的编辑以获取点击并运行网络版本。
标签: system.reactive reactive-programming rxjs