【问题标题】:Subscribe to a stream with RxJS and twitter-stream-api module使用 RxJS 和 twitter-stream-api 模块订阅流
【发布时间】:2017-03-05 21:25:41
【问题描述】:

好的,所以我是 Rx 的完整初学者,不幸的是,我对 js 和 js 中的流也很陌生。我使用这个https://github.com/trygve-lie/twitter-stream-api 连接到推特流API 并接收带有推文的json 对象。到目前为止我有这个代码

var Rx = require('rxjs/Rx');

var TwitterStream = require('twitter-stream-api'),
    fs = require('fs');
var filter = 'tweet';
var keys = {
    consumer_key : "key",
    consumer_secret : "secret",
    token : "token",
    token_secret : "tokensecret"
};

var Twitter = new TwitterStream(keys);
Twitter.stream('statuses/filter', {
    track: filter
});

Twitter.on('connection success', function (uri) {
    console.log('connection success', uri); 
});
Twitter.on('data', function (obj) {
    console.log(obj.text);
});

我成功地将推文写入控制台,但我真正想学习的是使用流,尤其是 RxJS。我已经尝试了所有我能想到的方法来创建一个 observable。 Rx. 可观察的。创建/从等...

我也尝试过 Twitter.resume() 因为它显然默认暂停以恢复流并观察它。我只收到诸如不能 .subscribe 不是函数之类的错误。根据我上面的内容,我如何使用 Rx.Observable 开始过滤和处理数据?

谢谢!

【问题讨论】:

    标签: javascript stream rxjs twitter-streaming-api


    【解决方案1】:

    RxJS 5 没有任何方法可以将流从 Observable 转换为 Observable,因此您需要自己完成此操作。最好是Observable.create

    const Rx = require('rxjs');
    const Observable = Rx.Observable;
    
    var TwitterStream = require('twitter-stream-api'),
    
    ...
    
    var source$ = Observable.create(observer => {
      var Twitter = new TwitterStream(keys);
      Twitter.stream('statuses/filter', {
        track: filter
      });
    
      Twitter.on('data', function (obj) {
        observer.next(obj);
      });
    
      return () => {
        Twitter.close();
      };
    });
    

    这使得只有当你订阅它时才会连接到 Twitter 的冷 Observable。 Observable.create 静态方法让我们将值推送给观察者,最后返回一个拆卸函数,然后关闭连接。当您取消订阅或 Observable 完成时调用此函数。

    然后你可以用你想要的任何东西链接这个 Observable:

    source$.filter(...).map(...)
    

    请注意,还有 Observable.bindCallback()Observable.bindNodeCallback() 方法,但在您的情况下这些方法对您没有多大帮助。

    阅读更多:

    【讨论】:

    • 非常感谢您的回答!我还没有时间正确地尝试它。当我得到'Observable is not defined'时的快速问题,它不应该是 Rx.Observable.create() 吗?还是我需要(导入)更多东西?
    • @JimJimL 现在查看代码,我还添加了require() 调用。
    【解决方案2】:

    这是一个使用 desmondmorris/node-twitter 和 rxjs 5 的示例。

    const Observable = require('rxjs').Observable;
    
    Observable
      .of(new require('twitter')({
        consumer_key: 'xxxx',
        consumer_secret: 'xxxx',
        access_token_key: 'xxxx',
        access_token_secret: 'xxxx',
      })).mergeMap(twitter =>
        Observable
        .fromEvent(twitter.stream('statuses/filter', {
            track: 'Stack Overflow'
          }),
          'data'))
      .filter(tweet => tweet.user.follow_count > 10000)
      .subscribe(console.log);

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-06-15
      • 1970-01-01
      • 2014-04-30
      相关资源
      最近更新 更多