【问题标题】:how to match 2 column with each other in Apache Spark - Pyspark如何在 Apache Spark 中匹配 2 列 - Pyspark
【发布时间】:2018-04-09 22:58:54
【问题描述】:

我有一个数据框,所以假设我的数据是表格格式。

|ID   |       Serial               |    Updated
-------------------------------------------------------
|10   |pers1                       |                  |
|20   |                            |                  |
|30   |entity_1, entity_2, entity_3|entity_1, entity_3|

现在使用 withColumn("Serial", explode(split(",")"Serial")))。我已经实现了将列分成多行,如下所示。这是要求的第一部分。

   |ID   |       Serial    |    Updated
    -------------------------------------------------------
    |10   |pers1           |                  |
    |20   |                |                  |
    |30   |entity_1        |entity_1, entity_3|
    |30   |entity_2        |entity_1, entity_3|
    |30   |entity_3        |entity_1, entity_3| 

现在对于没有值的列,它应该是 0, 对于 'Serial' 列中存在的值,应在 'Updated' 列中进行搜索。如果该值存在于“Updated”列中,则应显示“1”否则为“2”

所以在这种情况下,对于 entity_1 && entity_3 --> 必须显示 1 & 对于 entity_2 --> 应该显示 2

如何实现这个..?

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql spark-dataframe


    【解决方案1】:

    AFAIK,如果不使用udf,就无法直接检查一列是否包含在另一列中或是否是另一列的子字符串。

    但是,如果您想避免使用udf,一种方法是分解"Updated" 列。然后,您可以检查 "Serial" 列和展开的 "Updated" 列之间的相等性并应用您的条件(如果匹配则为 1,否则为 2)- 调用此 "contains"

    最后,您可以groupBy("ID", "Serial", "Updated") 并选择"contains" 列的最小值。

    例如,在两次调用explode() 并检查您的情况后,您将拥有这样的DataFrame:

    df.withColumn("Serial", f.explode(f.split("Serial", ",")))\
        .withColumn("updatedExploded", f.explode(f.split("Updated", ",")))\
        .withColumn(
            "contains",
            f.when(
                f.isnull("Serial") | 
                f.isnull("Updated") | 
                (f.col("Serial") == "") | 
                (f.col("Updated") == ""),
                0
            ).when(
                f.col("Serial") == f.col("updatedExploded"),
                1
            ).otherwise(2)
        )\
        .show(truncate=False)
    #+---+--------+-----------------+---------------+--------+
    #|ID |Serial  |Updated          |updatedExploded|contains|
    #+---+--------+-----------------+---------------+--------+
    #|10 |pers1   |                 |               |0       |
    #|20 |        |                 |               |0       |
    #|30 |entity_1|entity_1,entity_3|entity_1       |1       |
    #|30 |entity_1|entity_1,entity_3|entity_3       |2       |
    #|30 |entity_2|entity_1,entity_3|entity_1       |2       |
    #|30 |entity_2|entity_1,entity_3|entity_3       |2       |
    #|30 |entity_3|entity_1,entity_3|entity_1       |2       |
    #|30 |entity_3|entity_1,entity_3|entity_3       |1       |
    #+---+--------+-----------------+---------------+--------+
    

    ("ID", "Serial", "Updated") 分组并取"contains" 的最小值的“技巧”之所以有效,是因为:

    • 如果"Serial""Updated" 为null(或在这种情况下等于空字符串),则值为0。
    • 如果"Updated" 中的至少一个值与"Serial" 匹配,则其中一列的值为1。
    • 如果没有匹配项,您将只有 2 个

    最终输出:

    df.withColumn("Serial", f.explode(f.split("Serial", ",")))\
        .withColumn("updatedExploded", f.explode(f.split("Updated", ",")))\
        .withColumn(
            "contains",
            f.when(
                f.isnull("Serial") |
                f.isnull("Updated") |
                (f.col("Serial") == "") |
                (f.col("Updated") == ""),
                0
            ).when(
                f.col("Serial") == f.col("updatedExploded"),
                1
            ).otherwise(2)
        )\
        .groupBy("ID", "Serial", "Updated")\
        .agg(f.min("contains").alias("contains"))\
        .sort("ID")\
        .show(truncate=False)
    #+---+--------+-----------------+--------+
    #|ID |Serial  |Updated          |contains|
    #+---+--------+-----------------+--------+
    #|10 |pers1   |                 |0       |
    #|20 |        |                 |0       |
    #|30 |entity_3|entity_1,entity_3|1       |
    #|30 |entity_2|entity_1,entity_3|2       |
    #|30 |entity_1|entity_1,entity_3|1       |
    #+---+--------+-----------------+--------+
    

    我是chaining callspyspark.sql.functions.when() 来检查条件。第一部分检查任一列是否为null 或等于空字符串。我相信您可能只需要在实际数据中检查null,但我会根据您显示示例DataFrame的方式检查空字符串。

    【讨论】:

      猜你喜欢
      • 2015-07-28
      • 2020-08-10
      • 2015-09-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-03-03
      • 2017-10-11
      相关资源
      最近更新 更多