【发布时间】:2020-06-14 22:10:08
【问题描述】:
我正在尝试将 PTransform 应用于 PCollectionTuple,但无法弄清楚编译器为何抱怨。
我想这样做是为了将加入一些 csv 行所需的多个步骤抽象成一个 PTransform(PCollectionTuple 中的每个 PCollection 都包含要加入的 csv 行),而我遇到的问题不在于加入本身,但如何将 PTransform 应用于 PCollectionTuple。
这是我的代码:
static class JoinCsvLines extends DoFn<PCollectionTuple, String[]> {
@ProcessElement
public void processElement(ProcessContext context) {
PCollectionTuple element = context.element();
// TODO: Implement the output
}
}
我这样称呼 PTransform:
TupleTag<String[]> tag1 = new TupleTag<>();
TupleTag<String[]> tag2 = new TupleTag<>();
PCollectionTuple toJoin = PCollectionTuple.of(tag1, csvLines1).and(tag2, csvLines2);
// Can't compile this line
PCollection<String[]> joinedLines = toJoin.apply("JoinLines", ParDo.of(new JoinCsvLines()));
当我将鼠标悬停在未编译的行上方时,IntelliJ IDEA 会输出以下内容:
Required type:
PTransform
<? super PCollectionTuple,
OutputT>
Provided:
SingleOutput
<PCollectionTuple,
String[]>
reason: no instance(s) of type variable(s) InputT exist so that PCollectionTuple conforms to PCollection<? extends InputT>
如何将 PTransform 应用于 PCollectionTuple?
【问题讨论】:
标签: java google-cloud-dataflow apache-beam dataflow