20140310补充:

  rabbitmq有requeue属性,可以选择消息是否返回队列,另,本文的解决方式非常之山寨,只能应用于发送和接收方式。  

 

  这几天在折腾消息队列,在.Net环境下有基于RabbitMQ有很多有API的选择,最后选择了比较简单的EasyNetQ(http://easynetq.com/)

  在测试使用的时候发现一个问题,对于处理错误的消息,EasyNetQ默认会放到一个错误队列中并提供了一个工具可以对错误队列的消息进行重新分发。RabiitMQ是有一个事务功能的,但是貌似在EasyNetQ的实现中,没有发现相关的操作方式

  现在的需求是,在某种特定的情况下,需要将处理不成功的消息,保留在消息原队列

  做了一个变通处理,出现错误消息的时候,将其再送回原队列。从客户端上可以直接send回队列,但这样不是一个很好的方式

  研究了一下EasyNetQ的相关代码,发现其定义了一个IConsumerErrorStrategy接口,我们只要自己自行实现,并注册一个自定义方法就可以

  ComponentRegistration.cs原注册方法相关,默认错误消息处理方式在DefaultConsumerErrorStrategy.cs中

 1     public class ComponentRegistration
 2     {
 3         public static void RegisterServices(IContainer container)
 4         {
 5             Preconditions.CheckNotNull(container, "container");
 6 
 7             // Note: IConnectionConfiguration gets registered when MQContext.CreateBus(..) is run.
 8             #region DefaultConsumerErrorStrategy
 9             container
10             .Register(_ => container)
11             .Register<IMessageQueueQLogger, ConsoleLogger>()
12             .Register<ISerializer, JsonSerializerN>()
13             .Register<IConventions, Conventions>()
14             .Register<IEventBus, EventBus>()
15             .Register<ITypeNameSerializer, TypeNameSerializer>()
16             .Register<Func<string>>(x => CorrelationIdGenerator.GetCorrelationId)
17             .Register<IClusterHostSelectionStrategy<ConnectionFactoryInfo>, DefaultClusterHostSelectionStrategy<ConnectionFactoryInfo>>()
18             .Register<IConsumerDispatcherFactory, ConsumerDispatcherFactory>()
19             .Register<IPublishExchangeDeclareStrategy, PublishExchangeDeclareStrategy>()
20             .Register<IPublisherConfirms, PublisherConfirms>()
21             .Register<IConsumerErrorStrategy, DefaultConsumerErrorStrategy>()
22             .Register<IHandlerRunner, HandlerRunner>()
23             .Register<IInternalConsumerFactory, InternalConsumerFactory>()
24             .Register<IConsumerFactory, ConsumerFactory>()
25             .Register<IConnectionFactory, ConnectionFactoryWrapper>()
26             .Register<IPersistentChannelFactory, PersistentChannelFactory>()
27             .Register<IClientCommandDispatcherFactory, ClientCommandDispatcherFactory>()
28             .Register<IHandlerCollectionFactory, HandlerCollectionFactory>()
29             .Register<IAdvancedBus, RabbitAdvancedBus>()
30             .Register<IRpc, Rpc>()
31             .Register<ISendReceive, SendReceive>()
32             .Register<IBus, RabbitBus>();
33             #endregion
34         }
35     }
View Code

相关文章: