【问题标题】:Looping inside a beam transform. Process sequentially using Apache Beam在光束变换内循环。使用 Apache Beam 按顺序处理
【发布时间】:2020-12-13 11:10:23
【问题描述】:

我正在创建一个光束管道来对数据包进行批处理。管道使用 CassandraIO 读取记录。我想分批处理 30 分钟的数据,然后对 30 分钟的数据进行分组/缝合并将其写入另一个表。 我为每个员工准备了 300 个捆绑包,我需要使用有限的资源(~2Gi)处理至少 50 名员工。 但目前堆使用率非常高,因此我只能处理 1 名员工(约 4Gi)。如果我提供更多数据,则会出现内存不足/堆错误。

是他们一次处理 1 名员工的一种方式。就像一个循环,这样我就可以用我的 ~2Gi 顺序处理所有员工

下面是我的示例代码

Pipeline p = Pipeline.create(options);
 dataToWritetoDb = p.apply(CassandraIO.<LevelOneInputEntity>read()
                    .withHosts(Arrays.asList("127.0.0.1")).withPort(9042).withKeyspace("beam")
                    .withTable("LevelOneInputEntity").withEntity(LevelOneInputEntity.class).withConnectTimeout(1000)
                    .withReadTimeout(5000).withCoder(SerializableCoder.of(LevelOneInputEntity.class)))
.apply(ParDo.of(new ApplyTimeStampDoFnFunction()))
.apply(Window.<EntityClass>into(FixedWindows.of(Duration.standardMinutes(30)))
.apply("Group and create keyvalue bundles",ParDo.of(new BundleKVDoFn(mapper)))

有没有办法实现以下逻辑,以便管道将为每个员工运行一次?

  Pipeline p = Pipeline.create(options);
    
    for (String employeeId : employeelist) {
        sql = "select * from LevelOneInputEntity where employeeId = "+employeeId;
        dataToWritetoDb =  p.apply(CassandraIO.<LevelOneInputEntity>read()
                            .withHosts(Arrays.asList("127.0.0.1")).withPort(9042).withKeyspace("beam")
                            .withTable("LevelOneInputEntity").withEntity(LevelOneInputEntity.class).withConnectTimeout(1000).withQuery(sql)
                            .withReadTimeout(5000).withCoder(SerializableCoder.of(LevelOneInputEntity.class)))
        .apply(ParDo.of(new ApplyTimeStampDoFnFunction()))
        .apply(Window.<EntityClass>into(FixedWindows.of(Duration.standardMinutes(30)))
        .apply("Group and create keyvalue bundles",ParDo.of(new BundleKVDoFn(mapper)))
    }

【问题讨论】:

    标签: java apache-flink apache-beam


    【解决方案1】:

    Beam 没有内置方法来执行此操作,但您可以在昂贵的 DoFn 中添加锁定,它只允许一次处理一个元素。类似的东西

    class ExpensiveDoFn extends DoFn<Input, Output> {
      private static Lock lock;
      @ProcessElement
      public void Process(@Element Input input) {
        lock.lock();
        // Do memory-intensive processing here.
        lock.unlock()
      }
    }
    

    或者,您可以尝试查看程序中的堆转储,看看是否可以优化内存使用

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-11-05
      相关资源
      最近更新 更多