【问题标题】:Implementing Stream in JavaScript在 JavaScript 中实现流
【发布时间】:2019-04-22 12:46:04
【问题描述】:

我想实现一个可以做到这一点的流对象:

// a -------1------2----3
// map -----\------\----\
// b --------2------4----6

const a = new Stream();
const b = a.map(value => value * 2);

b.subscribe(console.log);

a.push(1);
// 2
a.push(2);
// 4
a.push(3);
// 6

这里的想法是对象b 可以订阅新的回调到流amap 函数应该在调用 push 时进行监听,并应用映射出的函数以及最初订阅的函数。这是我到目前为止的实现:

class Stream {
  constructor(queue = []) {
    this.queue = queue;
  }

  subscribe(action) {
    if (typeof action === 'function') {
      this.queue.push(action);
    }
  }

  map(callback) {
     this.queue = this.queue.map(
        actionFn => arg => action(callback(arg))
     );

     return this;
  }

  push(value) {
    this.queue.forEach(actionFn => {
      actionFn.call(this, value);
    });
  }
}

当前实现的问题是,最初类 Stream 中的 queue 是空的,所以它不会通过它。将不胜感激任何建议或帮助。我不想为此使用任何库。

【问题讨论】:

    标签: javascript node.js stream


    【解决方案1】:

    您的地图需要创建一个新的Transform 流并将其返回。您可以简单地使用标准的on('data') 事件来代替subscribe,或者最好使用read 方法。

    最后 - 您可以简单地使用我的工作,并通过使用 scramjet 有效地实现您的 map 方法,这与您上面显示的完全一样 - 而且它支持异步函数。 :)

    这是你如何使用它(在一些 getStream 函数中):

    const {DataStream} = require('scramjet');
    
    const stream = new DataStream();
    
    stream.write(1); // you can also use await stream.whenWrote(1);
    stream.write(2);
    stream.write(3);
    
    return stream.map(x => x * 2);
    

    然后在其他地方阅读:

    stream.on('data', x => console.log(`x: ${x}`));
    // x: 2
    // x: 4
    // x: 6
    

    在这里查看scramjet docs

    【讨论】:

      【解决方案2】:

      在问了这个问题这么长时间之后,我能够回到这个问题并想出一个简单的解决方案。由于一个流应该监听它订阅的那个,我们应该返回原始实例以保留前一个流的值。这是我发现运行良好的代码:

      class Stream {
          constructor() {
              this.subscriptions = [];
              this.mappedActions = [];
          }
      
          subscribe(callback) {
              this.subscriptions.push(callback);
          }
      
          map(actionFunc) {
              this.mappedActions.push(actionFunc);
      
              return this;
          }
      
          push(opValue) {
              this.subscriptions.forEach(cb => {
                  if (this.mappedActions.length) {
                      this.mappedActions.forEach(action => {
                          cb(action.call(this, opValue));
                      });
                  } else {
                      cb(opValue);
                  }
              });
          }
      }
      
      const a = new Stream();
      const b = a.map(value => value * 1 / 2);
      const c = b.map(value => value * 3);
      
      c.subscribe(console.log);
      
      c.push(1);
      c.push(2);
      c.push(3);
      
      // expected output in the console:
      // 0.5
      // 3
      // 1
      // 6
      // 1.5
      // 9

      希望任何偶然发现这个有趣问题的人都会发现我的解决方案很有用。如果您想进行任何更改,请随时这样做或联系我!

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2017-09-18
        • 1970-01-01
        • 2017-06-22
        • 2020-03-29
        • 1970-01-01
        • 2012-02-04
        • 2018-01-18
        相关资源
        最近更新 更多