【问题标题】:Merging two Observables with one taking higher priority合并两个 Observable,其中一个具有更高的优先级
【发布时间】:2015-04-10 09:07:42
【问题描述】:

是否可以使用 ReactiveExtensions 来实现以下功能;

  • 两个 Observable,一个是“高”优先级,另一个是“低”优先级

  • 将两个 Observable 合并为一个,然后可以订阅该 Observable,目的是使生成的 Observable 始终在任何低优先级项之前发出高优先级项。

我知道这可以使用两个 ConcurrentQueue 集合和类似的东西更简单地实现;

return this.highPriorityItems.TryDequeue(out item) 
    || this.lowPriorityItems.TryDequeue(out item);

但是这种方法存在一些问题,例如不能像 Observable 那样“订阅”(因此一旦队列耗尽,处理就会结束,而无需大量额外的花言巧语将其推入任务)。

此外,我有兴趣在队列上应用一些额外的过滤,例如限制和“在更改前区分”,因此 Rx 在这里看起来很自然。

【问题讨论】:

  • 所以“低”可观察手(无论这段新代码是什么)是一个值。这段代码是什么意思?永远等待,看看“高”可观察对象是否会永远产生另一个值?我很难看到优先级如何发挥作用,只是继续使用Merge 并忽略优先级。
  • @Damien_The_Unbeliever,这个想法基本上是“合并”,但高优先级的项目会跳到前面。那有意义吗?我确实想以相同的方式处理项目的两个来源,因此合并和不知道优先级的订阅。但我关注的是它们到达的顺序 - 假设我有 20 个低优先级项目通过,一个新的高优先级项目被提交。我希望接下来发出它,而不是在其他 20 个项目之后。
  • 但是您一次只能看到一个项目。如果到目前为止“低”优先级已超过您的 5 个项目,并且打算再向您发送 15 个,那又如何呢?无论如何,您当然无法完全从可观察到的情况中发现这一事实。
  • 就像你说的,我的订阅不会知道或关心优先级或“管道中”项目的数量,但安排合并的 observable 发出下一个项目的顺序肯定是有价值的?假设我的订阅是显示通知,并且正在流式传输各种低优先级的消息。但是突然间,接下来需要显示一条非常重要的消息,因此您希望它“跳队列”并成为订阅处理的下一件事情.这就是我希望通过两个合并的 Observables 和一个订阅实现的精神。
  • 您的问题令人困惑,因为 Rx 并没有真正的项目位于队列中等待处理的概念。请画出你所期望的行为的大理石图。

标签: c# system.reactive


【解决方案1】:

您所描述的当然是优先级队列。

Rx 是关于事件的,而不是队列。当然,队列在 Rx 中被大量使用 - 但它们不是一流的概念,更多的是 Rx 概念的实现细节。

我们需要队列的一个很好的例子是处理一个缓慢的观察者。事件在 Rx 中按顺序分派,如果事件到达的速度比观察者处理它们的速度快,那么它们必须针对该观察者排队。如果有很多观察者,则必须维护多个逻辑队列,因为观察者可能会以不同的速度前进 - 而 Rx 选择不让它们保持同步。

“背压”是观察者向可观察对象提供反馈的概念,以允许机制处理更快的可观察对象的压力 - 例如合并或节流。 Rx 没有一流的引入背压的方法——可观察者监控观察者的唯一内置方法是通过OnNext 的同步特性。任何其他机制都需要带外。您的问题与背压直接相关,因为它仅在观察者缓慢的情况下才相关。

我提到这一切是为了证明我的主张,即 Rx 不是提供您正在寻找的那种优先级调度的好选择 - 实际上,一流的排队机制似乎更合适。

要解决手头的问题,您需要在自定义运算符中自己管理优先级排队。重申问题:您的意思是,如果事件在观察者处理OnNext 事件期间到达,这样就会有大量要调度的事件,而不是 Rx 使用的典型 FIFO 队列,您想要根据某个优先级进行调度。

需要注意的是,本着 Rx 不让多个观察者保持同步的精神,并发观察者可能会以不同的顺序查看事件,这对您来说可能是也可能不是问题。您可以使用Publish 之类的机制来获得订单一致性 - 但您可能不想这样做,因为在这种情况下,事件传递的时间会变得非常不可预测且效率低下。

