【问题标题】:adding a new column using withColumn from a lookup table dynamically使用查找表中的 withColumn 动态添加新列
【发布时间】:2020-08-31 09:20:22
【问题描述】:

我在 Java 8 中使用 spark-sql-2.4.1v。我有一个场景,我需要从查找表中动态添加一列。

我有带有列的数据框 A, B, C, ..., X,Y, Z

当少数(原始)列(例如:A、B、C)值为 null 时,我需要取/替换列(例如:X、Y、Z)值,否则取原始列值。 我将获取此映射信息作为业务逻辑的一部分。 如果是这种情况,我将遵循下面的硬编码代码

 Dataset<Row>  substitutedDs = ds
                  .withColumn("A",
                             when(col("A").isNull() , col("X").cast(DataTypes.StringType))
                             .otherwise(col("A").cast(DataTypes.StringType))
                          )
                  .withColumn("C",
                             when(col("C").isNull() , col("Z").cast(DataTypes.StringType))
                             .otherwise(col("C").cast(DataTypes.StringType))
                         

哪个工作正常。但我需要动态/可配置地执行此操作以避免硬编码。

我将得到包含“code”和“code_substitutes”列信息的查找表,如下所示

-------------------------
| Code | Code_Substitute |
-------------------------
  A         X
  B         Y
  C         Z
-------------------------

我需要在上面动态构造“submittedDs”,怎么做?

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    使用 Java8,您可以使用此 Stream.reduce() 重载:

    final Dataset<Row> dataframe = ...;
    final Map<String, String> substitutes = ...;
    
    final Dataset<Row> afterSubstitutions = codeSubstitutes.entrySet().stream()
        .reduce(dataframe, (df, entry) ->
                df.withColumn(entry.getKey(), when(/* replace with col(entry.getValue()) when null */)),
                (left, right) -> { throw new IllegalStateException("Can't merge two dataframes. This stream should not be a parallel one!"); }
        );
    

    组合器(最后一个参数)应该合并两个并行处理的数据帧(如果流是 parallel() 流),但我们根本不允许这样做,因为我们只是在 @ 上调用此逻辑987654324@流。


    一个更具可读性/可维护性的版本涉及将上述逻辑提取到专用方法中的额外步骤,例如:

        // ...
        Dataset<Row> nullSafeDf = codeSubstitutes.entrySet().stream()
            .reduce(dataframe, this::replaceIfNull, this::throwingCombiner);
        // ...
    }
    
    
    private Dataset<Row> replaceIfNull(Dataset<Row> df, Map.Entry<String, String> substitution) {
        final String original = substitution.getKey();
        final String replacement = substitution.getValue();
        return df.withColumn(original, when(col(original).isNull(), col(replacement))
                .otherwise(col(original)));
    }
    
    private <X> X throwingCombiner(X left, X right) {
        throw new IllegalStateException("Combining not allowed");
    }
    

    【讨论】:

    • 附带说明,您还可以使用一些虚拟/随机组合器,例如(left, right) -&gt; left,并假设永远不会调用组合器。但是,如果有人将 .stream() 更改为 .parallelStream(),我会说你希望它失败(理想情况下,在测试中)。
    • 如果您不止一次使用这种方法,您可能希望将 throw 提取到一些 throwingCombiner() 实用方法中...
    • 谢谢,但是在这个 reduce(dataframe, (df, entry) ....what 是什么 df ?我们从哪里得到它?
    • 查看 reduce 方法的 Javadoc:第二个参数是一个 BiFunction,您可以控制它获取迄今为止累积的任何数据帧(从原始数据帧开始,作为 @987654332 传入@ param) 和要“附加”到它的新元素(在这种情况下是一个映射条目)并返回“包含”该元素的新 Dataframe ...因此,reduce 方法本身将为它提供 df每次调用,从dataframe 开始,然后对每个条目应用replaceIfNull 的结果。有意义吗?
    • .reduce function using javaapi is not working ...而不是 codeSubstitutes ,我正在尝试使用 List> ...这减少了抛出编译错误 ...该方法类型 Stream> 中的 reduce(Tuple2, BinaryOperator>) 不适用于参数 (Tuple2, ( df, entry) -> {})
    【解决方案2】:

    在 Scala 中,我会这样做

    val substitueMapping: Map[String, String] = ??? //this is your substitute map, this is small as it contains columns and their null substitutes
    
    val df = ??? //this is your main dataframe 
    
    val substitutedDf = substituteMapping.keys().foldLeft(df)((df, k) => {
        df.withColumn(k, when(col(k).isNull, col(substituteMapping(k))).otherwise(col(k)))
        //do approproate casting in above which you have done in post
    })
    

    我认为 foldLeft 在 Java 8 中不存在,您可以通过重复修改变量并在 substituteMapping 上进行迭代来模拟相同的情况。

    【讨论】:

    • 不幸的是,折叠功能在 Java 中不可用。当我尝试使用 stream() 时它不起作用
    • @Dee 感谢 Dee,但如果第一列为空,我需要将一列的值替换为另一列的值 ...
    • “重复修改变量” lambda 内部的任何修改都不能在 lambda 函数外部访问。
    • @BdEngineer 你应该修改var df,你可以从 lambda 访问它
    猜你喜欢
    • 1970-01-01
    • 2021-11-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-18
    • 1970-01-01
    • 2018-09-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多