【问题标题】:Converting Spark-kafka InputDStream to Array[Bytes]将 Spark-kafka InputDStream 转换为数组[字节]
【发布时间】:2017-07-02 00:16:29
【问题描述】:

我正在使用 scala 并使用以下 Spark Streaming 方法使用来自 Kafka 的数据:

val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

Above 变量返回 InputDStream,通过它我可以使用以下代码查看原始/二进制格式的数据: println(行)

但我需要在原始/二进制格式上应用 avro 格式(可用模式),以便以预期的 json 格式查看数据。为了应用avro格式,我需要将上面的InputDStream转换为avro使用的Array[Bytes]。

有人可以告诉我将 InputDStream 转换为 Array[Bytes] 吗?

或者

如果您知道在 InputDStream(of spark Streaming) 上应用 avro 模式的更好方法,请分享。

【问题讨论】:

    标签: scala apache-spark apache-kafka spark-streaming avro


    【解决方案1】:

    你需要做两件事。第一种是使用 DefaultDecoder 来表示 Kafka,它会为值类型提供 Array[Byte]

    val lines: DStream[(String, Array[Byte])] = 
      KafkaUtils
       .createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)
    

    然后您需要通过额外的map 应用您的 Avro 反序列化逻辑:

    lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) }
    

    avroDeserializer 是你的任意类,它知道如何从 Avro 字节创建你的类型。

    我个人使用avro4s通过宏来反序列化案例类。

    【讨论】:

    • 太棒了,非常感谢!我只需要 DStream 的值作为 Array[Byte],所以我使用以下方法获取它: val lines: DStream[(Array[Byte])] = KafkaUtils.createDirectStream[ String, Array[Byte], StringDecoder, DefaultDecoder]( ssc, kafkaParams , 主题).map(_._2)
    猜你喜欢
    • 2018-07-31
    • 2021-06-09
    • 2018-07-29
    • 1970-01-01
    • 2018-07-16
    • 2014-08-08
    • 2019-11-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多