【发布时间】:2020-09-16 16:12:04
【问题描述】:
我正在尝试使用火花流从 Kafka 代理读取,但我遇到了一些问题。
def spark_streaming_from_STABLE_kafka_topic():
conf = SparkConf().setMaster("spark://antonis-dell:7077").setAppName("Kafka_Spark")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2)
topic = "stable_topic"
kvs = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": "my-broker",
"auto.offset.reset": "smallest"},
keyDecoder=lambda x: x,
valueDecoder=lambda x: x
)
lines = kvs.window(2, 2).map(lambda x: x[1])
lines.pprint()
return ssc
if __name__ == "__main__":
ssc = StreamingContext.getOrCreate('/home/antonis/Desktop/tmp/checkpoint_v06', lambda: spark_streaming_from_STABLE_kafka_topic())
ssc.start()
ssc.awaitTermination()
上面的代码除了空批次外不获取任何东西:
-------------------------------------------
Time: 2020-05-29 09:32:38
-------------------------------------------
-------------------------------------------
Time: 2020-05-29 09:32:40
-------------------------------------------
主题stable_topic 包含固定大小的数据。它不会改变。
我有另一个每秒接收数据的主题。如果我使用这个主题而不是stable_topic 并删除"auto.offset.reset": "smallest",那么代码会获取数据。
我认为{"auto.offset.reset": "smallest"} 有问题,但我无法弄清楚。
现在有人做错了吗?
【问题讨论】:
标签: apache-spark pyspark apache-kafka spark-streaming kafka-consumer-api