【问题标题】:Dataflow sideinput refresh数据流侧输入刷新
【发布时间】:2020-11-25 02:15:37
【问题描述】:

我们正在尝试在流式数据流作业中以特定间隔刷新侧输入。 关注了这个链接 https://beam.apache.org/documentation/patterns/side-inputs/ 但无法在语法上实现它也是正确的。有人可以帮助以正确的方式为下面的代码实现侧输入刷新吗?

PCollection<KV<String, String>> updateVariable = pipeline.apply(JdbcIO.<KV<String, String>>read()
            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                    .create("com.mysql.jdbc.Driver", "jdbc:mysql://" + sqlIp + "/" + sqlDb).withUsername(sqlUser)
                    .withPassword(sqlPwd))
            .withQuery(
                    "select * from OBD_LOOKUP")
            .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
            .withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
                @Override
                public KV<String, String> mapRow(java.sql.ResultSet resultSet) throws Exception {
                    / TODO Auto-generated method stub
                    return KV.of(resultSet.getString(1), resultSet.getString(2));
                }
            }));

   final PCollectionView<Map<String, String>> lookupCollection = updateVariable             
            .apply("Assign into Global Window",
            Window.<KV<String, String>>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                    .accumulatingFiredPanes())
            .apply("SideInputViewFormed", View.<String, String>asMap());

    PCollection<String> resultnew = result
            .apply(ParDo.of(new ObdLookUpSideInput(lookupCollection)).withSideInputs(lookupCollection));

【问题讨论】:

  • 您尝试了指南中的哪个pattern? GenerateSequence 还是 PeriodicImpulse?

标签: google-cloud-platform google-cloud-dataflow dataflow


【解决方案1】:

模式利用

View.asSinglton()

View.asMap 本身就是一个聚合,它将继承触发并累积其输入。在此特定模式中,在此转换之前将管道设置为 accumulatingPanes 模式将导致重复键错误。

更多细节可以在Solve duplicate values with view as map这个问题的答案中找到。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-04-08
    • 1970-01-01
    • 2021-08-15
    • 2010-11-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多