【问题标题】:Can I Pair Two Observable Sequences By a Matching Key -- With Repeating Keys?我可以通过匹配键配对两个可观察序列 - 使用重复键吗?
【发布时间】:2016-05-16 22:21:11
【问题描述】:

此问题基于同名问题here,但有两点不同:

  • 我正在匹配多个键。没问题。
  • 键可能重复。问题。

我的测试代码如下。我需要以下行为:

  • 只要观察到至少一个CoordMetrics 和一个CoordData,就会立即发布CoordBundle
  • 如果特定的 X/Y 键在任一 observable 上重复出现,则会发布一个新的 CoordBundle。

我必须做什么才能做到这一点?

public class CoordMetrics
{
    internal CoordMetrics(int x, int y, IEnumerable<IMetric> metrics)
    {
        X = x;
        Y = y;
        Metrics = metrics;
    }
    internal int X { get; private set; }
    internal int Y { get; private set; }
    internal IEnumerable<IMetric> Metrics { get; private set; }
}

public class CoordData
{
    internal CoordData(int x, int y, IEnumerable<IDatum> data)
    {
        X = x;
        Y = y;
        Data = data;
    }

    internal int X { get; private set; }
    internal int Y { get; private set; }
    internal IEnumerable<IDatum> Data { get; private set; }
}

public class CoordBundle
{
    internal CoordBundle(int x, int y, IEnumerable<IMetric> metrics, IEnumerable<IDatum> data)
    {
        X = x;
        Y = y;
        Metrics = metrics;
        Data = data;
    }

    internal int X { get; private set; }
    internal int Y { get; private set; }
    internal IEnumerable<IMetric> Metrics { get; private set; }
    internal IEnumerable<IDatum> Data { get; private set; }
}

[TestClass]
public class PairingTest
{
    [TestMethod, TestCategory("Temp")]
    public void PairedObservableTest()
    {
        Trace.Listeners.Add(new TextWriterTraceListener(Console.Out));
        var aSource = new Subject<CoordMetrics>();
        var bSource = new Subject<CoordData>();

        var paired = Observable.Merge(aSource.Select(a => new Pair(a, null)), bSource.Select(b => new Pair(null, b)))
                                .GroupBy(p => p.Item1 != null ? new { p.Item1.X, p.Item1.Y } : new { p.Item2.X, p.Item2.Y })
                                .SelectMany(g => g.Buffer(2).Take(1))
                                .Select(g => new Pair(
                                  g.ElementAt(0).Item1 ?? g.ElementAt(1).Item1,
                                  g.ElementAt(0).Item2 ?? g.ElementAt(1).Item2))
                                 .Select(t => new CoordBundle(t.Item1.X, t.Item1.Y, t.Item1.Metrics, t.Item2.Data));

        paired.Subscribe(g => Trace.WriteLine(String.Format("{0},{1}", g.X, g.Y)));

        bSource.OnNext(new CoordData(2, 1, Enumerable.Empty<IDatum>()));
        aSource.OnNext(new CoordMetrics(2, 2, Enumerable.Empty<IMetric>()));
        aSource.OnNext(new CoordMetrics(1, 1, Enumerable.Empty<IMetric>()));
        bSource.OnNext(new CoordData(1, 2, Enumerable.Empty<IDatum>()));
        bSource.OnNext(new CoordData(2, 2, Enumerable.Empty<IDatum>()));
        bSource.OnNext(new CoordData(1, 1, Enumerable.Empty<IDatum>()));
        aSource.OnNext(new CoordMetrics(1, 2, Enumerable.Empty<IMetric>()));
        aSource.OnNext(new CoordMetrics(2, 1, Enumerable.Empty<IMetric>()));
        aSource.OnNext(new CoordMetrics(2, 2, Enumerable.Empty<IMetric>()));
        bSource.OnNext(new CoordData(2,2,Enumerable.Empty<IDatum>()));
    }
}

所需的输出 - 上面的代码只输出前 4 行:

2,2
1,1 
1,2
2,1
2,2
2,2

【问题讨论】:

  • 如果您创建了一个最小完整的可验证示例stackoverflow.com/help/mcve,将会很有帮助。类型 PairIMetricIData 未定义。看来您的示例实际上也不需要它们。最后,您的测试中似乎没有断言,如果您将期望转移到一组断言而不是您的评论,那就太好了。
  • 很好的反馈,下次我会尽量让我的问题更简洁。

标签: c# system.reactive reactive-programming


【解决方案1】:

我想我有你想要的。 公平地说,这不是一个容易的问题。 一个序列是另一个序列的种子是很常见的,但这里的复杂情况是,任何一个序列都可以是另一个序列的种子。

为了获得可行的解决方案,我做的第一件事就是将其分解为可验证的单元测试。 我建议使用 TestScheduler 及其相关类型来执行此操作(而不是主题等)。

我根据您的要求创建了大理石图。 然后我可以将其映射到两个测试输入序列和预期的输出序列中。

最后一部分是实际创建查询。

