【发布时间】:2020-11-25 02:15:37
【问题描述】:
我们正在尝试在流式数据流作业中以特定间隔刷新侧输入。 关注了这个链接 https://beam.apache.org/documentation/patterns/side-inputs/ 但无法在语法上实现它也是正确的。有人可以帮助以正确的方式为下面的代码实现侧输入刷新吗?
PCollection<KV<String, String>> updateVariable = pipeline.apply(JdbcIO.<KV<String, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create("com.mysql.jdbc.Driver", "jdbc:mysql://" + sqlIp + "/" + sqlDb).withUsername(sqlUser)
.withPassword(sqlPwd))
.withQuery(
"select * from OBD_LOOKUP")
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
@Override
public KV<String, String> mapRow(java.sql.ResultSet resultSet) throws Exception {
/ TODO Auto-generated method stub
return KV.of(resultSet.getString(1), resultSet.getString(2));
}
}));
final PCollectionView<Map<String, String>> lookupCollection = updateVariable
.apply("Assign into Global Window",
Window.<KV<String, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.accumulatingFiredPanes())
.apply("SideInputViewFormed", View.<String, String>asMap());
PCollection<String> resultnew = result
.apply(ParDo.of(new ObdLookUpSideInput(lookupCollection)).withSideInputs(lookupCollection));
【问题讨论】:
-
您尝试了指南中的哪个pattern? GenerateSequence 还是 PeriodicImpulse?
标签: google-cloud-platform google-cloud-dataflow dataflow