【问题标题】:External packages (jars) in pyspark shell - How Topyspark shell 中的外部包(jar) - 如何
【发布时间】:2020-04-05 00:02:08
【问题描述】:

在 pyspark shell 中包含外部包(jar)的正确方法是什么?

我正在使用 jupyter notebook 中的 pyspark。

我想通过 spark-sql-kafka 库使用 spark 阅读 kafka,如下所述:https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#deploying

我正在尝试通过在环境变量PYSPARK_SUBMIT_ARGS 中设置的--packages 选项导入库。

但是

  • 我不确定要使用的软件包的确切版本和名称,
  • 我不知道我是否还需要包含 spark-streaming,是否必须使用 --repositories 指定一些存储库,
  • 我不知道下载jar并指定本地路径是否更好(它们必须在运行jupyter的机器上,还是在运行yarn的机器上?我正在使用--master yarn--deploy-mode client) 或依赖--packages
  • 我不知道PYSPARK_SUBMIT_ARGSpyspark-shell之后指定的选项是否被遗漏(如果我尝试在pyspark-shell之前指定--packages选项,我根本无法实例化火花上下文)
  • 如何检查某些包是否已正确下载并可供使用
  • 我不知道这种下载的jar(或一般的jar)的路径是什么。它们被复制了多少次?他们通过司机吗?如果我使用集群管理器作为 YARN,这些事情会改变吗?如果我在 jupyter notebook 中使用 spark-shell 中的所有内容,它们会改变吗?

目前我阅读的资源:

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    当你想在 Pyspark shell 中导入外部包时,在启动过程中我们可以像我们做 spark-submit 一样调用它。

    > ./bin/pyspark --packages
    > org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,com.databricks:spark-avro_2.11:3.2.0
    > --conf spark.ui.port=4055 --files /home/bdpda/spark_jaas,/home/bdpda/bdpda.headless.keytab --conf
    > "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/bdpda/spark_jaas"
    > --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/home/bdpda/spark_jaas
    

    注意:此 pyspark 提交用于将 Pyspark 与 Kafka 结构化流连接的相同用例。

    【讨论】:

      【解决方案2】:

      为简单起见,我会先尝试在 Jupyter 之外工作


      要使用的包的确切版本和名称

      版本需要与您的 Spark 版本相匹配。使用您要下载的包。

      我不知道我是否还需要包含火花流

      不要。它已经在您的 Spark 工作者的类路径中。

      我是否必须使用 --repositories 指定一些存储库,

      如果您可以直接从 Maven Central 下载文件,那么不可以。

      是否最好下载jar并指定本地路径

      您可能应该使用--packages,它将为您下载文件。部署模式和集群不会干扰这一点。

      PYSPARK_SUBMIT_ARGS 中 pyspark-shell 之后指定的选项是否被忽略

      不应该,尽管我通常将pyspark-shell 视为最后一个选项。

      import os
      os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages ... pyspark-shell'
      
      import pyspark
      from pyspark.streaming.kafka import KafkaUtils
      from pyspark.streaming import StreamingContext
      
      sc = pyspark.SparkContext()
      ssc = StreamingContext(sc,1)
      

      如何检查某些包是否下载正确

      你会得到一个 NoClassDefFound,例如在运行时如果它没有被下载。

      此类下载的 jars(或一般的 jars)采取的路线是什么

      $SPARK_HOME/jars,但任何--jars--packages 都缓存在每台机器上的~/.m2 文件夹中,供运行作业的用户使用,通常,然后符号链接到正在运行的YARN 容器/Spark 执行器中。

      【讨论】:

      • 嗨,我在从 Jupyter 运行代码时尝试将 Spark Cassandra 连接器与 PySpark 一起使用,当我通过添加我的包运行上述 sn-p 代码时,它运行良好(不到 1 秒)但是当我运行代码时,它没有给我找到类错误。
      • 它对我有用,我使用的是 spark.session.getOrCreate() 所以它选择了以前创建的 spark 会话,新的更改不起作用。感谢您的解决方案。
      猜你喜欢
      • 1970-01-01
      • 2017-03-08
      • 2014-09-27
      • 2014-04-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-10-03
      相关资源
      最近更新 更多