【发布时间】:2021-10-16 07:15:51
【问题描述】:
我在我的应用程序中使用 CDC。
CDC 总是以流的形式返回对象。StreamSource<ChangeRecord> source。
我的要求:-
1] 使用 cdc 捕获数据库更改并存储在地图中..
2] 在接下来的步骤中,我将根据用户 i/p 数据进行批处理。
这是我的代码。
public Pipeline returnPiple() {
StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
.setCustomProperty("plugin.name", "pgoutput").setDatabaseAddress("127.0.0.1").setDatabasePort(5432)
.setDatabaseUser("postgres").setDatabasePassword("root").setDatabaseName("postgres")
.setTableWhitelist("tblName").build();
Pipeline pipeline = Pipeline.create();
// this object as stream
pipeline.readFrom(source).withoutTimestamps().filter(deletedFalse)
.writeTo(Sinks.map("mapStore", e -> e.key(), e -> e.value()));
// from here I will be doing batch operation based on user i/p
pipeline.readFrom(Sources.map("mapStore")).writeTo(Sinks.logger());
return pipeline;
}
当我试图从mapStore 读取数据时。我越来越空了..
那么如何从 cdc 进行批处理。
【问题讨论】:
标签: java hazelcast hazelcast-jet