【问题标题】:Serializing a serialized Thrift struct to Kafka in C++在 C++ 中将序列化的 Thrift 结构序列化为 Kafka
【发布时间】:2017-08-09 19:33:57
【问题描述】:

我在Thrift 中定义了一组structs,如下所示:

struct Foo {
  1: i32 a,
  2: i64 b
}

我需要在C++中执行以下操作:

(a) 将 Foo 的实例序列化为 Thrift 兼容的字节(使用 BinaryCompact Thrift 协议)

(b) 将字节序列化的实例发送到Kafka 主题

问题

如何将Thrift 序列化实例发送到Kafka 集群?

提前致谢

【问题讨论】:

标签: c++ serialization apache-kafka thrift


【解决方案1】:

想出了我自己问题的答案。

序列化

下面的代码 sn-p 说明了如何将 Foo 的实例序列化为与 Thrift 兼容的字节(使用 Thrift Compact 协议)。要使用Binary 协议,请将TCompactProtocol 替换为TBinaryProtocol

#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TCompactProtocol.h>

using apache::thrift::protocol::TCompactProtocol;
using apache::thrift::transport::TMemoryBuffer;

...
...
boost::shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer());
boost::shared_ptr<TCompactProtocol> protocol(new TCompactProtocol(buffer));
uint8_t **serialized_bytes = reinterpret_cast<uint8_t **>(malloc(sizeof(uint8_t *)));
uint32_t num_bytes = 0;

// 'foo' is an instance of Foo
foo->write(protocol.get());
buffer->getBuffer(serialized_bytes, &num_bytes);

发送到 Kafka 集群

以下代码 sn-p 说明了如何将 Thrift 兼容字节发送到 Kafka 集群。

注意:下面使用的kafka客户端库是librdkafka

#include "rdkafkacpp.h"

std::string errstr;

// Create global configuration
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", "localhost:9092", errstr);
conf->set("api.version.request", "true", errstr);

// Create kafka producer
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);

// Create topic-specific configuration
RdKafka::Topic *topic = RdKafka::Topic::create(producer, "topic_name", nullptr, errstr);

auto partition = 1;

// Sending the serialized bytes to Kafka cluster
auto res = producer->produce(
    topic, partition,
    RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
    serialized_bytes, num_bytes,
    NULL, NULL);

  if (res != RdKafka::ERR_NO_ERROR) {
    std::cerr << "Failed to publish message" << RdKafka::err2str(res) << std::endl;
  } else {
    std::cout << "Published message of " << num_bytes << " bytes" << std::endl;
  }

producer->flush(10000);

【讨论】:

    猜你喜欢
    • 2019-08-11
    • 2010-12-11
    • 2018-03-31
    • 1970-01-01
    • 2016-02-21
    • 1970-01-01
    • 1970-01-01
    • 2017-02-21
    • 2010-10-18
    相关资源
    最近更新 更多