【问题标题】:Manually commit offset in kafka Direct Stream in python在 python 中的 kafka Direct Stream 中手动提交偏移量
【发布时间】:2019-01-07 04:28:10
【问题描述】:

我正在将一个用 scala 编写的流应用程序移植到 python。我想手动提交 DStream 的偏移量。这是在 scala 中完成的,如下所示:

stream = KafkaUtils.createDirectStream(soomeConfigs)
stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

但我无法在 python 中找到类似的 API。能否请您指导我如何使用 python 客户端手动提交偏移量。

【问题讨论】:

    标签: python apache-spark pyspark spark-streaming-kafka


    【解决方案1】:

    我通过返回 pyspark 2.2 库解决了这个问题,因为它具有获取 offsetRanges 并在 redis 上存储偏移量的 API。我不得不回到 python 2.7,因为 python 3.6 中没有“长期”支持。

    import redis
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition, KafkaRDD
    
    
    def get_offset_ranges(topic):
        ranges = None
    
        rk = '{topic}:offsets'.format(topic=topic)
        cache = redis.Redis()
        if cache.exists(rk):
            mapping = cache.hgetall(rk)
            ranges = dict()
            for k, v in mapping.items():
                tp = TopicAndPartition(topic, int(k))
                ranges[tp] = long(v)
    
        return ranges
    
    
    def update_offset_ranges(offset_ranges):
        cache = redis.Redis()
        for rng in offset_ranges:
            rk = '{rng.topic}:offsets'.format(rng=rng)
            print("updating redis_key: {}, partion:{} , lastOffset: {} ".format(rk, rng.partition, rng.untilOffset))
            cache.hset(rk, rng.partition, rng.untilOffset)
    
    
    def do_some_work(rdd):
        pass
    
    
    def process_dstream(rdd):
        rdd.foreachPartition(lambda iter: do_some_work(iter))
    
        krdd = KafkaRDD(rdd._jrdd, sc, rdd._jrdd_deserializer)
        off_ranges = krdd.offsetRanges()
        for o in off_ranges:
            print(str(o))
        update_offset_ranges(off_ranges)
    
    
    sc = SparkContext(appName="mytstApp")
    ssc = StreamingContext(sc, 1)
    
    kafka_params = {
        "bootstrap.servers": "localhost:9092",
        "group.id": "myUserGroup",
        "enable.auto.commit": "false",
        "auto.offset.reset": "smallest"
    }
    
    topic = "mytopic"
    offset_ranges = get_offset_ranges(topic)
    dstream = KafkaUtils.createDirectStream(ssc, "mytopic", kafka_params, fromOffsets=offset_ranges)
    dstream.foreachRDD(process_dstream)
    # Start our streaming context and wait for it to 'finish'
    ssc.start()
    
    # Wait for the job to finish
    try:
        ssc.awaitTermination()
    except Exception as e:
        ssc.stop()
        raise e  # to exit with error condition
    

    【讨论】:

    • 没有“长期”支持是什么意思?
    • 如果您在代码中看到我将类型转换为 long。范围[tp] = long(v)。这在 python 2.7 中不可用
    • 我没有使用redis的经验,但是longs存在于Python 2和Python 3中
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-18
    • 2015-04-09
    相关资源
    最近更新 更多