【发布时间】:2020-07-27 16:20:32
【问题描述】:
考虑以下程序;
class Program
{
static IObservable<int> GetNumbers()
{
var observable = Observable.Empty<int>();
foreach (var i in Enumerable.Range(1, 10))
{
observable = observable.Concat(Observable.FromAsync(() => Task.Run(() =>
{
Console.WriteLine($"Producing {i}");
Thread.Sleep(1000);
return i;
})));
}
return observable;
}
static async Task LogNumbers(IObservable<int> observable)
{
var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
await observable;
subscription.Dispose();
}
static void Main(string[] args)
{
LogNumbers(GetNumbers()).Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
它产生以下输出
Producing 1
Producing 1
Producing 2
Consuming 1
Producing 2
Producing 3
Consuming 2
Producing 3
Producing 4
Consuming 3
Producing 4
Producing 5
Consuming 4
Producing 5
Producing 6
Consuming 5
Producing 6
Producing 7
Consuming 6
Producing 7
Producing 8
Consuming 7
Producing 8
Producing 9
Consuming 8
Producing 9
Producing 10
Consuming 9
Producing 10
Finished
它写出每个“生产 x”语句中的两个和一个“消费 x”语句。为什么这样做?为什么它永远不会写出预期的最终“消费 10”语句?
【问题讨论】:
-
你是 concat (observable.Concat) 数据一起并没有清除 observable。
-
一个好的经验法则是“不要混合你的单子”。这适用于同时使用任务和 Rx。而不是
Observable.FromAsync(() => Task.Run(() => i)),你应该简单地使用Observable.Start((() => i)。尽量留在 Rx 世界中。它不能解决你的问题,但它会让你的 Rx 代码更健壮。 -
@Enigmativity 可以理解,谢谢。我只是想增加一些延迟来重现问题。如果速度太快,第二个问题就不会发生。我的实际代码不是这样的,但如果我发布那我会被解雇????
-
@Steztric - 使用
Observable.Delay添加延迟。 :-) -
@Steztric - 像这样:
Observable.Return(42).Delay(TimeSpan.FromSeconds(5.0)).
标签: c# observable system.reactive