【问题标题】:Spark Structured Streaming - cannot resolve "Kafka" format (Jupyter notebook on Cloudera cluster)Spark Structured Streaming - 无法解析“Kafka”格式(Cloudera 集群上的 Jupyter notebook)
【发布时间】:2019-11-24 07:52:13
【问题描述】:

我正在尝试在我的 Cloudera 上使用 Juputer notebookPySpark 内核)运行示例 Spark Structured Streaming 应用程序strong> cluster,但似乎我无法让它使用所需的包。

dsraw = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafkaBroker:9092") \
  .option("subscribe", "test") \
  .option("startingOffsets", "earliest") \
  .load()

我得到的错误:

Py4JJavaError: An error occurred while calling o113.load.
: org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

我的设置:

  • Spark 版本:'2.4.0.cloudera2'

  • Scala 版本:2.11.12

  • Kafka 版本:2.1.0-kafka-4.0.0

我的尝试:

1) 编辑 kernel.json

"PYSPARK_SUBMIT_ARGS": " --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 --master yarn --deploy-mode client pyspark-shell"

2) 在代码中传递环境变量

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 pyspark-shell'

当我从命令行运行 pyspark 时,它似乎可以工作,但在尝试查询流时​​我遇到了不同的问题(无论如何我都需要让它从 Jupyter 工作)

pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0

我熟悉integration guide,但就我而言,这似乎还不够。

我也在尝试使用不同版本的包,还尝试将它作为 jar 传递。你有什么想法可能是错的吗?是 Cloudera 发行版还是特定于 Jupyter 的问题?

【问题讨论】:

    标签: pyspark apache-kafka jupyter-notebook cloudera spark-structured-streaming


    【解决方案1】:

    晚安朋友,我有一个和你类似的问题......但是我解决了这个包的步骤:

    pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
    

    这是由于 spark/pyspark 的版本,所以我设法让 readStream 工作。

    我现在的问题是我无法使用 Registry 模式写入或打印 data.values,你能继续这样做吗?如果可以的话,会有很大帮助的。

    我的代码示例:

    schemaRegistryUrl = "http://localhost:8081"
    df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("startingOffsets", "earliest") \
    .option("subscribe", "topic-a") \
    .load() \
    .select(from_avro(col("value"), schemaRegistryUrl).alias("values"))
    

    【讨论】:

    • 感谢您的评论 - 不幸的是,从那以后我再也没有回到这个话题,祝你好运!
    猜你喜欢
    • 1970-01-01
    • 2021-12-05
    • 1970-01-01
    • 1970-01-01
    • 2021-05-22
    • 2020-07-25
    • 2019-09-14
    • 2019-02-19
    • 1970-01-01
    相关资源
    最近更新 更多