【问题标题】:Received doesn't get triggered收到没有被触发
【发布时间】:2017-05-14 12:28:52
【问题描述】:

我将共享库作为总线,我正在尝试从 rabbitmq 接收消息,但 ConsumerOnReceived 从未被触发。

namespace Bus
{
    public class MessageListener
    {

    private static IConnection _connection;
    private static IModel _channel;

    public void Start(string hostName, int port, string queueName)
    {
        var factory = new ConnectionFactory() { HostName = hostName, Port = port };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: queueName,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += ConsumerOnReceived;

            channel.BasicConsume(queue: queueName,
                                 noAck: true,
                                 consumer: consumer);
        }
    }

    public static void Stop()
    {
        _channel.Close(200, "Goodbye");
        _connection.Close();
    }

    public virtual void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea)
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body);
    }
}

public static class MessageSender
{
    public static void Send(string hostName, int port, string queueName, string message)
    {
        var factory = new ConnectionFactory() { HostName = hostName, Port = port };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

            var body = Encoding.UTF8.GetBytes(message.ToString());

            channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
        }
    }
}

}

核心

namespace Core
{
    class Program
    {
        static void Main(string[] args)
        {
            new MessageListener().Start("localhost", 5672, "MakePayment");

            Console.WriteLine("Core Service");
            string line = Console.ReadLine();
        }

    }
}

namespace Core
{
    public class MessageListener : Bus.MessageListener
    {
        public override void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea)
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
        }
    }

}

【问题讨论】:

    标签: c# .net rabbitmq


    【解决方案1】:

    问题来了

    channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
    

    但是,BasicConsume 不是阻塞方法,因此当您调用 Start 时,您会创建一个连接和一个通道,但它会立即被释放。

    以下是不是解决方案,但您可以通过执行以下操作来确认:

    channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
    Console.ReadKey();//←Added Line
    

    您的程序将以这种方式运行。

    这是我提出的解决方案。请注意_channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); 将在另一个线程上启动,因此您不需要使用while(...)

    using System;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace Bus {
    
        public abstract class BaseMessageListener {
            private static IModel _channel;
            private static IConnection _connection;
    
            public abstract void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea);
    
            public void Start(string hostName, int port, string queueName) {
                var factory = new ConnectionFactory() { HostName = hostName, Port = port };
                _connection = factory.CreateConnection();
                _channel = _connection.CreateModel();
                _channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false);
                var consumer = new EventingBasicConsumer(_channel);
                consumer.Received += ConsumerOnReceived;
                _channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);//This will start another thread!
            }
    
            public void Stop() {
                _channel.Close(200, "Goodbye");
                _connection.Close();
            }
        }
    }
    
    namespace StackOverfFLow.RabbitMQSolution {
    
        using Bus;
    
        public class MessageListener : BaseMessageListener {
    
            public override void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea) {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(message);
            }
        }
    
        internal class Program {
    
            private static void Main(string[] args) {
                var listener = new MessageListener();
                listener.Start("localhost", 5672, "MakePayment");
                Console.WriteLine("Core Service Started!");
                Console.ReadKey();
                listener.Stop();
            }
        }
    }
    

    【讨论】:

    • 这确实有效,但有没有比 ReadKey 更好的做法来处理这个问题?
    • @Mert,我不确定你把while (_channel.IsOpen) { }放在哪里,但如果你把它放在channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);这一行之后;我不认为你需要那个。请查看我的更新答案。
    猜你喜欢
    • 1970-01-01
    • 2017-05-19
    • 1970-01-01
    • 1970-01-01
    • 2017-11-22
    • 2012-06-03
    • 2019-11-07
    • 2018-09-01
    • 2018-12-03
    相关资源
    最近更新 更多