【发布时间】:2020-12-13 11:10:23
【问题描述】:
我正在创建一个光束管道来对数据包进行批处理。管道使用 CassandraIO 读取记录。我想分批处理 30 分钟的数据,然后对 30 分钟的数据进行分组/缝合并将其写入另一个表。 我为每个员工准备了 300 个捆绑包,我需要使用有限的资源(~2Gi)处理至少 50 名员工。 但目前堆使用率非常高,因此我只能处理 1 名员工(约 4Gi)。如果我提供更多数据,则会出现内存不足/堆错误。
是他们一次处理 1 名员工的一种方式。就像一个循环,这样我就可以用我的 ~2Gi 顺序处理所有员工
下面是我的示例代码
Pipeline p = Pipeline.create(options);
dataToWritetoDb = p.apply(CassandraIO.<LevelOneInputEntity>read()
.withHosts(Arrays.asList("127.0.0.1")).withPort(9042).withKeyspace("beam")
.withTable("LevelOneInputEntity").withEntity(LevelOneInputEntity.class).withConnectTimeout(1000)
.withReadTimeout(5000).withCoder(SerializableCoder.of(LevelOneInputEntity.class)))
.apply(ParDo.of(new ApplyTimeStampDoFnFunction()))
.apply(Window.<EntityClass>into(FixedWindows.of(Duration.standardMinutes(30)))
.apply("Group and create keyvalue bundles",ParDo.of(new BundleKVDoFn(mapper)))
有没有办法实现以下逻辑,以便管道将为每个员工运行一次?
Pipeline p = Pipeline.create(options);
for (String employeeId : employeelist) {
sql = "select * from LevelOneInputEntity where employeeId = "+employeeId;
dataToWritetoDb = p.apply(CassandraIO.<LevelOneInputEntity>read()
.withHosts(Arrays.asList("127.0.0.1")).withPort(9042).withKeyspace("beam")
.withTable("LevelOneInputEntity").withEntity(LevelOneInputEntity.class).withConnectTimeout(1000).withQuery(sql)
.withReadTimeout(5000).withCoder(SerializableCoder.of(LevelOneInputEntity.class)))
.apply(ParDo.of(new ApplyTimeStampDoFnFunction()))
.apply(Window.<EntityClass>into(FixedWindows.of(Duration.standardMinutes(30)))
.apply("Group and create keyvalue bundles",ParDo.of(new BundleKVDoFn(mapper)))
}
【问题讨论】:
标签: java apache-flink apache-beam