【问题标题】:Execute OnNext in parallel but sync subscription with UI thread并行执行 OnNext,但与 UI 线程同步订阅
【发布时间】:2015-03-12 22:21:05
【问题描述】:

给定一个这样的主题:

var input = new Subject<int>();

和这样的订阅者:

var observer1 = input
.Subscribe(ev =>
{
    Thread.Sleep(1000);
    listBox.Items.Add("o1: " + ev.ToString());
});

var observer2 = input
.Subscribe(ev =>
{
    Thread.Sleep(1000);
    listBox.Items.Add("o2: " + ev.ToString());
});

我怎样才能调整它,以便当我这样做时

Enumerable.Range(0, 1000).ToList().ForEach((i) =>
        {
            input.OnNext(i);                    
        });

OnNext 被称为异步,但订阅与 UI 线程同步。 (这是一个 WinForms 环境)

我试过了

var observer1 = input
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(ev =>
{
    Thread.Sleep(1000);
    listBox.Items.Add("o1: " + ev.ToString());
});

我可以看到observe1 和observer2 立即开始他们的订阅操作而无需等待,因为ObserveOn。但是现在应用程序由于在非 ui 线程上调用列表框而崩溃。一旦我将订阅的动作与 UI 同步,我就失去了 OnNext 调用的并行处理。

知道如何解决吗?

【问题讨论】:

  • 你知道 Rx 会阻止 .OnNext() 的并行执行吗?

标签: c# .net winforms system.reactive reactive-programming


【解决方案1】:

您需要将检索项目的工作与更新 UI 的工作分开。在后台线程上运行工作,然后将列表传递给 UI 以更新 ListBox。

试试这个:

input.ObserveOn(Scheduler.Default) // transition to background thread
     .Select(evt => /* synchronous code returning item */)
     .ObserveOn(frm) // use rx-winforms to get this extension method
     .Subscribe(item => listBox.Items.Add(item));

ObserveOn 有许多重载,使用 rx-winforms 你可以传递一个 WinForms 控件(可以是宿主窗体或组成部分),Rx 将使用它来转换到 UI 线程。另一个重载接受SynchronzationContext 对象。

根据您拥有的项目的数量和频率,您可能需要考虑在 Select 之前或之后使用 BufferTimeSpan 将项目批处理到 List 中,然后更新 UI一次有几个项目。这减少了上下文切换的次数,并使 UI 更具响应性。

您可能会发现参考以下内容会有所帮助:ObserveOn and SubscribeOn - where the work is being done

请注意,这将一次处理每个项目 - 这可能不是您想要的。在这种情况下,您可以使用SelectMany。假设你的后台工作被封装在一个异步方法中,比如:

async Task<string> GetItem(int evt);

那么你可以这样做:

input.SelectMany(evt => GetItem(evt))
     .ObserveOn(frm) // use rx-winforms to get this extension method
     .Subscribe(item => listBox.Items.Add(item));

由于GetItem 现在是异步的,它已经转换到后台线程 - 只要这种情况发生得很快(即你应该在你的GetItem 中有一个await 用于第一个长时间运行的操作),那里不需要第一个ObserveOn

SelectMany 将每个源事件投影到可观察的流中,并将结果展平。它有一个接受异步方法的重载,并将其封装在一个返回单个结果并完成的IObservable&lt;T&gt; 中。

这意味着事件是同时处理的,现在事件可能会乱序返回 - 所以请确保这是可以接受的。有办法解决这个问题,但它确实超出了问题的范围。 (提示:Select+Concat

【讨论】:

  • 我不得不考虑一下答案,直到它击中我 :-) 我的想法太僵硬了,我需要在 onNext 操作中完成所有工作,但当然不是唯一的办法。感谢您的广泛回答,谢谢!
【解决方案2】:

刚刚发现如果我这样做了

var observer1 = input
.Subscribe(ev =>
{
    await Task.Yield(); 
    Thread.Sleep(1000);
    listBox.Items.Add("o1: " + ev.ToString());
});

var observer2 = input
.Subscribe(ev =>
{
    await Task.Yield(); 
    Thread.Sleep(1000);
    listBox.Items.Add("o2: " + ev.ToString());
});

它有效。但是没有更清洁的、基于 Rx 的解决方案吗? 基本上我想要一个可以访问 UI 线程的订阅中的即发即弃操作。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-09-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-02-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多