【问题标题】:Kafka with Spark 3.0.1 Structured Streaming : ClassException: org.apache.kafka.common.TopicPartition; class invalid for deserialization带有 Spark 3.0.1 结构化流的 Kafka:ClassException:org.apache.kafka.common.TopicPartition;类对反序列化无效
【发布时间】:2021-05-31 07:55:18
【问题描述】:

我正在尝试使用 google dataproc 读取 kafka 消息 pyspark - 结构化流。

版本详情如下:

  1. dataproc 映像版本是 2.0.0-RC22-debian10(要获得 delta Lake 0.7.0 的 pyspark 3.0.1 版本,因为我必须最终将此数据写入 delta 托管在谷歌存储上)
  2. pyspark 3.0.1版本,pyspark使用的python版本是3.7.3
  3. 我使用的包是 org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
    io.delta:delta-core_2.12:0.7.0
    org.apache.spark:spark-avro_2.12:3.0.1

代码片段是:

__my_dir = os.path.dirname("<directory_path>") 
jsonFormatSchema = open(os.path.join(__my_dir, 'avro_schema_file.avsc'), "r").read() 

df = spark \    
    .readStream \    
    .format("kafka") \   
    .option("kafka.bootstrap.servers", "<kafka_broker>") \   
    .option("subscribe", "<kafka_topic>") \    
    .option("startingOffsets", "latest") \    
    .load()\    
    .select(from_avro("value", jsonFormatSchema)
    .alias("element"))

df.printSchema()
 
df_output =     df.select("element.after.id","element.after.name","element.after.attribute","element.after.quantity")

StreamQuery = ( df_output.writeStream \ 
               .format("delta") \   
               .outputMode("append") \   
               .option("checkpointLocation","<check_point_location>") \   
               .trigger(once=True) \    
               .start("<target_delta_table>") \    )

我得到的错误是:

java.io.InvalidClassException: org.apache.kafka.common.TopicPartition;
class invalid for deserialization

为什么 spark 无法反序列化 TopicPartition,我该如何解决?

【问题讨论】:

  • 与您的配置相同,但具有 delta 0.8.0 且没有 spark-avro。当主人是“纱线”时,我得到了错误。如果主服务器设置为本地,这将消失。但是,我无法找到解决方案
  • 你的问题帮我解决了版本依赖问题。

标签: java pyspark apache-kafka avro spark-structured-streaming


【解决方案1】:

以下帖子有助于解决此问题: How to use Confluent Schema Registry with from_avro standard function?

另外,我们开始为kafka-client指向以下jar

kafka-clients-2.4.1.jar

【讨论】:

  • 这对我不起作用。显然,当使用纱线时会出现问题,当主服务器设置为本地时会消失
  • 检查这个stackoverflow.com/questions/48882723/… Snippet : # 神奇之处在这里: # 跳过前 5 个字节(由模式注册表编码协议保留) .selectExpr("substring(value, 6) as avro_value") \ .select(from_avro(col("avro_value"), schema).alias("data")) \ .select(col("data.my_field")) \
  • 我可能已经找到了解决方案,这与驱动程序(序列化)和执行程序(反序列化)jar 冲突之间的问题有关。 forums.aws.amazon.com/thread.jspa?messageID=953144
【解决方案2】:

当您将 master 设置为 local[*] 时,错误消失。无论如何,问题似乎与驱动程序和执行程序之间的 jar 冲突有关:它们使用不同版本的 kafka-clients 库。

为了解决您可能想要启动作业的问题

gcloud dataproc jobs submit spark \
 --class <YOUR_CLASS> \
 --jars target/scala-2.12/<COMPILED_JAR_FILE>.jar,kafka-clients-2.4.1.jar  \
 --cluster <CLUSTER_NAME> \
 --region= <YOUR_REGION> \
 --properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,spark.executor.extraClassPath=org.apache.kafka_kafka-clients-2.4.1.jar,spark.driver.extraClassPath=org.apache.kafka_kafka-clients-2.4.1.jar

这适用于我的情况。有关版本的更多详细信息,请查看https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.0.1

【讨论】:

    猜你喜欢
    • 2017-08-23
    • 1970-01-01
    • 2018-07-20
    • 2021-09-30
    • 1970-01-01
    • 2019-11-25
    • 1970-01-01
    • 2018-03-31
    • 1970-01-01
    相关资源
    最近更新 更多