【问题标题】:RabbitMQ Missing data in body after dequeueRabbitMQ 出队后正文中缺少数据
【发布时间】:2012-08-18 00:30:00
【问题描述】:

我正在使用 RabbitMQ 处理其他人的项目,并且遇到了出队和丢失数据的问题。

当我发布时,数据都以字符串形式存在,并且它也正确地存在于 RabbitMQ 队列中。当我拉出数据时,数据的但像用户 ID 一样在那里,但其余的都消失了。我查看了整个代码,我相当肯定它发生在 RabbitMQ 上,并且在我出队时发生。任何帮助将不胜感激。谢谢。 这是发布之前的代码。

        private bool sendJobToMQ(EncodeJobModel job, string p_correlation_id, string p_request_routing_key)
    {
        JavaScriptSerializer ser = new JavaScriptSerializer();
        StringBuilder sb_job = new StringBuilder();
        ser.Serialize(job, sb_job);
        string rpc_reply_queue;

        ConnectionFactory factory = new ConnectionFactory();
        factory.HostName = HOST_NAME;
        factory.VirtualHost = VHOST_NAME;
        factory.UserName = USERNAME;
        factory.Password = PASSWORD;
        IConnection rabconn = factory.CreateConnection();
        IModel sender_channel = rabconn.CreateModel();
        try
        {
            sender_channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Direct, true, false, null);
        }
        catch (Exception err)
        {
            logger.Error("Error Declaring Exchange " + EXCHANGE_NAME + ": " + err.ToString());
            return false;
        }
        try
        {
            sender_channel.QueueDeclare(REQUEST_QUEUE, true, false, false, null);
        }
        catch (Exception err)
        {
            logger.Error("Error QueueDeclare (" + REQUEST_QUEUE + " true, false, false, null): " + err.ToString());
            return false;
        }
        try
        {
            sender_channel.QueueBind(REQUEST_QUEUE, EXCHANGE_NAME, REQUEST_ROUTING_KEY, null);
        }
        catch (Exception err)
        {
            logger.Error("Error QueueBind (" + REQUEST_QUEUE + " -> " + EXCHANGE_NAME + " " + REQUEST_ROUTING_KEY + ", null): " + err.ToString());
            return false;
        }

        //rpc_reply_queue = sender_channel.QueueDeclare("rq_" + job.encodejob_id.ToString(), false, false, true, null);
        //////bind the rpc reply queue to the exchange via a routing key (I appended _routingkey to signify this)
        //sender_channel.QueueBind(rpc_reply_queue, EXCHANGE_NAME, rpc_reply_queue + "_routingkey");

        //// Not sure what the props object is for yet but you can try to pass null in the mean time - Steve "Apeshit" Han
        BasicProperties props = new BasicProperties();
        props.CorrelationId = p_correlation_id;
        //props.ReplyTo = rpc_reply_queue;

        try
        {
            sender_channel.BasicPublish(EXCHANGE_NAME, REQUEST_ROUTING_KEY, props, Encoding.UTF8.GetBytes(sb_job.ToString()));

        }

以及出队的代码。

 QueueingBasicConsumer consumer = new QueueingBasicConsumer(p_channel);
        string consumerTag = p_channel.BasicConsume(p_queue, false, consumer);
        if (_is_console && Environment.UserInteractive)
            Console.WriteLine("Listening...");
        while (m_Listen)
        {
            try
            {
                //get the properties of the message, including the ReplyTo queue, to which we can append '_routingkey' (designated by me), to reply with messages
                BasicDeliverEventArgs e;
                Object message;
                if (!consumer.Queue.Dequeue(4000, out message)) {
                    // we do not wait to indefinitely block on waiting for the queue
                    // if nothing in queue continue loop iteration and wait again
                    continue;
                }

                // cast as necessary back to BasicDeliverEventArgs
                e = (BasicDeliverEventArgs)message;
                IBasicProperties props = e.BasicProperties;
                //get the Correlation ID sent by the client to track the job
                string client_correlation_id = props.CorrelationId;
                // I left out the reply_to field in the wizard, it can be set back in ApiEncodeServiceDefault - Steve "Smurfing Smurf" Han
                //string reply_to = props.ReplyTo;

                //get the body of the request
                byte[] body = e.Body;
                string body_result = Encoding.UTF8.GetString(body);
                bool redelivered = e.Redelivered;

e.Body 字符串缺少数据。

【问题讨论】:

  • 几个月前看到这个使用较新版本的 RabbitMQ 服务器。我们回滚了它,直到我们有时间调查并且还没有深入研究它。在我们的例子中,语言是 Perl,但症状与您描述的相同。你在客户端和服务器端都运行什么版本的 RabbitMQ?
  • 10 年后,我看到了类似的情况。这有没有弄清楚?

标签: c# asp-classic rabbitmq


【解决方案1】:

如果您没有任何消息,为什么要继续 最好在收到消息之前阻止,否则该过程不有趣(没有数据工作?) 试试这样

QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, null, consumer);
while (m_Listen) {
try {
RabbitMQ.Client.Events.BasicDeliverEventArgs e =
(RabbitMQ.Client.Events.BasicDeliverEventArgs)
consumer.Queue.Dequeue();
IBasicProperties props = e.BasicProperties;
byte[] body = e.Body;
// ... process the message
channel.BasicAck(e.DeliveryTag, false);
} catch (OperationInterruptedException ex) {
// The consumer was removed, either through
// channel or connection closure, or through the
// action of IModel.BasicCancel().
break;
}

}

【讨论】:

  • 谢谢,但这并不能完全回答我的问题。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-02-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多