【发布时间】:2015-05-15 02:13:14
【问题描述】:
我写了一个风暴拓扑。我基本上想以字节数组的形式将 avro 模式中的元组发送到 kafka 主题。
这就是我设置螺栓的方式:
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>())
.fieldsGrouping(BOLT1, new Fields("key"));
这就是我转换为字节数组的方式
Schema schema = avroObject.getSchema();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(ping, encoder);
encoder.flush();
byte[] message = out.toByteArray();
String key = new String(message, "UTF-8");
当我以以下方式发出元组时,我在 kafka 主题中看不到任何内容(向 kafka 发送字节流):
collector.emit(tuple, new Values(Obj.hashMD5(key), message));
但是,如果我将字节数组转换为字符串,然后再转换为 kafka 主题,它可以工作:
如下所示:
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, String>())
.fieldsGrouping(BOLT1, new Fields("key"));
collector.emit(tuple, new Values(Obj.hashMD5(key), key));
我做错了什么?如何使用storm kafka bolt将字节流发送到kafka topic?
【问题讨论】:
-
请出示您的 kafka 制作人。
-
我使用的是storm提供的Kafka bolt。参见 builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt
()) .fieldsGrouping(BOLT1, new Fields("key"));在上面的代码中 -
您正在将字节数组转换为 java 字符串以生成密钥,您可能会丢失数据,因为 java 字符串不是 C 字符串。您是否检查过您的键值是否正确?因为如果不是,您的 hashMD5 将是错误的。这可能是它不起作用的原因吗?
-
那么如何在java中将字节数组转换为字符串呢?
标签: java apache-kafka apache-storm kafka-consumer-api