【问题标题】:Exception while accessing KafkaOffset from RDD从 RDD 访问 KafkaOffset 时出现异常
【发布时间】:2016-10-25 13:58:15
【问题描述】:

我有一个从 Kafka 流出的 Spark 消费者。 我正在尝试管理精确一次语义的偏移量。

但是,在访问偏移量时会引发以下异常:

"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD 无法转换为 org.apache.spark.streaming.kafka.HasOffsetRanges"

执行此操作的代码部分如下:

var offsetRanges = Array[OffsetRange]()
dataStream
  .transform { 
    rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
   }
   .foreachRDD(rdd => { })

这里的 dataStream 是使用 KafkaUtils API 创建的直接流(DStream[String]),类似于:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)

如果有人可以帮助我理解我在这里做错了什么。 transform 是官方文档中提到的在数据流上执行的方法链中的第一个方法

谢谢。

【问题讨论】:

    标签: scala apache-spark apache-kafka spark-streaming rdd


    【解决方案1】:

    你的问题是:

    .map(._2)
    

    这会创建一个MapPartitionedDStream,而不是由KafkaUtils.createKafkaStream 创建的DirectKafkaInputDStream

    你需要在transform之后map

    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+""+t))
    
    kafkaStream
      .transform { 
        rdd => 
          offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd
      }
      .map(_._2)
      .foreachRDD(rdd => // stuff)
    

    【讨论】:

    • 另外,在尝试使用偏移量创建直接流时,我遇到了错误。
      val fromOffsets : (TopicAndPartition, Long)= TopicAndPartition(metrics_rs.getString(1), metrics_rs.getInt(2)) -> metrics_rs.getLong(3)
      KafkaUtils.createDirectStream[String, String, StringDecoder , StringDecoder,(String, String)](ssc,kafkaParams,fromOffsets,messageHandler)
      其中,val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd.message.length和metrics_rs是结果集我从中获取偏移量图。它说太多类型参数错误
    • 如何在下面的代码中读取 offsetRanges。我正在使用重新分区。 val numPartitionsOfInputTopic = 2 val streams = (1 to numPartitionsOfInputTopic) map { _ => KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ).map(_.value() ) } val UnifiedStream = ssc.union(streams) val sparkProcessingParallelism = 1 UnifiedStream.repartition(sparkProcessingParallelism) 更多细节在stackoverflow.com/questions/49344461/…
    猜你喜欢
    • 1970-01-01
    • 2016-09-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多