【问题标题】:How to run Apache Beam Python pipelines on Flink cluster in Kubernetes?如何在 Kubernetes 的 Flink 集群上运行 Apache Beam Python 管道?
【发布时间】:2020-09-29 07:16:21
【问题描述】:

尝试按照 Flink Kubernetes 指令 here 在 minikube 上运行 word count example,但该作业从未完成。 Python Beam SDK 工作程序池似乎没有做任何工作。

除了配置 Flink Kubernetes 集群的说明之外,我还在 taskmanager 部署中添加了一个 Python SDK 工作池。如果我理解正确,工作池的目的是执行管道的 Python 部分。查看完整的 k8s 部署。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: flink-test
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:1.10.2-scala_2.11
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
          while :;
          do
            if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*taskmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      - name: beam-worker-pool
        image: apache/beam_python3.7_sdk:2.24.0
        args: ["--worker_pool"]
        ports:
        - containerPort: 50000
          name: pool
        livenessProbe:
          tcpSocket:
            port: 50000
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

我运行的例子如下:

python -m apache_beam.examples.wordcount \
--output /tmp/results/count \
--runner FlinkRunner \
--flink_master=localhost:8081 \
--environment_type=EXTERNAL \
--environment_config=localhost:50000

我使用https://beam.apache.org/documentation/runtime/sdk-harness-config/ 的文档来设置environment_typeenvironment_config 值。

作业被添加到作业管理器中,我可以在 Flink UI 中查看它,但作业永远不会完成。我开始浏览容器日志,因为注意到 beam-worker-pool 有以下日志:

Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:46005', '--artifact_endpoint=localhost:43851', '--provision_endpoint=localhost:37079', '--control_endpoint=localhost:37961']
2020/09/28 16:44:00 Provision info:
pipeline_options:<fields: fields: > fields: > fields: > fields: > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > logging_endpoint: artifact_endpoint: control_endpoint: dependencies: 
2020/09/28 16:44:00 Initializing python harness: /opt/apache/beam/boot --id=1-1 --logging_endpoint=localhost:46005 --artifact_endpoint=localhost:43851 --provision_endpoint=localhost:37079 --control_endpoint=localhost:37961
2020/09/28 16:44:08 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/pickled_main_session
    caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
    caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
    caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
    caused by:
rpc error: code = Unknown desc = 

同样,任务管理器正在记录:

2020-09-28 16:46:00,155 INFO  org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory  - Still waiting for startup of environment from localhost:50000 for worker id 1-1

不知道我错过了什么。我检查了工作池上的/tmp/staging/pickled_main_session,它是空的。

请注意,此问题与之前的这些问题类似。 How do I run Beam Python pipelines using Flink deployed on Kubernetes? Running Apache Beam python pipelines in Kubernetes

【问题讨论】:

    标签: python kubernetes apache-beam


    【解决方案1】:

    默认情况下(截至撰写本文时),Beam 将运行时依赖项(“工件”)暂存到某个目录(默认为 /tmp/staged),作业服务器(在您的情况下为客户端)都需要访问该目录) 和 Beam 工人。

    您可以通过设置--flink_submit_uber_jar 管道选项来解决此问题。当--flink_submit_uber_jar 被设置时,Beam 会将你所有的依赖包装在一个 jar 中,然后提交给 Flink。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-06-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多