【问题标题】:Publishing to multiple subscribes in RX在 RX 中发布到多个订阅
【发布时间】:2015-05-28 15:56:28
【问题描述】:

我正在研究如何为项目开发插件框架,而 Rx 似乎非常适合我想要实现的目标。最终,该项目将是一组插件(模块化功能),可以通过 xml 进行配置以执行不同的操作。要求如下

  1. 即使在插件中也能实施模块化架构。这鼓励了松散耦合并潜在地最小化了复杂性。这有望使单个插件功能更易于建模和测试
  2. 对数据实施不变性以降低复杂性并确保将模块内的状态管理保持在最低限度
  3. 尽可能提供线程池线程以在模块内工作,从而避免手动创建线程

在我看来,插件本质上是一个数据转换实体。这意味着插件要么

  • 接收一些数据并以某种方式对其进行转换以生成新数据(此处未显示)
  • 自行生成数据并将其推送给观察者
  • 在不通知外部人员的情况下获取一些数据并对数据进行一些处理

如果您进一步了解这个概念,插件可以包含上述所有三种类型。例如,在插件中,您可以有一个 IntGenerator 模块,该模块为 ConsoleWorkUnit 模块等生成一些数据。所以我想要主要功能中的模型是插件必须完成其工作的接线。

为此,我有以下使用 Microsoft 的不可变 nuget 的基类。我想要实现的是抽象出 Rx 调用,以便它们可以在模块中使用,因此最终目标是将对缓冲区等的调用包装在可用于组成复杂查询和模块的抽象类中。这样,与必须实际读取模块中的所有代码以找出它订阅了 x 类型的缓冲区或窗口等相比,代码更具自我记录性。

public abstract class OutputBase<TOutput> : SendOutputBase<TOutput>
{
    public abstract void Work();
}

public interface IBufferedBase<TOutput>
{
    void Work(IList<ImmutableList<Data<TOutput>>> list);
}

public abstract class BufferedWorkBase<TInput> : IBufferedBase<TInput>
{
    public abstract void Work(IList<ImmutableList<Data<TInput>>> input);
}
public abstract class SendOutputBase<TOutput>
{
    private readonly ReplaySubject<ImmutableList<Data<TOutput>>> _outputNotifier;
    private readonly IObservable<ImmutableList<Data<TOutput>>> _observable;

    protected SendOutputBase()
    {
        _outputNotifier = new ReplaySubject<ImmutableList<Data<TOutput>>>(10);
        _observable  =  _outputNotifier.SubscribeOn(ThreadPoolScheduler.Instance);
        _observable = _outputNotifier.ObserveOn(ThreadPoolScheduler.Instance);
    }

    protected void SetOutputTo(ImmutableList<Data<TOutput>> output)
    {
        _outputNotifier.OnNext(output);
    }

    public void ConnectOutputTo(IWorkBase<TOutput> unit)
    {
        _observable.Subscribe(unit.Work);
    }

    public void BufferOutputTo(int count, IBufferedBase<TOutput> unit)
    {
        _observable.Buffer(count).Subscribe(unit.Work);
    }
}

public abstract class WorkBase<TInput> : IWorkBase<TInput>
{
    public abstract void Work(ImmutableList<Data<TInput>> input);
}

public interface IWorkBase<TInput>
{
    void Work(ImmutableList<Data<TInput>> input);
}

public class Data<T>
{
    private readonly T _value;

    private Data(T value)
    {
        _value = value;
    }

    public static Data<TData> Create<TData>(TData value)
    {
        return new Data<TData>(value);
    }

    public T Value { get { return _value; } }

}

这些基类用于创建三个类;一种用于生成一些 int 数据,一种用于在数据出现时打印数据,最后一种用于在数据进入时缓冲数据并将值以三个为单位求和。

