【发布时间】:2018-06-01 17:11:27
【问题描述】:
我正在尝试在 Apache Beam 中实现 Gamma 分布。首先,我正在使用 Apache Beam 的 TextIO 类读取 CSV 文件 CSV 文件:
Pipeline p = Pipeline.create();
p.apply(TextIO.read().from("gs://path/to/file.csv"));
之后,我应用一个转换来解析 CSV 文件中的每一行并返回一个对象。这里只有我正在尝试执行 Gamma Distribution 操作:
.apply(ParDo.of(new DoFn<String, Entity>() {
@ProcessElement
public void processElement(ProcessContext c) {
String[] strArr = c.element().split(",");
ClassxNorms xn = new ClassxNorms();
xn.setDuration(Double.parseDouble(strArr[0]));
xn.setAlpha(Double.parseDouble(strArr[1]));
xn.setBeta(Double.parseDouble(strArr[2]));
GammaDistribution gdValue = new GammaDistribution(Double.parseDouble(strArr[0]), Double.parseDouble(strArr[1]), Double.parseDouble(strArr[2]));
System.out.println("gdValue : " + gdValue);
c.output(xn);
}
}));
我正在创建一个 beamRecord,在下一步中,我会将 beam 记录转换为字符串,以将最终输出写入 Google 存储:
PCollection<String> gs_output_final = xnorm_trig.apply(ParDo.of(new DoFn<BeamRecord, String>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().toString());
System.out.println(c.element().toString());
}
}));
gs_output_final.apply(TextIO.write().to("gs://output/op_1/Q40test111"));
我得到了输出,但是伽玛分布操作没有得到实现。任何帮助将不胜感激。
【问题讨论】:
-
ClassxNorms在做什么?此外,您正在使用gdValue创建您的伽马分布,但是,我看不到您将其传递到下一步。 注意: 在谷歌云中,如果您在数据流中运行它,则不会保留打印到屏幕,除非您使用记录器或使用直接运行器。
标签: java google-cloud-platform apache-beam