【问题标题】:RxJS Filtered/Grouped DebounceRxJS 过滤/分组去抖动
【发布时间】:2021-04-14 08:51:41
【问题描述】:

我正在尝试使用 RxJS debounce 运算符,但我想自定义源的排放何时被去抖。

默认情况下,来自去抖窗口内的源的任何发射都会导致之前的发射被丢弃。我希望根据源排放的值,只有某些源排放计入去抖操作。

假设我有一个如下所示的可观察对象:

{
  priority: 'low'    //can be 'low' or 'medium' or 'high
}

我希望去抖动按对象的优先级分组。这意味着只有当一个发射具有相同的优先级时,它才会被另一个发射去抖动。

即只有'low' 排放可以去抖动'low' 排放,并且只有'high' 排放可以去抖动'high' 排放。如果'medium' 发射在'low' 发射等待时出现,则不会导致'low' 发射被丢弃。

这意味着如果我有一个'low' 发射和一个'medium' 发射快速连续,两者都会通过。如果我快速连续发出两个'low',则只有最后一个会通过。

这是我想出的:

const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000

$source.pipe(
    mergeMap(value => {
       
        // We start a race of the value with a delay versus any other emissions from the source with the same priority
        return race(
            timer(delay).pipe(mapTo(value)),
            $source.pipe(
                filter(v => v.priority === value.priority),
            )
        ).pipe(
            take(1),
            // If another emission with the same priority comes before the delay, the second racer it will win the race.
            // If no emission with the same priority comes, the first racer will win.
            //
            // If the first racer wins, this equality check is satisfied and the value is passed through.
            // If the second racer wins, the equality check fails and no value is emitted. Since this is a mergeMap, this whole process will start again for that emission.
            filter(v => v === value),
        )
    })
)

认为以上是正确的,但我想知道我是否遗漏了某些东西或使这种方式比需要的更复杂?上面的代码应该像合并$low.pipe(debounceTime(delay)) $medium.pipe(debounceTime(delay))$high.pipe(debounceTime(delay)) 的三个独立流一样运行。

谢谢!!

【问题讨论】:

    标签: javascript rxjs reactive-programming redux-observable


    【解决方案1】:

    我认为你的答案有效。这也很清楚。但是,您必须确保您的 $source 已被多播。

    我认为您的方法有一个缺点:

    你做了很多额外的计算。如果您每秒对 1000 个值进行去抖动处理,它可能会明显变慢,具体取决于运行的位置。

    每个流式传输的值都可以参加任意数量的比赛。来自不同优先级的输入仍然相互竞争,并且当下一个值开始竞争时,前一个竞争不会停止,因此如果大量值同时到达,您可能会出现定时器/竞争的爆炸式增长。

    设置和删除了很多额外的计时器。在您的情况下,您最多需要三个计时器,每个计时器都会在相同优先级的新值到达时重置。

    如果您的代码不在可能不是问题的关键路径上。否则,还有其他方法。不过,我想出的那个在代码方面有点笨重。

    对流进行分区

    这是我的大脑如何解决这个问题的。我创建了一个操作符,它执行 RxJS partition 操作符的功能,但允许您划分为两个以上的流。

    我的方法在内部处理多播,因此源可以是任何东西(热、冷、多播或不)。它(在内部)为每个流设置一个主题,然后您可以像往常一样使用 RxJS 的 debounceTime。

    虽然有一个缺点。在您的方法中,您可以随意添加一个新的优先级字符串,它应该会继续工作。 {priority: "DucksSayQuack"} 的对象会相互去抖动,不会影响其他优先级。这甚至可以即时完成。

    下面的partitionOn 运算符需要提前知道分区。对于您描述的情况,它应该具有相同的输出并且启动效率更高。

    这样更好吗?我不知道,这是解决同一问题的一种有趣且不同的方法。另外,我认为partitionOn 运算符的用途比分区去抖动要多。

    操作员

    /***
     * Create a partitioned stream for each value where a passed 
     * predicate returns true
     ***/
    function partitionOn<T>(
      input$: Observable<T>, 
      predicates: ((v:T) => boolean)[]
    ): Observable<T>[] {
      const partitions = predicates.map(predicate => ({
        predicate,
        stream: new Subject<T>()
      }));
    
      input$.subscribe({
        next: (v:T) => partitions.forEach(prt => {
          if(prt.predicate(v)){
            prt.stream.next(v);
          } 
        }),
        complete: () => partitions.forEach(prt => prt.stream.complete()),
        error: err => partitions.forEach(prt => prt.stream.error(err))
      });
    
      return partitions.map(prt => prt.stream.asObservable());
    }
    

    使用 partitionOn 进行优先去抖动

    const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
    const delay = 1000;
    
    const priorityEquals = a => b => a === b?.priority;
    
    merge(
      ...partitionOn(
        $source,
        [priorityEquals('low'),
        priorityEquals('medium'),
        priorityEquals('high')]
      ).map(s => s.pipe(
        debounceTime(1000)
      ))
    );
    

    为您的流添加时间戳

    这种方法与您的方法非常相似,可让您再次随意使用优先级字符串。这有一个类似的问题,每个值都被扔到一个计时器中,并且在新值到达时计时器不会被取消。

    但是,使用这种方法,取消不必要的计时器的路径会更加清晰。您可以将订阅对象与时间戳一起存储在 priorityTimeStamp 映射中,并确保在新值到达时取消订阅。

    我真的不知道这可能会对性能造成什么影响,我认为 JavaScript 的事件循环非常健壮/高效。这种方法的好处是您无需支付多播的成本。这实际上只是一个使用查找映射来决定过滤哪些内容和不过滤哪些内容的流。

    priorityDebounceTime 运算符

    function priorityDebounceTime<T>(
      dbTime: number, 
      priorityStr = "priority"
    ): MonoTypeOperatorFunction<T> {
    
      return s => defer(() => {
        const priorityTimeStamp = new Map<string, number>();
        return s.pipe(
          mergeMap(v => {
            priorityTimeStamp.set(v[priorityStr], Date.now());
            return timer(dbTime).pipe(
              timestamp(),
              filter(({timestamp}) => 
                timestamp - priorityTimeStamp.get(v[priorityStr]) >= dbTime
              ),
              mapTo(v)
            )
          })
        )
      });
    
    }
    

    使用 priorityDebounceTime 进行优先级去抖动

    这显然有点简单:

    const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
    const delay = 5000;
    
    $source.pipe(
      priorityDebounceTime(delay)
    ).subscribe(console.log);
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-07-22
      • 1970-01-01
      • 2018-03-08
      • 2021-02-20
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多