【问题标题】:Flink checkpoints to Google Cloud StorageFlink 检查点到 Google Cloud Storage
【发布时间】:2018-08-15 14:45:38
【问题描述】:

我正在尝试为 GCS 中的 flink 作业配置检查点。 如果我在本地运行测试作业(没有 docker 和任何集群设置),一切正常,但如果我使用 docker-compose 或集群设置运行它并在 flink 仪表板中部署带有作业的 fat jar,它会失败并出现错误。

有什么想法吗? 谢谢!

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:61)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:441)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:379)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:247)
... 33 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)

Env 配置是这样的:

StreamExecutionEnvironment env = applicationContext.getBean(StreamExecutionEnvironment.class);
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.setFailOnCheckpointingErrors(false);
    checkpointConfig.setCheckpointInterval(10000);
    checkpointConfig.setMinPauseBetweenCheckpoints(5000);
    checkpointConfig.setMaxConcurrentCheckpoints(1);
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(
            String.format("gs://checkpoints/%s", jobClass.getSimpleName()), true);
    env.setStateBackend((StateBackend) rocksDBStateBackend);

这是我的core-site.xml 文件:

<configuration>
<property>
    <name>google.cloud.auth.service.account.enable</name>
    <value>true</value>
</property>
<property>
    <name>google.cloud.auth.service.account.json.keyfile</name>
    <value>${user.dir}/key.json</value>
</property>
<property>
    <name>fs.gs.impl</name>
    <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value>
    <description>The FileSystem for gs: (GCS) uris.</description>
</property>
<property>
    <name>fs.AbstractFileSystem.gs.impl</name>
    <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
    <description>The AbstractFileSystem for gs: (GCS) uris.</description>
</property>
<property>
    <name>fs.gs.application.name.suffix</name>
    <value>-kube-flink</value>
    <description>
        Appended to the user-agent header for API requests to GCS to help identify
        the traffic as coming from Dataproc.
    </description>
</property>

对 gcs-connector 的依赖:

<dependency>
        <groupId>com.google.cloud.bigdataoss</groupId>
        <artifactId>gcs-connector</artifactId>
        <version>1.9.4-hadoop2</version>
</dependency>

更新:

在对依赖项进行一些操作后,我已经能够编写检查点了。我目前的设置是:

<dependency>
        <groupId>com.google.cloud.bigdataoss</groupId>
        <artifactId>gcs-connector</artifactId>
        <version>hadoop2-1.9.5</version>
</dependency>
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
        <version>1.5.1</version>
</dependency>

另外我将 flink 镜像切换到版本flink:1.5.2-hadoop28

不幸的是,我仍然无法读取检查点数据,因为我的工作在恢复状态时总是失败并出现错误:

java.lang.NoClassDefFoundError: com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl$6
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:666)
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:323)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:136)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1102)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1005)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:974)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)

我相信这将是最后一个错误...

【问题讨论】:

  • 我检查了文档,但不确定您的依赖是否正确。如果您将此依赖项用于 gsc-connector,会发生什么? &lt;dependency&gt; &lt;groupId&gt;com.google.cloud.bigdataoss&lt;/groupId&gt; &lt;artifactId&gt;gcs-connector&lt;/artifactId&gt; &lt;version&gt;hadoop2-1.9.5&lt;/version&gt; &lt;scope&gt;provided&lt;/scope&gt; &lt;/dependency&gt;
  • 你好菲利普!实际上,我在故障排除方面取得了一些进展。我将 flink docker 映像从 flink:1.5.0 更改为 flink:1.5.2-hadoop28,因此异常消失了。不幸的是,我有另一个,你的建议没有帮助。我现在得到的是java.lang.ClassNotFoundException: com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.auth.oauth2.TokenResponseException 似乎是类加载问题。它无法从 GoogleHadoopFileSystemBase 中看到 GCS 类...
  • 能否提供完整的 pom.xml 文件并更新错误信息?
  • 嗨菲利普!我添加了更新信息。如果您能够帮助解决 NoClassDefFound 错误,那就太好了。感谢您的帮助。

标签: google-cloud-storage apache-flink google-cloud-dataproc


【解决方案1】:

终于找到解决方案here

