【问题标题】:Issue with job submission from Flink Job UI (Exception:org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException)从 Flink 作业 UI 提交作业的问题(异常:org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException)
【发布时间】:2020-06-18 03:17:14
【问题描述】:

我有用于 flink 作业的简单 java 代码

List<Tuple2> list = new ArrayList<>();
for (int i  = 0; i < 10; i++) {
     list.add(new Tuple2(Integer.valueOf(i), "test" + i));
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(list).print();
env.execute("job1");

我打包了这段代码并创建了一个 jar:比如 flink-processor-0.1-SNAPSHOT.jar,从提交作业 UI 将其上传到 JobManager。上传没有问题。我看到 EntryClass 有主类(com.abc.xyz.streaming.FlinkProcessor) 现在,我使用一些参数(--ns.conf1 abc.file --ns.conf2 xyz.file)从“Submit Job Ui”提交作业并指定主类(com.abc.xyz.streaming.FlinkProcessor)。 作业提交失败。 在 JobManager 中,我看到以下错误。

org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
    at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:70)
    at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:53)
    at com.abc.xyz.streaming.FlinkProcessor.run(FlinkProcessor.java:114)
    at com.abc.xyz.streaming.FlinkProcessor.main(FlinkProcessor.java:53)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
    at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
    at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2020-06-16 16:14:20,588 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The program plan could not be fetched - the program aborted pre-maturely.

System.err: Running flink job :2020-06-16T23:14:20.476Z
 Error msg: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
    at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:70)
    at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:53)

当我使用 flink run 命令提交相同的作业时,它工作正常。没有错误。

flink run -c com.abc.xyz.streaming.FlinkProcessor /Users/abc/target/flink-processor-0.1-SNAPSHOT.jar --ns.conf1 abc.file --ns.conf2 xyz。文件

不确定我在这里缺少什么?非常感谢任何帮助。

【问题讨论】:

    标签: apache-flink


    【解决方案1】:

    发现问题。 我用

    包装我的代码
    try { 
     List<Tuple2> list = new ArrayList<>();
     for (int i  = 0; i < 10; i++) {
         list.add(new Tuple2(Integer.valueOf(i), "test" + i));
     }
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     env.fromCollection(list).print();
     env.execute("job1");
    } catch (Throwable t) { 
    ignore t
    };
    

    更改了异常块 try { ..... } catch (Exception exp) { ignore exp}; 在此之后,它开始工作。

    谢谢!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-12-27
      • 1970-01-01
      • 1970-01-01
      • 2022-12-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多