【问题标题】:Unable to send image via RabbitMQ C master无法通过 RabbitMQ C 主机发送图像
【发布时间】:2017-09-08 06:08:27
【问题描述】:

我正在尝试通过 RabbitMQ C master 发送图像,方法是将其二进制数据用作消息,然后将其传输并将其复制到另一个图像文件中。 我的代码如下:

客户/发件人:

#include "amqp_wrapper_library.h"

int main(int argc, char const *const *argv)
{
  char const *hostname;
  int port;
  char const *exchange;
  char const *routingkey;
  char const *messagebody;
  char const *username;
  char const *password;
  amqp_connection_state_t conn;

  if (argc < 7) {
    fprintf(stderr, "Usage: amqp_sendstring host port exchange routingkey messagebody username password\n");
    return 1;
  }

  hostname = argv[1];
  port = atoi(argv[2]);
  exchange = argv[3];
  routingkey = argv[4];
  username = argv[5];
  password = argv[6];

  int length;
  FILE *image1;
  image1= fopen("/home/sneha/Desktop/Sender/image1.jpg", "rb");    
  fseek(image1, 0, SEEK_END);         
  length= ftell(image1);           
  rewind(image1);               

  messagebody = (char *)malloc(sizeof(char)*(length+1));
  fread(messagebody, length, 1, image1);
  printf("%s",messagebody);
  fclose(image1);

  conn = create_messagebus_context(hostname,port,username,password);
  send_message_on_messagebus(conn,exchange,routingkey,messagebody);
  close_messagebus_context(conn);

  return 0;
}

服务器/接收器:

#include "amqp_wrapper_library.h"

int main(int argc, char const *const *argv)
{
  char const *hostname;
  int port;
  char const *exchange;
  char const *bindingkey;
  char const *username;
  char const *password;
  amqp_connection_state_t conn;
  FILE *image2;

  if (argc < 7) {
    fprintf(stderr, "Usage: amqp_listen host port exchange bindingkey username password\n");
    return 1;
  }

  hostname = argv[1];
  port = atoi(argv[2]);
  exchange = argv[3];
  bindingkey = argv[4];
  username = argv[5];
  password = argv[6];

  conn = create_messagebus_context(hostname, port, username, password);
  image2 = fopen ("/home/sneha/Desktop/Receiver/image2.jpg", "wb");
  char *messagebody = receive_message_on_messagebus(conn,exchange,bindingkey);  
  fwrite (messagebody, 1, sizeof(messagebody), image2);
  printf("%s",messagebody);

  fclose(image2);

  close_messagebus_context(conn);

  return 0;
}

amqp_wrapper_library.c

#include "amqp_wrapper_library.h"

amqp_connection_state_t create_messagebus_context(char const *hostname,int port,char const *username,char const *password)
{
  amqp_socket_t *socket = NULL;
  amqp_connection_state_t conn = amqp_new_connection();
  socket = amqp_tcp_socket_new(conn);
  if (!socket) {
    die("creating TCP socket");
  }

  int status = amqp_socket_open(socket, hostname, port);
  if (status) {
    die("opening TCP socket");
  }

  die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, username, password), "Logging in");
  amqp_channel_open(conn, 1);
  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
  return conn;
}

void close_messagebus_context(amqp_connection_state_t conn)
{
  die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
  die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
  die_on_error(amqp_destroy_connection(conn), "Ending connection");
}

void send_message_on_messagebus(amqp_connection_state_t conn,char const *exchange,char const *routingkey,char const *messagebody)
{
    amqp_basic_properties_t props;
    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
    props.content_type = amqp_cstring_bytes("text/plain");
    props.delivery_mode = 2; /* persistent delivery mode */
    die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(routingkey), 0, 0, &props, amqp_cstring_bytes(messagebody)), "Publishing");
}

char* receive_message_on_messagebus(amqp_connection_state_t conn,char const *exchange,char const *bindingkey)
{
  amqp_bytes_t queuename;
  {
    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
    die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
    queuename = amqp_bytes_malloc_dup(r->queue);
    if (queuename.bytes == NULL) 
    {
      fprintf(stderr, "Out of memory while copying queue name");
      return 0;
    }
  }

  amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey), amqp_empty_table);
  die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");

  amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
  die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");

  {
    //for (;;) 
    //{
      amqp_rpc_reply_t res;
      amqp_envelope_t envelope;

      amqp_maybe_release_buffers(conn);

      res = amqp_consume_message(conn, &envelope, NULL, 0);

      if (AMQP_RESPONSE_NORMAL != res.reply_type) 
      {
        return;
      }
      char *result = (char *)malloc((int) envelope.message.body.len);
      sprintf(result,"%.*s",(int) envelope.message.body.len, (char *) envelope.message.body.bytes);
      amqp_destroy_envelope(&envelope);
      return result;
    //}
  }
}

但是,图像 2 并未重建。我检查了图像大小,两个图像的大小不匹配。这里有什么问题?

【问题讨论】:

    标签: c image rabbitmq rabbitmq-c


    【解决方案1】:

    来自 JPEG 文件的数据不是 c 字符串(以 null 结尾的字节字符串),因此无法使用像 printfamqp_cstring_bytes 这样期望以 null 结尾的字符串的函数来处理图像数据。

    不要使用 const char* 来保存 jpg 数据,而是在代码中使用 amqp_bytes_t 结构。

    从文件读入struct amqp_bytes_t

    amqp_bytes_t msg = amqp_bytes_alloc(length);
    /* Make sure that allocation succeeded */
    assert(msg.bytes != NULL);
    
    fread(msg.bytes, msg.len, 1, image1);
    ...
    /* Free the memory when you're done with it */
    amqp_bytes_free(msg);
    

    反向写回:

    /* if you wish to create a copy of the message body
     * free it using amqp_bytes_free */
    amqp_bytes_t msg = amqp_bytes_malloc_dup(envelope.message.body);
    
    fwrite(msg.bytes, msg.len, 1, image2);
    

    编辑:添加示例代码。

    【讨论】:

    • 但是我如何将 amqp_bytes_t 消息复制到文件中?我应该使用哪种格式说明符?
    • 你能解释一下 amqp_bytes_t 结构的用法吗?与 amqp_cstring_bytes 一起使用时,如何修改代码?
    猜你喜欢
    • 2012-05-26
    • 2020-09-29
    • 2021-09-14
    • 1970-01-01
    • 2011-05-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多