【问题标题】:External checkpoints to S3 on EMREMR 上 S3 的外部检查点
【发布时间】:2018-02-19 19:52:12
【问题描述】:

我正在尝试为我的 Flink 程序部署一个生产集群。我正在使用安装了 Flink 1.3.2 的标准 hadoop-core EMR 集群,使用 YARN 运行它。

我正在尝试配置我的 RocksDB 以将我的检查点写入 S3 存储桶。我正在尝试浏览这些文档:https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html#set-s3-filesystem。问题似乎是使依赖项正常工作。尝试运行程序时收到此错误:

java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.initialize(EmrFileSystem.java:93)
    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:328)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:350)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:293)
    at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.<init>(FsCheckpointStreamFactory.java:99)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createStreamFactory(FsStateBackend.java:282)
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createStreamFactory(RocksDBStateBackend.java:273    

我已经尝试离开和调整 core-site.xml 并保持原样。我尝试将 HADOOP_CLASSPATH 设置为包含(我假设是)上述指南中描述的大多数 JAR 的 /usr/lib/hadoop/share。我尝试下载 hadoop 2.7.2 二进制文件,并将它们复制到 flink/libs 目录中。所有导致相同的错误。

有没有人成功让 Flink 能够在 EMR 上写入 S3?

编辑:我的集群设置

AWS 门户:

1) EMR -> Create Cluster
2) Advanced Options
3) Release = emr-5.8.0
4) Only select Hadoop 2.7.3
5) Next -> Next -> Next -> Create Cluster ( I do fill out names/keys/etc)

一旦集群启动,我 ssh 进入 Master 并执行以下操作:

1  wget http://apache.claz.org/flink/flink-1.3.2/flink-1.3.2-bin-hadoop27-scala_2.11.tgz
2  tar -xzf flink-1.3.2-bin-hadoop27-scala_2.11.tgz
3  cd flink-1.3.2
4  ./bin/yarn-session.sh -n 2 -tm 5120 -s 4 -d
5  Change conf/flink-conf.yaml 
6  ./bin/flink run -m yarn-cluster -yn 1 ~/flink-consumer.jar

我的 conf/flink-conf.yaml 我添加了以下字段:

state.backend: rocksdb
state.backend.fs.checkpointdir: s3:/bucket/location
state.checkpoints.dir: s3:/bucket/location

我的程序的检查点设置:

env.enableCheckpointing(getCheckpointRate,CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(getCheckpointMinPause)
env.getCheckpointConfig.setCheckpointTimeout(getCheckpointTimeout)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setStateBackend(new RocksDBStateBackend("s3://bucket/location", true))

如果您认为我缺少任何步骤,请告诉我

【问题讨论】:

    标签: amazon-s3 apache-flink emr amazon-emr rocksdb


    【解决方案1】:

    我假设你在 EMR Yarn 集群上自己安装了 Flink 1.3.2,因为亚马逊还没有提供 Flink 1.3.2,对吧?

    鉴于此,您似乎遇到了依赖冲突。 org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration) 方法仅在 Hadoop 2.4.0 中引入。因此我假设你已经部署了一个 Flink 1.3.2 版本,它是用 Hadoop 2.3.0 构建的。请部署在 EMR 上运行的 Hadoop 版本构建的 Flink 版本。这很可能会解决所有依赖冲突。

    将 Hadoop 依赖项放入 lib 文件夹似乎无法可靠地工作,因为 flink-shaded-hadoop2-uber.jar 似乎在类路径中具有优先权。

    【讨论】:

    • 你是对的,我在 EMR Yarn 集群上安装了 1.3.2,但是我使用的是使用 Hadoop 2.71.3.2 版本(因为这是 EMR 实例的 Hadoop 版本已安装)。 flink-1.3.2-bin-hadoop27-scala_2.11 是包含我正在使用的二进制文件的 tar 文件。使用二进制文件可能是问题吗?我应该从源代码安装,还是应该不重要?
    • 没关系。嗯,那么这个 Hadoop &lt;= 2.3.0 Configuration 必须来自其他地方。您能否检查日志并列出其中列出的完整类路径?
    • Classpath: /home/hadoop/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/home/hadoop/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/home/hadoop/flink-1.3.2/lib/log4j-1.2.17.jar:/home/hadoop/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/home/hadoop/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar::/etc/hadoop/conf:
    • 您能否通过提取类并通过jad 对其进行反编译来检查flink-shaded-h‌​adoop2-uber-1.3.2.ja‌​r 是否包含正确的Hadoop Configuration 类。然后你可以检查它是否有方法addResource(Configuration)。此外,请确保flink-1.3.2/lib 不包含任何其他可能引入错误 Hadoop 版本的依赖项。
    • 嗯,那么冲突肯定来自EMR的Yarn集群带来的依赖之一。不过,这很奇怪。
    猜你喜欢
    • 2021-02-15
    • 2016-12-13
    • 1970-01-01
    • 1970-01-01
    • 2017-06-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-01
    相关资源
    最近更新 更多