【问题标题】:How to send an client specific answer from an server using rabbitMq如何使用rabbitMq从服务器发送客户端特定的答案
【发布时间】:2020-10-14 11:23:34
【问题描述】:

这个解决方案很慢,我想这是为我发送给一个客户的答案创建了许多队列。有没有一种方法可以创建两个队列,例如请求和响应队列,然后过滤一个唯一标识符,以便客户端可以等待自己的答案?还是我需要每个客户都有自己的队列来接收答案?

这个类是用于payLoad的,我用序列化把它转成消息。

 namespace PayLoad
    {
        public class PayLoad
        {
            public string clientQueue { get; set; } // the queue where the client listens to to receive a response
            public int x { get; set; }
            public int y { get; set; }
            public int response { get; set; }
        }
    }

这是我的通信助手类,需要一些重构,但这只是一个概念证明。

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.IO;
using System.Runtime.Serialization;
using System.Text;
using System.Threading.Tasks;

    namespace PayLoad
    {
        public class Job
        {
            static string _serverQueue = "ElvisDTS_Incomming";
    
            public void doWork(string payLoadMessage)
            {
                var ThreadId = System.Threading.Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine($"[x] Received {payLoadMessage} on thread {ThreadId}");
                var payLoad = DeserializeData<PayLoad>(payLoadMessage);
                payLoad.response = payLoad.x + payLoad.y;
                var responseMessage = SerializeData(payLoad);
                Console.WriteLine($"[x] Sending response to {payLoad.clientQueue} on thread {ThreadId}");
                RabbitSend(payLoad.clientQueue, responseMessage);           
            }
    
            public static void askServer(int x, int y)
            {
                var payload = new PayLoad();
                payload.x = x;
                payload.y = y;
                payload.clientQueue = System.Guid.NewGuid().ToString();
                payload.response = -1;
                var clientMessage = SerializeData(payload);
                RabbitSend(_serverQueue, clientMessage);
    
                var factory = new ConnectionFactory() { HostName = "localhost" };
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
    
                channel.QueueDeclare(queue: payload.clientQueue,
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    var result = DeserializeData<PayLoad>(message);
    
                    Console.WriteLine("The result is " + result.response);
                };
                channel.BasicConsume(queue: payload.clientQueue,
                                     autoAck: true,
                                     consumer: consumer);
            }
    
            private static void RabbitSend(string queueName, string message)
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: queueName,
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
    
                    var body = Encoding.UTF8.GetBytes(message);
    
                    channel.BasicPublish(exchange: "",
                                         routingKey: queueName,
                                         basicProperties: null,
                                         body: body);
                }
            }
    
            public static void runServer(string queueName)
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
    
                channel.QueueDeclare(queue: queueName,
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
    
                channel.QueuePurge(queueName);
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    var job = new Job();
                    Task.Factory.StartNew(() => job.doWork(message));
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
            }
    
            private static string SerializeData<T>(T requestStruct)
            {
                var settings = new DataContractSerializerSettings();
                settings.MaxItemsInObjectGraph = Int32.MaxValue;
                var serializer = new DataContractSerializer(typeof(T), settings);
                var ms = new MemoryStream();
                serializer.WriteObject(ms, requestStruct);
                return Encoding.UTF8.GetString(ms.ToArray());
            }
    
            private static T DeserializeData<T>(string xml)
            {
                var settings = new DataContractSerializerSettings();
                settings.MaxItemsInObjectGraph = Int32.MaxValue;
                var serializer = new DataContractSerializer(typeof(T), settings);
                var ms = GenerateStreamFromString(xml);
                return ((T)(serializer.ReadObject(ms)));
            }
    
            private static MemoryStream GenerateStreamFromString(string value)
            {
                return new MemoryStream(Encoding.UTF8.GetBytes(value ?? ""));
            }
        }
    }

我的客户是这样的:

using System;

namespace RabbitSample
{
    class Program
    {
        static void Main(string[] args)
        {

            for (int i = 1; i < 100; i++) PayLoad.Job.askServer(42, i);
            Console.WriteLine("done");
            Console.ReadLine();
        }
    }
}

我的服务器是这样的:

using System;
using System.Threading.Tasks;

namespace RabbitServer
{
    class Program
    {
        static void Main(string[] args)
        {
            AppDomain.CurrentDomain.UnhandledException += new UnhandledExceptionEventHandler(CurrentDomain_UnhandledException);
            Task.Factory.StartNew(() => PayLoad.Job.runServer("ElvisDTS_Incomming"));

            Console.WriteLine("Done");
            Console.ReadLine();
        }

        private static void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e)
        {
            Console.WriteLine(e.ToString());
        }
    }
}

【问题讨论】:

标签: c# .net rabbitmq


【解决方案1】:

问题

这个解决方案很慢,我猜是创建了很多队列。

回答

不完全是。您创建队列的方式可能会导致channel leaks(这反过来会导致内存压力,从而减慢 RabbitMQ 代理),但这不是主要问题 (document)。

对于使用多个线程/进程进行处理的应用程序,很常见的是为每个线程/进程打开一个新通道,并且它们之间不共享通道。

另一方面,您应该始终考虑长期连接 (document),以防止资源耗尽(文件句柄和 TCP 套接字)。

RabbitMQ 支持的所有协议都是基于 TCP 的,并且为了提高效率而假设连接是长寿命的(每个协议操作都不会打开一个新连接)。


问题

有没有一种方法可以创建两个队列,例如请求和响应队列,然后过滤唯一标识符,以便客户端可以等待自己的答案?

回答

不,RabbitMQ 不是这样工作的。

可以创建两个队列。在这种情况下,您的客户端将充当consumers 来使用响应队列。您的每个客户都会以几乎平等的机会收到消息(答案)(默认为循环调度)。无法保证客户端 A 一定会收到消息 A。即使客户端 A nack 消息 B 并重新排队,下次客户端 C 也可能会收到它。


问题

或者我需要每个客户都有自己的队列来接收答案吗?

回答

这完全取决于您的要求。如果您需要多个队列来处理不同的任务,那么可以。

为了防止channel leaks 和/或high channel churn,您可以

  • 只声明一次所有响应队列(如果您事先知道这些队列)
    • 这也将节省一些时间,而无需重新声明相同的队列(尽管该操作是幂等的)
  • 从频道池中获取频道 (document)。

虽然对于某些工作负载,这是系统的自然状态,但应尽可能使用长寿命通道。

【讨论】:

  • 更具体地说,我尝试将 Web 应用程序与运行在 Windows 服务中的业务逻辑分离。请求很慢,所以我尝试以异步方式等待。所以每个客户端都是一个等待特定答案的会话,所以打开两个通道可以是静态声明的,但是当消息是回答特定会话?谢谢;-)
  • @NickD - 路由发生在exchangequeue(带有绑定键)级别。假设您只有一个 response 队列。所有具有不同会话 ID 的回调都将被推送到该队列。你的客户是consumer 监听那个队列。但是路由不能在consumer 级别上完成。这就是为什么您无法过滤任何已推送到队列中的消息。
猜你喜欢
  • 2017-06-20
  • 2018-06-03
  • 1970-01-01
  • 1970-01-01
  • 2014-01-16
  • 1970-01-01
  • 2012-06-02
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多