【问题标题】:StreamSource vs BatchStage in hazelcast jetHazelcast jet 中的 StreamSource 与 BatchStage
【发布时间】:2021-10-16 07:15:51
【问题描述】:

我在我的应用程序中使用 CDC

CDC 总是以流的形式返回对象。StreamSource<ChangeRecord> source

我的要求:-

1] 使用 cdc 捕获数据库更改并存储在地图中..

2] 在接下来的步骤中,我将根据用户 i/p 数据进行批处理。

这是我的代码。

public Pipeline returnPiple() {

        StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
                .setCustomProperty("plugin.name", "pgoutput").setDatabaseAddress("127.0.0.1").setDatabasePort(5432)
                .setDatabaseUser("postgres").setDatabasePassword("root").setDatabaseName("postgres")
                .setTableWhitelist("tblName").build();

        Pipeline pipeline = Pipeline.create();

        // this object as stream
        pipeline.readFrom(source).withoutTimestamps().filter(deletedFalse)
                .writeTo(Sinks.map("mapStore", e -> e.key(), e -> e.value()));

        // from here I will be doing batch operation based on user i/p
        pipeline.readFrom(Sources.map("mapStore")).writeTo(Sinks.logger());

        return pipeline;
    }

当我试图从mapStore 读取数据时。我越来越空了..

那么如何从 cdc 进行批处理。

【问题讨论】:

    标签: java hazelcast hazelcast-jet


    【解决方案1】:

    您的管道中有两个子管道。第一个是流式传输,将数据从 CDC 源流式传输到 mapStore IMap。另一个读取mapStore IMap 并将其写入记录器。但是,第二个作业是批处理作业 - 它读取地图一次并完成。之后它不会接收来自 CDC 的事件。

    【讨论】:

      【解决方案2】:

      将代码拆分为两个管道,分别提交。

      Pipeline cdcPipeline = Pipeline.create();
      
      cdcPipeline
          .readFrom(source)
          .withoutTimestamps()
          .filter(deletedFalse)
          .writeTo(Sinks.map("mapStore", e -> e.key(), e -> e.value()));
      
      jetService.newJobIfAbsent(cdcPipeline);
      
      // now Jet streams data from your Postgres DB to the IMap in the background
      
      Pipeline batchQuery = Pipeline.create();
      batchQuery
          .readFrom(Sources.map("mapStore"))
          .writeTo(Sinks.logger());
      
      // runs your query once and prints the results
      jetService.newJobIfAbsent(batchQuery).join(); 
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2023-03-08
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多