【问题标题】:It's possible to configure the Beam portable runner with the spark configurations?可以使用 spark 配置配置 Beam 便携式跑步者吗?
【发布时间】:2021-02-22 17:48:41
【问题描述】:

TLDR;

是否可以使用 spark 配置配置 Beam 便携式跑步者?更准确地说,可以在Portable Runner中配置spark.driver.host吗?

动机

目前,我们在 Kubernetes 集群中实现了气流,并且为了使用 TensorFlow Extended,我们需要使用 Apache Beam。对于我们的用例,Spark 将是合适的运行器,由于气流和 TensorFlow 是用 python 编码的,我们需要使用 Apache Beam 的便携式运行器 (https://beam.apache.org/documentation/runners/spark/#portability)。

问题

便携式运行器在其容器内创建 spark 上下文,并且不会为驱动程序 DNS 配置留出空间,从而使工作 pod 内的执行程序无法与驱动程序(作业服务器)通信。

设置

  1. 根据 beam 文档,作业服务器与气流在同一个 pod 中实施,以使用这两个容器之间的本地网络。 作业服务器配置:
- name: beam-spark-job-server
  image: apache/beam_spark_job_server:2.27.0
  args: ["--spark-master-url=spark://spark-master:7077"]

作业服务器/气流服务:

apiVersion: v1
kind: Service
metadata:
  name: airflow-scheduler
  labels:
    app: airflow-k8s
spec:
  type: ClusterIP
  selector:
    app: airflow-scheduler
  ports:
    - port: 8793
      protocol: TCP
      targetPort: 8793
      name: scheduler
    - port: 8099
      protocol: TCP
      targetPort: 8099
      name: job-server
    - port: 7077
      protocol: TCP
      targetPort: 7077
      name: spark-master
    - port: 8098
      protocol: TCP
      targetPort: 8098
      name: artifact
    - port: 8097
      protocol: TCP
      targetPort: 8097
      name: java-expansion

端口 8097,8098 和 8099 与作业服务器相关,8793 与气流相关,7077 与 spark master 相关。

开发/错误

  1. 从气流容器测试一个简单的光束示例python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK 时,我在气流吊舱上得到以下响应:
Defaulting container name to airflow-scheduler.
Use 'kubectl describe pod/airflow-scheduler-local-f685b5bc7-9d7r6 -n airflow-main-local' to see all of the containers in this pod.
airflow@airflow-scheduler-local-f685b5bc7-9d7r6:/opt/airflow$ python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.client:Timeout attempting to reach GCE metadata service.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:35837
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.27.0
INFO:apache_beam.runners.portability.portable_runner:Environment "LOOPBACK" has started a component necessary for the execution. Be sure to run the pipeline using
  with Pipeline() as p:
    p.apply(..)
This ensures that the pipeline finishes before this program exits.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STARTING
INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING

还有工人日志:

21/02/19 19:50:00 INFO Worker: Asked to launch executor app-20210219194804-0000/47 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:00 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:00 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:00 INFO SecurityManager: Changing view acls groups to: 
21/02/19 19:50:00 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 19:50:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:00 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "47" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
21/02/19 19:50:02 INFO Worker: Executor app-20210219194804-0000/47 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 19:50:02 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 47
21/02/19 19:50:02 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219194804-0000, execId=47)
21/02/19 19:50:02 INFO Worker: Asked to launch executor app-20210219194804-0000/48 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:02 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:02 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:02 INFO SecurityManager: Changing view acls groups to: 
21/02/19 19:50:02 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 19:50:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:02 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "48" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
21/02/19 19:50:04 INFO Worker: Executor app-20210219194804-0000/48 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 19:50:04 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 48
21/02/19 19:50:04 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219194804-0000, execId=48)
21/02/19 19:50:04 INFO Worker: Asked to launch executor app-20210219194804-0000/49 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:04 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:04 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:04 INFO SecurityManager: Changing view acls groups to: 
21/02/19 19:50:04 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 19:50:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:04 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "49" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
.
.
.

正如我们所看到的,执行程序不断退出,据我所知,这个问题是由执行程序和驱动程序(本例中的作业服务器)之间缺少通信造成的。此外,“--driver-url”使用随机端口“-Dspark.driver.port”转换为驱动程序 pod 名称。 由于我们无法定义服务的名称,worker 尝试使用驱动程序的原始名称并使用随机生成的端口。由于配置来自驱动程序,因此更改 worker/master 中的默认 conf 文件不会产生任何结果。 以this answer 为例,我尝试在作业服务器中使用环境变量SPARK_PUBLIC_DNS,但这并没有导致工作人员日志发生任何变化。

观察

