【问题标题】:Set rate using RxJS5使用 RxJS5 设置速率
【发布时间】:2017-02-02 06:51:37
【问题描述】:

我有这段代码,它只是从 .csv 文件中读取数据并将其转换为 json 并记录数据:

const fs = require('fs');
const path = require('path');

const sd = path.resolve(__dirname + '/fixtures/SampleData.csv');
const strm = fs.createReadStream(sd).setEncoding('utf8');

const Rx = require('rxjs/Rx');
const csv2json = require('csv2json');


const dest = strm
  .pipe(csv2json({
    separator: ','
  }));

dest.on('error', function(e){
    console.error(e.stack || e);
})

const obs = Rx.Observable.fromEvent(dest, 'data')
          .flatMap(d => Rx.Observable.timer(100).mapTo(d))

obs.subscribe(v => {
    console.log(String(v));
})

代码所做的是在 100 毫秒延迟后记录所有数据。 我其实是想延迟每一行数据,并在一小段延迟后记录每一行。

上面的代码没有做到这一点 - 控制数据记录速率的最佳方法是什么?

假设:所有数据行几乎同时进入,因此所有数据都延迟了 100 毫秒,因此它们最终几乎同时打印。我只需要在记录前一行之后开始延迟下一行。

下面的代码似乎和上面的计时器做同样的事情:

const obs = Rx.Observable.fromEvent(dest, 'data')
      .delay(100)

【问题讨论】:

    标签: node.js rxjs5


    【解决方案1】:

    假设:所有数据行大致相同 时间,所以所有的都延迟了 100 毫秒,所以它们最终被打印在 几乎同时。我只需要开始延迟下一行 在前一个被记录之后。

    你的假设是正确的

    解决方案

    将原始解决方案中的.flatMap() 替换为.concatMap()

    Rx.Observable.from([1,2,3,4])
      .mergeMap(i => Rx.Observable.timer(500).mapTo(i))
      .subscribe(val => console.log('mergeMap value: ' + val));
    
    Rx.Observable.from([1,2,3,4])
      .concatMap(i => Rx.Observable.timer(500).mapTo(i))
      .subscribe(val => console.log('concatMap value: ' + val));
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

    这将确保每个发射在订阅下一个发射之前完成并开始延迟其值。

    【讨论】:

    • 谢谢马克!所以你说 mergeMap 和 concatMap 解决方案都应该工作。你能简单谈谈两者之间的区别吗?为什么它们都有效?
    • 我已经在另一个答案stackoverflow.com/a/42006671/106909中解释了flatMap concatMap 和switchMap 之间的区别@
    • 在您的情况下,您需要使用 concatMap,在我的答案中运行 sn-p 并观察值在订阅中到达方式的差异
    【解决方案2】:

    我在 RxJS 库中找不到我需要的功能(虽然它可能在那里,但我只是找不到它,如果有更好、更惯用的方法,请告诉我)。

    所以我写了这个,这似乎可以完成这项工作:

    const fs = require('fs');
    const path = require('path');
    
    const sd = path.resolve(__dirname + '/fixtures/SampleData.csv');
    const strm = fs.createReadStream(sd).setEncoding('utf8');
    
    const Rx = require('rxjs/Rx');
    const csv2json = require('csv2json');
    
    const p = Rx.Observable.prototype;
    
    p.eachWait = function(timeout){
    
        const source = this;
        const values = [];
        let flipped = true;
    
        const onNext = function (sub){
    
              flipped = false;
    
              setTimeout(() => {
    
                var c = values.pop();
                if(c)  sub.next(c);
    
                if(values.length > 0){
                   onNext(sub);
                }
                else{
                   flipped = true;
                }
    
             }, timeout);
        }
    
          return Rx.Observable.create(sub => {
    
              return source.subscribe(
    
                    function next(v){
    
                             values.unshift(v);
    
                             if(flipped){
                                 onNext(sub);
                             }
    
                     },
                  sub.error.bind(sub),
                  sub.complete.bind(sub)
              );
    
          });
    
    }
    
    
    const dest = strm
      .pipe(csv2json({
        separator: ','
      }));
    
    dest.on('error', function(e){
        console.error(e.stack || e);
    });
    
    const obs = Rx.Observable.fromEvent(dest, 'data')
          .eachWait(1000)
    
    obs.subscribe(v => {
      console.log(String(v));
    });
    

    我认为这是您可以做到的最佳性能 - 在任何给定时刻都应该只运行一个计时器。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-09-08
      • 2019-08-02
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多