我确信有更好的方法可以做到这一点,但这里有一个基于优先级队列的交付示例 - 您可以使用更好的方法将其扩展为适用于多个流和优先级(甚至是每个事件的优先级)队列实现(例如基于 b 树的优先级队列),但我选择保持这个相当简单。即便如此,请注意代码必须解决的大量问题,包括错误处理、完成等。我已经做出了选择,当这些问题发出信号时,肯定还有很多其他有效的选择。

总而言之,这个实现确实让 me 放弃了使用 Rx 的想法。它足够复杂,无论如何这里可能存在错误。正如我所说,可能会有更简洁的代码(尤其是考虑到我已经付出了最小的努力!),但是从概念上,无论实现如何,我都对这个想法感到不舒服:

public static class ObservableExtensions
{
    public static IObservable<TSource> MergeWithLowPriorityStream<TSource>(
        this IObservable<TSource> source,
        IObservable<TSource> lowPriority,
        IScheduler scheduler = null)
    {    
        scheduler = scheduler ?? Scheduler.Default;
        return Observable.Create<TSource>(o => {    
            // BufferBlock from TPL dataflow is used as it is
            // handily awaitable. package: Microsoft.Tpl.Dataflow        
            var loQueue = new BufferBlock<TSource>();
            var hiQueue = new BufferBlock<TSource>();
            var errorQueue = new BufferBlock<Exception>();
            var done = new TaskCompletionSource<int>();
            int doneCount = 0;
            Action incDone = () => {
                var dc = Interlocked.Increment(ref doneCount);
                if(dc == 2)
                    done.SetResult(0);
            };
            source.Subscribe(
                x => hiQueue.Post(x),
                e => errorQueue.Post(e),
                incDone);
            lowPriority.Subscribe(
                x => loQueue.Post(x),
                e => errorQueue.Post(e),
                incDone);
            return scheduler.ScheduleAsync(async(ctrl, ct) => {
                while(!ct.IsCancellationRequested)
                {
                    TSource nextItem;
                    if(hiQueue.TryReceive(out nextItem)
                      || loQueue.TryReceive(out nextItem))
                        o.OnNext(nextItem);

                    else if(done.Task.IsCompleted)
                    {
                        o.OnCompleted();
                        return;
                    }

                    Exception error;                        
                    if(errorQueue.TryReceive(out error))
                    {
                        o.OnError(error);
                        return;
                    }

                    var hiAvailableAsync = hiQueue.OutputAvailableAsync(ct);    
                    var loAvailableAsync = loQueue.OutputAvailableAsync(ct);                    
                    var errAvailableAsync =
                        errorQueue.OutputAvailableAsync(ct);
                    await Task.WhenAny(
                        hiAvailableAsync,
                        loAvailableAsync,
                        errAvailableAsync,
                        done.Task);
                }
            });
        });
    }
}

以及示例用法:

void static Main()
{
    var xs = Observable.Range(0, 3);
    var ys = Observable.Range(10, 3);

    var source = ys.MergeWithLowPriorityStream(xs);

    source.Subscribe(Console.WriteLine, () => Console.WriteLine("Done"));
}

这将首先打印出ys的元素,表明它们的优先级更高。

【讨论】:

  • 谢谢詹姆斯,你的解释让我对 Rx 的实际目的和能力有了更多的了解。当然,您是对的,Rx 解决方案试图将方形钉子安装在圆孔中。我突然觉得有必要从头到尾重新阅读 IntroToRx!再次感谢您抽出宝贵时间提供此答案,非常感谢。
【解决方案2】:

您需要花时间来解决此类问题。在上面的评论中,您谈到了用户通知。在我看来,您想要的说法是这样的:显示最近的通知,除非有高优先级的通知,在这种情况下显示。

气泡图可以更容易地对此进行推理。一个字符是一秒:

High  : ---------3---5-6
Low   : 1--2-------4----
Result: 1--2-----3---5-6

这就是你的想法吗?您想缓冲消息并在以后显示它们吗?就像在这种情况下,消息 5 只能显示 2 秒可以吗?

【讨论】:

  • (1) 这是评论,不是回答,但我接受这样一个事实,即这是一个重要的问题,很难将其表达为评论并将其格式化,因此 LGTM对我来说,(2) 5 可用的时间跨度较短的问题是一回事,但还有一个重要的其他问题:如何处理 4?它应该在6点后出现吗? 6点后多晚? :) 在这里我们再次问自己,我们真的需要在事件流中那样驱动它吗,也许只是按原样推送它,让 GUI 组件根据优先级或其他任何方式对它们进行排序跨度>
猜你喜欢
  • 1970-01-01
  • 2023-04-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-11-15
  • 1970-01-01
相关资源
最近更新 更多