【问题标题】:RxJS Operator to delay handling subsequent eventsRxJS Operator 延迟处理后续事件
【发布时间】:2021-09-25 14:06:16
【问题描述】:

我正在寻找一个 RxJS 运算符(或运算符的组合),它可以让我实现这一目标:

  1. observable 触发的每个事件都应按其触发顺序进行处理(不得跳过、限制、去抖动任何值)。
  2. 我应该能够在处理 2 个后续事件之间定义一个设定的时间间隔。
  3. 如果当前没有处理以前的事件,则应立即处理下一个事件(无延迟)。

我创建了following example,我希望它会更清楚:

import { Component, OnInit } from "@angular/core";
import { Subject } from "rxjs";
import { delay } from "rxjs/operators";

@Component({
  selector: "app-root",
  templateUrl: "./app.component.html",
  styleUrls: ["./app.component.css"]
})
export class AppComponent implements OnInit {
  counter = 1;
  displayedValue = 1;

  valueEmitter = new Subject<number>();
  valueEmitted$ = this.valueEmitter.asObservable();

  ngOnInit() {
    // I want to allow each incremented value to be displayed for at least 2 seconds
    this.valueEmitted$.pipe(delay(2000)).subscribe((value) => {
      this.displayedValue = value;
    });
  }

  onClick() {
    const nextCounter = ++this.counter;
    this.counter = nextCounter;
    this.valueEmitter.next(nextCounter);
  }
}

在此示例中,我希望在 counter 中增加的每个值都显示为 displayedValue 至少 2 秒。

因此,当用户第一次点击 onClick 时,显示的值应立即从 1 变为 2,但如果用户继续快速增加 counterdisplyedValue 应以延迟的方式跟随,允许每个之前增加的数字要显示 2 秒再改变,慢慢赶上当前的计数器值。

这可以通过 RxJs 操作符实现吗?

更新

目前我想出了following solution

import { Component, OnInit } from "@angular/core";
import { Subject } from "rxjs";

@Component({
  selector: "app-root",
  templateUrl: "./app.component.html",
  styleUrls: ["./app.component.css"]
})
export class AppComponent implements OnInit {
  counter = 1;
  displayedValue = 1;
  currentDisplayedValueStartTime = 0;
  readonly DISPLAY_DURATION = 2000;
  valuesToDisplay = [];
  displayedValueSwitcherInterval: number | null = null;

  valueEmitter = new Subject<number>();
  valueEmitted$ = this.valueEmitter.asObservable();

  ngOnInit() {
    this.valueEmitted$.pipe().subscribe((value) => {
      this.handleValueDisplay(value);
    });
  }

  onClick() {
    const nextCounter = ++this.counter;
    this.counter = nextCounter;
    this.valueEmitter.next(nextCounter);
  }

  handleValueDisplay(value) {
    this.valuesToDisplay.push(value);

    if (!this.displayedValueSwitcherInterval) {
      this.displayedValue = this.valuesToDisplay.shift();

      this.displayedValueSwitcherInterval = window.setInterval(() => {
        const nextDiplayedValue = this.valuesToDisplay.shift();
        if (nextDiplayedValue) {
          this.displayedValue = nextDiplayedValue;
        } else {
          clearInterval(this.displayedValueSwitcherInterval);
          this.displayedValueSwitcherInterval = null;
        }
      }, this.DISPLAY_DURATION);
    }
  }
}

这不是我想要的,因为它依赖于组件状态和setInterval 来管理通知队列。我希望找到一些更干净、更具声明性的东西,它依赖于 RxJs 操作符来处理我的通知流和它们的间隔队列——但至少我让这个功能完全按照我的意愿工作。仍然愿意接受有关如何使其变得更好的建议。

【问题讨论】:

  • 我还是不清楚。特别是#3。给我一些具体的用例。
  • @SerejaBogolubov - 将示例中的点击视为一次(或以非常短的时间间隔)从服务器返回的多个响应,并且您希望为每个响应显示 2 秒的快餐栏消息随后。 #3 - 你不想在显示第一个小吃店之前等待 2 秒。你查看sandbox了吗?

标签: javascript angular typescript rxjs observable


【解决方案1】:

我认为这可能是一种解决方法:

this.valueEmitted$
  .pipe(
    concatMap(
      (v) => concat(
        of(v),
        timer(2000).pipe(ignoreElements())
      )
    )
  )
  .subscribe((value) => {
    this.displayedValue = value;
  });

