【问题标题】:Using Reactive extension (Rx) for MSMQ message receive using async pattern (queue.BeginReceive,queue.EndReceive)使用异步模式 (queue.BeginReceive,queue.EndReceive) 对 MSMQ 消息接收使用响应式扩展 (Rx)
【发布时间】:2011-11-04 07:06:32
【问题描述】:

我在我的项目中使用 Rx 已经有一段时间了,专门用于 Socket 编程,好的部分是它做得很好。管理我的代码、性能优势以及更好的执行和解释。

最近我必须修改我的项目的流程,我需要将所有传入数据(来自套接字操作)转储到队列中(使用 MSMQ 实现作为排队的决定)。

由于 MSMQ 提供异步调用以从队列中取出消息(但以一种奇怪的模式)。 我现在一直在努力将 Rx 用于此目的,但可以这样做。

问题:谁能给我一个干净的代码示例来实现 Rx 以使用异步模式从队列接收消息。

我需要类似于这样的 MSMQ 的异步运算符实现

var data = Observable.FromAsyncPattern<byte[]>(
                        this.receiverSocket.BeginReceive,
                        this.receiverSocket.EndReceive(some parameters);

提前致谢。 *干杯* Rx 和 .NET

【问题讨论】:

  • 你能告诉我有EndReceive的类吗?我找不到它...
  • 我不确定你在问什么,但我正在回答我所理解的。我需要一个用于 BeginReceive 和 EndReceive 的 Rx 实现(FromAsyncPattern),它返回“System.Messaging.Message”对象。异步方法用于 System.Messaging.MessageQueue 对象方法。希望你现在有了更好的想法。我很高兴,并且确信您会在 Rx 问题上快速回复:D
  • MessageQueue 类中的所有EndReceive 方法都没有IAsyncResult 以外的任何参数。你的receiverSocket是什么类型的?
  • 哦,不要考虑 MSMQ 上下文中的“receiverSocket”对象。这只是我想用 MSMQ BeginReceive - EndReceive 做的类似的例子。顺便说一句,“receiverSocket”是 Socket 类型的。你说的实施障碍我也面临着同样的问题。在一行代码上敲了我近 5 个小时。我想问问社区。希望有人找到解决方法,以便我可以在 Rx for MSMQ 中使用工具并利用其优势。

标签: system.reactive reactive-programming


【解决方案1】:

这很简单:

var queue = new System.Messaging.MessageQueue("test");
var fun = Observable.FromAsyncPattern((cb, obj) => queue.BeginReceive(TimeSpan.FromMinutes(10),obj,cb), a => queue.EndReceive(a));
var obs = fun();

【讨论】:

  • 我认为你是对的。整个“EndReceive 上的参数”完全让我震惊!
  • 是的。此代码完美运行。但问题在于超时。当 time : case 在这里 10 分钟结束并且队列中没有消息时,它会抛出 MessageQueueException。第二个问题是 var obs = fun(); subscribe 只会调用该函数一次我怎样才能使它与前一个请求的 endreceive 同步递归。
  • Google 员工请注意 - 虽然当时的答案很好,但 FromAsyncPattern 现在已过时
  • @Micky 那么现在类似的解决方案是什么?
猜你喜欢
  • 1970-01-01
  • 2014-03-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-12-04
相关资源
最近更新 更多