【发布时间】:2020-10-14 11:23:34
【问题描述】:
这个解决方案很慢,我想这是为我发送给一个客户的答案创建了许多队列。有没有一种方法可以创建两个队列,例如请求和响应队列,然后过滤一个唯一标识符,以便客户端可以等待自己的答案?还是我需要每个客户都有自己的队列来接收答案?
这个类是用于payLoad的,我用序列化把它转成消息。
namespace PayLoad
{
public class PayLoad
{
public string clientQueue { get; set; } // the queue where the client listens to to receive a response
public int x { get; set; }
public int y { get; set; }
public int response { get; set; }
}
}
这是我的通信助手类,需要一些重构,但这只是一个概念证明。
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.IO;
using System.Runtime.Serialization;
using System.Text;
using System.Threading.Tasks;
namespace PayLoad
{
public class Job
{
static string _serverQueue = "ElvisDTS_Incomming";
public void doWork(string payLoadMessage)
{
var ThreadId = System.Threading.Thread.CurrentThread.ManagedThreadId;
Console.WriteLine($"[x] Received {payLoadMessage} on thread {ThreadId}");
var payLoad = DeserializeData<PayLoad>(payLoadMessage);
payLoad.response = payLoad.x + payLoad.y;
var responseMessage = SerializeData(payLoad);
Console.WriteLine($"[x] Sending response to {payLoad.clientQueue} on thread {ThreadId}");
RabbitSend(payLoad.clientQueue, responseMessage);
}
public static void askServer(int x, int y)
{
var payload = new PayLoad();
payload.x = x;
payload.y = y;
payload.clientQueue = System.Guid.NewGuid().ToString();
payload.response = -1;
var clientMessage = SerializeData(payload);
RabbitSend(_serverQueue, clientMessage);
var factory = new ConnectionFactory() { HostName = "localhost" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare(queue: payload.clientQueue,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var result = DeserializeData<PayLoad>(message);
Console.WriteLine("The result is " + result.response);
};
channel.BasicConsume(queue: payload.clientQueue,
autoAck: true,
consumer: consumer);
}
private static void RabbitSend(string queueName, string message)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: queueName,
basicProperties: null,
body: body);
}
}
public static void runServer(string queueName)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueuePurge(queueName);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var job = new Job();
Task.Factory.StartNew(() => job.doWork(message));
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
}
private static string SerializeData<T>(T requestStruct)
{
var settings = new DataContractSerializerSettings();
settings.MaxItemsInObjectGraph = Int32.MaxValue;
var serializer = new DataContractSerializer(typeof(T), settings);
var ms = new MemoryStream();
serializer.WriteObject(ms, requestStruct);
return Encoding.UTF8.GetString(ms.ToArray());
}
private static T DeserializeData<T>(string xml)
{
var settings = new DataContractSerializerSettings();
settings.MaxItemsInObjectGraph = Int32.MaxValue;
var serializer = new DataContractSerializer(typeof(T), settings);
var ms = GenerateStreamFromString(xml);
return ((T)(serializer.ReadObject(ms)));
}
private static MemoryStream GenerateStreamFromString(string value)
{
return new MemoryStream(Encoding.UTF8.GetBytes(value ?? ""));
}
}
}
我的客户是这样的:
using System;
namespace RabbitSample
{
class Program
{
static void Main(string[] args)
{
for (int i = 1; i < 100; i++) PayLoad.Job.askServer(42, i);
Console.WriteLine("done");
Console.ReadLine();
}
}
}
我的服务器是这样的:
using System;
using System.Threading.Tasks;
namespace RabbitServer
{
class Program
{
static void Main(string[] args)
{
AppDomain.CurrentDomain.UnhandledException += new UnhandledExceptionEventHandler(CurrentDomain_UnhandledException);
Task.Factory.StartNew(() => PayLoad.Job.runServer("ElvisDTS_Incomming"));
Console.WriteLine("Done");
Console.ReadLine();
}
private static void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e)
{
Console.WriteLine(e.ToString());
}
}
}
【问题讨论】:
-
你可以在 gitHub 上找到完整的运行示例项目。 github.com/NickDinges/RabbitSample