【问题标题】:Rxjs angular 6/7 mergeMap delay http requestRxjs angular 6/7 mergeMap 延迟 http 请求
【发布时间】:2018-11-27 20:34:22
【问题描述】:

我想使用此代码发送请求(我也尝试过 forkJoin),但调用之间有延迟:

duplicateElement(id: string): Observable<any> {
    return this.http.get({ routeName: 'route_name', params: { id } });
}

duplicateElements(ids: string[]): Observable<any> {
    return from(ids)
    .pipe(
        mergeMap(id => this.duplicateElement(id).pipe(delay(1000))
    ));
}

但是 .pipe(delay(1000) 没有按我的预期工作:在 1000 毫升之后发送每个 http 请求。

【问题讨论】:

  • 你希望它如何工作?
  • 我想以 1000 毫升的延迟发送每个 http 请求

标签: javascript angular rxjs6


【解决方案1】:

现在有两种选择!

基本设置

import * as rx from "rxjs";
import * as rxop from "rxjs/operators";

const startTime = new Date();

function getTimestamp() {
  return (new Date().getTime() - startTime.getTime()) / 1000;
}


const desiredDelay = 750;
const serviceDelay = 500;

// simulating service, you can ignore what's inside
var myService = (b: any) => {
  return rx.of(Math.random()).pipe(
    // To simulate long running service
    rxop.delay(serviceDelay),
    // Log the timestap after execution, should be ~ desiredDelay + serviceDelay, so by default 1250ms each emitted value
    rxop.tap(() => console.log(`${b} after service result, ${getTimestamp()}`)),
    // simulating the result
    rxop.map(a => "result" + b)
  );
};

延迟后一个一个发出值,尽快执行服务并收集结果

of([1, 2, 3, 4, 5])
  .pipe(
    // See the initial values
    tap(console.log),
    // Split array into single values during emit
    // Collect observables and subscribe to next when previous completes
    concatAll(),
    // Emit each value as a sequence of observables with a desired delay
    concatMap(a => of(a).pipe(delay(desiredDelay))),
    // Call service on each value as soon as possible, do not care about the order
    mergeMap(a => myService(a)),
    // Reduce single values back into array
    // Reduces the values from source observable to a single value that's emitted when the source completes
    reduce<any>((acc, val) => [...acc, val], []),
    // See the result, not necessary
    tap(console.log)
  )
  .subscribe();

在一个值上调用服务,等待延迟,然后用另一个值调用服务

of([1, 2, 3, 4, 5])
  .pipe(
    // See the initial values
    tap(console.log),
    // Split array into single values during emit
    // Collect observables and subscribe to next when previous completes
    concatAll(),
    // Call the service
    // Map values to inner observable, subscribe and emit in order
    concatMap(a => myService(a).pipe(delay(desiredDelay))),
    // Reduce single values back into array
    // Reduces the values from source observable to a single value that's emitted when the source completes
    reduce<any>((acc, val) => [...acc, val], []),
    // See the result, not necessary
    tap(console.log)
  )
  .subscribe();

【讨论】:

  • 谢谢Jacek,但是这种方式在1000mls之后发送所有请求,但我想每1000mls发送一个id请求
  • 试试这个。如果这不起作用,当我真正在我的电脑前时,我将能够给你更好的结果:) return from(unitIds).pipe( switchMap(id =&gt; this.duplicateElement(id).pipe( delay(1000))), concatAll() );
  • 谢谢雅克!很好的解释和例子。但是我们需要使用mergeMap,因为我们希望请求是“伪”(有延迟)并行的,使用concatMap,我认为它们是顺序的。
  • 已更新。请注意,当同时触发时,实际保留调用顺序更加困难。如果您需要特定的调用顺序,我宁愿提出稍微不同的想法
  • 很棒的例子!在我们的例子中,顺序并不重要,所以效果很好......非常感谢 Jacek!
【解决方案2】:

这个怎么样:

duplicateElements(ids: string[]): Observable<any> {
    return interval(1000).pipe( // start emitting every 1000ms
        take(ids.length),  // limit emissions to length of array
        map(i => ids[i]),  // map (change) emission to the array item @ index i 
        mergeMap(id => this.duplicateElement(id)) // add the http request
    )
}

【讨论】:

  • 嗯......它不起作用怎么办?我确实想到了需要添加的一项......我将使用take(ids.length) 更新答案,以便在正确的尝试次数后间隔停止发出。此外,您是否希望每个后续的 http 调用都替换前一个 http 调用,还是希望它添加另一个 http 调用? mergeMap 将继续构建它们,但如果您想替换它们,那么 switchMap 可能是更好的选择。
  • 谢谢,太好了!,是的,你是对的......在正确的尝试次数后发送未定义
猜你喜欢
  • 2017-08-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-11-24
  • 2018-01-02
  • 1970-01-01
相关资源
最近更新 更多