【问题标题】:Google Dataflow "elementCountExact" aggregation谷歌数据流“elementCountExact”聚合
【发布时间】:2017-06-01 00:15:42
【问题描述】:

我正在尝试将 PCollection<String> 聚合到 PCollection<List<String>> 中,每个元素约 60 个。

它们将被发送到每个请求接受 60 个元素的 API。 目前我正在通过窗口尝试它,但只有elementCountAtLeast,所以我必须将它们收集到一个列表中并再次计数并拆分以防它太长。这非常麻烦,并且会导致很多列表只有很少的元素:

Repeatedly.forever(AfterFirst.of(
                    AfterPane.elementCountAtLeast(maxNrOfelementsPerList),
                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1)))))
            .withAllowedLateness(Duration.ZERO)
            .discardingFiredPanes())
            .apply("CollectIntoLists", Combine.globally(new StringToListCombinator()).withoutDefaults())
            .apply("SplitListsToMaxSize", ParDo.of(new DoFn<List<String>, List<String>>() {
                @ProcessElement
                public void apply(ProcessContext pc) {
                    splitList(pc.element(), maxNrOfelementsPerList).forEach(pc::output);
                }
            }));

是否有任何直接且更一致的方式来进行这种聚合?

【问题讨论】:

    标签: aggregate google-cloud-dataflow


    【解决方案1】:

    这可以使用Dataflow 2.x 中的State API 构建。

    基本上,您会编写一个有两个状态的有状态 DoFn——一个元素数量的计数和一个已缓冲元素的“包”。

    当一个元素到达时,你将它添加到包中并增加计数。然后检查计数,如果是 60,则输出它,并清除两个状态。

    由于有状态 DoFn 的每个键都将在一台机器上运行,因此将元素随机分布在 N 个键上可能会更好,这样您就可以扩展到 N 台机器(多个键可以在一台机器上运行)。

    【讨论】:

    • 非常感谢,我会尽快尝试并标记您的答案。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-01-16
    • 2018-07-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-12-18
    • 1970-01-01
    相关资源
    最近更新 更多