【问题标题】:C# Rx Observable producing random resultsC# Rx Observable 产生随机结果
【发布时间】:2020-07-27 16:20:32
【问题描述】:

考虑以下程序;

class Program
{
    static IObservable<int> GetNumbers()
    {
        var observable = Observable.Empty<int>();
        foreach (var i in Enumerable.Range(1, 10))
        {
            observable = observable.Concat(Observable.FromAsync(() => Task.Run(() =>
            {
                Console.WriteLine($"Producing {i}");
                Thread.Sleep(1000);
                return i;
            })));
        }

        return observable;
    }

    static async Task LogNumbers(IObservable<int> observable)
    {
        var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
        await observable;
        subscription.Dispose();
    }

    static void Main(string[] args)
    {
        LogNumbers(GetNumbers()).Wait();
        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}

它产生以下输出

Producing 1
Producing 1
Producing 2
Consuming 1
Producing 2
Producing 3
Consuming 2
Producing 3
Producing 4
Consuming 3
Producing 4
Producing 5
Consuming 4
Producing 5
Producing 6
Consuming 5
Producing 6
Producing 7
Consuming 6
Producing 7
Producing 8
Consuming 7
Producing 8
Producing 9
Consuming 8
Producing 9
Producing 10
Consuming 9
Producing 10
Finished

它写出每个“生产 x”语句中的两个和一个“消费 x”语句。为什么这样做?为什么它永远不会写出预期的最终“消费 10”语句?

【问题讨论】:

  • 你是 concat (observable.Concat) 数据一起并没有清除 observable。
  • 一个好的经验法则是“不要混合你的单子”。这适用于同时使用任务和 Rx。而不是Observable.FromAsync(() =&gt; Task.Run(() =&gt; i)),你应该简单地使用Observable.Start((() =&gt; i)。尽量留在 Rx 世界中。它不能解决你的问题,但它会让你的 Rx 代码更健壮。
  • @Enigmativity 可以理解,谢谢。我只是想增加一些延迟来重现问题。如果速度太快,第二个问题就不会发生。我的实际代码不是这样的,但如果我发布那我会被解雇????
  • @Steztric - 使用Observable.Delay 添加延迟。 :-)
  • @Steztric - 像这样:Observable.Return(42).Delay(TimeSpan.FromSeconds(5.0)).

标签: c# observable system.reactive


【解决方案1】:

因为您订阅了两次,所以您将获得两份生产行。最有可能的是,您没有得到消耗的 10,因为当第二个订阅结束时,第一个订阅被取消。如果您有时确实获得了 Consuming 10,我不会感到惊讶,只是因为当时任务以不同的顺序运行。

static async Task LogNumbers(IObservable<int> observable)
{
    //This is the first subscription
    var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));

    //This is the second subscription
    await observable;

    subscription.Dispose();
}

GetNumbers 函数的编写方式是,每次订阅 observable 都会触发它自己的 10 个任务集来运行,从而触发它自己的输出集。第一个订阅还监视生成的值并输出一条消耗线。第二个订阅对生成的值没有任何作用,因为您没有使用 await observable 的值,但确实会导致第二组任务运行。

您可以通过在 LogNumbers 的参数上使用 Publish().RefCount() 来消除第二次订阅,或者使用 TaskCompletionSource 并使用您当前未在第一次订阅中使用的 OnError 和 OnComplete 函数将其标记为完成。这些看起来像这样:

static async Task LogNumbersWithRefCount(IObservable<int> observable)
{
    observable = observable.Publish().RefCount();
    var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
    await observable;
    subscription.Dispose();
}

static async Task LogNumbersTCS(IObservable<int> observable)
{
    var t = new TaskCompletionSource<object>()
    var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"),
                       ex => t.TrySetException(ex),
                       () => t.TrySetResult(null));
    return t.Task;
}

【讨论】:

    【解决方案2】:

    Gideon 为您解决了这个问题,但是当我开始在 cmets 中添加一些提示时,我认为发布一个完整的解决方案可能会很好。试试这个:

    static IObservable<int> GetNumbers() =>
        Observable
            .Interval(TimeSpan.FromSeconds(1.0))
            .Select(i => (int)i + 1)
            .Do(i => Console.WriteLine($"Producing {i}"))
            .Take(10);
    
    static Task LogNumbers(IObservable<int> observable) =>
        observable
            .Do(i => Console.WriteLine($"Consuming {i}"))
            .ToArray()
            .ToTask();
    
    static void Main(string[] args)
    {
        LogNumbers(GetNumbers()).Wait();
        Console.WriteLine("Finished");
        Console.ReadLine();
    }
    

    或者,更简洁:

    static IObservable<int> GetNumbers() =>
        Observable
            .Interval(TimeSpan.FromSeconds(1.0))
            .Select(i => (int)i + 1)
            .Do(i => Console.WriteLine($"Producing {i}"))
            .Take(10);
    
    static IObservable<int> LogNumbers(IObservable<int> observable) =>
        observable
            .Do(i => Console.WriteLine($"Consuming {i}"));
    
    static async Task Main(string[] args)
    {
        await LogNumbers(GetNumbers());
        Console.WriteLine("Finished");
        Console.ReadLine();
    }
    

    你可以直接await observables。

    【讨论】:

    • 对于方法LogNumbers 而不是Task 可能更好的返回类型是Task&lt;int[]&gt;(关于第一个示例)。
    • @TheodorZoulias - 是的,同意。我试图保留 OP 的签名,但你在现场。
    • 感谢@Enigmativity,这真的很有用。我想这个Observable.Interval(...) 可用于定期发布指标或对网络服务进行某种轮询等,对吧?
    • @Steztric - 是的,它相当于Timer。它定期发送一个值。
    猜你喜欢
    • 2012-05-29
    • 1970-01-01
    • 2013-02-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-02-24
    相关资源
    最近更新 更多