【问题标题】:rxjs zip is not lazy?rxjs zip 不偷懒?
【发布时间】:2014-08-27 10:24:48
【问题描述】:

我已经删除了样板文件以达到重点

// a.js

// My observables from stream and event
this.a = Rx.Node.fromStream(this.aStream());
this.itemSource = Rx.Observable.fromEvent(ee, 'addItem');

// Zip 'em
this.itemcombo = Rx.Observable.zip(this.a, this.itemSource, function (s1, s2) {
    return {item: s2, a: s1.toString()};
});

// Streams the lowercase alphabet
rb.prototype.aStream = function aStream() {
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    console.log('Hit!');
    if (c > 'z'.charCodeAt(0)) {
        rs.push(null);
    }
};

return rs;
};

// b.js(需要上面导出的模块)

rb.enqueue('a'); // The method simply does an ee.emit('addItem', ...)  in the module to trigger the itemSource observable

我期望看到的:

{item: 'a', a: 'a'} 打印在控制台中

发生了什么:

Hit!{item: 'a', a: 'a'} 之前被打印了 24 次。这意味着zipaStream 中获取所有值,缓冲它们,然后执行它应该做的事情。

我如何获得与zip 提供的相同功能但又懒惰?我的目标是使用无限流/可观察的并用有限(异步)流压缩它。

编辑

通过runnable查看/编辑它:RX Zip test Edit 2根据答案更新代码 -> 现在没有输出。

【问题讨论】:

  • 请重新添加样板并简化示例。
  • @DaveSexton 请参阅:pastebin.com/mnc82KuVpastebin.com/8HxURWYc 以获取复制/粘贴/运行版本 - 谢谢!我认为这个例子不能再简化了。它是 2 个流和 zip 功能。我已经包含了我用作参考的流,它可能被忽略了,忽略那部分是安全的,但我认为它可能有用。
  • @DaveSexton 查看我的编辑以获取点击并运行网络版本。

标签: system.reactive reactive-programming rxjs


【解决方案1】:

zip 确实很懒。它只订阅ab 并在其中任何一个产生新值时执行其工作。

您的问题是fromStream 会在zip 订阅它时同步发出它的所有值。发生这种情况是因为您的自定义 Readable 一直在说“有更多可用数据!”

使您的 Readable 异步,您将获得所需的行为。

试试这样的东西(未经测试)

var rs = Readable();
var subscription = null;
rs._read = function () {
    if (!subscription) {
        // produce the values once per second
        subscription = Rx.Observable
            .generateWithRelativeTime(
                97, // start value
                function (c) { return c > 'z'.charCodeAt(0); }, // end condition
                function (c) { return c + 1; }, // step function
                function (c) { return String.fromCharCode(c); }, // result selector
                function () { return 1000; }) // 1000ms between values
            .subscribe(
                function (s) {
                    rs.push(s);
                    console.log("Hit!");
                },
                function (error) { rs.push(null); },
                function () { rs.push(null); });
    }
};

【讨论】:

  • 感谢您的回答。我已经更新了可运行的小提琴以使用您的代码,它只是输出“已完成”,现在没有其他任何事情发生。此外,即使我们通过在内部修改此特定流来解决此问题,我的问题是,有没有办法获取任意流并使其不会溢出zip(以便我可以在任何项目中使用该模式)?跨度>
  • 我接受这个答案。在了解了更多关于响应式编程的知识之后,我认为我需要重构我的方法。
猜你喜欢
  • 2012-01-19
  • 1970-01-01
  • 2020-04-05
  • 1970-01-01
  • 2017-08-27
  • 1970-01-01
  • 1970-01-01
  • 2011-03-14
  • 1970-01-01
相关资源
最近更新 更多