【问题标题】:Error No Runner was specified and the DirectRunner was not found on the classpath错误没有指定运行程序并且在类路径上找不到 DirectRunner
【发布时间】:2018-10-28 08:03:05
【问题描述】:

我在运行 flink 1.6.1 的单节点 flink 集群上运行字数统计示例

并不断收到此错误。

我使用的是梁版本 2.8.0。

错误

jobmanager_1   | 2018-10-28 07:20:41,401 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception occurred in REST handler.
jobmanager_1   | org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
jobmanager_1   |    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
jobmanager_1   |    at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
jobmanager_1   |    at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
jobmanager_1   |    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
jobmanager_1   |    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
jobmanager_1   |    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
jobmanager_1   |    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
jobmanager_1   |    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
jobmanager_1   |    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
jobmanager_1   |    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
jobmanager_1   |    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
jobmanager_1   | Caused by: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
jobmanager_1   |    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
jobmanager_1   |    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
jobmanager_1   |    ... 6 more
jobmanager_1   | Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
jobmanager_1   |    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
jobmanager_1   |    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
jobmanager_1   |    at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
jobmanager_1   |    at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:74)
jobmanager_1   |    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
jobmanager_1   |    ... 7 more
jobmanager_1   | Caused by: java.lang.IllegalArgumentException: No Runner was specified and the DirectRunner was not found on the classpath.
jobmanager_1   | Specify a runner by either:
jobmanager_1   |     Explicitly specifying a runner by providing the 'runner' property
jobmanager_1   |     Adding the DirectRunner to the classpath
jobmanager_1   |     Calling 'PipelineOptions.setRunner(PipelineRunner)' directly
jobmanager_1   |    at org.apache.beam.sdk.options.PipelineOptions$DirectRunner.create(PipelineOptions.java:300)
jobmanager_1   |    at org.apache.beam.sdk.options.PipelineOptions$DirectRunner.create(PipelineOptions.java:286)
jobmanager_1   |    at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
jobmanager_1   |    at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
jobmanager_1   |    at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
jobmanager_1   |    at org.apache.beam.sdk.options.PipelineOptionsValidator.validate(PipelineOptionsValidator.java:97)
jobmanager_1   |    at org.apache.beam.sdk.options.PipelineOptionsValidator.validate(PipelineOptionsValidator.java:47)
jobmanager_1   |    at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:44)
jobmanager_1   |    at org.apache.beam.sdk.Pipeline.create(Pipeline.java:145)
jobmanager_1   |    at com.rnd.beam.PipelineDataHandler.runPipeline(PipelineDataHandler.java:75)
jobmanager_1   |    at com.rnd.beam.PipelineDataHandler.main(PipelineDataHandler.java:29)
jobmanager_1   |    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
jobmanager_1   |    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
jobmanager_1   |    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
jobmanager_1   |    at java.lang.reflect.Method.invoke(Method.java:498)
jobmanager_1   |    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
jobmanager_1   |    ... 11 more

主类 `

public class PipelineDataHandler {
   public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);
        counter.inc();
        p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())
                .apply(
                        FlatMapElements.into(TypeDescriptors.strings())
                                .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
                // We use a Filter transform to avoid empty word
                .apply(Filter.by((String word) -> !word.isEmpty()))
                .apply(Count.perElement())
                .apply(
                        MapElements.into(TypeDescriptors.strings())
                                .via(
                                        (KV<String, Long> wordCount) ->
                                                wordCount.getKey() + ": " + wordCount.getValue()))
                .apply(TextIO.write().to("wordcounts"));

        p.run().waitUntilFinish();
  }
}

`

gradle 文件

`

repositories {
    jcenter()
    mavenLocal()
    mavenCentral()
}

apply plugin: 'org.owasp.dependencycheck'
apply plugin: 'application'
apply plugin: 'com.github.johnrengelman.shadow'

mainClassName = 'com.rnd.beam.PipelineDataHandler' //com/rnd/beam/PipelineDataHandler
sourceCompatibility = 1.8

dependencies {
    compile('org.apache.beam:beam-sdks-java-core:2.8.0')
    compile('org.apache.beam:beam-runners-flink_2.11:2.8.0')
//    testCompile('org.apache.beam:beam-runners-direct-java:2.8.0')
    testCompile('org.testng:testng:6.14.+')
}

jar {
    manifest {
        attributes(
                'Class-Path': configurations.compile.collect { it.getName() }.join(' '),
                'Main-Class': 'com.rnd.beam.PipelineDataHandler'
        )
    }
}

`

【问题讨论】:

    标签: java apache-flink apache-beam flink-cep


    【解决方案1】:

    必须在流水线中指定流道。有两种设置方式,一种是使用pipeline.setRunner()方法设置你要使用的Beam Runner,另一种是在命令行中指定Beam runner ,示例:

    java -jar your_application.jar --runner=FlinkRunner
    

    您的代码应如下所示:

    public class PipelineDataHandler {
    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);
        p.setRunner(FlinkRunner.class)
        counter.inc();
        p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())
            .apply(
                    FlatMapElements.into(TypeDescriptors.strings())
                            .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
            // We use a Filter transform to avoid empty word
            .apply(Filter.by((String word) -> !word.isEmpty()))
            .apply(Count.perElement())
            .apply(
                    MapElements.into(TypeDescriptors.strings())
                            .via(
                                    (KV<String, Long> wordCount) ->
                                            wordCount.getKey() + ": " + wordCount.getValue()))
            .apply(TextIO.write().to("wordcounts"));
        p.run().waitUntilFinish();}}
    

    【讨论】:

    • 必须在pipeline中指定runner。有两种设置方式,一种是使用pipeline.setRunner()方法设置你要使用的Beam Runner,另一种方式是在命令行中指定Beam runner,例如:java -jar your_application.jar --runner=FlinkRunner
    猜你喜欢
    • 1970-01-01
    • 2016-11-24
    • 2018-08-26
    • 2016-11-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-11-29
    相关资源
    最近更新 更多