【发布时间】:2012-05-16 15:28:56
【问题描述】:
使用 MassTransit 和 RabbitMQ 处理请求/响应场景。 当做一个简单的请求/回复时,它会工作多次。 如果我在请求处理程序中发布一条消息,它适用于第一个请求,但请求处理程序永远不会在第二个请求上被调用并最终超时并且消息保留在服务器队列中。
我好像错过了什么;可以配置吗?
项目位于https://bitbucket.org/joeyoung/enterprise-rabbitmq
客户端配置:
ObjectFactory.Configure(cfg =>
{
cfg.AddRegistry<WebRegistry>();
cfg.For<IServiceBus>().Singleton().Use(o => ServiceBusFactory.New(sbc =>
{
// configure the bus
sbc.UseRabbitMqRouting();
sbc.ReceiveFrom("rabbitmq://localhost/entrprise_client");
// finds all the consumers in the container and register them with the bus
sbc.Subscribe(x => x.LoadFrom(ObjectFactory.Container));
}));
});
服务器配置:
var container = new Container(cfg =>
{
cfg.Scan(scan =>
{
scan.Assembly("Server.MessageHandlers");
scan.AddAllTypesOf<IConsumer>();
});
});
var bus = ServiceBusFactory.New(sbc =>
{
// configure the bus
sbc.UseRabbitMqRouting();
sbc.ReceiveFrom("rabbitmq://localhost/enterprise_server");
// finds all the consumers in the container and register them with the bus
sbc.Subscribe(x => x.LoadFrom(container));
});
// finally inject the bus into the container
container.Inject(bus);
发送请求:
bus.PublishRequest(new CreateProductCommand(correlationId, model.Name, model.Description, model.Price), x =>
{
x.HandleTimeout(10.Seconds(), () => { timedOut = true; });
x.Handle<CreateProductCommand.Response>(response => { productId = response.Id; });
});
消费请求:
public void Consume(IConsumeContext<CreateProductCommand> context)
{
Console.Out.WriteLine("Consuming Create Product");
// simulate creating a product
var productId = "products/1234";
bus.Publish(new ProductCreatedEvent(productId));
context.Respond(new CreateProductCommand.Response(context.Message.CorrelationId) { Id = productId});
}
消息:
public class CreateProductCommand : CorrelatedBy<Guid>
{
public Guid CorrelationId { get; private set; }
public string Name { get; set; }
public string Description { get; set; }
public decimal Price { get; set; }
public CreateProductCommand(Guid correlationId, string name, string description, decimal price)
{
CorrelationId = correlationId;
Name = name;
Description = description;
Price = price;
}
public class Response : CorrelatedBy<Guid>
{
public Guid CorrelationId { get; private set; }
public string Id { get; set; }
public Response(Guid correlationId)
{
CorrelationId = correlationId;
}
}
}
【问题讨论】:
-
我不确定分辨率只是快速查看。我建议考虑加入我们的邮件列表,以便其他人可以站出来看看。 groups.google.com/forum/#!forum/masstransit-discuss 还包括您使用的 MT 版本。
-
另外,打开日志并验证没有抛出异常。
-
我想知道在消费第一条消息后容器是否正在处理总线实例。您可以将总线作为 IConsumeContext 上的属性访问,因此无需在使用者构造函数中对其进行依赖。
-
启用了日志记录,除了在第二次发布时引发的超时之外,没有看到任何其他错误。在consumeContext上使用了总线,这似乎可以解决它。
标签: rabbitmq masstransit