【问题标题】:Rx combining many streams by joining on a propertyRx 通过加入一个属性来组合许多流
【发布时间】:2016-05-17 10:07:06
【问题描述】:

我正在尝试在 Rx 中“压缩”任意数量的流,其中元素对应但可能被无序处理。每个流的元素都有一个标识符,可用于将它们匹配在一起。例如。元素看起来像:

public class Element
{
    public string Key {get; set;}
}

通常,zip 只会按出现的索引组合元素:

|-A-----------A
|--B---------B-
|-----C------C-
|-----ABC-----ABC  <- zip

但是如果我们只想匹配共享相同 Key 的元素呢?我正在寻找一个更像这样的序列:

(在本例中,键为 1 或 2)

|--2A-------1A----------
|----1B----------2B-----
|------1C-----------2C--
|-----------1ABC----2ABC   <- zipped by key 1 & 2 respectively

我觉得 GroupJoin 适合这种情况,但它只服务于两个 Observable,并且链接它们很快就失控了。

我还查看了 And/Then/When,但并不真正了解如何为这种场景构建它。

理想情况下,我想要一个我可以调用的扩展方法并为其提供一个结果选择器,其中结果选择器的输入保证具有相同的 Key。

你会如何解决这个问题?

【问题讨论】:

    标签: c# system.reactive reactive-programming


    【解决方案1】:

    这是我在 LinqPad 中碰到的一些东西。它满足您对大理石图的要求。然而,它比我想要的更混乱。

    Nuget 对 Rx-Testing 的依赖

    void Main()
    {
        TestScheduler scheduler = new TestScheduler();
        /*
    |--2A-------1A----------
    |----1B----------2B-----
    |------1C-----------2C--
    |-----------1ABC----2ABC   <- zipped by key 1 & 2 respectively
    
        */
        var sourceA = scheduler.CreateColdObservable(
            ReactiveTest.OnNext(3, "2A"),
            ReactiveTest.OnNext(12, "1A"));
        var sourceB = scheduler.CreateColdObservable(
            ReactiveTest.OnNext(5, "1B"),
            ReactiveTest.OnNext(17, "2B"));
        var sourceC= scheduler.CreateColdObservable(
            ReactiveTest.OnNext(7, "1C"),
            ReactiveTest.OnNext(20, "2C"));
    
        var observer = scheduler.CreateObserver<string>();
    
    
        var query = Observable.Merge(sourceA, sourceB, sourceC)
            .GroupBy(x => GetKey(x))
            .SelectMany(grp => grp.Select(x=>GetValue(x))
                                  .Take(3)
                                  .Aggregate(new List<string>(), 
                                            (accumulator, current) => { 
                                                accumulator.Add(current); 
                                                return accumulator;
                                            })
                                .Select(acc=>CreateGroupResult(grp.Key, acc)));
    
        query.Subscribe(observer);
        scheduler.Start();
    
        ReactiveAssert.AreElementsEqual(
            new[]{
                ReactiveTest.OnNext(12, "1ABC"),
                ReactiveTest.OnNext(20, "2ABC")
            },
            observer.Messages
        );
    
    }
    
    // Define other methods and classes here
    private static string CreateGroupResult(string key, IEnumerable<string> values)
    {
        var combinedOrderedValues = string.Join(string.Empty, values.OrderBy(v => v));
        return string.Format("{0}{1}", key, combinedOrderedValues);
    }
    
    private static string GetKey(string message)
    {
        return message.Substring(0, 1);
    }
    
    private static string GetValue(string message)
    {
        return message.Substring(1);
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-10-03
      • 1970-01-01
      • 1970-01-01
      • 2017-10-27
      • 1970-01-01
      • 2022-06-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多