【问题标题】:Reactive Extensions for processing continuous streams of messages用于处理连续消息流的反应式扩展
【发布时间】:2014-12-10 04:54:25
【问题描述】:

我有一个消息处理应用程序,该应用程序当前对可以轻松放入内存的小消息进行操作。我正在扩展它以处理大于内存(在 10 到 100 千兆字节中)的消息,这将需要某种流式处理方法。到目前为止,我很喜欢 Reactive Extensions(尤其是 this piece on "Rx on the server")如何建模异步推送事件流,以及 Rx 如何与 C# 中的其他构造和模式互操作,例如 System.IO.Stream(参见上一个链接)、@987654322 @、APM patternEAP pattern

例如,IObservable<MyMessage> 是 Rx 流的普通示例。因为我的应用程序对MyMessage 的实现表示数据太大而无法放入内存,所以我需要类似于IObservable<MyMessageChunk> 的东西,其中可变数量的MyMessageChunk 实例组合起来代表一个MyMessage。我需要像这样的油漆味大理石图:

在上图中,每个圆圈代表一段消息,颜色表示消息之间的中断。红色 X 表示处理绿色消息时出现错误,但即使OnError 的语义要求终止流,我仍需要处理以继续处理以下紫色消息。末尾的绿色条代表OnCompleted,在这种情况下,它本质上意味着关闭应用程序,而不是成功完成任何一条消息。

有没有办法用 Rx 对这种处理进行建模?

【问题讨论】:

  • 您能提供一些示例代码吗?
  • @MatthewFinlay 我添加了一些额外的问题描述。这有帮助吗?

标签: c# .net system.reactive reactive-programming


【解决方案1】:

根据您提供的简短问题描述,使用IObservable<T> 中的T 对您的数据进行编码似乎是正确的方法。 (The Power of T)

根据 Rx 语法,只能调用一次 OnError 然后 observable 被终止,所以这不会成为问题:

OnNext* (OnCompleted | OnError)?

详情请参阅Rx Design Guidelines

但是,当发生错误时,您可以使用 RetryCatch 等运算符以递归方式使用不同的 observable 或原始 observable 继续序列。

【讨论】:

  • 我发现您在“T 的力量”上的链接非常有帮助。我突然想到,为我的系统建模的一种可能方法是使用您描述的“通道”方法。本质上,我需要一个IObservable<IObservable<T>>。外部 observable 将呈现一个 observable 流,每个 observable 将呈现一条消息的消息块流。 OnError 可以正确处理这些流中的事件,并且语义将更符合 Rx 模型。
  • 不错!请注意,嵌套的 observables 实际上是“窗口”模型而不是“通道”模型。 “Channel”意味着某种回调机制。
【解决方案2】:

如果您有一个函数IObservable<T> ReadChunck(Chunk chunk),那么您可以使用Observable.OnErrorResumeNext() 组合它们。这将在调用 OnErrorOnCompleted 时连接块。

IObserable<T> finalStream = Observable.OnErrorResumeNext(
    ReadChunk(chunk1),
    ReadChunk(chunk2),
    ReadChunk(chunck3);

或者,您可以将ReadChunks 组合成一个IEnumerable&lt;IObservable&lt;T&gt;&gt;,这也可以。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-05
    • 1970-01-01
    • 2011-09-27
    相关资源
    最近更新 更多