【问题标题】:Apache Beam Resampling of columns based on date基于日期的列的 Apache Beam 重采样
【发布时间】:2021-10-20 06:21:15
【问题描述】:

我正在使用 ApacheBeam 处理数据并尝试实现以下目标。

  1. 从 CSV 文件中读取数据。 (已完成)
  2. 根据客户 ID 对记录进行分组(已完成)
  3. 根据月份重新采样数据并计算该特定月份的总和。

详细解释:

我有一个如下所示的 CSV 文件。

customerId date amount
BS:89481 11/14/2012 124
BS:89480 11/14/2012 234
BS:89481 11/10/2012 189
BS:89480 11/02/2012 987
BS:89481 09/14/2012 784
BS:89480 11/14/2012 056

中间阶段: 按 customerId 分组并按日期排序

customerId date amount
BS:89481 09/14/2012 784
BS:89481 11/10/2012 189
BS:89481 11/14/2012 124
BS:89480 11/02/2012 987
BS:89480 11/14/2012 234
BS:89480 11/14/2012 056

预期输出(重新采样) 在这里,我们计算单个客户在该特定月份的所有金额的总和。例如:客户 BS:89481 在 11 月有两次支出,因此我们计算了该月的总和 (124 + 189)。

customerId date amount
BS:89481 09/30/2012 784
BS:89481 11/30/2012 313
BS:89480 11/02/2012 1277

已经完成了step1和step2,不知道step3怎么实现。

Schema schema = new Schema.Parser().parse(schemaFile);

    Pipeline pipeline = Pipeline.create();

    // Reading schema
    org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

    final PCollectionTuple tuples = pipeline

            // Reading csv input
            .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

            // Reading files that matches conditions //PRashanth needs to be looked at
            .apply("2", FileIO.readMatches())

            // Reading schema and validating with schema and converts to row and returns
            // valid and invalid list
            .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                    TupleTagList.of(invalidTag())));

    // Fetching only valid rows

    final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));

    // Step2
    //Convert row to KV for grouping
    StringToKV stringtoKV = new StringToKV();
    stringtoKV.setColumnName("customerId");
    PCollection<KV<String, Row>> kvOrderRows = rows.apply(ParDo.of(stringtoKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
            //setCoder(KvCoder.of(VoidCoder.of()), rows.getCoder()));

    
    // Obtain a PCollection of KeyValue class of 
    PCollection<KV<String,Iterable<Row>>> kvIterableForm = kvOrderRows.apply(GroupByKey.<String,Row>create());

更新

架构转换:

    {
      "type" : "record",
      "name" : "Entry",
      "namespace" : "transform",
      "fields" : [  {
        "name" : "customerId",
        "type" : [ "string", "null" ]
      }, {
        "name" : "date",
        "type" : [ "long", "null" ]
      }, {
        "name" : "amount",
        "type" : [ "double", "null" ]
      }]
    }

CSV 文件

customerId date amount
BS:89481 11/14/2012 124
BS:89480 11/14/2012 234
BS:89481 11/10/2012 189
BS:89480 11/02/2012 987
BS:89481 09/14/2012 784
BS:89480 11/14/2012 056
class StringToKV1 extends DoFn<Row, KV<String, Row>> {

        private static final long serialVersionUID = -8093837716944809689L;
        String columnName=null;

        @ProcessElement
        public void processElement(ProcessContext context) {
            Row row = context.element();
            context.output(KV.of(row.getValue(columnName), row));
        }
        
        public void setColumnName(String columnName) {
            this.columnName = columnName;
        }
    }

代码:

        public class GroupByTest {
            public static void main(String[] args) throws IOException {
                System.out.println("We are about to start!!");

                final File schemaFile = new File(
                        "C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\schema_transform2.avsc");

                File csvFile = new File(
                        "C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\CustomerRequest-case2.csv");
                Schema schema = new Schema.Parser().parse(schemaFile);

                Pipeline pipeline = Pipeline.create();

                // Reading schema
                org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

                final PCollectionTuple tuples = pipeline

                        // Reading csv input
                        .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

                        // Reading files that matches conditions //PRashanth needs to be looked at
                        .apply("2", FileIO.readMatches())

                        // Reading schema and validating with schema and converts to row and returns
                        // valid and invalid list
                        .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                                TupleTagList.of(invalidTag())));

                // Fetching only valid rows
                final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));

                // Transformation
                //Convert row to KV
                StringToKV1 stringtoKV1 = new StringToKV1();
                stringtoKV1.setColumnName("customerId");
                PCollection<KV<String, Row>> kvOrderRows = rows.apply(ParDo.of(stringtoKV1)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));

                
                // Will throw error
                // rows.apply(Group.byFieldNames("customerId", "date").aggregateField("amount", Sum.ofIntegers(), //"totalAmount"));
                System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@  "+Group.byFieldNames("customerId", "date")
                 .aggregateField("amount", Sum.ofIntegers(), "totalAmount").getName());                 
                
                pipeline.run().waitUntilFinish();
                System.out.println("The end");

            }

            private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
                String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
                LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
                if (logicalType != null) {
                    type = logicalType.getName();
                }

                switch (type) {
                case "string":
                    return row.getString(columnName);
                case "int":
                    return Objects.requireNonNull(row.getInt32(columnName)).toString();
                case "bigint":
                    return Objects.requireNonNull(row.getInt64(columnName)).toString();
                case "double":
                    return Objects.requireNonNull(row.getDouble(columnName)).toString();
                case "timestamp-millis":
                    return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();

                default:
                    return row.getString(columnName);

                }
            }



        }

