【发布时间】:2021-10-15 20:34:53
【问题描述】:
我在我的应用程序中使用 Hazelcast 变更数据捕获 (CDC)。 (我使用 CDC 的原因是因为如果使用 jdbc 或其他替代功能将数据加载到缓存中,它会花费很多时间)。所以 CDC 将在数据库和 Hazelcast Jet 之间进行数据同步。
StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
.setCustomProperty("plugin.name", "pgoutput").setDatabaseAddress("127.0.0.1").setDatabasePort(5432)
.setDatabaseUser("postgres").setDatabasePassword("root").setDatabaseName("postgres")
.setTableWhitelist("tblName").build();
这里我有以下步骤
Pipeline pipeline = Pipeline.create();
// filter records based on deleted false
StreamStage<ChangeRecord> deletedFlagRecords = pipeline.readFrom(source).withoutTimestamps()
.filter(deletedFalse);
deletedFlagRecords.filter(idBasedFetch).writeTo(Sinks.logger());
这里我使用StreamSource<ChangeRecord> source 对象作为pipeLine 的输入。如您所知,source 对象是 Stream 类型。但在我的情况下,管道数据过程取决于用户输入数据(一些元数据)。如果我在数据库中进行任何更新或删除。 jet 将在所有流实例中更新。由于我的数据处理取决于用户数据,因此我不想在第一步之后使用流类型。只有第一个StreamSource<ChangeRecord> source; 需要以流的形式。在下一步中,我只想将其用于批处理。那么如何在批处理中使用source。
pipeLine.readFrom(source) //总是返回 Stream 类型。那么如何将其转换为批处理类型。我尝试了另一种方法,例如从source 读取并将所有内容下沉到映射。
pipeLine.readFrom(source).writeTo(Sinks.map("dbStreamedData", e -> e.key(), e -> e.value()));
再次从地图构造 pipeLine readFrom。
pipeline.readFrom(Sources.map("dbStreamedData")).writeTo(Sinks.logger());
这只是返回空数据。所以任何建议都会有所帮助。
【问题讨论】:
标签: java hazelcast hazelcast-jet