【发布时间】:2019-12-05 15:25:32
【问题描述】:
我正在尝试在 Eclipse 中本地运行(使用本地运行程序),这是一个使用 Apache Beam 的 Java 数据流。
当我添加 pubsub 作为数据源时出现问题。
(如果我删除该行并尝试使用简单的管道,我可以毫无问题地执行它)
管道代码:
public class StarterPipeline {
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
/*
* Step 1: Read from PubSub
*/
PCollection<PubsubMessage> messages = null;
if (options.getUseSubscription()) {
messages = pipeline.apply("ReadPubSubSubscription", PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()).withIdAttribute("messageId"));
} else {
messages = pipeline.apply("ReadPubSubTopic", PubsubIO.readMessagesWithAttributes()
.fromTopic(options.getInputTopic()).withIdAttribute("messageId"));
}
<...>
return pipeline.run();
}
以及错误信息:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/beam/sdk/util/DoFnAndMainOutput
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:462)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:160)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:695)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:156)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$ParDoPayloadTranslator.translate(ParDoTranslation.java:111)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:206)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:547)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:557)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformMatchers$4.matches(PTransformMatchers.java:194)
at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:282)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:260)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:210)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:170)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
at com.accenture.pipeline.StarterPipeline.run(StarterPipeline.java:128)
at com.accenture.pipeline.StarterPipeline.main(StarterPipeline.java:74)
Caused by: java.lang.ClassNotFoundException: org.apache.beam.sdk.util.DoFnAndMainOutput
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:602)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
... 26 more
为了在本地执行管道,我尝试使用:
- Maven
- 数据流 SDK
有以下选项:
我应该添加任何依赖项吗?
如果您需要任何详细信息,请告诉我。
【问题讨论】:
-
查看堆栈跟踪,似乎您使用的是 DirectRunnner 而不是 DataflowRunner。您可以参考以下内容来正确设置您的 Dataflow 管道吗? cloud.google.com/dataflow/docs/quickstarts/… 不幸的是,如果您尝试使用最新版本的 Beam,则提供的 eclipse 插件似乎不适用于该插件。
标签: java google-cloud-dataflow apache-beam