【问题标题】:Observable combine multiple function calls into single ObservableObservable 将多个函数调用组合成一个 Observable
【发布时间】:2018-11-16 06:04:20
【问题描述】:

我有一个基于参数执行 http 请求的函数。我想添加某种“去抖动”功能。因此,如果函数在设定的时间窗口内被多次调用,我想将参数组合到一个请求中,而不是发出多个请求。

我想通过 Observables 和 Angular 来实现这一点。这听起来并不复杂,但是我无法让它运行,也许我错过了一些东西。

现在让我们跳过单个请求中的组合,因为这可以通过聚合去抖动或 Oberservable.buffer 来完成。我无法组合单个 Observables。

这是我迄今为止尝试过的。

我尝试使用主题,因为这似乎是本案例的正确对象 (https://stackblitz.com/edit/angular-hcn41v?file=src%2Fapp%2Fapp.component.ts)。

constructor(private http: HttpClient) {
  this.makeRequest('1').subscribe(x => console.log(x))
  this.makeRequest('2').subscribe(console.log)
  setTimeout(() => {
    this.makeRequest('3').subscribe(console.log)
  }, 1000)
}

private makeRequest(id: string) {
  this.observable = this.observable.pipe(
    merge(Observable.of(id).pipe(delay(1)))
  )
  return this.aggregateDebounce(this.observable)
}

private getUrl(value) {
  console.log('getUrl Call', value);
  return 'https://jsonplaceholder.typicode.com/posts/1';
}

private aggregateDebounce(ob$) {
  const shared$ = ob$.publishReplay(1).refCount()
  return shared$.buffer(shared$.debounceTime(75))
}

我希望每个函数调用都有一个“getUrl Call”日志和一个结果日志。但是,如果我向 this.makeRequest() 添加超过 1 个调用,我只会得到结果,结果也很奇怪。也总是返回所有以前的值。我想我不完全理解 Subject 在这种情况下是如何工作的。

另一种方法(取自这里RXJS: Aggregated debounce)是创建某种聚合去抖动(https://stackblitz.com/edit/angular-mx232d?file=src/app/app.component.ts

constructor(private http: HttpClient) {
  this.makeRequest('1').subscribe(x => console.log(x))
  this.makeRequest('2').subscribe(console.log)
  setTimeout(() => {
    this.makeRequest('3').subscribe(console.log)
  }, 1000)
}

private makeRequest(id: string) {
  this.observable = this.observable.pipe(
    merge(Observable.of(id).pipe(delay(1)))
  )
  return this.aggregateDebounce(this.observable)
}

private getUrl(value) {
  console.log('getUrl Call', value);
  return 'https://jsonplaceholder.typicode.com/posts/1';
}

private aggregateDebounce(ob$) {
  const shared$ = ob$.publishReplay(1).refCount()
  return shared$.buffer(shared$.debounceTime(75))
}

在这种情况下,我遇到了问题,我也得到了所有以前的值。

理论上(至少对我而言)这两种变体听起来都是合理的,但我似乎遗漏了一些东西。任何朝着正确方向的眨眼都会受到高度赞赏。

编辑:

根据要求,我添加了最终的现实世界目标。

想象一个从 API 请求信息的服务。在 50-75 毫秒内,您调用具有特定 ID 的服务。我想将这些 id 组合到一个请求而不是执行 3 个请求。如果 100 毫秒后再次调用该服务,则会完成一个新请求

【问题讨论】:

  • 你的意思是如果你拿到了之前执行过的参数,你就不想再执行一次了吧?多久不想重新运行?
  • 完全正确。我添加了一个示例以进一步说明。 50-75ms
  • 我已经更新了我的答案以包含一个可能的解决方案。
  • 非常感谢。如果我能把它应用到我的例子中,我会看看。

标签: javascript angular typescript rxjs observable


【解决方案1】:
this.makeRequest(1).subscribe();

private makeRequest(number: number) {
  this.values.next(number);
  return this.idObservable.pipe(

在订阅之前发出值 -> 值丢失。

private values: Subject = new Subject();
private idObservable = this.values.pipe(

private makeRequest(number: number) {
  this.values.next(number);
  return this.idObservable.pipe(    

每次调用都会基于主题创建一个新的 observable。每当您发出一个值时,所有订阅者都会收到该值。

一个可能的解决方案可能看起来像这样(我在这里使用新的 rxjs 语法):

subject: Subject<String> = null;
observable = null;
window = 100;

constructor() {
  this.subject = null;
  this.window = 100;

  this.makeRequest('1').subscribe(console.log)
  this.makeRequest('2').subscribe(console.log)
  setTimeout(() => {
    this.makeRequest('3').subscribe(console.log)
  }, 1000)
}

private makeRequest(id: string) {
  if (!this.subject) {
    this.subject = new ReplaySubject()
    this.observable = this.subject.pipe(
      takeUntil(timer(this.window).pipe(take(1))),
      reduce((url, id, index) => this.combine(url, id), baseUrl),
      flatMap(url => this.request(url)),
      tap(() => this.subject = null),
      share()
    )
  }      

  this.subject.next(id);
  return this.observable;
}  

combine 创建 URL,request 发出实际请求。

【讨论】:

  • 感谢您的回答。为什么每次通话都会创建一个新主题?主题本身是否只是在一开始就创建了一次?但是如果我为每个 id 添加一个流,我最终如何将它们结合起来?我需要某种“单流”才能做到这一点,还是我错了?
  • @NicolasGehlert 写了“基于主题的新 observable”,而不是“新主题”。我假设您想对具有相同 ID 的请求进行分组。如果不是这种情况,那么您希望每个时隙有一个可观察对象,但这仍然是每个“单元”有一个可观察对象,而不是只有一个主题。
  • 啊好的。是的,这更有意义。是的,不是同一个ID。但不同的 id 进入同一个请求。以及如何创建“每个时隙的可观察”? (不创建计时器)
  • @NicolasGehlert 好问题。 windowTime 运算符通常可以使用,但我不确定它是否适合这里。如果您不想要计时器,您可以存储上次创建新可观察对象的时间,并在必要时创建一个新的。但是计时器实际上是一个好主意,因为它是可靠地完成流的唯一方法。
  • 好的,计时器可能是有意义的。但问题仍然是我如何实现这一目标。第一次函数调用,启动计时器,第二次调用检查计时器是否正在运行,组合可观察对象,启动新计时器......而且我不希望有一个在后台运行整个时间并每隔 x 秒池化的间隔。我猜这是浪费内存
【解决方案2】:

Rxjs 非常擅长处理这种情况。你需要两个不同的主题:

  1. 一个将用于收集和合并所有请求
  2. 第二个将用于订阅结果

当一个请求发出时,该值将被推送到第一个主题,但第二个将被返回,抽象出组合请求的内部逻辑。

private values: Subject = new Subject();
private results: Subject = new Subject();

private makeRequest(number: number) {
  this.values.next(number);
  return this.results;
}

根据需要,用于合并请求的管道可以是bufferdebounceTime,如问题或其他逻辑所示。收到响应后,只需将其推送到结果主题:

constructor(private http: HttpClient) {
  this.values
    .pipe(
      buffer(this.values.pipe(debounceTime(1000))),
      switchMap(values => this.getUrl(values)),
      map(response => this.results.next(response)))
    .subscribe();
}

在将响应推送到结果之前,我使用了 switchMap 来模拟异步请求。

此处为完整示例:https://angular-8yyvku.stackblitz.io

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-10-29
    • 2019-03-05
    • 2020-06-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-14
    相关资源
    最近更新 更多