【发布时间】:2021-05-31 07:55:18
【问题描述】:
我正在尝试使用 google dataproc 读取 kafka 消息 pyspark - 结构化流。
版本详情如下:
- dataproc 映像版本是 2.0.0-RC22-debian10(要获得 delta Lake 0.7.0 的 pyspark 3.0.1 版本,因为我必须最终将此数据写入 delta 托管在谷歌存储上)
- pyspark 3.0.1版本,pyspark使用的python版本是3.7.3
- 我使用的包是 org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
io.delta:delta-core_2.12:0.7.0
org.apache.spark:spark-avro_2.12:3.0.1代码片段是:
__my_dir = os.path.dirname("<directory_path>")
jsonFormatSchema = open(os.path.join(__my_dir, 'avro_schema_file.avsc'), "r").read()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "<kafka_broker>") \
.option("subscribe", "<kafka_topic>") \
.option("startingOffsets", "latest") \
.load()\
.select(from_avro("value", jsonFormatSchema)
.alias("element"))
df.printSchema()
df_output = df.select("element.after.id","element.after.name","element.after.attribute","element.after.quantity")
StreamQuery = ( df_output.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation","<check_point_location>") \
.trigger(once=True) \
.start("<target_delta_table>") \ )
我得到的错误是:
java.io.InvalidClassException: org.apache.kafka.common.TopicPartition;
class invalid for deserialization
为什么 spark 无法反序列化 TopicPartition,我该如何解决?
【问题讨论】:
-
与您的配置相同,但具有 delta 0.8.0 且没有 spark-avro。当主人是“纱线”时,我得到了错误。如果主服务器设置为本地,这将消失。但是,我无法找到解决方案
-
你的问题帮我解决了版本依赖问题。
标签: java pyspark apache-kafka avro spark-structured-streaming