直接在 kubernetes 中使用 spark 作业 kubectl run spark-base --rm -it --labels="app=spark-client" --image bde2020/spark-base:2.4.5-hadoop2.7 -- bash ./spark/bin/pyspark --master spark://spark-master:7077 --conf spark.driver.host=spark-client 有服务:

apiVersion: v1
kind: Service
metadata:
  name: spark-client
spec:
  selector:
    app: spark-client
  clusterIP: None

我得到了一个完整的工作 pyspark shell。如果我省略 --conf 参数,我会得到与第一个设置相同的行为(无限期退出执行程序)

21/02/19 20:21:02 INFO Worker: Executor app-20210219202050-0002/4 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 20:21:02 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 4
21/02/19 20:21:02 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219202050-0002, execId=4)
21/02/19 20:21:02 INFO Worker: Asked to launch executor app-20210219202050-0002/5 for Spark shell
21/02/19 20:21:02 INFO SecurityManager: Changing view acls to: root
21/02/19 20:21:02 INFO SecurityManager: Changing modify acls to: root
21/02/19 20:21:02 INFO SecurityManager: Changing view acls groups to: 
21/02/19 20:21:02 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 20:21:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 20:21:02 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=46161" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-base:46161" "--executor-id" "5" "--hostname" "172.18.0.20" "--cores" "1" "--app-id" "app-20210219202050-0002" "--worker-url" "spark://Worker@172.18.0.20:45151"

【问题讨论】:

    标签: apache-spark kubernetes apache-beam tfx


    【解决方案1】:

    我有三种解决方案可供选择,具体取决于您的部署要求。按难度排序:

    1. 使用 Spark “uber jar”作业服务器。这会在 Spark 主服务器中启动嵌入式作业服务器,而不是在容器中使用独立的作业服务器。这将大大简化您的部署,因为您根本不需要启动 beam_spark_job_server 容器。
    python -m apache_beam.examples.wordcount \
    --output ./data_test/ \
    --runner=SparkRunner \
    --spark_submit_uber_jar \
    --spark_master_url=spark://spark-master:7077 \
    --environment_type=LOOPBACK
    
    1. 您可以通过 Spark 配置文件传递属性。创建 Spark 配置文件,并添加 spark.driver.host 和您需要的任何其他属性。在作业服务器的docker run 命令中,将该配置文件挂载到容器中,并将SPARK_CONF_DIR 环境变量设置为指向该目录。

    2. 如果这些都不适合您,您也可以构建自己的自定义版本的作业服务器容器。从 Github 拉取 Beam 源。查看您要使用的发布分支(例如git checkout origin/release-2.28.0)。修改入口点spark-job-server.sh 并在那里设置-Dspark.driver.host=x。然后使用./gradlew :runners:spark:job-server:container:docker -Pdocker-repository-root="your-repo" -Pdocker-tag="your-tag" 构建容器。

    【讨论】:

    • 感谢您的精彩回答!第一个设置对我来说是完美的,使用这个命令我得到了错误“spark_rest_url”未定义,在配置它并在 spark master 中启用其余 URL 后,我得到错误“INFO:apache_beam.runners.portability.portable_runner:Job状态更改为 RUNNING ERROR:root:Exception from the cluster: java.nio.file.NoSuchFileException: /tmp/tmp029vgh_0.jar”。但是我可以在路径 /tmp 中的同一容器中看到该文件存在,也许这是驱动程序进程中的访问问题?你知道如何解决这个问题吗?谢谢
    • 你能分享 NoSuchFileException 的完整堆栈跟踪吗?
    • 您发现 NoSuchFileException 了吗?我无法弄清楚。
    • 问题是作业服务器“共享”/tmp 文件夹中的 jar 文件,我尝试使用变量“SPARK_LOCAL_DIRS”更改此文件夹,但它不起作用,将 /tmp 挂载为在所有工作吊舱和气流吊舱中进行永久存储,我可以在它们之间成功通信的情况下执行光束管道。不幸的是,我现在遇到了 python_sdk 容器的问题,因为我不能使用 LOOPBACK 或 DOCKER 作为 environment_type,你知道如何使用“--logging_endpoint”配置它的日志吗?也许我需要为此创建另一个问题,非常感谢!
    • 我用最后一个问题的细节创建了这个相关的question,你能帮我吗?再次感谢您。
    【解决方案2】:

    让我修改答案。作业服务器需要能够与工人进行通信,反之亦然。保持退出的错误是由于这个。您需要进行配置,以便他们可以通信。一个可以解决这个问题的 k8s 无头服务。

    https://github.com/cometta/python-apache-beam-spark 处的可行示例参考。如果它对你有用,可以帮助我“星”存储库

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-01-13
      • 2017-02-20
      • 2010-09-06
      • 1970-01-01
      • 1970-01-01
      • 2019-10-17
      • 1970-01-01
      • 2019-11-15
      相关资源
      最近更新 更多