【问题标题】:Running Java Jar with included config via maven on flink yarn cluster在 flink 纱线集群上通过 maven 运行包含配置的 Java Jar
【发布时间】:2019-12-20 20:33:51
【问题描述】:

我在 maven/java 项目中使用 flink,需要在创建的 jar 内部包含我的配置。

所以,我在我的 pom 文件中添加了以下内容。这包括我在 jar 中的所有 yml 配置(位于 src/main/resources 文件夹中),我将在执行时将其名称作为参数传递。

    <resources>
        <resource>
            <directory>src/main/resources</directory>
            <includes>
                <include>**/*.yml</include>
            </includes>
        </resource>
    </resources>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <finalName>${project.artifactId}-${project.version}</finalName>
                        <shadedArtifactAttached>true</shadedArtifactAttached>
                        <transformers>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.exmaple.MyApplication</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>

下面的主类代码接收一个 arg,根据它我决定从资源中选择什么配置,读取(使用snakeyaml)并使用。

public static void main(String[] args) throws Exception {
    final ParameterTool parameterTool = ParameterTool.fromArgs(args);

    ClassLoader classLoader = MyApplication.class.getClassLoader();
    Yaml yaml = new Yaml();

    String filename = parameterTool.getRequired("configFilename");

    InputStream in  = classLoader.getSystemResourceAsStream(filename);
    MyConfigClass = yaml.loadAs(in, MyConfigClass.class);

    ...

}

mvn clean install 创建“my-shaded-jar.jar”

我使用命令执行的

java -jar /path/to/my-shaded-jar.jar --configFilename filename

如果我与其他人共享 jar,它可以在多个系统上运行。

但是,当我尝试使用以下命令在 Hadoop 上的纱线集群中运行相同的 jar 时,我遇到了问题:-

HADOOP_CLASSPATH=`hadoop classpath` HADOOP_CONF_DIR=/etc/hadoop/conf ./flink-1.6.2/bin/flink run -m yarn-cluster -yd -yn 5 -ys 30 -yjm 10240 -ytm 10240 -yst -ynm some-job-name -yqu queue-name ./my-shaded-jar.jar --configFilename filename

我收到以下错误:

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
    at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
    at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)
Caused by: org.yaml.snakeyaml.error.YAMLException: java.io.IOException: Stream closed
    at org.yaml.snakeyaml.reader.StreamReader.update(StreamReader.java:200)
    at org.yaml.snakeyaml.reader.StreamReader.<init>(StreamReader.java:60)
    at org.yaml.snakeyaml.Yaml.loadAs(Yaml.java:444)
    at com.example.MyApplication.main(MyApplication.java:53)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    ... 13 more
Caused by: java.io.IOException: Stream closed
    at java.io.PushbackInputStream.ensureOpen(PushbackInputStream.java:74)
    at java.io.PushbackInputStream.read(PushbackInputStream.java:166)
    at org.yaml.snakeyaml.reader.UnicodeReader.init(UnicodeReader.java:90)
    at org.yaml.snakeyaml.reader.UnicodeReader.read(UnicodeReader.java:122)
    at java.io.Reader.read(Reader.java:140)
    at org.yaml.snakeyaml.reader.StreamReader.update(StreamReader.java:184)

为什么我的解决方案可以在任何普通的 linux/mac 系统上运行,但是在 yarn 集群上使用 flink run 命令运行时,具有相同 args 的同一个 jar 会失败。 我们通常执行 jars 的方式和 yarn 的执行方式有区别吗?

任何帮助表示赞赏。

【问题讨论】:

    标签: java maven hadoop apache-flink hadoop-yarn


    【解决方案1】:

    classLoader.getSystemResourceAsStream(filename) 替换为classLoader.getResourceAsStream(filename)

    1. java.lang.ClassLoader#getSystemResourceAsStream 通过系统类加载器定位资源,通常用于启动应用程序。

    2. java.lang.ClassLoader#getResourceAsStream 将首先搜索父类加载器。如果失败,它将调用当前类加载器的findResource

    为了避免依赖冲突,Flink 应用程序中的类被分为两个域 [1],这也适用于 Flink 客户端,例如CliFrontend

    Java 类路径包括 Apache Flink 的类及其核心依赖项。
    动态用户代码包括用户 jar 的类(和资源)。

    所以为了找到你的“配置文件”,它被打包在你的jar文件中,我们应该使用用户代码类加载器(你可以在org.apache.flink.client.program.PackagedProgram中找到userCodeClassLoader的详细信息),而不是系统类加载器。

    1. https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html

    【讨论】:

    • 嗨,roban,将classLoader.getSystemResourceAsStream(filename) 更改为classLoader.getResourceAsStream(filename) 怎么样?看来getSystemResourceAsStream 使用系统类加载器。
    • 嗨,维克多,它有效。非常感谢。您能否编辑您的初始答案并说明为什么将 classloader.getSystemResourceAsStream 更改为 classloader.getResourceAsStream 可以解决问题。
    • 嗨,rohan,我已经更新了答案。而且调试到 Flink 源码之后,我觉得用MyApplication.class.getClassLoader 是可以的,因为主类也加载了用户代码类加载器。您可以尝试验证一下吗?
    • 是的,可以使用“MyApplication.class.getClassLoader”。不错的更新。接受并赞成。谢谢。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-07-25
    • 1970-01-01
    • 1970-01-01
    • 2021-10-10
    • 1970-01-01
    • 2015-09-03
    • 1970-01-01
    相关资源
    最近更新 更多