【问题标题】:Can I generate a sequence of Rx Observables from chained AJAX calls?我可以从链接的 AJAX 调用中生成一系列 Rx Observables 吗?
【发布时间】:2016-03-12 04:07:17
【问题描述】:

技术背景

我在 Web 浏览器中使用 jQuery 来调用从日志中返回一组条目的 API。

API 请求有两个参数:

  • offset_timestamp: 一个整数,指定我想要的最早的可能条目
  • limit:一个整数,指定要返回的记录数

请求和响应示例

Request with parameters:

- offset_timestamp = 100
- limit = 50


curl "https://example.com/log?offset_timestamp=100&limit=5"


Resulting JSON Response:

{
    next_timestamp: 222,
    end_of_log: false,
    entries: [
        {
            timestamp: 111,
            data: { ... }
        },
        {
            timestamp: 112,
            data: { ... }
        },

        ...

        {
            timestamp: 160,
            data: { ... }
        }
    ]
}

如果我使用普通的 jQuery + 回调,我想我必须递归地链接 AJAX 调用。大致如下:

// NOTE: I have NOT tested this code.
//       I just wrote it for illustration purposes

function getBatch(offset, limit, callback) {
    var url = "https://example.com/logs?offset_timestamp=" + offset + "&limit=" + limit;
    var ajaxParams = {
        method: "GET",
        url: url
    };

    jQuery.ajax(ajaxParams))
        .done(function(data) {
            if (data.end_of_log) {
                return callback(data.entries);
            }
            return getBatch(data.next_timestamp, limit, function(entries) { 
                return callback(entries.concat(data.entires));
            });
        });
}

function processEntries(entries) {
    for (var i = 0; i < entries.length; i++) {
        console.log(i.data);
    }
}

getBatch(0, 50, processEntries);

当然,我宁愿拥有一系列 Observable(每个都持有一批条目),这样我就可以使用 flatMap() 来获取所有条目的序列。

问题

如果从 jQuery 调用创建 Observable,例如Rx.Observable.fromPromise(jQuery.ajax({...})),是否可以使用 RxJS 将任意数量的这些 Observable 链接在一起,使用后续调用的参数中先前调用的 response.next_timestamp 的值?

【问题讨论】:

    标签: javascript jquery ajax rxjs reactivex


    【解决方案1】:

    我会尝试使用expand 运算符。您还可以在此处查看使用示例:RxJs: How to loop based on state of the observable?,以及此处:RxJS, how to poll an API to continuously check for updated records using a dynamic timestamp

    简而言之,它允许您使用可观察对象进行递归,通过返回 Rx.Observable.empty() 来表示递归结束。

    【讨论】:

      【解决方案2】:

      Rx.Subject 允许我们写入(推送数据)以及从(订阅)流中读取。 Rx.Subject 的行为类似于 Rx.Observable,但是,实例有一个 onNext 方法,允许我们推送数据。

      我们可以将我们的请求建模为Rx.Subject,并让我们的响应流依赖于它。然后我们可以订阅响应并将下一个请求推送到请求流:

      // Represents our stream of requests
      const request$ = new Rx.Subject();
      
      // Represents our AJAX responses
      const response$ = request$
          .startWith(initialRequest)
          .flatMap(makeAjaxCall)
      
          // Convert to a hot stream so we don't end up making an AJAX
          // request for every subscriber of this stream.
          .share();
      
      // And we can map over response$ to get the updated timestamp
      const timestamp$ = response$.map(getTimeStamp);
      
      // And we can subscribe to the timestamp$ stream to push new requests
      timestamp$.subscribe(timestamp => {
          const request = makeRequestFromTimestamp(timestamp);
      
          // Call onNext to push a new item onto our request$ stream
          request$.onNext(request);
      });
      
      // And we can subscribe to our responses. Note that our response$
      // stream will continue indefinitely.
      response$.subscribe(x => console.log(x));
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-03-18
        • 2012-10-13
        • 2011-06-17
        • 2023-03-16
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多