【问题标题】:Need to add a new column to a Dataset/Row in Spark, based on all existing columns需要基于所有现有列向 Spark 中的数据集/行添加新列
【发布时间】:2020-12-04 06:10:34
【问题描述】:

我有这个包含这些列的(简化的)Spark 数据集:

"col1", "col2", "col3", "col4"

我想添加一个新列:“结果”。 “result”的值是一个函数的返回值,该函数将所有其他列(“col1”、“col2”、...)值作为参数。

map/foreach 无法更改迭代行,UDF 函数不会将整行作为参数,所以我必须收集 all 列名作为输入,我还必须在 UDF 注册部分中指定每个列类型。

注意事项:

  1. 数据集没有很多行,所以我不介意低性能解决方案。
  2. 数据集确实有很多不同类型的列,因此在 UDF 注册部分指定所有列似乎不是最优雅的解决方案。
  3. 该项目是用 Java 编写的,所以我使用 Java API 与 Spark 进行交互。

我怎样才能实现这种行为?

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    您实际上可以添加一个带有地图的新列。

    df.map { row =>
      val col1 = row.getAs[String]("col1")
      val col2 = row.getAs[String]("col2")
      // etc, extract all your columns
      ....
      val newColumns = col1 + col2
      // do what you need to do to obtain value for a new column
      (col1, col2, ..., newColumn)
    }.toDF("col1", "col2", ..., "new")
    

    在 Java API 方面,这将与一些调整相同:

    data.map((MapFunction<Row, Tuple3<String, String, String>>) row -> {
        String col1 = row.getAs("col1");
        String col2 = row.getAs("col2");
        // whatever you need
        String newColumns = col1 + col2;
        return new Tuple3<>(col1, col2, newColumns);
      }, Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING()))
      .toDF("col1", "col2", ..., "new")
    

    或者,您可以将所有列收集到数组中,然后在 UDF 中处理此数组。

    val transformer = udf { arr: Seq[Any] =>
        // do your stuff but bevare of types
    }
    
    data.withColumn("array", array($"col1", $"col2", ..., $"colN"))
        .select($"col1", $"col2",..., transformer($"array") as "newCol")
    

    【讨论】:

    • 在您的解决方案中,我仍然需要在此行中指定所有行的类型:“Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING())” .我的数据集有几十列,所以很麻烦。
    【解决方案2】:

    我找到了解决问题的方法:

    String[] allColumnsAsStrings = dataset.columns();
    final Column[] allColumns = Arrays.stream(allColumnsAsStrings).toArray(Column[]::new);
    
    UserDefinedFunction addColumnUdf = udf((Row row) -> {
        double score;
        // Calculate stuff based on the row values
        // ...
        return score;
    }, DataTypes.DoubleType
            );
    dataset = dataset.withColumn("score", addColumnUdf.apply(functions.struct(allColumns)));
    
    

    【讨论】:

      猜你喜欢
      • 2018-12-25
      • 2015-11-19
      • 1970-01-01
      • 1970-01-01
      • 2017-01-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多