【发布时间】:2022-06-15 00:15:11
【问题描述】:
我有一个 Apache Beam 管道,它处理无限数据并将结果写入 MySQL。在这个过程中,需要从用户标识符中查找用户名。我正在使用侧面输入用户 ID 和用户名映射到管道。
由于我们不断添加用户,因此需要定期更新侧边输入。我已经了解了侧输入模式“缓慢更新全局窗口侧输入”和“使用窗口化缓慢更新侧输入”。
我倾向于第一个,因为添加的新用户不是那么频繁。
使用 JdbcIO 从数据库中读取用户。
final PCollection userCollection =
pipeline.apply("read-users-info", jdbcMgr.readUserInfo(userDsFn));
从 MySQL 读取数据
public PTransform<PBegin, PCollection<KV<String, String>>> readUserInfo(
SerializableFunction<Void, DataSource> dataSourceProviderFn) {
LOG.info("reading users");
return JdbcIO.<KV<String, String>>read()
.withDataSourceProviderFn(dataSourceProviderFn)
.withQuery("select id, concat(first_name, ' ', last_name) from users")
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.withRowMapper(
(JdbcIO.RowMapper<KV<String, String>>) rs -> KV.of(rs.getString(1), rs.getString(2)));
}
}
使用全局窗口更新侧面输入。
final PCollectionView<Map<String, String>> userMap =
pipeline
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(30)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(30))))
.apply(Sum.longsGlobally().withoutDefaults())
.apply(
ParDo.of(
new DoFn<Long, Map<String, String>>() {
@ProcessElement
public void process(
@Element Long input,
@Timestamp Instant timestamp,
OutputReceiver<PCollection<KV<String, String>>> o) {
o.output(userCollection);
}
}))
.apply(
Window.<Map<String, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(View.asSingleton());
我确定 o.output(userCollection); 有问题,请您在这里帮帮我。
我遇到了以下问题。
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: com.streaming.pipelines.ContactPipeline$1, @ProcessElement process(Long, Instant, OutputReceiver), @ProcessElement process(Long, Instant, OutputReceiver), parameter of type DoFn.OutputReceiver<PCollection<KV<String, String>>> at index 2: OutputReceiver should be parameterized by java.util.Map<java.lang.String, java.lang.String>
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.IllegalArgumentException: com.streaming.pipelines.ContactPipeline$1, @ProcessElement process(Long, Instant, OutputReceiver), @ProcessElement process(Long, Instant, OutputReceiver), parameter of type DoFn.OutputReceiver<PCollection<KV<String, String>>> at index 2: OutputReceiver should be parameterized by java.util.Map<java.lang.String, java.lang.String>
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.throwIllegalArgument(DoFnSignatures.java:2397)
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.checkArgument(DoFnSignatures.java:2403)
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.analyzeExtraParameter(DoFnSignatures.java:1406)
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.analyzeProcessElementMethod(DoFnSignatures.java:1230)
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.parseSignature(DoFnSignatures.java:638)
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.lambda$getSignature$0(DoFnSignatures.java:294)
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getSignature(DoFnSignatures.java:294)
at org.apache.beam.sdk.transforms.ParDo.validate(ParDo.java:614)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:403)
at com.streaming.pipelines.ContactPipeline.buildPipeline(ContactPipeline.java:61)
at com.streaming.pipelines.StreamingApp.main(StreamingApp.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 8 more
谢谢, 苏雷什
【问题讨论】:
-
在管道上方运行时是否出现错误?
-
是的@chamikara,我已经用错误堆栈更新了讨论。
标签: apache-beam