【问题标题】:Read latest records from Kafka using pyspark batch job使用 pyspark 批处理作业从 Kafka 读取最新记录
【发布时间】:2020-03-03 16:43:45
【问题描述】:

我正在 pyspark 中执行批处理作业,其中 spark 将每 5 分钟从 kafka 主题读取数据。

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1") \
  .option("subscribePattern", "test") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()

每当 spark 从 kafka 读取数据时,它都会读取所有数据,包括以前的批次。 我想读取当前未读取的当前批次或最新记录的数据。 请推荐!!谢谢。

【问题讨论】:

    标签: apache-spark apache-kafka


    【解决方案1】:

    来自https://spark.apache.org/docs/2.4.5/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries

    对于批量查询,最新(隐式或在 json 中使用 -1) 不允许。

    使用最早意味着再次获取所有数据。

    每次运行时都需要明确定义偏移量,例如:

    .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
    

    这意味着您需要保存每个分区处理的偏移量。我正在为一个项目在不久的将来自己研究这个。下面的一些项目有帮助:

    https://medium.com/datakaresolutions/structured-streaming-kafka-integration-6ab1b6a56dd1 陈述你的观察:

    创建一个Kafka批量查询

    • Spark 还提供了一个功能来获取 以批处理模式从 Kafka 获取数据。在批处理模式下,Spark 将消耗所有 消息。批处理模式下的 Kafka 需要两个重要的 参数 起始偏移量和结束偏移量,如果未指定 spark 将考虑默认配置,即,

      • startingOffsets — 最早
      • endingOffsets — 最新

    https://dzone.com/articles/kafka-gt-hdfss3-batch-ingestion-through-spark 也暗示了你应该做什么,如下:

    1. 最后,将这些 Kafka 主题 endOffset 保存到文件系统 - 本地或 HDFS(或将它们提交到 ZooKeeper)。这将用于 下一次运行 Kafka 主题的偏移量。我们在这里制作 确保作业的下一次运行将从上一次运行的偏移量中读取 从左边跑。

    这个博客https://dataengi.com/2019/06/06/spark-structured-streaming/我认为有保存偏移量的答案。

    【讨论】:

      【解决方案2】:

      你在写流数据时是否使用了检查点位置

      【讨论】:

        猜你喜欢
        • 2021-04-22
        • 1970-01-01
        • 2016-10-27
        • 2020-06-01
        • 2021-11-27
        • 2021-10-18
        • 1970-01-01
        • 2014-02-13
        • 2017-08-11
        相关资源
        最近更新 更多