我最终得到的方法*是创建两个序列,尝试从一个主序列和一个子序列匹配 -> SelectMany + Where。 然而,由于两个输入都可以扮演主序列的角色,我需要这样做两次。 因为我要订阅两次,所以我需要分享这些序列 -> Publish()。 此外,由于每个序列都可能产生多个值,我需要在重复到达时取消先前匹配的匹配 -> TakeUntil。最后我只是将两个结果集合并在一起 -> Merge.

*我考虑过GroupJoinCombineLatest,但它们似乎不适合我。

[TestClass]
public class PairingTest
{
    [TestMethod, TestCategory("Temp")]
    public void PairedObservableTest()
    {
        var scheduer = new TestScheduler();

        /*
        Legend 
            a = aSource (CoordMetrics)
            b = bSource (CoordData)
            r = expected result


        a   ----2--1-----------1--2--2-----
                2  1           2  1  2

        b   -2--------1--2--1-----------2--
             1        2  2  1           2

        r   -------------2--1--1--2--2--2--
                         2  1  2  1  2  2
        */
        var aSource = scheduer.CreateColdObservable<CoordMetrics>(
            ReactiveTest.OnNext(5, new CoordMetrics(2, 2)),
            ReactiveTest.OnNext(8, new CoordMetrics(1, 1)),
            ReactiveTest.OnNext(20, new CoordMetrics(1, 2)),
            ReactiveTest.OnNext(23, new CoordMetrics(2, 1)),
            ReactiveTest.OnNext(26, new CoordMetrics(2, 2))
        );
        var bSource = scheduer.CreateColdObservable<CoordData>(
            ReactiveTest.OnNext(2, new CoordData(2, 1)),
            ReactiveTest.OnNext(11, new CoordData(1, 2)),
            ReactiveTest.OnNext(14, new CoordData(2, 2)),
            ReactiveTest.OnNext(17, new CoordData(1, 1)),
            ReactiveTest.OnNext(29, new CoordData(2, 2))
        );

        var testObserver = scheduer.CreateObserver<string>();
        Implementation(aSource, bSource)
            .Subscribe(testObserver);



        scheduer.Start();

        ReactiveAssert.AreElementsEqual(
            new[] {
                    ReactiveTest.OnNext(14, "2,2"),
                    ReactiveTest.OnNext(17, "1,1"),
                    ReactiveTest.OnNext(20, "1,2"),
                    ReactiveTest.OnNext(23, "2,1"),
                    ReactiveTest.OnNext(26, "2,2"),
                    ReactiveTest.OnNext(29, "2,2")
                },
            testObserver.Messages
        );
    }

    private static IObservable<string> Implementation(IObservable<CoordMetrics> aSource, IObservable<CoordData> bSource)
    {
        return Observable.Create<string>(observer =>
        {
            var aShared = aSource.Publish();
            var bShared = bSource.Publish();

            var fromA = aShared.SelectMany(a => bShared
                    //Find matching values from B's
                    .Where(b => a.X == b.X && a.Y == b.Y)
                    //Only run until another matching A is produced
                    .TakeUntil(aShared.Where(a2 => a2.X == a.X && a2.Y == a.Y))
                    //Project/Map to required type.
                    .Select(b => new CoordBundle(a.X, a.Y /*,  a.Metrics, b.Data*/ ))
                );

            var fromB = bShared.SelectMany(b => aShared
                    //Find matching values from A's
                    .Where(a => a.X == b.X && a.Y == b.Y)
                    //Only run until another matching B is produced
                    .TakeUntil(bShared.Where(b2 => b2.X == b.X && b2.Y == b.Y))
                    //Project/Map to required type.
                    .Select(a => new CoordBundle(a.X, a.Y /*,  a.Metrics, b.Data*/ ))
                );

            var paired = Observable.Merge(fromA, fromB);

            paired
                .Select(g => String.Format("{0},{1}", g.X, g.Y))
                .Subscribe(observer);

            return new CompositeDisposable(aShared.Connect(), bShared.Connect());
        });
    }
}

// Define other methods and classes here
public class CoordMetrics
{
    internal CoordMetrics(int x, int y)
    {
        X = x;
        Y = y;
    }
    internal int X { get; private set; }
    internal int Y { get; private set; }
}

public class CoordData
{
    internal CoordData(int x, int y)
    {
        X = x;
        Y = y;
    }

    internal int X { get; private set; }
    internal int Y { get; private set; }
}

public class CoordBundle
{
    internal CoordBundle(int x, int y)
    {
        X = x;
        Y = y;
    }

    internal int X { get; private set; }
    internal int Y { get; private set; }
}
public class Pair
{
    public Pair(CoordMetrics x, CoordData y)
    {
        Item1 = x;
        Item2 = y;
    }
    public CoordMetrics Item1 { get; set; }
    public CoordData Item2 { get; set; }
}

【讨论】:

  • 这很完美,并且对于如何实际测试 observables 也非常有用。谢谢!
猜你喜欢
  • 1970-01-01
  • 2022-08-17
  • 1970-01-01
  • 2011-06-17
  • 1970-01-01
  • 2020-11-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多