【发布时间】:2019-04-24 19:40:35
【问题描述】:
我编写了一个带有输入 KV<String, TableRow> 和输出 KV<String, Iterable<TableRow>> 的 CombineFn。我想使用 Combine.GroupedValues(或 Combine.PerKey),来源似乎暗示这是可能的,但我收到以下错误:
Incorrect number of type arguments for generic method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>) of type Combine; it cannot be parameterized with arguments <String, TableRow, Iterable<TableRow>>
我们使用的是 Beam v2.10。这里的上下文是我们将会话窗口应用于KV<String, TableRow> 的PCollection,然后使用GroupByKey 创建KV<String, Iterable<TableRow>> 的PCollection。在这一步之后,我们的 CombineFn 将每个组减少到 KV<String, Iterable<TableRow>>,即包含基于输入内容创建的 TableRows 的 Iterable。
变换步骤:
public PCollection<KV<String, Iterable<TableRow>>> expand(PCollection<KV<String, TableRow>> rows) {
// group by step
PCollection<KV<String, Iterable<TableRow>>> groupedValues = rows.apply(
"Group by Key",
GroupByKey.<String, TableRow>Create()
);
// combine step
PCollection<KV<String, Iterable<TableRow>>> combinedValues = groupedValues.apply(
"Generate New Rows",
// errors here
// Incorrect number of type arguments for generic
// method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>)
// of type Combine; it cannot be parameterized with arguments
// <String, TableRow, Iterable<TableRow>>
Combine.<String, TableRow, Iterable<TableRow>>groupedValues(new CreateEvents())
);
return combinedValues;
}
组合功能:
private static class CreateEvents extends CombineFn<KV<String, TableRow>, CreateEvents.Accum, KV<String, Iterable<TableRow>>> {
@DefaultCoder(AvroCoder.class)
public static class Accum implements Serializable {
Double startTime = 0.0;
Double endTime = 0.0;
}
@Override
public Accum createAccumulator() {
return new Accum();
}
@Override
public Accum addInput(Accum accumulator, KV<String, TableRow> input) {
// the earliest and latest times in the set of table rows is set on the accumulator
return accumulator;
}
@Override
public Accum mergeAccumulators(Iterable<Accum> accumulators) {
Accum merged = createAccumulator();
for (Accum accumulator : accumulators) {
// merge steps happen here to find the earliest and latest times
}
return merged;
}
@Override
public KV<String, Iterable<TableRow>> extractOutput(Accum accumulator) {
// this step will create two rows based on the start and end times found in this function
}
}
我希望 CombineFn 与 Combine.GroupedValues 兼容,正如文档所暗示的那样。然而,这种情况并非如此。 Combine.PerKey 是另一种选择,但我们还没有找到将它与 CombineFn 一起使用的方法。
相关链接:
Documentation - Combine.GroupedValues
Documentation - Combine.PerKey
Documentation - Combine.CombineFn
Source - Combine.GroupedValues
Source - Combine.PerKey
Source - Combine.CombineFn
【问题讨论】:
标签: java google-cloud-dataflow apache-beam