【问题标题】:Sending request reponse message on Artemis using C#使用 C# 在 Artemis 上发送请求响应消息
【发布时间】:2021-08-10 16:05:57
【问题描述】:

我正在尝试使用 ArtemisNetClient 在 C# 中实现请求响应模式,但在实际解决方案中找出如何以更通用的方式执行此操作时遇到了一些麻烦。

基于一些 Java 示例,我能够在两个控制台应用程序中执行类似的操作:

发件人

static async System.Threading.Tasks.Task Main(string[] args)
{
    var connectionFactory = new ConnectionFactory();
    var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
    var connection = await connectionFactory.CreateAsync(endpoint);

    string guid = new Guid().ToString();

    var requestAddress = "TRADE REQ1";
    var responseAddress = "TRADE RESP";

    Message message = new Message("BUY AMD 1000 SHARES");
    message.SetCorrelationId(guid);
    message.ReplyTo = responseAddress;

    var producer = await connection.CreateProducerAsync(requestAddress, RoutingType.Anycast);
    await producer.SendAsync(message);

    var consumer = await connection.CreateConsumerAsync(responseAddress, RoutingType.Anycast);
    var responseMessage = await consumer.ReceiveAsync();

    Console.WriteLine(responseMessage.GetBody<string>());
    
}

接收者

static async System.Threading.Tasks.Task Main(string[] args)
{
    // Create connection
    var connectionFactory = new ConnectionFactory();
    var endpoint = Endpoint.Create("localhost", 5672, "guest", "guest");
    var connection = await connectionFactory.CreateAsync(endpoint);

    var requestAddress = "TRADE REQ1";

    // Create consumer to receive trade request messages
    var consumer = await connection.CreateConsumerAsync(requestAddress, RoutingType.Anycast);
    var message = await consumer.ReceiveAsync();

    Console.WriteLine($"Received message: {message.GetBody<string>()}");

    // Confirm trade request and ssend response message
    if (!string.IsNullOrEmpty(message.ReplyTo))
    {
        Message responseMessage = new Message("Confirmed trade request");
        responseMessage.SetCorrelationId(message.CorrelationId);
        var producer = await connection.CreateProducerAsync(message.ReplyTo);
        await producer.SendAsync(responseMessage);
    }
}

这按预期工作,但我希望有更多类似于this article 中描述的内容,除了它没有任何请求响应模式的示例。

详细地说,我目前有两个服务要通过它进行通信。

服务1中,我想创建并发布一条消息,然后等待响应以丰富实例对象并将其保存到数据库中。我目前有这个,但它缺少等待响应消息。

public async Task<Instance> CreateInstance(Instance instance)
{
    await _instanceCollection.InsertOneAsync(instance);

    var @event = new InstanceCreated
    {
        Id = instance.Id,
        SiteUrl = instance.SiteUrl
    };

    await _messageProducer.PublishAsync(@event);

    return instance;
}

我想我可能需要在 PublishAsync() 中设置一个临时队列/连接或其他内容,并将其更改为例如Task&lt;Message&gt; 支持返回响应消息。但是我该怎么做呢?我是否必须像控制台应用程序示例中那样做一个新的 connectionfactory + CreateConsumerAsync 等?

public class MessageProducer
{
    private readonly IAnonymousProducer _producer;

    public MessageProducer(IAnonymousProducer producer)
    {
        _producer = producer;
    }

    public async Task PublishAsync<T>(T message, string replyTo = null, string correlationId = null)
    {
        var serialized = JsonSerializer.Serialize(message);
        var address = typeof(T).Name;
        var msg = new Message(serialized);
        if (replyTo != null && correlationId != null)
        {
            msg.CorrelationId = correlationId;
            msg.ReplyTo = replyTo;
        }
        await _producer.SendAsync(address, msg);
    }

    public async Task PublishAsync<T>(T message, string routeName, string replyTo = null, string correlationId = null)
    {
        var serialized = JsonSerializer.Serialize(message);
        var address = routeName;
        var msg = new Message(serialized);
        if(replyTo != null && correlationId != null)
        {
            msg.CorrelationId = correlationId;
            msg.ReplyTo = replyTo;
        }
        await _producer.SendAsync(address, msg);
    }
}

Service 2中,我有一个InstanceCreatedConsumer,它接收消息,但它再次缺少返回响应消息的方法。

public class InstanceCreatedConsumer : ITypedConsumer<InstanceCreated>
{
    private readonly MessageProducer _messageProducer;
    public InstanceCreatedConsumer(MessageProducer messageProducer)
    {
        _messageProducer = messageProducer;
    }
    public async Task ConsumeAsync(InstanceCreated message, CancellationToken cancellationToken)
    {
        // consume message and return response
    }
}

我想我也许可以使用 ConsumeAsyncHandleMessage 扩展 ActiveMqExtensions 类,以处理带有返回值的响应消息,但我还没有做到这一点。

public static IActiveMqBuilder AddTypedConsumer<TMessage, TConsumer>(this IActiveMqBuilder builder,
    RoutingType routingType)
    where TConsumer : class, ITypedConsumer<TMessage>
{
    builder.Services.TryAddScoped<TConsumer>();
    builder.AddConsumer(typeof(TMessage).Name, routingType, HandleMessage<TMessage, TConsumer>);
    return builder;
}

private static async Task HandleMessage<TMessage, TConsumer>(Message message, IConsumer consumer, IServiceProvider serviceProvider, CancellationToken token)
    where TConsumer : class, ITypedConsumer<TMessage>
{
    try
    {
        var msg = JsonConvert.DeserializeObject<TMessage>(message.GetBody<string>());
        using var scope = serviceProvider.CreateScope();
        var typedConsumer = scope.ServiceProvider.GetService<TConsumer>();
        await typedConsumer.ConsumeAsync(msg, token);
        await consumer.AcceptAsync(message);
    }
    catch(Exception ex)
    {
        // todo
    }
}

我在这里尝试实现的目标是完全错误的,还是使用 ArtemisNetClient 是不可能的?

也许有人有一个例子或可以确认我是否走在正确的道路上,或者我应该使用不同的框架。

我不熟悉这种通过 ActiveMQ Artemis 等消息进行的通信,因此非常感谢任何指导。

【问题讨论】:

    标签: c# activemq-artemis request-response


    【解决方案1】:

    从您的应用程序的角度来看,我在 ArtemisNetClient 中看不到任何可以简化请求/响应模式的内容。有人可能会期待类似于 JMS 的 QueueRequestor 的东西,但我在代码中没有看到类似的内容,也没有在文档中看到类似的内容。

    我建议您简单地在应用程序中执行您在示例中所做的操作(即手动创建消费者和生产者以分别处理每一端的响应)。我建议的唯一更改是重用连接,以便您创建尽可能少的连接。连接池在这里是理想的选择。


    就其价值而言,在我看来,第一个 release of ArtemisNetClient 只是在 3 个月前,而 according to GitHub 除了 2 个对代码库的提交之外,其他所有提交都来自一位开发人员。 ArtemisNetClient 可能会成长为一个非常成功的 C# 客户端实现,但在这一点上它似乎相对不成熟。即使如果现有代码是高质量的,如果客户端周围没有稳固的社区,那么它很可能没有必要的支持来获得及时的错误修复、新功能等。只有时间会告诉你的。

    【讨论】:

    • 这正是我一直在寻找的,因为我已经看到了许多创建临时会话队列的 Java 示例,但我在 ArtemisNetClient 中找不到类似的东西。感谢您的输入。
    猜你喜欢
    • 2020-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-20
    • 2015-02-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-24
    相关资源
    最近更新 更多