【问题标题】:Sending data through ZeroMQ (zmqpp) using MsgPack gives 'msgpack::v1::insufficient_bytes' error使用 MsgPack 通过 ZeroMQ (zmqpp) 发送数据会出现“msgpack::v1::insufficient_bytes”错误
【发布时间】:2015-08-22 16:17:30
【问题描述】:

我使用zmqpp 建立了一个 PUB/SUB 连接,现在我想使用 msgpack-c 的仅标头的 C++11 版本将数据从发布者发送到订阅者。

发布者必须发送 2 个 int64_t 号码 -- header_1header_2 -- 后跟一个 std::vector<T> -- data --,其中 T(header_1, header_2) 组合确定.

虽然没有太多关于如何结合 msgpack 和 zmqpp 的示例,但我想出的想法是使用 zmqpp::message::add/add_raw 发送 3 部分消息。使用 msgpack 每个部分将是 packed/unpacked

发布者打包单个数据部分如下:

zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());

接收方这样解包:

zmqpp::message msg;
subscriberSock.receive(msg);

int64_t header_1;
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
                static_cast<const char*>(msg.raw_data(0)),
                msg.size(0));
unpackedData.get().convert(&header_1);

当我运行代码时,我在订阅者端收到以下错误:

terminate called after throwing an instance of 'msgpack::v1::insufficient_bytes'
  what():  insufficient bytes
Aborted

此外,zmqpp 似乎生成了一条由 5 部分组成的消息,尽管我只调用了 3 次 add()

Q1:我是否正确打包/解包数据?

Q2:这是使用 zmqpp 发送 msgpack 缓冲区的正确方法吗?

以下是代码的重要部分:

出版商

zmqpp::socket publisherSock;
/* connection setup stuff ...*/

// forever send data to the subscribers
while(true)
{
    zmqpp::message msg;

    // meta info about the data
    int64_t header_1 = 1234567;
    int64_t header_2 = 89;
    // sample data
    std::vector<double> data;
    data.push_back(1.2);
    data.push_back(3.4);
    data.push_back(5.6);


    {
        msgpack::sbuffer buffer;
        msgpack::pack(buffer, header_1);
        msg.add(buffer.data(), buffer.size());
        cout << "header_1:" << header_1 << endl;  // header_1:1234567
    }

    {
        msgpack::sbuffer buffer;
        msgpack::pack(buffer, header_2);
        msg.add(buffer.data(), buffer.size());
        cout << "header_2:" << header_2 << endl;  // header_2:89
    }

    {
        msgpack::sbuffer buffer;
        msgpack::pack(buffer, data);
        msg.add_raw(buffer.data(), buffer.size());
        std::cout << "data: " << data << std::endl;  // data:[1.2 3.4 5.6]
    }

    std::cout << msg.parts() << " parts" << std::endl;  // prints "5 parts"... why ?
    publisherSock.send(msg);

    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

订阅者

zmqpp::socket subscriberSock;
/* connection setup stuff ...*/

zmqpp::message msg;
subscriberSock.receive(msg);

int64_t header_1;
int64_t header_2;
std::vector<double> data;

std::cout << msg.parts() << " parts" << std::endl;  // prints "5 parts"
{
    // header 1
    {
        msgpack::unpacked unpackedData;
        // crash !
        msgpack::unpack(unpackedData,
                        static_cast<const char*>(msg.raw_data(0)),
                        msg.size(0));
        unpackedData.get().convert(&header_1);
        cout << "header_1:" << header_1 << endl;
    }
    // header 2
    {
        msgpack::unpacked unpackedData;
        msgpack::unpack(unpackedData,
                        static_cast<const char*>(msg.raw_data(1)),
                        msg.size(1));
        unpackedData.get().convert(&header_2);
        cout << "header_2:" << header_2 << endl;
    }
    // data
    {
        msgpack::unpacked unpacked_data;
        msgpack::unpack(unpacked_data,
                        static_cast<const char*>(msg.raw_data(2)),
                        msg.size(2));
        unpacked_data.get().convert(&data);
        std::cout << "data:" << data << std::endl;
    }

}

编辑:问题已解决:正如@Jens 所指出的,打包/发送数据的正确方法是使用zmqpp::message::add_raw()

zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add_raw(buffer.data(), buffer.size());

【问题讨论】:

    标签: c++ c++11 zeromq msgpack messagepack


    【解决方案1】:

    我认为对msg.add(buffer.data(), buffer.size()的调用不会添加buffer.size()字节数组,而是调用message::add(Type const&amp; part, Args &amp;&amp;...args),这

    1. msg &lt;&lt; buffer.data(),它可能调用 message::operator&lt;&lt;(bool),因为指针转换为 bool
    2. add(buffer.size()) 然后调用 msg &lt;&lt; buffer.size(),它添加一个 size_t 值作为下一部分。

    查看 zmqpp::message 类,使用 message::add_raw 应该可以解决问题。

    PS:这一切都没有任何保证,因为我从未使用过zmqpp或msgpack。

    【讨论】:

    • 谢谢,但即使使用message::add_raw(),订阅者仍然会遇到同样的崩溃。此外,zmqpp 现在生成 5 个部分,而不是 6 个……我不明白为什么……
    • @865719 您必须将所有对 message::add 的调用替换为 message::add_raw。添加指针 buffer.data() 和 buffer.size() ,而不是为两个标头字段添加 int64。你能在其他 couts 之后添加cout &lt;&lt; msg.parts() &lt;&lt; endl 吗?
    • 是的!现在 zmqpp 按预期创建一个由 3 部分组成的消息。另外,在第一个示例中,我在解压接收端的向量时错误地使用了索引3 而不是2
    • 是的,我可以确认在这里使用add_raw 是正确的做法。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多