public class IntGenerator : OutputBase<int>
{
    public override void Work()
    {
        var list = ImmutableList<Data<int>>.Empty;
        var builder = list.ToBuilder();
        for (var i = 0; i < 1000; i++)
        {
            builder.Add(Data<int>.Create(i));
        }

        SetOutputTo(builder.ToImmutable());
    }
}

public class ConsoleWorkUnit : WorkBase<int>
{
    public override void Work(ImmutableList<Data<int>> input)
    {
        foreach (var data in input)
        {
            Console.WriteLine("ConsoleWorkUnit printing {0}", data.Value);
        }
    }
}

public class SumPrinter : WorkBase<int>
{

    public override void Work(ImmutableList<Data<int>> input)
    {
        input.ToObservable().Buffer(2).Subscribe(PrintSum);
    }

    private void PrintSum(IList<Data<int>> obj)
    {
      Console.WriteLine("Sum of {0}, {1} is {2} ", obj.First().Value,obj.Last().Value ,obj.Sum(x=>x.Value) );
    }
}

这些都是在这样的主目录中运行的

        var intgen = new IntGenerator();
        var cons = new ConsoleWorkUnit();
        var sumPrinter = new SumPrinter();

        intgen.ConnectOutputTo(cons);
        intgen.BufferOutputTo(3,sumPrinter);

        Task.Factory.StartNew(intgen.Work);

        Console.ReadLine();

这个架构合理吗?

【问题讨论】:

  • 我尝试运行您的代码,但找不到IBufferedBase&lt;TOutput&gt;BufferedWorkBase&lt;int&gt; 的定义。可以加一下吗?
  • 另外,Rx 调用几乎总是流畅的——所以调用这个_outputNotifier.SubscribeOn(ThreadPoolScheduler.Instance) 只会返回一个新的IObservable&lt;ImmutableList&lt;Data&lt;TOutput&gt;&gt;&gt;,你会扔掉它。与_outputNotifier.ObserveOn(ThreadPoolScheduler.Instance) 相同。您可能想先尝试修复这些问题。
  • @Enigmativity 感谢您的回复,更新了界面并按照建议更改了代码。干杯伯纳德
  • 你还是没有掌握流畅的界面使用方法。当您分配 ObserveOn 时,您正在清除 SubscribeOn
  • 感谢您发布代码。看来您正在努力使 Rx 尽可能地努力。 Rx 是一个非常实用的功能(就像在函数式编程中一样),但您正试图使其面向对象。你正在失去很多力量。您能从需求 POV 解释您在这里尝试做什么吗?我想我可以让这一切变得更容易?

标签: c# .net system.reactive


【解决方案1】:

您正在缓冲您的 observable (.Buffer(count)),以便它仅在 count 通知到达后发出信号。

但是,您的 IntGenerator.DoWork 只会产生一个值。因此,您永远不会“填充”缓冲区并触发下游通知。

要么更改DoWork 以使其最终产生更多值,要么在完成工作时让其完成可观察流。 Buffer 将在流完成时释放剩余的缓冲值。为此,这意味着IntGenerator.DoWork 需要在某处引起对_outputNotifier.OnCompleted() 的调用

【讨论】:

  • 谢谢!你是绝对正确的。我已经更新了代码,它按预期工作。在我当前的架构中是否有可能在创建时缓冲可枚举,这是我的初衷?
  • 我认为您的回答解决了问题的最初主旨,因此我将其标记为已回答并创建一个新问题。再次感谢您的帮助
  • 对不起,我离开了。无需创建Subject&lt;ImmutableList&lt;T&gt;&gt;,只需创建Subject&lt;T&gt; 并在DoWork 方法中将单个项目推入其中。生成的 observable 将是 IObservable&lt;T&gt;。此时Buffer 将具有生成count 项目数组的预期效果:IObservable&lt;T[]&gt;。如果您真的想要ImmutableList,请关注Buffer.Select(b =&gt; ImmutableList.CreateRange(b))
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多