【问题标题】:How to group by a dataframe of a specific class如何按特定类的数据框分组
【发布时间】:2019-08-23 12:45:55
【问题描述】:

我有一个具有此架构的数据框:

|-- Agreement_A1: string (nullable = true)
|-- Line_A1: string (nullable = true)
|-- Line_A2: string (nullable = true)

我用这段代码创建了一个新的数据框:

val df2 = df.map(row => new MapResultRequestLine().apply(row))(Encoders.bean(classOf[AgreementLine]))

函数 apply() 是这样的:

public AgreementLine apply(Row row) {
        AgreementLine agrLine = new AgreementLine();
        agrLine.Agreement_A1 = row.getAs("Agreement_A1");
        Line res = new Line();
        res.Line_A1 = row.getAs("Line_A1");
        res.Line_A2 = row.getAs("Line_A2");
        agrLine.line = res 
        return agrLine;
    }

类协议线如下所示:

public class AgreementLine{
    public String agreementCrocCode;
    public Line line;
}

类行是这样的:

public class Line{
    public String Line_A1;
    public String Line_A2;
}

如何对 df2 进行分组,使结果数据框有 Agreement_A1 列和 Line 列表?

我试过这样:

val groupedDF = df2.groupBy($"Agreement_A1").agg(collect_set((array($"line"))).as("lines"))

但它显示错误“无法解析'Agreement_A1'给定输入列:[];”

【问题讨论】:

  • df2 是 AgreementLine 类型的数据集,该类中没有 Agreement_A1。您尝试在 df2 中不存在的列 (Agreement_A1) 上 groupBy。您可以使用 df2.printSchema 检查数据集的架构
  • printSchema 仅显示空括号“()”。虽然 Count() 显示有行

标签: scala apache-spark apache-spark-sql apache-spark-dataset


【解决方案1】:

问题就在这里:

val df2 = df.map(row => new MapResultRequestLine().apply(row))(Encoders.bean(classOf[AgreementLine]))

scala 没有显示数据类型,所以你认为它是一个 DataFrame(as DataSet[Row])。

但实际上,它是一个 DataSet(作为 DataSet[AgreementLine])。感谢您的编码器,它丢失了所有架构,这就是您的 df2.printSchema 返回空结果的原因。

因此,当您调用df2.groupBy($"Agreement_A1") 时,它会抛出异常,因为没有名为“Agreement_A1”的列。

显然,解决方案是更新 DataSet 的架构(在您的情况下为 df2)。

遗憾的是,我不知道该怎么做(我也是新手)。

我唯一的解决方案是将数据集转换回 RDD[Row](如果您想使用 df2.rdd,请提及它是 RDD[AgreementLine]),并使用自定义架构构建一个新的 DataFrame。

希望你能得到更好的解决方案。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-07-15
    • 2017-01-31
    • 2019-08-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多