【问题标题】:Pyspark Kafka offset range unitsPyspark Kafka 偏移范围单位
【发布时间】:2017-06-13 10:49:25
【问题描述】:

我使用 Spark 作为批处理来处理来自 kafka 的日志。 在每个周期中,我的代码应该得到任何到达 kafka 消费者的东西。但是,我想限制每个周期从 kafka 获取的数据量。假设 5 GB 或 500000 条日志行..

offsetRanges = []
def storeOffsetRanges(rdd):
    global offsetRanges
    offsetRanges = rdd.offsetRanges()
    WRITE OFFSETS TO DISK
    return rdd

while True:
    host = "localhost:9092"
    offset = OffsetRange(topic, 0, fromOffset, untilOffset)
    kafka_content = KafkaUtils.createRDD(sc, {"metadata.broker.list": host}, [offset])
    kafka_content.transform(storeOffsetRanges)
    RDD TRANSFORMATIONS..

我会将偏移量存储在内存和磁盘中,以防驱动程序发生故障。但是我怎样才能强加这些 kafka 偏移量来限制每个周期的最大数据量呢? kafka偏移范围的单位是什么?

提前致谢!

【问题讨论】:

    标签: apache-spark pyspark apache-kafka kafka-consumer-api kafka-python


    【解决方案1】:

    Kafka 偏移量单位是消息。在每个周期中,您最多会收到来自 Kafka 的 untilOffest - fromOffset 消息。但是数据只会从一个主题分区中读取,因此如果您的主题有更多分区,那么应用程序会丢失一些日志行。

    您也可以尝试spark streaming with kafka direct approach。使用这种方法,您将摆脱while True,您将使用可选的背压机制基于时间(不是固定偏移量)在微批处理中处理日志行。然后您可以省略在内存中保存偏移量(流式处理会处理它),但在驱动程序重新启动的情况下仍然需要将它们保存到磁盘(参见 KafkaUtils.createDirectStream 中的 fromOffsets)。

    【讨论】:

    • 我考虑过使用火花流,但我认为在我的情况下使用火花作为批处理会更好。我需要每小时获取指标并开始使用 1h 窗口的流式传输。问题是:在流式传输中,如果驱动程序在数据处理期间死亡,从 kafka 消耗的数据将丢失。使用批处理它将写入磁盘,直到所有处理完成然后我删除。此外,我发现流式传输非常难以调试..
    猜你喜欢
    • 1970-01-01
    • 2019-03-11
    • 2017-10-18
    • 2018-11-28
    • 2016-05-02
    • 2011-01-12
    • 1970-01-01
    • 2012-04-18
    • 2020-11-06
    相关资源
    最近更新 更多