【发布时间】:2017-10-21 22:39:06
【问题描述】:
目前我正在使用 Kafka / Zookeeper 和 pySpark (1.6.0)。
我已经成功创建了一个使用 KafkaUtils.createDirectStream() 的 kafka 消费者。
所有流媒体都没有问题,但我认识到,在我消费了一些消息后,我的 Kafka 主题没有更新到当前偏移量。
由于我们需要更新主题才能在此处进行监控,这有点奇怪。
在 Spark 的文档中我发现了这条评论:
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
directKafkaStream\
.transform(storeOffsetRanges)\
.foreachRDD(printOffsetRanges)
如果您希望基于 Zookeeper 的 Kafka 监控工具显示流应用程序的进度,您可以使用它自己更新 Zookeeper。
我在 Scala 中找到了一个解决方案,但我找不到 python 的等价物。 这是 Scala 示例:http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
问题
但问题是,我如何才能从那时起更新 zookeeper?
【问题讨论】:
标签: python pyspark apache-kafka spark-streaming apache-zookeeper