【发布时间】:2017-06-01 00:15:42
【问题描述】:
我正在尝试将 PCollection<String> 聚合到 PCollection<List<String>> 中,每个元素约 60 个。
它们将被发送到每个请求接受 60 个元素的 API。 目前我正在通过窗口尝试它,但只有elementCountAtLeast,所以我必须将它们收集到一个列表中并再次计数并拆分以防它太长。这非常麻烦,并且会导致很多列表只有很少的元素:
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(maxNrOfelementsPerList),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1)))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply("CollectIntoLists", Combine.globally(new StringToListCombinator()).withoutDefaults())
.apply("SplitListsToMaxSize", ParDo.of(new DoFn<List<String>, List<String>>() {
@ProcessElement
public void apply(ProcessContext pc) {
splitList(pc.element(), maxNrOfelementsPerList).forEach(pc::output);
}
}));
是否有任何直接且更一致的方式来进行这种聚合?
【问题讨论】:
标签: aggregate google-cloud-dataflow