【问题标题】:Angular 2 / RXJS - need some help batching requestsAngular 2 / RXJS - 需要一些帮助批处理请求
【发布时间】:2016-10-27 23:25:03
【问题描述】:

我一直在阅读 rxjs 文档,但却迷失在所有的运算符中..

这是我目前得到的

  let obs = Observable.from([1, 3, 5])   

所以我需要做的是take() 数组中的一些设定数量。在发布请求中使用结果,当结果成功时,我需要重新启动该过程。我想收集所有结果,并在过程中保持进度(用于进度条)

我不需要所有这些的代码。我真正需要知道的是如何使用 rxjs 来拆分这个数组。发送它的一部分,然后重新启动该过程,直到没有任何东西可以发送。

最终解决方案

  var _this = this

  function productsRequest(arr) {
    return _this.chainableRequest('post', `reports/${clientId}/${retailerId}/`, loadedProductsReport, {
        'identifiers': arr,
        'realTime': true
      })    
  }

  let arrayCount = Math.ceil(identifiers.length/10)
  let obs = Observable.from(identifiers)            
    .bufferCount(10)
    .concatMap(arr => {
      arrayCount--
      return arrayCount > 0 ? productsRequest(arr) : Observable.empty()
    })


  let subscriber = obs.subscribe(
    value => console.log(value)
  )

父级中的可链接请求方法

  chainableRequest(method: string, endpoint: string, action: Function, data = {}, callback?: Function){
let body = (<any>Object).assign({}, {
  headers: this.headers
}, data)


return this._http[method.toLowerCase()](`${this.baseUri}/${endpoint}`, body, body)
          .map((res: Response) => res.json())
  }

【问题讨论】:

    标签: angular rxjs observable rxjs5


    【解决方案1】:

    这在很大程度上取决于您要达到的目标。

    如果您想基于之前的某个 Observable 递归调用 Observable,并且您不知道要调用多少次,请使用 expand() 运算符。

    例如,此演示基于上一次调用的响应(count 属性)递归创建 5 个请求:

    import { Observable } from 'rxjs/Observable';
    
    function mockPostRequest(count) {
        return Observable.of(`{"count":${count},"data":"response"}`)
            .map(val => JSON.parse(val));
    }
    
    Observable.of({count: 0})
        .expand(response => {
            console.log('Response:', response.count);
            return response.count < 5 ? mockPostRequest(response.count + 1) : Observable.empty();
        })
        .subscribe(undefined, undefined, val => console.log('Completed'));
    

    打印到控制台:

    Response: 0
    Response: 1
    Response: 2
    Response: 3
    Response: 4
    Response: 5
    Completed
    

    观看现场演示:http://plnkr.co/edit/lKNdR8oeOuB2mrnR3ahQ?p=preview

    或者,如果您只想依次调用一堆 HTTP 请求(concatMap() 运算符)或一次调用所有请求并在它们到达时使用它们(mergeMap() 运算符):

    Observable.from([
        'https://httpbin.org/get?1',
        'https://httpbin.org/get?2',
        'https://httpbin.org/get?3',
      ])
      .concatMap(url => Observable.of(url))
      .subscribe(response => console.log(response));
    

    打印到控制台:

    https://httpbin.org/get?1
    https://httpbin.org/get?2
    https://httpbin.org/get?3
    

    观看现场演示:http://plnkr.co/edit/JwZ3rtkiSNB1cwX5gCA5?p=preview

    【讨论】:

    • 我试图完成的是从数组中获取 x 个项目 -> 将其传递给 http 请求。等待http请求完成,然后从头开始再次传递项目。我已经尝试了你所有的例子,但没有看到如何让它适用于我想要做的事情..
    • @hamobi 一次运行一个 Observables 可以使用 concatMap() 完成。然后你想从同一个数组重新启动进程还是什么?
    • 是的,我想从同一个阵列重新启动进程。我想从数组中发送十个项目,然后是接下来的十个项目,依此类推..直到我到达数组的末尾。谢谢:)
    • @hamobi 然后使用expand() 而不是concatMap()
    • 我已经更新了我的工作......现在使用扩展。我的请求仍在一次全部触发..你能看到我做错了什么吗?
    猜你喜欢
    • 1970-01-01
    • 2017-07-23
    • 1970-01-01
    • 1970-01-01
    • 2014-10-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多