【发布时间】:2017-05-14 12:28:52
【问题描述】:
我将共享库作为总线,我正在尝试从 rabbitmq 接收消息,但 ConsumerOnReceived 从未被触发。
namespace Bus
{
public class MessageListener
{
private static IConnection _connection;
private static IModel _channel;
public void Start(string hostName, int port, string queueName)
{
var factory = new ConnectionFactory() { HostName = hostName, Port = port };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += ConsumerOnReceived;
channel.BasicConsume(queue: queueName,
noAck: true,
consumer: consumer);
}
}
public static void Stop()
{
_channel.Close(200, "Goodbye");
_connection.Close();
}
public virtual void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea)
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
}
}
public static class MessageSender
{
public static void Send(string hostName, int port, string queueName, string message)
{
var factory = new ConnectionFactory() { HostName = hostName, Port = port };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var body = Encoding.UTF8.GetBytes(message.ToString());
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
}
}
}
}
核心
namespace Core
{
class Program
{
static void Main(string[] args)
{
new MessageListener().Start("localhost", 5672, "MakePayment");
Console.WriteLine("Core Service");
string line = Console.ReadLine();
}
}
}
namespace Core
{
public class MessageListener : Bus.MessageListener
{
public override void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea)
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
}
}
}
【问题讨论】: