【发布时间】:2020-03-03 16:43:45
【问题描述】:
我正在 pyspark 中执行批处理作业,其中 spark 将每 5 分钟从 kafka 主题读取数据。
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1") \
.option("subscribePattern", "test") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
每当 spark 从 kafka 读取数据时,它都会读取所有数据,包括以前的批次。 我想读取当前未读取的当前批次或最新记录的数据。 请推荐!!谢谢。
【问题讨论】: