【发布时间】:2014-02-06 05:09:43
【问题描述】:
假设我有一组 URI,我正在监控它们的可用性。每个 URI 要么是“up”,要么是“down”,并且可以随时将要监控的新 URI 添加到系统中:
public enum ConnectionStatus
{
Up,
Down
}
public class WebsiteStatus
{
public string Uri
{
get;
set;
}
public ConnectionStatus Status
{
get;
set;
}
}
public class Program
{
static void Main(string[] args)
{
var statusStream = new Subject<WebsiteStatus>();
Test(statusStream);
Console.WriteLine("Done");
Console.ReadKey();
}
private static void Test(IObservable<WebsiteStatus> statusStream)
{
}
}
现在假设在Test() 我想被动地确定:
- 是否所有 URI 都已关闭(作为
bool) - 哪些 URI 已关闭(如
IEnumerable<string>)
所以Test 最终会创建像IObservable<Tuple<bool, IEnumerable<string>>> 这样的可观察对象,其中bool 指示是否所有URI 都已关闭,而IEnumerable<string> 包含那些已关闭的URI。
我该怎么做?我最初的想法是我需要按 URI 进行分组,然后将每个组中的最新信息组合到一个列表中,然后我可以针对该列表执行Select。但是,由于 CombineLatest 的工作方式,这并没有奏效。
编辑:感谢 Matthew 的回答,我查看了 rxx 并发现它实现了 CombineLatest 重载,这正是我在 rx 开箱即用时所期望的方式,除了我需要更改它,以便即使只有一个源流被组合,它也会发布(默认情况下它正在等待至少两个源流)。此外,我不能证明为了一种方法而额外提取 2MB 的二进制文件是合理的,所以我已将其复制/粘贴到我的项目中。这样做,我能够解决如下:
private static void Test(IObservable<WebsiteStatus> statusStream)
{
statusStream
.GroupBy(x => x.Uri)
.CombineLatest()
.Select(
x =>
{
var down = x.Where(y => y.Status == ConnectionStatus.Down);
var downCount = down.Count();
var downUris = down.Select(y => y.Uri).ToList();
return new
{
AllDown = x.Count == downCount,
DownUris = downUris
};
})
.Subscribe(x =>
{
Console.WriteLine(" Sources down ({0}): {1}", x.AllDown ? "that's all of them" : "some are still up", x.DownUris.Aggregate("", (y, z) => y += (z + " | ")));
});
}
【问题讨论】:
标签: c# .net system.reactive reactive-programming