【发布时间】:2016-12-29 04:11:13
【问题描述】:
我正在将消息从 Message Hub 流式传输到 Bluemix 中的 Spark 实例。我正在使用 java 客户端将简单的 json 消息发送到 Message Hub。
JSON 消息 -
{"country":"Netherlands","dma_code":"0","timezone":"Europe\/Amsterdam","area_code":"0","ip":"46.19.37.108","asn":"AS196752","continent_code":"EU","isp":"Tilaa V.O.F.","longitude":5.75,"latitude":52.5,"country_code":"NL","country_code3":"NLD"}
当我开始在 Spark 中流式传输时,我收到的消息的开头有一个额外的 null。
(null,{"country":"Netherlands","dma_code":"0","timezone":"Europe\/Amsterdam","area_code":"0","ip":"46.19.37.108","asn":"AS196752","continent_code":"EU","isp":"Tilaa V.O.F.","longitude":5.75,"latitude":52.5,"country_code":"NL","country_code3":"NLD"})
请告诉我为什么 Spark 上下文将这个 null 放在前面。如何删除它?
KafkaSender 代码 -
KafkaProducer<String, String> kafkaProducer;
kafkaProducer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);
RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
//getting RecordMetadata is possible to validate topic, partition and offset
System.out.println("topic where message is published : " + recordMetadata.topic());
System.out.println("partition where message is published : " + recordMetadata.partition());
System.out.println("message offset # : " + recordMetadata.offset());
kafkaProducer.close();
谢谢 拉杰
【问题讨论】:
-
您的问题似乎与stackoverflow.com/questions/36888224/… 类似,您的密钥丢失,因此消息中心(kafka)为您添加了 null,我怀疑 spark 上下文添加了这一点。
标签: apache-spark ibm-cloud message-hub