您必须创建自己的映像并将 gcs-connector 放入 lib 目录。否则你总是会遇到类加载问题(用户代码和系统类加载器)。

要创建自定义 Docker 映像,我们创建以下 Dockerfile:

FROM registry.platform.data-artisans.net/trial/v1.0/flink:1.4.2-dap1-scala_2.11

RUN wget -O lib/gcs-connector-latest-hadoop2.jar https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar

RUN wget -O lib/gcs-connector-latest-hadoop2.jar https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar && \     
wget http://ftp.fau.de/apache/flink/flink-1.4.2/flink-1.4.2-bin-hadoop28-scala_2.11.tgz && \

tar xf flink-1.4.2-bin-hadoop28-scala_2.11.tgz && \
mv flink-1.4.2/lib/flink-shaded-hadoop2* lib/  && \
rm -r flink-1.4.2*  

RUN mkdir etc-hadoop
COPY <name of key file>.json etc-hadoop/
COPY core-site.xml etc-hadoop/

ENTRYPOINT ["/docker-entrypoint.sh"]
EXPOSE 6123 8081
CMD ["jobmanager"]

Docker 镜像将基于我们提供的 Flink 镜像 dA 平台试验的一部分。我们正在添加 Google Cloud Storage 连接器、Flink 的 Hadoop 包和配置密钥 文件。

要构建自定义映像,以下文件应位于您的 当前目录:core-site.xml、Dockerfile 和密钥文件 (.json)。

为了最终触发自定义图像的构建,我们运行以下命令 命令:

$ docker build -t flink-1.4.2-gs .

图像构建完成后,我们会将图像上传到 Google 的 容器注册表。配置 Docker 以正确访问 注册表,运行此命令一次:

$ gcloud auth configure-docker

接下来,我们将标记并上传容器:

$ docker tag flink-1.4.2-gs:latest eu.gcr.io/<your project id>/flink-1.4.2-gs
$ docker push eu.gcr.io/<your project id>/flink-1.4.2-gs

上传完成后,我们需要设置自定义图片 应用程序管理器部署。发送以下 PATCH 请求:

PATCH /api/v1/deployments/<your AppMgr deployment id>
 spec:
   template:
     spec:
       flinkConfiguration:
         fs.hdfs.hadoopconf: /opt/flink/etc-hadoop/
       artifact:
         flinkImageRegistry: eu.gcr.io
         flinkImageRepository: <your project id>/flink-1.4.2-gs
         flinkImageTag: latest

或者,使用以下 curl 命令:

$ curl -X PATCH --header 'Content-Type: application/yaml' --header 'Accept: application/yaml' -d '  spec: \ 
    template: \ 
      spec: \ 
        flinkConfiguration:
          fs.hdfs.hadoopconf: /opt/flink/etc-hadoop/
        artifact: \ 
          flinkImageRegistry: eu.gcr.io \ 
          flinkImageRepository: <your project id>/flink-1.4.2-gs \ 
          flinkImageTag: latest' 'http://localhost:8080/api/v1/deployments/<your AppMgr deployment id>‘

实施此更改后,您将能够检查点到 Google 的 云储存。指定目录时使用以下模式 gs:///检查点。对于保存点,设置 state.savepoints.dir Flink 配置选项。

【讨论】:

  • 您能否使用有效的 URL 更新您的答案?
  • flink.apache.org/downloads.html "如果您计划将 Apache Flink 与 Apache Hadoop 一起使用(在 YARN 上运行 Flink,连接到 HDFS,连接到 HBase,或者使用一些基于 Hadoop 的文件系统连接器)然后选择下载捆绑匹配的Hadoop版本,下载可选的与您的版本匹配的预捆绑Hadoop并将其放在Flink的lib文件夹中,或者导出您的HADOOP_CLASSPATH。”
  • 添加后,我在类未找到异常上失败,与一些谷歌依赖项有关,如WARN org.apache.hadoop.fs.FileSystem [] - java.lang.NoClassDefFoundError: com/google/api/client/http/HttpRequestInitializer
【解决方案2】:

问题在于方案 gs:// 的实现。这是连接到 GCS 的协议。如果添加以下依赖项,java程序应该能够运行:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-storage</artifactId>
  <version>1.35.0</version>
</dependency>

