【问题标题】:Apache Beam Update side input from database来自数据库的 Apache Beam 更新端输入
【发布时间】:2022-06-15 00:15:11
【问题描述】:

我有一个 Apache Beam 管道,它处理无限数据并将结果写入 MySQL。在这个过程中,需要从用户标识符中查找用户名。我正在使用侧面输入用户 ID 和用户名映射到管道。

由于我们不断添加用户,因此需要定期更新侧边输入。我已经了解了侧输入模式“缓慢更新全局窗口侧输入”和“使用窗口化缓慢更新侧输入”。

我倾向于第一个,因为添加的新用户不是那么频繁。

使用 JdbcIO 从数据库中读取用户。

   final PCollection userCollection =
       pipeline.apply("read-users-info", jdbcMgr.readUserInfo(userDsFn));

从 MySQL 读取数据

  public PTransform<PBegin, PCollection<KV<String, String>>> readUserInfo(
      SerializableFunction<Void, DataSource> dataSourceProviderFn) {
    LOG.info("reading users");
    return JdbcIO.<KV<String, String>>read()
        .withDataSourceProviderFn(dataSourceProviderFn)
        .withQuery("select id, concat(first_name, ' ', last_name) from users")
        .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
        .withRowMapper(
            (JdbcIO.RowMapper<KV<String, String>>) rs -> KV.of(rs.getString(1), rs.getString(2)));
  }
}

使用全局窗口更新侧面输入。

    final PCollectionView<Map<String, String>> userMap =
        pipeline
            .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(30)))
            .apply(Window.into(FixedWindows.of(Duration.standardSeconds(30))))
            .apply(Sum.longsGlobally().withoutDefaults())
            .apply(
                ParDo.of(
                    new DoFn<Long, Map<String, String>>() {
                      @ProcessElement
                      public void process(
                          @Element Long input,
                          @Timestamp Instant timestamp,
                          OutputReceiver<PCollection<KV<String, String>>> o) {
                        o.output(userCollection);
                      }
                    }))
            .apply(
                Window.<Map<String, String>>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                    .discardingFiredPanes())
            .apply(View.asSingleton());

我确定 o.output(userCollection); 有问题,请您在这里帮帮我。

我遇到了以下问题。

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: com.streaming.pipelines.ContactPipeline$1, @ProcessElement process(Long, Instant, OutputReceiver), @ProcessElement process(Long, Instant, OutputReceiver), parameter of type DoFn.OutputReceiver<PCollection<KV<String, String>>> at index 2: OutputReceiver should be parameterized by java.util.Map<java.lang.String, java.lang.String>
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.IllegalArgumentException: com.streaming.pipelines.ContactPipeline$1, @ProcessElement process(Long, Instant, OutputReceiver), @ProcessElement process(Long, Instant, OutputReceiver), parameter of type DoFn.OutputReceiver<PCollection<KV<String, String>>> at index 2: OutputReceiver should be parameterized by java.util.Map<java.lang.String, java.lang.String>
    at org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.throwIllegalArgument(DoFnSignatures.java:2397)
    at org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.checkArgument(DoFnSignatures.java:2403)
    at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.analyzeExtraParameter(DoFnSignatures.java:1406)
    at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.analyzeProcessElementMethod(DoFnSignatures.java:1230)
    at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.parseSignature(DoFnSignatures.java:638)
    at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.lambda$getSignature$0(DoFnSignatures.java:294)
    at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
    at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getSignature(DoFnSignatures.java:294)
    at org.apache.beam.sdk.transforms.ParDo.validate(ParDo.java:614)
    at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:403)
    at com.streaming.pipelines.ContactPipeline.buildPipeline(ContactPipeline.java:61)
    at com.streaming.pipelines.StreamingApp.main(StreamingApp.java:28)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    ... 8 more

谢谢, 苏雷什

【问题讨论】:

  • 在管道上方运行时是否出现错误?
  • 是的@chamikara,我已经用错误堆栈更新了讨论。

标签: apache-beam


猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-02-02
  • 1970-01-01
  • 2018-05-22
  • 2016-01-23
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多