【问题标题】:How to get a JavaDStream of an Object in Spark Kafka Connector?如何在 Spark Kafka 连接器中获取对象的 JavaDStream?
【发布时间】:2016-12-17 22:24:45
【问题描述】:

我正在使用 Spark Kafka 连接器从 Kafka 集群中获取数据。从中,我以JavaDStream<String> 的形式获取数据。如何以JavaDStream<EventLog> 的形式获取数据,其中EventLog 是Java bean?

public static JavaDStream<EventLog> fetchAndValidateData(String zkQuorum, String group, Map<String, Integer> topicMap) {
    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
    JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
        @Override
        public String call(Tuple2<String, String> tuple2) {
            return tuple2._2();
        }
    });
    jssc.start();
    jssc.awaitTermination();
    return lines;
}

我的目标是将这些数据保存到 Cassandra 中,其中的表与 EventLog 具有相同的规格。 Spark Cassandra 连接器在插入语句中接受JavaRDD&lt;EventLog&gt;,如下所示:javaFunctions(rdd).writerBuilder("ks", "event", mapToRow(EventLog.class)).saveToCassandra();。我想从 Kafka 那里得到这些JavaRDD&lt;EventLog&gt;

【问题讨论】:

  • 您的意思是您仍然希望将消息作为字符串对并进行转换吗?还是您希望使用 JavaReceiverInputDStream?您究竟希望在哪里引入 EventLog 类型?您是否尝试定义一个接受 EventLog 类型并从中构建 JavaDStream 的接收器?
  • @Sunny 我的目标是将数据写入 Cassandra。 Spark Cassandra 连接器在插入语句中接受JavaRDD&lt;EventLog&gt;,如下所示:javaFunctions(rdd).writerBuilder("ks", "event", mapToRow(EventLog.class)).saveToCassandra();。我想从卡夫卡那里得到这些JavaRDD&lt;EventLog&gt;
  • 您是否也有权访问将这些事件日志写入 kafka 的代码?是否实现了自定义序列化程序,是否将 EventLogs 序列化并作为 EventLogs 写入 Kafka?
  • @Sunny 有自定义序列化程序正在发送到 Kafka。
  • 我相信实现一个自定义接收器是最简单的,它将从 org.apache.spark.streaming.receiver 扩展 Receiver。在 onStart() 方法中,您应该开始使用来自 Kafka 的这些事件并添加一个侦听器。如果你有一个活跃的接收者并且有一个 JavaStreamingContext 你可以做 jssc.receiverStream(receiver) 来获取 JavaReceiverInputDStream

标签: java apache-spark cassandra apache-kafka


【解决方案1】:

使用重载的createStream 方法,您可以在其中传递键/值类型和解码器类。

例子:

createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,
        kafkaParams, topicsMap, StorageLevel.MEMORY_AND_DISK_SER_2());

上面应该给你JavaPairDStream&lt;String, EventLog&gt;

JavaDStream<EventLog> lines = messages.map(new Function<Tuple2<String, EventLog>, EventLog>() {
  @Override
  public EventLog call(Tuple2<String, EventLog> tuple2) {
    return tuple2._2();
  }
});

EventLogDecoder 应该实现 kafka.serializer.Decoder。下面是 json 解码器的例子。

public class EventLogDecoder implements Decoder<EventLog> {

 public EventLogDecoder(VerifiableProperties verifiableProperties) {
 }

 @Override
 public EventLog fromBytes(byte[] bytes) {
   ObjectMapper objectMapper = new ObjectMapper();
   try {
     return objectMapper.readValue(bytes, EventLog.class);
   } catch (IOException e) {
     //do something
   }
   return null;
 }
}

【讨论】:

  • 你能告诉我StringDecoder 的整个包裹吗? EventLogDecoder 应该包含哪些功能?
  • Function 中的第三个参数应该是EventLog,如Function&lt;Tuple2&lt;String, EventLog&gt;, EventLog&gt;()
  • 是更新了第三个参数。而StringDecoder的包是kafka.serializer.StringDecoder。我已更新答案以包含示例解码器。
  • 我在运行spark-submit 时得到NoClassDefFoundErrorkafka.serializer.StringDecoder。我已在 POM 中添加了此依赖项,并且在 Eclipse 中没有收到任何错误。
  • 你提供的是 kafka jar 还是 shading ?
猜你喜欢
  • 2018-03-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-04-22
  • 1970-01-01
  • 1970-01-01
  • 2020-02-25
  • 2019-04-24
相关资源
最近更新 更多