this link 中,您将了解如何为任何其他编程语言添加此依赖项。

【讨论】:

  • 你好,查克!感谢您的回复,但 id 没有帮助。实际上,GoogleCloudStorageImpl 中定义了一个匿名类,应用程序类加载器无法加载该类。我已经深入研究了作业 jar,类路径中有一个类 GoogleCloudStorageImpl$6 但不知何故找不到。
  • 哦!我是的,根据您上次的更新,这样的类属于一个名为 gcsio 的包。我在 github [1] 中找到了 GoogleCloudStorageImpl.java,它的 pom.xml 有这个依赖: gcsio1.9.6-SNAPSHOT 我不确定这是否可以帮助你但我认为您只需要正确引用此依赖项即可。 (1)github.com/GoogleCloudPlatform/bigdata-interop/blob/master/…
【解决方案3】:

使用这个 docker 文件对我有用,关键是采用正确的依赖版本。

我的这个解决方案基于flink k8s operator

  1. 泊坞窗文件
ARG FLINK_VERSION=1.13.1
ARG SCALA_VERSION=2.12
FROM flink:${FLINK_VERSION}-scala_${SCALA_VERSION}-java8

ARG FLINK_HADOOP_VERSION=2.8.3-10.0
ARG GCS_CONNECTOR_VERSION=latest-hadoop2

ARG GCS_CONNECTOR_NAME=gcs-connector-${GCS_CONNECTOR_VERSION}.jar
ARG GCS_CONNECTOR_URI=https://storage.googleapis.com/hadoop-lib/gcs/${GCS_CONNECTOR_NAME}
ARG FLINK_HADOOP_JAR_NAME=flink-shaded-hadoop-2-uber-${FLINK_HADOOP_VERSION}.jar
ARG FLINK_HADOOP_JAR_URI=https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/${FLINK_HADOOP_VERSION}/${FLINK_HADOOP_JAR_NAME}

#COPY target/lib /opt/flink/lib

RUN echo "Downloading ${GCS_CONNECTOR_URI}" && \
  wget -q -O /opt/flink/lib/${GCS_CONNECTOR_NAME} ${GCS_CONNECTOR_URI}
RUN echo "Downloading ${FLINK_HADOOP_JAR_URI}" && \
  wget -q -O /opt/flink/lib/${FLINK_HADOOP_JAR_NAME} ${FLINK_HADOOP_JAR_URI}

COPY target/play-flink-1.0-SNAPSHOT.jar /opt/flink/usrlib/play-flink-1.0-SNAPSHOT.jar
  1. 创建 core.xml 文件:
<?xml version="1.0" ?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>fs.AbstractFileSystem.gs.impl</name>
        <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
        <description>The AbstractFileSystem for gs: uris.</description>
    </property>
    <property>
        <name>fs.gs.project.id</name>
        <value>projectName</value>
        <description>
            Optional. Google Cloud Project ID with access to GCS buckets.
            Required only for list buckets and create bucket operations.
        </description>
    </property>
    <property>
        <name>google.cloud.auth.service.account.enable</name>
        <value>true</value>
        <description>
            Whether to use a service account for GCS authorization.
        </description>
    </property>
</configuration>
  1. 将 core.xml 创建为 configmap kubectl create configmap hadoop-configmap --from-file core-site.xml

  2. 为服务帐户创建密钥 kubectl create secret generic gcp-secret --from-file=key.json=${SERVICE_ACCOUNT_FILE}

  3. 在 job.yaml 中加载 configmap 和服务帐户密码

 volumeMounts:
            - name: hadoop-configmap-volume
              mountPath: /etc/hadoop/conf
            - name: google-cloud-key
              mountPath: /etc/gcp/keys
  ....
   volumes:
        - name: google-cloud-key
          secret:
            secretName: gcp-secret
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
        - name: hadoop-configmap-volume
          configMap:
              name: hadoop-configmap
              items:
                - key: core-site.xml
                  path: core-site.xml
  1. 在flink conf中配置卷挂载路径 fs.hdfs.hadoopconf: /etc/hadoop/conf

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-01-31
    • 1970-01-01
    • 1970-01-01
    • 2018-07-16
    • 2019-10-02
    • 1970-01-01
    • 2019-07-11
    相关资源
    最近更新 更多