【问题标题】:Combining latest from an observable of observables结合来自可观察对象的最新数据
【发布时间】:2014-02-06 05:09:43
【问题描述】:

假设我有一组 URI,我正在监控它们的可用性。每个 URI 要么是“up”,要么是“down”,并且可以随时将要监控的新 URI 添加到系统中:

public enum ConnectionStatus
{
    Up,
    Down
}

public class WebsiteStatus
{
    public string Uri
    {
        get;
        set;
    }

    public ConnectionStatus Status
    {
        get;
        set;
    }
}

public class Program
{
    static void Main(string[] args)
    {
        var statusStream = new Subject<WebsiteStatus>();
        Test(statusStream);

        Console.WriteLine("Done");
        Console.ReadKey();
    }

    private static void Test(IObservable<WebsiteStatus> statusStream)
    {
    }
}

现在假设在Test() 我想被动地确定:

  • 是否所有 URI 都已关闭(作为 bool
  • 哪些 URI 已关闭(如 IEnumerable&lt;string&gt;

所以Test 最终会创建像IObservable&lt;Tuple&lt;bool, IEnumerable&lt;string&gt;&gt;&gt; 这样的可观察对象,其中bool 指示是否所有URI 都已关闭,而IEnumerable&lt;string&gt; 包含那些已关闭的URI。

我该怎么做?我最初的想法是我需要按 URI 进行分组,然后将每个组中的最新信息组合到一个列表中,然后我可以针对该列表执行Select。但是,由于 CombineLatest 的工作方式,这并没有奏效。

编辑:感谢 Matthew 的回答,我查看了 rxx 并发现它实现了 CombineLatest 重载,这正是我在 rx 开箱即用时所期望的方式,除了我需要更改它,以便即使只有一个源流被组合,它也会发布(默认情况下它正在等待至少两个源流)。此外,我不能证明为了一种方法而额外提取 2MB 的二进制文件是合理的,所以我已将其复制/粘贴到我的项目中。这样做,我能够解决如下:

private static void Test(IObservable<WebsiteStatus> statusStream)
{
    statusStream
        .GroupBy(x => x.Uri)
        .CombineLatest()
        .Select(
            x =>
            {
                var down = x.Where(y => y.Status == ConnectionStatus.Down);
                var downCount = down.Count();
                var downUris = down.Select(y => y.Uri).ToList();

                return new
                {
                    AllDown = x.Count == downCount,
                    DownUris = downUris
                };
            })
        .Subscribe(x =>
        {
            Console.WriteLine("    Sources down ({0}): {1}", x.AllDown ? "that's all of them" : "some are still up", x.DownUris.Aggregate("", (y, z) => y += (z + " | ")));
        });
}

【问题讨论】:

    标签: c# .net system.reactive reactive-programming


    【解决方案1】:

    最简洁的方法是在this answer 中使用Rxx 扩展。下面是另一种选择,它只保留一个已关闭/已启动的网站列表。

    var downStream = statusStream
        .Aggregate<WebsiteStatus, IEnumerable<string>>(new string[0], (down, newStatus) =>
        {
            if (newStatus.IsUp)
                return down.Where(uri => uri != newStatus.Uri);
            else if (!down.Contains(newStatus.Uri))
                return down.Concat(new string[] { newStatus.Uri });
            else
                return down;
        });
    
    var upStream = statusStream
        .Aggregate<WebsiteStatus, IEnumerable<string>>(new string[0], (up, newStatus) =>
        {
            if (!newStatus.IsUp)
                return up.Where(uri => uri != newStatus.Uri);
            else if (!up.Contains(newStatus.Uri))
                return down.Concat(new string[] { newStatus.Uri });
            else
                return up;
        });
    
    var allDown = upStream.Select(up => !up.Any());
    

    【讨论】:

    • 感谢您拯救我的理智。用我最终确定的实施更新了我的问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-07-16
    • 2019-03-15
    • 2018-01-12
    相关资源
    最近更新 更多