【问题标题】:How can I use a custom CombineFn with Combine.GroupedValues?如何将自定义 CombineFn 与 Combine.GroupedValues 一起使用?
【发布时间】:2019-04-24 19:40:35
【问题描述】:

我编写了一个带有输入 KV<String, TableRow> 和输出 KV<String, Iterable<TableRow>> 的 CombineFn。我想使用 Combine.GroupedValues(或 Combine.PerKey),来源似乎暗示这是可能的,但我收到以下错误:

Incorrect number of type arguments for generic method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>) of type Combine; it cannot be parameterized with arguments <String, TableRow, Iterable<TableRow>>

我们使用的是 Beam v2.10。这里的上下文是我们将会话窗口应用于KV&lt;String, TableRow&gt; 的PCollection,然后使用GroupByKey 创建KV&lt;String, Iterable&lt;TableRow&gt;&gt; 的PCollection。在这一步之后,我们的 CombineFn 将每个组减少到 KV&lt;String, Iterable&lt;TableRow&gt;&gt;,即包含基于输入内容创建的 TableRows 的 Iterable。

变换步骤:

public PCollection<KV<String, Iterable<TableRow>>> expand(PCollection<KV<String, TableRow>> rows) {

  // group by step
  PCollection<KV<String, Iterable<TableRow>>> groupedValues = rows.apply(
    "Group by Key",
    GroupByKey.<String, TableRow>Create()
  );

    // combine step
  PCollection<KV<String, Iterable<TableRow>>> combinedValues = groupedValues.apply(
    "Generate New Rows",
    // errors here
    // Incorrect number of type arguments for generic 
    // method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>) 
    // of type Combine; it cannot be parameterized with arguments 
    // <String, TableRow, Iterable<TableRow>>
    Combine.<String, TableRow, Iterable<TableRow>>groupedValues(new CreateEvents())
  );


  return combinedValues;
}

组合功能:

private static class CreateEvents extends CombineFn<KV<String, TableRow>, CreateEvents.Accum, KV<String, Iterable<TableRow>>> {

  @DefaultCoder(AvroCoder.class)
  public static class Accum implements Serializable {
    Double startTime = 0.0;
    Double endTime = 0.0;
  }

  @Override
  public Accum createAccumulator() {
    return new Accum();
  }

  @Override
  public Accum addInput(Accum accumulator, KV<String, TableRow> input) {
    // the earliest and latest times in the set of table rows is set on the accumulator

    return accumulator;
  }

  @Override
  public Accum mergeAccumulators(Iterable<Accum> accumulators) {
    Accum merged = createAccumulator();
    for (Accum accumulator : accumulators) {
      // merge steps happen here to find the earliest and latest times
    }

    return merged;
  }

  @Override
  public KV<String, Iterable<TableRow>> extractOutput(Accum accumulator) {
    // this step will create two rows based on the start and end times found in this function
  }
}

我希望 CombineFn 与 Combine.GroupedValues 兼容,正如文档所暗示的那样。然而,这种情况并非如此。 Combine.PerKey 是另一种选择,但我们还没有找到将它与 CombineFn 一起使用的方法。

相关链接:
Documentation - Combine.GroupedValues
Documentation - Combine.PerKey Documentation - Combine.CombineFn
Source - Combine.GroupedValues
Source - Combine.PerKey Source - Combine.CombineFn

【问题讨论】:

    标签: java google-cloud-dataflow apache-beam


    【解决方案1】:

    CreateEvents 的签名似乎有点不对劲。它应该是private static class CreateEvents extends CombineFn&lt;TableRow, Accum, Iterable&lt;TableRow&gt;&gt;GroupBy 一起使用。这里输入为TableRow,组合输出为Iterable&lt;TableRow&gt;

    这是完整的代码

    public PCollection<KV<String, Iterable<TableRow>>> expand(PCollection<KV<String, TableRow>> rows) {
    
        // group by step
        PCollection<KV<String, Iterable<TableRow>>> groupedValues = rows.apply(
            "Group by Key",
            GroupByKey.<String, TableRow>create()
        );
    
        // combine step
        PCollection<KV<String, Iterable<TableRow>>> combinedValues = groupedValues.apply(
            "Generate New Rows",
            // errors here
            // Incorrect number of type arguments for generic
            // method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>)
            // of type Combine; it cannot be parameterized with arguments
            // <String, TableRow, Iterable<TableRow>>
            Combine.<String, TableRow, Iterable<TableRow>>groupedValues(new CreateEvents())
        );
    
    
        return combinedValues;
      }
    
      private static class CreateEvents extends CombineFn<TableRow, Accum, Iterable<TableRow>> {
    
        @DefaultCoder(AvroCoder.class)
        public static class Accum implements Serializable {
          Double startTime = 0.0;
          Double endTime = 0.0;
        }
    
        @Override
        public Accum createAccumulator() {
          return new Accum();
        }
    
        @Override
        public Accum addInput(Accum accumulator, TableRow input) {
          // the earliest and latest times in the set of table rows is set on the accumulator
    
          return accumulator;
        }
    
        @Override
        public Accum mergeAccumulators(Iterable<Accum> accumulators) {
          Accum merged = createAccumulator();
          for (Accum accumulator : accumulators) {
            // merge steps happen here to find the earliest and latest times
          }
    
          return merged;
        }
    
        @Override
        public Iterable<TableRow> extractOutput(Accum accumulator) {
          // this step will create two rows based on the start and end times found in this function
          return null;
        }
      }
    

    或者,您也可以使用更简洁的方式进行分组和组合,使用Combine.perKey

    
      public PCollection<KV<String, Iterable<TableRow>>> expand(PCollection<KV<String, TableRow>> rows) {
        // combine step
        return rows.apply(Combine.perKey(new CreateEvents()));
      }
    
      private static class CreateEvents extends CombineFn<TableRow, Accum, Iterable<TableRow>> {
    
        @DefaultCoder(AvroCoder.class)
        public static class Accum implements Serializable {
          Double startTime = 0.0;
          Double endTime = 0.0;
        }
    
        @Override
        public Accum createAccumulator() {
          return new Accum();
        }
    
        @Override
        public Accum addInput(Accum accumulator, TableRow input) {
          // the earliest and latest times in the set of table rows is set on the accumulator
    
          return accumulator;
        }
    
        @Override
        public Accum mergeAccumulators(Iterable<Accum> accumulators) {
          Accum merged = createAccumulator();
          for (Accum accumulator : accumulators) {
            // merge steps happen here to find the earliest and latest times
          }
    
          return merged;
        }
    
        @Override
        public Iterable<TableRow> extractOutput(Accum accumulator) {
          // this step will create two rows based on the start and end times found in this function
          return null;
        }
      }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-11-09
      • 2021-05-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-10-18
      • 2012-04-01
      相关资源
      最近更新 更多