【问题标题】:How do I run Beam Python pipelines using Flink deployed on Kubernetes?如何使用部署在 Kubernetes 上的 Flink 运行 Beam Python 管道?
【发布时间】:2021-01-28 16:53:37
【问题描述】:

当 Flink 在 Kubernetes 中作为 pod 运行时,有人知道如何使用 Flink 运行 Beam Python 管道吗?

我已经成功地使用便携式运行器和作业服务运行了一个 Beam Python 管道,该作业服务指向在 Docker 容器中运行的本地 Flink 服务器。

我能够在我的 Flink 容器中安装 Docker 套接字,并将 Flink 作为根进程运行,因此 DockerEnvironmentFactory 类可以创建 Python 线束容器。

很遗憾,当 Flink 在 Kubernetes 中运行时,我无法使用相同的解决方案。此外,我不想在我的 pod 中使用 Docker 命令创建 Python 线束容器。

似乎 Bean runner 自动选择 Docker 来执行 Python 管道。但是,我注意到有一个名为 ExternalEnvironmentFactory 的实现,但我不知道如何使用它。

有没有办法部署一个侧容器并使用不同的工厂来运行 Python 线束流程?什么是正确的方法?

这是 DockerEnvironmentFactory 的补丁:

diff -pr beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
*** beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java   2019-08-14 22:33:41.000000000 +0100
--- beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java 2019-09-09 16:02:07.000000000 +0100
*************** package org.apache.beam.runners.fnexecut
*** 19,24 ****
--- 19,26 ----

  import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;

+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
  import java.nio.file.Files;
  import java.nio.file.Paths;
  import java.time.Duration;
*************** public class DockerEnvironmentFactory im
*** 127,133 ****
          ImmutableList.<String>builder()
              .addAll(gcsCredentialArgs())
              // NOTE: Host networking does not work on Mac, but the command line flag is accepted.
!             .add("--network=host")
              // We need to pass on the information about Docker-on-Mac environment (due to missing
              // host networking on Mac)
              .add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
--- 129,135 ----
          ImmutableList.<String>builder()
              .addAll(gcsCredentialArgs())
              // NOTE: Host networking does not work on Mac, but the command line flag is accepted.
!             .add("--network=flink")
              // We need to pass on the information about Docker-on-Mac environment (due to missing
              // host networking on Mac)
              .add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
*************** public class DockerEnvironmentFactory im
*** 222,228 ****

      private static ServerFactory getServerFactory() {
        ServerFactory.UrlFactory dockerUrlFactory =
!           (host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString();
        if (RUNNING_INSIDE_DOCKER_ON_MAC) {
          // If we're already running in a container, we need to use a fixed port range due to
          // non-existing host networking in Docker-for-Mac. The port range needs to be published
--- 224,230 ----

      private static ServerFactory getServerFactory() {
        ServerFactory.UrlFactory dockerUrlFactory =
!               (host, port) -> HostAndPort.fromParts(getCanonicalHostName(), port).toString();
        if (RUNNING_INSIDE_DOCKER_ON_MAC) {
          // If we're already running in a container, we need to use a fixed port range due to
          // non-existing host networking in Docker-for-Mac. The port range needs to be published
*************** public class DockerEnvironmentFactory im
*** 237,242 ****
--- 239,252 ----
      }
    }

+   private static String getCanonicalHostName() throws RuntimeException {
+     try {
+       return InetAddress.getLocalHost().getCanonicalHostName();
+     } catch (UnknownHostException e) {
+       throw new RuntimeException(e);
+     }
+   }
+
    /** Provider for DockerEnvironmentFactory. */
    public static class Provider implements EnvironmentFactory.Provider {
      private final boolean retainDockerContainer;
*************** public class DockerEnvironmentFactory im
*** 269,275 ****
      public ServerFactory getServerFactory() {
        switch (getPlatform()) {
          case LINUX:
!           return ServerFactory.createDefault();
          case MAC:
            return DockerOnMac.getServerFactory();
          default:
--- 279,286 ----
      public ServerFactory getServerFactory() {
        switch (getPlatform()) {
          case LINUX:
!           return DockerOnMac.getServerFactory();
! //          return ServerFactory.createDefault();
          case MAC:
            return DockerOnMac.getServerFactory();
          default:

这是我用来运行 Flink 的 Docker compose 文件:

version: '3.4'
services:
  jobmanager:
    image: tenx/flink:1.8.1
    command: 'jobmanager'
    environment:
      JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
      DOCKER_MAC_CONTAINER: 1
      FLINK_JM_HEAP: 128
    volumes:
      - jobmanager-data:/data
      - /var/run/docker.sock:/var/run/docker.sock
    ports:
      - target: 8081
        published: 8081
        protocol: tcp
        mode: ingress
    networks:
      - flink
  taskmanager:
    image: tenx/flink:1.8.1
    command: 'taskmanager'
    environment:
      JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
      DOCKER_MAC_CONTAINER: 1
      FLINK_TM_HEAP: 1024
      TASK_MANAGER_NUMBER_OF_TASK_SLOTS: 2
    networks:
      - flink
    volumes:
      - taskmanager-data:/data
      - /var/run/docker.sock:/var/run/docker.sock
      - /var/folders:/var/folders
volumes:
    jobmanager-data:
    taskmanager-data:
networks:
  flink:
    external: true

这是我的 Python 管道:

import apache_beam as beam
import logging

class LogElements(beam.PTransform):

    class _LoggingFn(beam.DoFn):

        def __init__(self, prefix=''):
            super(LogElements._LoggingFn, self).__init__()
            self.prefix = prefix

        def process(self, element, **kwargs):
            logging.info(self.prefix + str(element))
            yield element

    def __init__(self, label=None, prefix=''):
        super(LogElements, self).__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._LoggingFn(self.prefix))


from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099"])

p = beam.Pipeline(options=options)

(p | beam.Create([1, 2, 3, 4, 5]) | LogElements())

p.run()

这就是我运行作业服务的方式:

gradle :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081

自动选择 Docker 来执行 Python 工具。

我可以更改用于运行 Python 容器的图像:

options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=DOCKER", "--environment_config=beam/python:latest"])

我可以禁用 Docker 并启用 ExternalEnvironmentFactory:

options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=EXTERNAL", "--environment_config=server"])

但我必须在http://server:80 上实现一些回调应答。

有可用的实现吗?

【问题讨论】:

    标签: python kubernetes apache-flink apache-beam


    【解决方案1】:

    要回答上面的问题,基本上你想在同一个 pod 中添加 beam_worker_pool 容器和 flink 任务管理器容器。所以在你用来部署 flink 任务管理器的 yaml 文件中,添加一个新的容器:

      - name: beam-worker-pool
        image: apache/beam_python3.7_sdk:2.22.0
        args: ["--worker_pool"]
        ports:
        - containerPort: 50000
          name: pool
        livenessProbe:
          tcpSocket:
            port: 50000
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999
    

    【讨论】:

      【解决方案2】:

      我找到了解决方案。新版本的 Apache Beam 2.16.0 提供了与环境类型 EXTERNAL 结合使用的实现。该实现基于为支持 Kubernetes 而创建的 worker_pool_main。

      【讨论】:

      【解决方案3】:

      我知道它有点过时了,但现在有一个适用于 Kubernetes 的 Flink 操作符。

      以下是如何使用运算符通过 Flink 运行 Apache Beam 的示例:

      https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/tree/master/examples/beam

      【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-06-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-04-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多