对应出异常的代码是:val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

做foreachRDD的DStream必须是直接从KafkaUtils.createDirectStream拿到的,才能转换为kafkaRDD. 后面做其他操作的时候会把kafkaRDD转换为非kafkaRDD。也就是说这个HasOffsetRanges接口,只有kafkaRDD这个实现类。所以如果从kafka拿到DStream,后面需要使用foreachRDD,那么这个DStream必须是直接从KafkaUtils.createDirectStream拿到,中间不能再做其他的操作。

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2021-11-15
  • 2021-09-17
  • 2022-12-23
  • 2021-12-01
  • 2022-01-03
  • 2021-12-01
猜你喜欢
  • 2021-07-20
  • 2021-07-09
  • 2021-10-29
  • 2022-12-23
  • 2022-12-23
  • 2022-01-27
相关资源
相似解决方案