更正的代码:

        final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
                .aggregateField("amount", Sum.ofDoubles(), "sumAmount");

        final PCollection<Row> aggregagte = rows.apply(combine);

        PCollection<String> pOutput = aggregagte.apply(ParDo.of(new RowToString()));

我得到的输出是

预期输出

customerId date amount
BS:89481 09/30/2012 784
BS:89481 11/30/2012 313
BS:89480 11/30/2012 1277

【问题讨论】:

    标签: java apache-beam


    【解决方案1】:

    既然您已经拥有Rows 中的PCollection,那么您可以使用Schema-aware PTransforms。对于您的情况,您可能希望使用“分组聚合”,它可以是这样的:

    Group.byFieldNames("customerId", "date")
         .aggregateField("amount", Sum.ofIntegers(), "totalAmount")
    

    它将按客户 ID 和日期对行进行分组,然后计算每天的总金额。如果您想每月计算它,那么您需要创建一个新列(或修改当前列),其中只有一个月份格式的日期,并按 id 和该列分组。

    此外,使用Selected.flattenedSchema 来展平输出模式也很有用。 Beam Schema API 允许以非常简单有效的方式使用 Schema-aware PCollections。

    另一种选择是使用KVs 手动实现您的 GroupBy/Aggregate 逻辑,但它更复杂且容易出错,因为它需要更多样板代码。

    【讨论】:

    • 抱歉听起来很傻,我不确定如何处理您的建议。我是apacheBeam的新手。我什至尝试查看 Group.byFieldNames 的几个示例。我什至看过这个例子。 github.com/apache/beam/blob/master/sdks/java/core/src/test/java/… 但不知道如何让它工作。
    • 我尝试将 Group.byFieldNames("customerId", "date") .aggregateField("amount", Sum.ofIntegers(), "totalAmount") 添加到我的代码中,但它只是结果打印出 Group.CombineFieldsByFields。我已经尝试了一整天来让它工作。
    • 你能在这里分享你的管道代码吗?
    • 我已经更新了代码!!
    • 执行“rows.apply(Group.byFieldNames()...)”时出现什么错误?
    【解决方案2】:

    感谢@Alexey Romanenko 提供的帮助

    最终解决方案是:

    public class GroupByTest {
        public static void main(String[] args) throws IOException {
            System.out.println("We are about to start!!");
    
            final File schemaFile = new File(
                    "C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\schema_transform2.avsc");
    
            File csvFile = new File(
                    "C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\CustomerRequest-case2.csv");
            Schema schema = new Schema.Parser().parse(schemaFile);
    
            Pipeline pipeline = Pipeline.create();
    
            // Reading schema
            org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
    
            final PCollectionTuple tuples = pipeline
    
                    // Reading csv input
                    .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
    
                    // Reading files that matches conditions //PRashanth needs to be looked at
                    .apply("2", FileIO.readMatches())
    
                    // Reading schema and validating with schema and converts to row and returns
                    // valid and invalid list
                    .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                            TupleTagList.of(invalidTag())));
    
            // Fetching only valid rows
            final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
    
            // Transformation
            //Convert row to KV
            final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
                .aggregateField("balance", Sum.ofDoubles(), "balances");
    
            final PCollection<Row> aggregagte = rows.apply(combine);
    
            PCollection<String> pOutput=aggregagte.apply(Select.flattenedSchema()).apply(ParDo.of(new RowToString()));
            
                            
            
            pipeline.run().waitUntilFinish();
            System.out.println("The end");
    
        }
    
        private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
            String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
            LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
            if (logicalType != null) {
                type = logicalType.getName();
            }
    
            switch (type) {
            case "string":
                return row.getString(columnName);
            case "int":
                return Objects.requireNonNull(row.getInt32(columnName)).toString();
            case "bigint":
                return Objects.requireNonNull(row.getInt64(columnName)).toString();
            case "double":
                return Objects.requireNonNull(row.getDouble(columnName)).toString();
            case "timestamp-millis":
                return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();
    
            default:
                return row.getString(columnName);
    
            }
        }
    
    
    
    }
    

    【讨论】:

      猜你喜欢
      • 2018-05-22
      • 1970-01-01
      • 2017-05-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-09-29
      • 2020-11-18
      • 1970-01-01
      相关资源
      最近更新 更多