【问题标题】:Hazelcast Jet change data captureHazelcast Jet 变更数据捕获
【发布时间】:2021-10-15 20:34:53
【问题描述】:

我在我的应用程序中使用 Hazelcast 变更数据捕获 (CDC)。 (我使用 CDC 的原因是因为如果使用 jdbc 或其他替代功能将数据加载到缓存中,它会花费很多时间)。所以 CDC 将在数据库和 Hazelcast Jet 之间进行数据同步。

StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
                .setCustomProperty("plugin.name", "pgoutput").setDatabaseAddress("127.0.0.1").setDatabasePort(5432)
                .setDatabaseUser("postgres").setDatabasePassword("root").setDatabaseName("postgres")
                .setTableWhitelist("tblName").build();

这里我有以下步骤

Pipeline pipeline = Pipeline.create();

// filter records based on deleted false
StreamStage<ChangeRecord> deletedFlagRecords = pipeline.readFrom(source).withoutTimestamps()
                .filter(deletedFalse);

deletedFlagRecords.filter(idBasedFetch).writeTo(Sinks.logger());

这里我使用StreamSource&lt;ChangeRecord&gt; source 对象作为pipeLine 的输入。如您所知,source 对象是 Stream 类型。但在我的情况下,管道数据过程取决于用户输入数据(一些元数据)。如果我在数据库中进行任何更新或删除。 jet 将在所有流实例中更新。由于我的数据处理取决于用户数据,因此我不想在第一步之后使用流类型。只有第一个StreamSource&lt;ChangeRecord&gt; source; 需要以流的形式。在下一步中,我只想将其用于批处理。那么如何在批处理中使用source

pipeLine.readFrom(source) //总是返回 Stream 类型。那么如何将其转换为批处理类型。我尝试了另一种方法,例如从source 读取并将所有内容下沉到映射。

pipeLine.readFrom(source).writeTo(Sinks.map("dbStreamedData", e -> e.key(), e -> e.value()));

再次从地图构造 pipeLine readFrom。

pipeline.readFrom(Sources.map("dbStreamedData")).writeTo(Sinks.logger());

这只是返回空数据。所以任何建议都会有所帮助。

【问题讨论】:

    标签: java hazelcast hazelcast-jet


    【解决方案1】:

    Pipeline.readFrom 返回StreamStageBatchStage,具体取决于源。 Sources.map() 是一个批处理源,它会读取一次地图并完成。 PostgresCdcSources.postgres() 是一个流媒体源,它将连接到数据库并在事件发生时不断返回事件,直到被取消。

    如果这是您的问题,您需要根据您的用例选择来源。

    【讨论】:

      【解决方案2】:

      仅当您需要不断更新数据时,使用 CDC 源才有意义。例如。对数据库中的每个更新做出反应,或者可能将数据加载到地图中,然后在内存中快照的某个时间间隔重复运行批处理作业。

      在这种情况下,您可能希望仅在 CDC 源是最新的之后发生第一个 - 在它从数据库中读取所有 当前状态 并且只接收更新时被制作到数据库中。不幸的是,目前(Hazelcast 5.0)无法使用 Jet API 判断何时发生这种情况。

      您可能可以使用一些特定于域的信息 - 具有您查询的时间戳字段,地图中存在最后插入的记录或类似信息。

      如果您想对数据库表中的数据运行单个批处理作业,您应该使用 jdbc 源。

      (我使用 CDC 的原因是因为如果使用 jdbc 或其他替代功能将数据加载到缓存中会花费很多时间)

      使用 CDC 有其开销,这不是我们通常看到的。使用带有 jdbc 源的纯 SQL 查询(如 SELECT * FROM table)比 CDC 源更快。也许您没有测量处理整个当前状态所需的时间?使用 jdbc 加载数据确实比 CDC 需要更多时间,请向复制者提出问题。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-11-11
        • 2023-03-08
        • 1970-01-01
        • 2021-12-14
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多