【发布时间】:2014-12-10 04:54:25
【问题描述】:
我有一个消息处理应用程序,该应用程序当前对可以轻松放入内存的小消息进行操作。我正在扩展它以处理大于内存(在 10 到 100 千兆字节中)的消息,这将需要某种流式处理方法。到目前为止,我很喜欢 Reactive Extensions(尤其是 this piece on "Rx on the server")如何建模异步推送事件流,以及 Rx 如何与 C# 中的其他构造和模式互操作,例如 System.IO.Stream(参见上一个链接)、@987654322 @、APM pattern 和 EAP pattern。
例如,IObservable<MyMessage> 是 Rx 流的普通示例。因为我的应用程序对MyMessage 的实现表示数据太大而无法放入内存,所以我需要类似于IObservable<MyMessageChunk> 的东西,其中可变数量的MyMessageChunk 实例组合起来代表一个MyMessage。我需要像这样的油漆味大理石图:
在上图中,每个圆圈代表一段消息,颜色表示消息之间的中断。红色 X 表示处理绿色消息时出现错误,但即使OnError 的语义要求终止流,我仍需要处理以继续处理以下紫色消息。末尾的绿色条代表OnCompleted,在这种情况下,它本质上意味着关闭应用程序,而不是成功完成任何一条消息。
有没有办法用 Rx 对这种处理进行建模?
【问题讨论】:
-
您能提供一些示例代码吗?
-
@MatthewFinlay 我添加了一些额外的问题描述。这有帮助吗?
标签: c# .net system.reactive reactive-programming