【问题标题】:Spark: how to process certain column content individually in dataframe?Spark:如何在数据框中单独处理某些列内容?
【发布时间】:2021-07-09 14:02:48
【问题描述】:

数据结构是这样的:

id name data
001 aaa true,false,false
002 bbb true,true,true
003 ccc false,true,true

我想将数据中的结果按照映射表中对应的顺序映射到它们的名字上。详细来说,第一步是获取数据中False的序号,然后在映射表中通过序号获取名称。

比如第一条记录有两个False,它们的索引号分别是2和3,所以映射结果是code2和code3。另外,第二条记录全部为真,所以映射结果为空字符串。

映射表:("code1","code2","code3")

预期结果:

id name data
001 aaa code2,code3
002 bbb
003 ccc code1

是否可以在数据框中实现这一点?

【问题讨论】:

  • 你能详细说明 true,false,false 如何映射到 code2,code3 和 true,true,true 映射到空吗?
  • @linusRian 抱歉忘记提及映射逻辑。详情已更新。
  • cnidaye,谢谢,抱歉耽搁了,我看到这个已经回答了:)

标签: apache-spark apache-spark-sql


【解决方案1】:

如果您使用的是 spark 3+,您可以将 filtertransform 函数用作

val df = Seq(
  ("001", "aaa", "true,false,false"),
  ("002", "bbb", "true,true,true"),
  ("003", "ccc", "false,true,true"),
).toDF("id", "name", "data")

val cols = Seq("col1", "col2", "col3")

val dfNew = df.withColumn("data", split($"data", ","))
  .withColumn("mapping", arrays_zip($"data", typedLit(cols)))
  .withColumn("new1", filter($"mapping", (c: Column) => c.getField("data") === "false"))
  .withColumn("data", transform($"new1", (c: Column) => c.getField("1")))
  .drop("new1", "mapping")

dfNew.show(false) 

输出:

+---+----+------------+
|id |name|data        |
+---+----+------------+
|001|aaa |[col2, col3]|
|002|bbb |[]          |
|003|ccc |[col1]      |
+---+----+------------+

【讨论】:

    【解决方案2】:

    以下内容应该有效,但请注意,它具有 posexplodeexplode 和具有位置值的 array),如果您拥有庞大的数据集,这可能是一项成本高昂的操作。

    val df = Seq(
    ("001", "aaa", "true,false,false"),
    ("002", "bbb", "true,true,true"),
    ("003", "ccc", "false,true,true")
    ).toDF("id", "name", "data")
    
    val codes = Seq(
    (0, "code1"),
    (1, "code2"),
    (2, "code3")
    ).toDF("code_id", "codes")
    
    
    val df1 = df.select($"*", posexplode(split($"data", ",")))
      .join(codes, $"pos" === $"code_id")
      .withColumn( "codes", when($"col" === "false", $"codes").otherwise(null) )
    
    //+---+----+----------------+---+-----+-------+-----+
    //| id|name|            data|pos|  col|code_id|codes|
    //+---+----+----------------+---+-----+-------+-----+
    //|001| aaa|true,false,false|  0| true|      0| null|
    //|001| aaa|true,false,false|  1|false|      1|code2|
    //|001| aaa|true,false,false|  2|false|      2|code3|
    //|002| bbb|  true,true,true|  0| true|      0| null|
    //|002| bbb|  true,true,true|  1| true|      1| null|
    //|002| bbb|  true,true,true|  2| true|      2| null|
    //|003| ccc| false,true,true|  0|false|      0|code1|
    //|003| ccc| false,true,true|  1| true|      1| null|
    //|003| ccc| false,true,true|  2| true|      2| null|
    //+---+----+----------------+---+-----+-------+-----+
    
    val finalDf = df1.groupBy($"id", $"name").agg(concat_ws(",", collect_list($"codes")).as("data"))
    
    //+---+----+-----------+
    //| id|name|       data|
    //+---+----+-----------+
    //|002| bbb|           |
    //|001| aaa|code2,code3|
    //|003| ccc|      code1|
    //+---+----+-----------+
    

    【讨论】:

    • 您好,感谢您的回答。我也同意这将是一个昂贵的操作,所以我正在考虑如果我使用df.map() 函数并将我的处理逻辑放在map() 函数中是否更有效?由于映射表存储在每台机器中并仅在本地处理它们,这似乎减少了额外的 IO 压力?
    • @cnidaye 如果您不能使用下面的 Spark 3 答案,您也可以尝试将性能与 Spark UDF 进行比较。您的操作应该很容易在 UDF 中实现。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多