【问题标题】:RX IObservable as a PipelineRX IObservable 作为管道
【发布时间】:2010-02-11 09:08:38
【问题描述】:

目前,我正在使用 RX 框架来实现类似工作流的消息处理管道。本质上,我有一个消息生产者(反序列化网络消息并在主题上调用 OnNext()),并且我有几个消费者。

注意:if 和 transform 是我编写的扩展方法,它们只返回一个 IObservable。

消费者执行以下操作:

 var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
                              .Where(y => y.Value > 5)
                              .Select(y => y.ComplexObject)
                              .If(z => z.IsPaid, respond(z))
                              .Do(z => SendError(z));

commerceRequest 然后被另一个类似的管道消耗,并一直持续到顶部,在最后一个管道上有人调用 Subscribe()。我遇到的问题是,除非在某处直接对消息调用 subscribe,否则来自 base 的消息不会向上传播。

如何将消息推送到堆栈顶部?我知道这是一种非正统的方法,但我觉得它使代码非常容易理解消息发生了什么。如果您觉得这是一个非常糟糕的想法,任何人都可以建议另一种方法吗?

【问题讨论】:

  • 是的……在我看来,这种面向对象的一般风格中的任何东西都会看起来有点流水线式的。这是一件好事!

标签: c# .net workflow system.reactive pipeline


【解决方案1】:

如果没有订阅者,他们为什么要通过管道?如果您的中间步骤之一对它们的副作用有用(即使没有其他订阅者,您也希望它们运行),您应该将副作用操作重写为订阅者。

如果您想继续该链,您还可以将具有副作用的步骤作为传递操作(或 tee,如果您愿意的话)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-01-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多