因为timer 也会发出一些值(0、1、2 等等),所以我们使用ignoreElements,因为我们对这些值不感兴趣,而我们想要从timer 得到的只是 complete通知 这将指示可以订阅下一个内部可观察对象的时刻。请注意,前面提到的内部 observable 是通过以v 为参数调用concatMap 的回调函数创建的。

Working demo.


observable 触发的每个事件都应按触发顺序处理(不跳过、限制、去抖动任何值)。

这在concatMap 的帮助下得到保证。

我应该能够在处理 2 个后续事件之间定义一个设定的时间间隔。

我们可以通过以下方式实现:

concat(
  // First, we send the value.
  of(v),
  // Then we set up a timer so that in case a new notification is emitted, it will have to
  // wait for the timer to end.
  timer(2000).pipe(ignoreElements())
)

如果当前没有处理之前的事件,则应立即处理下一个事件(无延迟)。

如果concatMap 没有创建活动的内部可观察对象,则值v 将立即发送给消费者。


这也可以:

this.valueEmitted$
  .pipe(
    concatMap(
      (v) => timer(2000).pipe(
        ignoreElements(), startWith(v)
      )
    )
  )
  .subscribe((value) => {
    this.displayedValue = value;
  });

【讨论】:

  • 太棒了!看起来这是我正在寻找的解决方案。谢谢。我认为我更喜欢第二种选择 - 更简洁、更具声明性,但两者都很棒。
【解决方案2】:

所以你的评论让它更清楚了。

显然,您需要一些状态。因为您不能一次显示所有内容,所以用户的屏幕不是无限的。它一次不能处理超过.. 3.. 4.. 5 个通知。

scan 是您在发出每个事件的rxjs 中处理状态的方式。

您的scan 可能包含两个列表:一个用于当前呈现的通知,另一个用于未决通知。列表确实与scan 一样保持顺序,因此不会违反顺序。

然后,您需要能够根据您的事件采取不同的行动。我能想象的最简单的事情是元组(event_type_as_sting_or_number, event_specific_params)。有了它,您将能够正确计算您的下一个状态(将由 scan 发出),然后优雅地处理它(如重新呈现通知)。

看来你需要三个事件:

  1. 新的事件来自外太空。
  2. 某些通知已过期,必须从屏幕上移除。
  3. 用户确认了一些通知,但保留它没有任何意义。

所以整个管道是:merge 你所有的事件到一个单一的流中,用event_type“标记”它们; scan 在该流上,[], [] 作为您的初始状态(没有待处理,也没有活动通知); if else if else 基于 event_typescan 中正确计算下一个状态;获取结果并呈现所有活动通知。


来自cmets:

我仍然看不到我如何管理此解决方案中的时间间隔。我的意思是,只有在显示当前活动通知的设定间隔之后,我才能将通知从待处理数组移动到活动数组。

基本上,有两种选择。由于您没有严格限制显示某些通知的给定限制,您可以简单地让全局interval 发出,比方说,每个your_limit / 2 时间单位。当您将其移动到活动通知列表时,您拥有的每个通知都可以使用简单的时间戳进行扩展。下次触发间隔时,您会遍历该列表并过滤掉时间增量为&gt;= your_limit 的所有通知。这有点不准确,但可以轻松配置和调整为对您的用户透明。我投票赞成这个解决方案,因为它很简单。

另一种解决方案是在每个通知放在活动列表中后触发timer。计时器完成后,您发送一个事件(notification_must_die, { notification_id: ... }),然后您的scan 处理该事件;必须准备好这样的notification_id 不再存在(例如,它可以在之前被取消)。它很复杂,但要准确得多。

【讨论】:

  • 感谢@SerejaBogolubov,为简单起见,您是否有机会展示如何将其应用于我提供的沙箱中的示例? (将pipe 中的当前delay 替换为scan 实现)
  • @itaydafna 这需要一些时间来实现,我建议您按照我的解释自己尝试一下,如果您发现任何复杂情况,请使用最新代码更新您的答案,我们看看如何解决你的问题。
  • 我仍然看不到如何管理此解决方案中的间隔。我的意思是,如何将通知从 pending 移动到 active 数组,仅在显示当前活动通知的设置间隔之后。
  • 我已经用我想出的不使用scan 的替代解决方案更新了我的问题。所以现在的问题包括我试图实现的working example。如果您认为使用scan 可以更好地处理此示例,我将很高兴听到。谢谢。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-10-05
  • 1970-01-01
  • 1970-01-01
  • 2018-10-17
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多