【问题标题】:Kafka with spark streaming issue: Cannot read data from topic with existing data带有火花流问题的 Kafka:无法使用现有数据从主题中读取数据
【发布时间】:2020-09-16 16:12:04
【问题描述】:

我正在尝试使用火花流从 Kafka 代理读取,但我遇到了一些问题。

def spark_streaming_from_STABLE_kafka_topic():
    conf = SparkConf().setMaster("spark://antonis-dell:7077").setAppName("Kafka_Spark")
    sc = SparkContext(conf=conf) 
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, 2)

    topic = "stable_topic"
    kvs = KafkaUtils.createDirectStream(ssc,
                                    [topic],
                                    {"metadata.broker.list": "my-broker",
                                    "auto.offset.reset": "smallest"},
                                    keyDecoder=lambda x: x,
                                    valueDecoder=lambda x: x
                                    )

    lines = kvs.window(2, 2).map(lambda x: x[1])
    lines.pprint()
    return ssc


if __name__ == "__main__":
    ssc = StreamingContext.getOrCreate('/home/antonis/Desktop/tmp/checkpoint_v06', lambda: spark_streaming_from_STABLE_kafka_topic())
    ssc.start()
    ssc.awaitTermination()

上面的代码除了空批次外不获取任何东西:

-------------------------------------------
Time: 2020-05-29 09:32:38
-------------------------------------------

-------------------------------------------
Time: 2020-05-29 09:32:40
-------------------------------------------

主题stable_topic 包含固定大小的数据。它不会改变。 我有另一个每秒接收数据的主题。如果我使用这个主题而不是stable_topic 并删除"auto.offset.reset": "smallest",那么代码会获取数据。

我认为{"auto.offset.reset": "smallest"} 有问题,但我无法弄清楚。

现在有人做错了吗?

【问题讨论】:

    标签: apache-spark pyspark apache-kafka spark-streaming kafka-consumer-api


    【解决方案1】:

    在以后的版本中,smallestearliest 取代。请务必检查您正在使用的版本的文档。

    另外,auto.offset.reset 配置不会生效,如果 Consumer Group 已经在消费来自主题 stable_topic 的一些数据。因此,您可以考虑在流式传输作业中更改group.id

    如果您要分配新的group.id,请确保将auto.offset.reset 设置为smalles(或在较新版本中为earliest)。

    【讨论】:

    • 它说“ConsumerConfig 中 auto.offset.reset 的最早值错误;有效值最小和最大”
    • 你的意思是这样的:kafkaParams["group.id"] = "mygroup" ?
    • 使用 "auto.offset.reset": "largest" 它可以工作,最小的它不能。我不明白为什么
    • 它适用于 "auto.offset.reset": "smallest" 和分配的 group.id
    猜你喜欢
    • 2023-03-18
    • 2021-09-12
    • 2018-12-30
    • 2020-02-24
    • 1970-01-01
    • 2020-09-18
    • 2019-10-11
    • 2020-04-11
    • 2017-04-16
    相关资源
    最近更新 更多