【问题标题】:pySpark Kafka Direct Streaming update Zookeeper / Kafka OffsetpySpark Kafka Direct Streaming 更新 Zookeeper / Kafka Offset
【发布时间】:2017-10-21 22:39:06
【问题描述】:

目前我正在使用 Kafka / Zookeeper 和 pySpark (1.6.0)。 我已经成功创建了一个使用 KafkaUtils.createDirectStream() 的 kafka 消费者。

所有流媒体都没有问题,但我认识到,在我消费了一些消息后,我的 Kafka 主题没有更新到当前偏移量。

由于我们需要更新主题才能在此处进行监控,这有点奇怪。

在 Spark 的文档中我发现了这条评论:

   offsetRanges = []

     def storeOffsetRanges(rdd):
         global offsetRanges
         offsetRanges = rdd.offsetRanges()
         return rdd

     def printOffsetRanges(rdd):
         for o in offsetRanges:
             print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)

     directKafkaStream\
         .transform(storeOffsetRanges)\
         .foreachRDD(printOffsetRanges)

如果您希望基于 Zookeeper 的 Kafka 监控工具显示流应用程序的进度,您可以使用它自己更新 Zookeeper。

这里是文档: http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

我在 Scala 中找到了一个解决方案,但我找不到 python 的等价物。 这是 Scala 示例:http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/

问题

但问题是,我如何才能从那时起更新 zookeeper?

【问题讨论】:

    标签: python pyspark apache-kafka spark-streaming apache-zookeeper


    【解决方案1】:

    我用 python kazoo 库编写了一些函数来保存和读取 Kafka 偏移量。

    获取 Kazoo 客户端单例的第一个函数:

    ZOOKEEPER_SERVERS = "127.0.0.1:2181"
    
    def get_zookeeper_instance():
        from kazoo.client import KazooClient
    
        if 'KazooSingletonInstance' not in globals():
            globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
            globals()['KazooSingletonInstance'].start()
        return globals()['KazooSingletonInstance']
    

    然后函数读取和写入偏移量:

    def read_offsets(zk, topics):
        from pyspark.streaming.kafka import TopicAndPartition
    
        from_offsets = {}
        for topic in topics:
            for partition in zk.get_children(f'/consumers/{topic}'):
                topic_partion = TopicAndPartition(topic, int(partition))
                offset = int(zk.get(f'/consumers/{topic}/{partition}')[0])
                from_offsets[topic_partion] = offset
        return from_offsets
    
    def save_offsets(rdd):
        zk = get_zookeeper_instance()
        for offset in rdd.offsetRanges():
            path = f"/consumers/{offset.topic}/{offset.partition}"
            zk.ensure_path(path)
            zk.set(path, str(offset.untilOffset).encode())
    

    然后在开始流式传输之前,您可以从 zookeeper 读取偏移量并将它们传递给createDirectStream 对于fromOffsets 参数。:

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    
    
    def main(brokers="127.0.0.1:9092", topics=['test1', 'test2']):
        sc = SparkContext(appName="PythonStreamingSaveOffsets")
        ssc = StreamingContext(sc, 2)
    
        zk = get_zookeeper_instance()
        from_offsets = read_offsets(zk, topics)
    
        directKafkaStream = KafkaUtils.createDirectStream(
            ssc, topics, {"metadata.broker.list": brokers},
            fromOffsets=from_offsets)
    
        directKafkaStream.foreachRDD(save_offsets)
    
    
    if __name__ == "__main__":
        main()
    

    【讨论】:

      【解决方案2】:

      我遇到了类似的问题。 你是对的,通过使用directStream,意味着直接使用kafka低级API,它没有更新阅读器偏移量。 有几个scala/java的例子,但python没有。 但是自己做很容易,你需要做的是:

      • 从开头的偏移量读取
      • 保存最后的偏移量

      例如,我通过以下方式将每个分区的偏移量保存在 redis 中:

      stream.foreachRDD(lambda rdd: save_offset(rdd))
      def save_offset(rdd):
        ranges = rdd.offsetRanges()
        for rng in ranges:
           rng.untilOffset # save offset somewhere
      

      然后在开始时,您可以使用:

      fromoffset = {}
      topic_partition = TopicAndPartition(topic, partition)
      fromoffset[topic_partition]= int(value) #the value of int read from where you store previously.
      

      对于一些使用zk跟踪偏移量的工具,最好将偏移量保存在zookeeper中。 这一页: https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html 描述如何设置偏移量,基本上zk节点是: /consumers/[consumer_name]/offsets/[topic name]/[partition id] 因为我们使用的是directStream,所以你必须组成一个消费者名称。

      【讨论】:

      • 感谢您的回答,但我仍然不确定我可以从 pySpark 使用哪个框架来更新 Kafka 分区上的偏移量。
      猜你喜欢
      • 1970-01-01
      • 2018-01-13
      • 2020-11-02
      • 2017-12-11
      • 2017-12-28
      • 1970-01-01
      • 2016-05-06
      • 2019-08-08
      • 2016-06-27
      相关资源
      最近更新 更多