【问题标题】:How to properly submit kafka streaming pyspark job to Google Dataproc如何将 kafka 流式处理 pyspark 作业正确提交到 Google Dataproc
【发布时间】:2018-03-11 19:03:30
【问题描述】:

我正在尝试通过 Dataproc UI 提交 pyspark 作业并不断收到错误消息,它似乎没有加载 kafka 流包。

这是我工作中 UI 提供的 REST 命令: POST /v1/projects/projectname/regions/global/jobs:submit/ { "projectId": "projectname", "job": { "placement": { "clusterName": "cluster-main" }, "reference": { "jobId": "job-33ab811a" }, "pysparkJob": { "mainPythonFileUri": "gs://projectname/streaming.py", "args": [ "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0" ], "jarFileUris": [ "gs://projectname/spark-streaming-kafka-0-10_2.11-2.2.0.jar" ] } } }

我尝试将 kafka 包作为 args 和 jar 文件传递​​。

这是我的代码 (streaming.py):

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json


sc = SparkContext()

spark = SparkSession.builder.master("local").appName("Spark-Kafka-Integration").getOrCreate()

# < ip > is masked
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "<ip>:9092") \
    .option("subscribe", "rsvps") \
    .option("startingOffsets", "earliest") \
    .load()
df.printSchema()

错误: :java.lang.ClassNotFoundException:找不到数据源:kafka。请在http://spark.apache.org/third-party-projects.html查找包

完整跟踪:https://pastebin.com/Uz3iGy2N

【问题讨论】:

    标签: python pyspark google-cloud-platform google-cloud-dataproc


    【解决方案1】:

    您可能会遇到“--packages”是spark-submit 中的语法糖的问题,当高级工具 (Dataproc) 以编程方式调用 Spark 提交时,它的交互效果很差,我的回复中描述了另一种语法这里:use an external library in pyspark job in a Spark cluster from google-dataproc

    长话短说,您可以在 Dataproc 请求中使用properties 指定等效的spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,而不是在作业参数中传递--properties

    【讨论】:

      猜你喜欢
      • 2016-08-15
      • 2020-08-23
      • 1970-01-01
      • 2020-08-06
      • 2018-11-28
      • 1970-01-01
      • 2018-05-31
      • 1970-01-01
      • 2018-05-12
      相关资源
      最近更新 更多