【问题标题】:Reactive Extensions Subject<IEnumerable<Obj>> to Subject<IEnumerable<AggregatedObj>>响应式扩展 Subject<IEnumerable<Obj>> 到 Subject<IEnumerable<AggregatedObj>>
【发布时间】:2011-06-06 21:39:42
【问题描述】:

我在选题时遇到了困难

public Subject<IEnumerable<Person>> PersonDataSubject;

并将其转换为:

public Subject<IEnumerable<BornInYear>> BornInYearSubject;

...使用一些 linq 聚合。

下面的示例将其置于更多上下文中,而我正在努力研究如何通过订阅 PersonDataSubject 将 IEnumerable 放入 BornInYearSubject 中。

无论我尝试什么,最终都会得到IObservable&lt;BornInYear&gt;,而不是IObservable&lt;IEnumerable&lt;BornInYear&gt;&gt;

目标是让该类的客户能够订阅两个主题并在每个“下一个”通知中获得相应类型的 IEnumerable。

public class ReactiveTest
{
    public class Person
    {
        public string name;
        public DateTime dob;
    };

    public class BornInYear
    {
        public int Year;
        public int Count;
    }

    public Subject<IEnumerable<Person>> PersonDataSubject = new Subject<IEnumerable<Person>>();
    public Subject<IEnumerable<BornInYear>> BornInYearSubject= new Subject<IEnumerable<BornInYear>>();

    public void LoadData()
    {
        // Go to hypotheritical web service and get batch of people.
        IEnumerable<Person> people = WebService.Fetch();

        // Notify subscribers we have a fresh batch of data.
        PersonDataSubject.OnNext(people);
    }

    public ReactiveTest()
    {
        // Hookup BornInYearSubject to listen to PersonDataSubject and publish the summarised data.
        PersonDataSubject.Subscribe(pd => pd.GroupBy(p => p.dob.Year)
                                            .Select(ps => new BornInYear { Year = ps.Key, Count = ps.Count()})
                                            .AsParallel()
            );

        // How do I get the results of this out and published onto BornInYearSubject?
    }
}

现在我知道我可以使用 Task.Factory.StartNew(...)... 作为我对 PersonDataSubject 的 OnNext 订阅来实现这一点,但我相信它一定有可能保持更多的反应性?

【问题讨论】:

  • 使用 IObservable> 有什么问题?

标签: ienumerable system.reactive task-parallel-library


【解决方案1】:

怎么样:

PersonDataSubject
    .GroupBy(x => x.Dob.Year)
    .Select(x => x.Aggregate(new List<BornInYear>(), (acc, x) => { acc.Add(new BornInYear { Year = ps.Key }); return acc; }))

【讨论】:

  • 如果没有 OnCompleted,聚合会返回任何内容吗?
  • @Scott 你说得对,我假设 PersonDataSubject 最终会完成。如果没有,请将“聚合”替换为“扫描”
  • 这不起作用,因为 PersonDataSubject 是 IEnumerable 所以你不能直接调用 GroupBy ,它是需要分组的 IEnumerable 的内容。虽然给了我一些想法,所以会继续玩。谢谢。
【解决方案2】:

好的,这行得通。感谢您的想法 - 事后看来,答案似乎非常明显!

using System;
using System.Collections.Generic;
using System.Linq;

namespace TestReactive
{
    public class ReactiveTest
    {
        public class Person
        {
            public string name;
            public DateTime dob;
        };

        public class BornInYear
        {
            public int Year;
            public int Count;
        }

        public Subject<IEnumerable<Person>> PersonDataSubject = new Subject<IEnumerable<Person>>();
        public Subject<IEnumerable<BornInYear>> BornInYearSubject = new Subject<IEnumerable<BornInYear>>();

        public void LoadData()
        {
            IEnumerable<Person> people = new List<Person> {
                new Person() {name = "Bill", dob = DateTime.Now.AddYears(-10)},
                new Person() {name = "Pete", dob = DateTime.Now.AddYears(-5)},
                new Person() {name = "Judy", dob = DateTime.Now.AddYears(-1)},
                new Person() {name = "Mike", dob = DateTime.Now.AddYears(-5)},
                new Person() {name = "Jake", dob = DateTime.Now.AddYears(-5)},
                new Person() {name = "Fred", dob = DateTime.Now.AddYears(-13)},
            };

            // Notify subscribers we have a fresh batch of data.
            PersonDataSubject.OnNext(people);
        }

        public ReactiveTest()
        {
            var subj = PersonDataSubject.Select(pds => pds.GroupBy(pd => pd.dob.Year)
                                                          .Select(p => new BornInYear {
                                                              Year = p.Key, Count = p.Count()
                                                          }).AsParallel());
            subj.Subscribe(BornInYearSubject);  

            BornInYearSubject.Subscribe( x=> Console.WriteLine("{0}", x.Count()));
            LoadData();
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            ReactiveTest rt = new ReactiveTest();
        }
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-03-17
    • 2012-09-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多