【问题标题】:Spark Context on Bluemix adds null to json payloadBluemix 上的 Spark Context 将 null 添加到 json 有效负载
【发布时间】:2016-12-29 04:11:13
【问题描述】:

我正在将消息从 Message Hub 流式传输到 Bluemix 中的 Spark 实例。我正在使用 java 客户端将简单的 json 消息发送到 Message Hub。

JSON 消息 -

{"country":"Netherlands","dma_code":"0","timezone":"Europe\/Amsterdam","area_code":"0","ip":"46.19.37.108","asn":"AS196752","continent_code":"EU","isp":"Tilaa V.O.F.","longitude":5.75,"latitude":52.5,"country_code":"NL","country_code3":"NLD"}

当我开始在 Spark 中流式传输时,我收到的消息的开头有一个额外的 null。

(null,{"country":"Netherlands","dma_code":"0","timezone":"Europe\/Amsterdam","area_code":"0","ip":"46.19.37.108","asn":"AS196752","continent_code":"EU","isp":"Tilaa V.O.F.","longitude":5.75,"latitude":52.5,"country_code":"NL","country_code3":"NLD"})

请告诉我为什么 Spark 上下文将这个 null 放在前面。如何删除它?

KafkaSender 代码 -

  KafkaProducer<String, String> kafkaProducer;
  kafkaProducer = new KafkaProducer<String, String>(props);
  ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);

  RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
  //getting RecordMetadata is possible to validate topic, partition and offset
  System.out.println("topic where message is published : " + recordMetadata.topic());
  System.out.println("partition where message is published : " + recordMetadata.partition());
  System.out.println("message offset # : " + recordMetadata.offset());
  kafkaProducer.close();

谢谢 拉杰

【问题讨论】:

标签: apache-spark ibm-cloud message-hub


【解决方案1】:

您的密钥为空 - 第一个值是您的密钥,第二个当然是您的值。

我建议您发布将消息发布到 Kafka/MessageHub 的代码以获得更好的答案。

要解决您的问题 - 如果您的目标只是将其打印出来,您可以执行类似的操作,这会将数据打印到标准输出并忽略 null 键。

stream.foreachRDD(recordRDD => {
  recordRDD.foreach(record => print(record._2))
})

【讨论】:

  • 同样,有问题的生产者代码没有发送密钥。我不确定你的问题是什么?你得到一个 String 的元组,String back.. 只需使用元组的 _2 来获取你的值。
猜你喜欢
  • 1970-01-01
  • 2021-07-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-04-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多