【问题标题】:Gamma Distribution in Apache beamApache 光束中的伽马分布
【发布时间】:2018-06-01 17:11:27
【问题描述】:

我正在尝试在 Apache Beam 中实现 Gamma 分布。首先,我正在使用 Apache Beam 的 TextIO 类读取 CSV 文件 CSV 文件:

Pipeline p = Pipeline.create();  
p.apply(TextIO.read().from("gs://path/to/file.csv"));  

之后,我应用一个转换来解析 CSV 文件中的每一行并返回一个对象。这里只有我正在尝试执行 Gamma Distribution 操作:

.apply(ParDo.of(new DoFn<String, Entity>() {
@ProcessElement
public void processElement(ProcessContext c) {
    String[] strArr = c.element().split(",");
    ClassxNorms xn = new ClassxNorms();
    xn.setDuration(Double.parseDouble(strArr[0]));
    xn.setAlpha(Double.parseDouble(strArr[1]));
    xn.setBeta(Double.parseDouble(strArr[2]));
    GammaDistribution gdValue = new GammaDistribution(Double.parseDouble(strArr[0]), Double.parseDouble(strArr[1]), Double.parseDouble(strArr[2]));
    System.out.println("gdValue : " + gdValue);
    c.output(xn);
}
}));

我正在创建一个 beamRecord,在下一步中,我会将 beam 记录转换为字符串,以将最终输出写入 Google 存储:

PCollection<String> gs_output_final = xnorm_trig.apply(ParDo.of(new DoFn<BeamRecord, String>() {
                    private static final long serialVersionUID = 1L;
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        c.output(c.element().toString());
                        System.out.println(c.element().toString());
                    }
                })); 
   gs_output_final.apply(TextIO.write().to("gs://output/op_1/Q40test111"));  

我得到了输出,但是伽玛分布操作没有得到实现。任何帮助将不胜感激。

【问题讨论】:

  • ClassxNorms 在做什么?此外,您正在使用gdValue 创建您的伽马分布,但是,我看不到您将其传递到下一步。 注意: 在谷歌云中,如果您在数据流中运行它,则不会保留打印到屏幕,除非您使用记录器或使用直接运行器。

标签: java google-cloud-platform apache-beam


【解决方案1】:

我能够在 Apache Beam 中实现伽马分布。下面是代码 sn-p 供参考:

.apply(ParDo.of(new DoFn<String, ClassxNorms>() { 
    @ProcessElement
    public void processElement(ProcessContext c) throws ParseException {
      String[] strArr = c.element().split(",");
      ClassxNorms xn = new ClassxNorms();
      double sample = new GammaDistribution(Double.parseDouble(strArr[11]), Double.parseDouble(strArr[12])).cumulativeProbability(Double.parseDouble(strArr[6]));
      xn.setDuration(Double.parseDouble(strArr[6]));
      xn.setAlpha(Double.parseDouble(strArr[11]));
      xn.setBeta(Double.parseDouble(strArr[12]));
      xn.setVolume(Double.parseDouble(strArr[13]));
      xn.setSpend(Double.parseDouble(strArr[14]));
      xn.setEfficiency(Double.parseDouble(strArr[15]));
      xn.setXnorm(Double.parseDouble(strArr[16]));
      xn.setKey(strArr[17]);
      xn.setGamma(sample);
      c.output(xn);
    }
  }));

【讨论】:

    猜你喜欢
    • 2018-12-20
    • 1970-01-01
    • 2019-01-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-02-23
    • 2013-03-27
    • 1970-01-01
    相关资源
    最近更新 更多