【问题标题】:Close RabbitMQ channel while in BasicAcks在 BasicAcks 中关闭 RabbitMQ 通道
【发布时间】:2020-01-20 13:55:07
【问题描述】:

我在一个连接下有一个 RabbitMQ 通道池。我正在尝试实现Publisher Confirms 功能。我以一种随着对频道的需求增加而创建新频道的方式构建了池。但现在,我也必须管理这些渠道的关闭。

我计划使用outstandingConfirms 列表实施类似RabbitMQ tutorials 的解决方案。

我遇到的问题是,当通道中的最后一条消息是ack-ednack-ed 时,我必须以某种方式关闭通道(当我高于池中对象的“软”阈值时)。

正如您在下面的代码中看到的,sender 参数实际上是通道本身,我想如果没有待处理的消息,我可以直接使用它来关闭通道。但是我面临这样一个事实,即通道不应该被多个线程同时使用。这个相同的频道将在池中可用,并且可以被应用程序拾取以供使用。

这些是事件订阅:

protected void OnBasicAcks(object sender, BasicAckEventArgs e)
{
    //sender = channel object
    //todo close channel after all pending messages (n)acked
}

protected void OnBasicNacks(object sender, BasicNackEventArgs e)
{
    //sender = channel object
    //todo close channel after all pending messages (n)acked
}

另外here 我可以读到回调处理程序中允许阻塞操作:

从 3.5.0 版开始,应用程序回调处理程序可以调用阻塞 操作(例如 IModel.QueueDeclare 或 IModel.BasicCancel)

但这是否也适用于通道本身的关闭?

简而言之,这是我的问题。我的问题是:

  1. 是否可以直接使用通过sender参数传递的通道进行IModel.Close()之类的操作?
  2. 我只是想防止在频道或其他奇怪的东西上出现死锁。针对我的问题有什么建议或最佳实践吗?
  3. 还有一些完全不同的东西:我可以在OnBasicAcks 方法中创建一个新频道吗?

【问题讨论】:

    标签: c# rabbitmq


    【解决方案1】:

    如果您使用的是最新的 RabbitMQ.Client(我认为是 v5.1),则所有回调都是使用 async 在 ThreadPool 上调用的,因此您通常可以在回调中调用这些方法。但是,我会避免劫持回调线程来执行任何长时间运行的任务,因为它们是由 RabbitMQ 客户端按通道管理的。

    所以,具体回答一下。

    1. 应该没问题,查看源代码,它不会锁定任何会阻止调用 close 的东西。请注意您的代码中会死锁的任何内容。
    2. 最好在您自己的代码中控制对会话的访问,这样您就不会从多个线程调用单个通道,期望任何事情以特定的顺序发生。
    3. 应该可以正常工作,因为您在 TPL 任务线程上,所以没有理由不应该工作。

    如果您想了解有关客户端工作原理的更多详细信息,可以查看source,尤其是ModelBase.cs 文件,以及它为回调创建的异步工作器服务。

    【讨论】:

      猜你喜欢
      • 2015-11-05
      • 2018-01-29
      • 1970-01-01
      • 1970-01-01
      • 2016-08-09
      • 2018-07-27
      • 1970-01-01
      • 1970-01-01
      • 2020-09-16
      相关资源
      最近更新 更多