【问题标题】:ObserveOn(Scheduler.CurrentThread) doesn't cause subscribed Action to be run on original threadObserveOn(Scheduler.CurrentThread) 不会导致订阅的 Action 在原始线程上运行
【发布时间】:2012-11-15 23:56:43
【问题描述】:

我有一个操作,它接受一个回调,一旦完成,它将使用通用参数调用,即Action<Action<T>>。我想在动作开始时显示一个忙碌的微调器,然后在它调用回调时将其带走,因此我创建了一个简单的实用程序来执行此操作。我遇到的问题是用户希望他们的回调在原始调用线程上运行,但它并不总是这样做。它几乎总是在单元测试 (nUnit) 中完美运行,但在应用程序实际运行时对某些调用不起作用(WPF、.Net 4)。

以下是我所拥有的相关信息

void WrapAsyncCallbackPattern<T>(Action<T> callback, Action<Action<T>> actionToRun)
{
    var subject = new AsyncSubject<T>();
    try
    {
        actionToRun(
            result =>
            {
                subject.OnNext(result);
                subject.OnCompleted();
            });
    }
    catch (Exception ex)
    {
        subject.OnError(ex);
    }

    subject
        .ObserveOn(Scheduler.CurrentThread)
        .Subscribe(callback, OnError);
}

我希望callback 在我订阅的线程上运行(并且subject 被声明),但它似乎并不可靠。我假设我在做一些愚蠢的事情。这是什么?

编辑:添加单元测试代码

private readonly TimeSpan m_WaitTime = TimeSpan.FromSeconds(1);

[Test]
public void WrapAsyncCallbackPattern_WithActionOnDifferentThread_CallsCallbackOnSameThread()
{
    var awaiter = new AutoResetEvent(false);

    bool callbackRan = false;
    int callingThreadId = Thread.CurrentThread.ManagedThreadId;
    int callbackThreadId = int.MinValue;
    int actionThreadId = int.MinValue;

    BackgroundOperation.WrapAsyncCallbackPattern<int>(
        _ =>
        {
            callbackRan = true;
            callbackThreadId = Thread.CurrentThread.ManagedThreadId;

            awaiter.Set();
        },
        cb => ThreadPool.QueueUserWorkItem(
            _ =>
            {
                actionThreadId = Thread.CurrentThread.ManagedThreadId;
                cb(0);
            }));

    var errorInfo = string.Format("\r\nCalling thread = {0}; Action thread = {1}; Callback thread = {2}", callingThreadId, actionThreadId, callbackThreadId);

    Assert.IsTrue(awaiter.WaitOne(m_WaitTime));
    Assert.IsTrue(callbackRan);
    Assert.AreNotEqual(callingThreadId, actionThreadId, "Action needs to be run on a different thread for this test." + errorInfo);
    Assert.AreNotEqual(actionThreadId, callbackThreadId, "Callback should not be run on action thread." + errorInfo);
    Assert.AreEqual(callingThreadId, callbackThreadId, "Callback should be run on calling thread." + errorInfo);
}

【问题讨论】:

    标签: c#-4.0 system.reactive


    【解决方案1】:

    您可能对Scheduler.CurrentThread 的作用有错误的理解。我想每个人都会犯这个错误。

    CurrentThread 调度程序与正在执行的 observable 相关,而不是在定义(或订阅)时。考虑延迟或延迟执行。这应该是有道理的,因为每当您跳转到不同的线程时,您都需要某种方式来编组调用。

    所以你真正追求的是这样的:

    var synchContext = new SynchronizationContextScheduler(
        System.Threading.SynchronizationContext.Current) 
    
    subject
        .ObserveOn(synchContext)
        .Subscribe(callback, OnError);
    

    或许:

    subject
        .ObserveOn(this) /* this is my current form */
        .Subscribe(callback, OnError);
    

    如果你这样做,你应该能够控制你的回调在哪个线程上运行。

    您的测试可能有效,因为它们最终同步执行。

    【讨论】:

    • 单元测试实际上是为了验证它们是否在不同的线程上运行,作为最后断言的一部分。如果不是这样,测试的结果几乎不会是有效的。有趣的是,此更改在应用程序运行时修复了它,但现在测试失败,调用线程、操作线程和回调线程都不同。如果您有任何想法,我会添加我的测试代码。
    • @BryanAnderson - 您发布的测试代码中没有 Rx。您是否发布了正确的测试方法?
    • 是的,我更新了我的方法来显示签名,所以它更清晰一点。它在 BackgroundOperation 类中。 Rx 是类/函数的一种实现细节,因此它对消费者隐藏(因此是测试)。
    猜你喜欢
    • 1970-01-01
    • 2018-06-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-06-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多