【问题标题】:Create a new column from one of the value available in another columns as an array of Key Value pair从另一列中可用的值之一创建一个新列,作为键值对数组
【发布时间】:2019-10-09 18:14:12
【问题描述】:

我已经从 hive 中提取了一些数据到 dataframe,格式如下。

+--------------------+-----------------+--------------------+---------------+
| NUM_ID|            SIG1|           SIG2|             SIG3|            SIG4|
+----------------------+---------------+--------------------+---------------+
|XXXXX01|[{15695605310...|[{15695605310...|[{15695605310...|[{15695605310...|
|XXXXX02|[{15695604780...|[{15695604780...|[{15695604780...|[{15695604780...|
|XXXXX03|[{15695605310...|[{15695605310...|[{15695605310...|[{15695605310...|
|XXXXX04|[{15695605310...|[{15695605310...|[{15695605310...|[{15695605310...|
|XXXXX05|[{15695605310...|[{15695605310...|[{15695605310...|[{15695605310...|
|XXXXX06|[{15695605340...|[{15695605340...|[{15695605340...|[{15695605340...|
|XXXXX07|[{15695605310...|[{15695605310...|[{15695605310...|[{15695605310...|
|XXXXX08|[{15695605310...|[{15695605310...|[{15695605310...|[{15695605310...|

如果我们只接收一个信号,它将如下所示。

|XXXXX01|[{1569560531000,3.7825},{1569560475000,3.7812},{1569560483000,3.7812},{1569560491000,34.7875}]|
    [{1569560537000,3.7825},{1569560481000,34.7825},{1569560489000,34.7825},{1569560497000,34.7825}]|
    [{1569560505000,34.7825},{1569560513000,34.7825},{1569560521000,34.7825},{1569560527000,34.7825}]|
    [{1569560535000,34.7825},{1569560479000,34.7825},{1569560487000,34.7825}]

对于每个 NUM_ID ,每个 SIG 列都有一个 E 和 V 对数组。

上述数据的架构是

fromHive.printSchema
root
|-- NUM_ID: string (nullable = true)
|-- SIG1: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- E: long (nullable = true)
|    |    |-- V: double (nullable = true)
|-- SIG2: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- E: long (nullable = true)
|    |    |-- V: double (nullable = true)
|-- SIG3: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- E: long (nullable = true)
|    |    |-- V: double (nullable = true)
|-- SIG4: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- E: long (nullable = true)
|    |    |-- V: double (nullable = true)

我的要求是从特定 NUM_ID 的所有列中获取所有 E 值,并创建一个新的 cloumn,在其他列中具有相应的信号值,如下所示。

+-------+-------------+-------+-------+-------+-------+
| NUM_ID|            E| SIG1_V| SIG2_V| SIG3_V| SIG4_V|
+-------+-------------+-------+-------+-------+-------+
|XXXXX01|1569560531000|33.7825|34.7825|   null|96.3354|
|XXXXX01|1569560505000|   null|   null|35.5501|   null|
|XXXXX01|1569560531001|73.7825|   null|   null|   null|
|XXXXX02|1569560505000|34.7825|   null|35.5501|96.3354|
|XXXXX02|1569560531000|33.7825|34.7825|35.5501|96.3354|
|XXXXX02|1569560505001|73.7825|   null|   null|   null|
|XXXXX02|1569560502000|   null|   null|35.5501|96.3354|
|XXXXX03[1569560531000|73.7825|   null|   null|   null|
|XXXXX03|1569560505000|34.7825|   null|35.5501|96.3354|
|XXXXX03|1569560509000|   null|34.7825|35.5501|96.3354|

对于特定的 NUM_ID,所有四个信号列中的 E 值应被视为没有重复的单列,并且相应 E 的 V 值应填充到不同的列中。假设一个 Signal 没有特定 E 的任何 E-V 对,那么该列应该为空。如上图。

提前致谢。任何线索表示赞赏。

为了更好地理解,下面是输入和预期输出的示例结构。

输入:

+-------------------------+-----------------+-----------------+------------------+
| NUM_ID|             SIG1|           SIG2|             SIG3|            SIG4|
+-------------------------+-----------------+-----------------+------------------+
|XXXXX01|[{E1,V1},{E2,V2}]|[{E1,V3},{E3,V4}]|[{E4,V5},{E5,V6}]|[{E5,V7},{E2,V8}] |
|XXXXX02|[{E7,V1},{E8,V2}]|[{E1,V3},{E3,V4}]|[{E1,V5},{E5,V6}]|[{E9,V7},{E8,V8}]|
|XXXXX03|[{E1,V1},{E2,V2}]|[{E1,V3},{E3,V4}]|[{E4,V5},{E5,V6}]|[{E5,V7},{E2,V8}] |

预期输出:



+-------+---+--------+-------+-------+-------+
| NUM_ID|  E| SIG1_V| SIG2_V| SIG3_V| SIG4_V|
+-------+---+-------+-------+-------+-------+
|XXXXX01| E1|     V1|     V3|   null|   null|
|XXXXX01| E2|     V2|   null|   null|     V8|
|XXXXX01| E3|   null|     V4|   null|   null|
|XXXXX01| E4|   null|   null|     V5|   null|
|XXXXX01| E5|   null|   null|     V6|     V7|

|XXXXX02| E1|   null|     V3|     V5|   null|
|XXXXX02| E3|   null|     V4|   null|   null|
|XXXXX02| E5|   null|   null|     V6|   null|
|XXXXX02[ E7|     V1|   null|   null|   null|
|XXXXX02| E8|     V2|   null|   null|     V7|
|XXXXX02| E9|   null|34.7825|   null|     V8|

【问题讨论】:

  • @Nikk- 是否可以编写一个将输入参数作为 Row 的 UDF?
  • 是的,您可以在 UDF 中通过竞争行。如果您需要任何线索,请告诉我。
  • 您能否提供您作为输入提供的数据的预期输出。输入和输出数据似乎存在差异。我可以看到 SIG1 中有四个可能的 E 值,用于 XXXXX01 的输入,但得到不同的输出
  • 我已经给出了一个样本数据作为预期的输出。它不完全是给定输入数据的输出
  • @Nikk- 是的,我尝试通过传递一行来编写 UDF,但无法继续。如果你能提供一些线索,那将会很有帮助。

标签: scala dataframe apache-spark apache-spark-sql


【解决方案1】:

输入的CSV文件如下:

NUM_ID|SIG1|SIG2|SIG3|SIG4 XXXXX01|[{1569560531000,3.7825},{1569560475000,3.7812},{1569560483000,3.7812},{1569560491000,34.7875}]|[{1569560537000,3.7825},{1569560531000, 34.7825},{1569560489000,34.7825},{1569560497000,34.7825}]|[{1569560505000,34.7825},{1569560513000,34.7825},{1569560521000,34.7825},{1569560531000,34.7825}]|[{1569560535000,34.7825},{ 1569560531000,34.7825},{1569560483000,34.7825}

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.expressions.UserDefinedFunction

    val df = spark.read.format("csv").option("header","true").option("delimiter", "|").load("path .csv")
    df.show(false)
    +-------+----------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------+
    |NUM_ID |SIG1                                                                                          |SIG2                                                                                            |SIG3                                                                                             |SIG4                                                                     |
    +-------+----------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------+
    |XXXXX01|[{1569560531000,3.7825},{1569560475000,3.7812},{1569560483000,3.7812},{1569560491000,34.7875}]|[{1569560537000,3.7825},{1569560531000,34.7825},{1569560489000,34.7825},{1569560497000,34.7825}]|[{1569560505000,34.7825},{1569560513000,34.7825},{1569560521000,34.7825},{1569560531000,34.7825}]|[{1569560535000,34.7825},{1569560531000,34.7825},{1569560483000,34.7825}]|
    +-------+----------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------+


    //UDF to generate column E

    def UDF_E:UserDefinedFunction=udf((r: Row)=>{
    val SigColumn = "SIG1,SIG2,SIG3,SIG4"
    val colList = SigColumn.split(",").toList
    val rr = "[\\}],[\\{]".r
    var out = ""
    colList.foreach{ x =>
    val a = (rr replaceAllIn(r.getAs(x).toString, "|")).replaceAll("\\[\\{","").replaceAll("\\}\\]","")
    val b = a.split("\\|").map(x => x.split(",")(0)).toSet
    out = out + "," + b.mkString(",")
    }
    val out1 = out.replaceFirst(s""",""","").split(",").toSet.mkString(",")
    out1
    })


    //UDF to generate column value with Signal

    def UDF_V:UserDefinedFunction=udf((E: String, SIG:String)=>{
    val Signal = SIG.replaceAll("\\{", "\\(").replaceAll("\\}", "\\)").replaceAll("\\[", "").replaceAll("\\]", "")
    val SigMap = "(\\w+),([\\w 0-9 .]+)".r.findAllIn(Signal).matchData.map(i => {(i.group(1), i.group(2))}).toMap
    var out = ""
    if(SigMap.keys.toList.contains(E)){
    out = SigMap(E).toString
    }
    out})

    //new DataFrame with Column "E"
    val df1 = df.withColumn("E", UDF_E(struct(df.columns map col: _*))).withColumn("E", explode(split(col("E"), ",")))

     df1.show(false)
    +-------+----------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------+-------------+
    |NUM_ID |SIG1                                                                                          |SIG2                                                                                            |SIG3                                                                                             |SIG4                                                                     |E            |
    +-------+----------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------+-------------+
    |XXXXX01|[{1569560531000,3.7825},{1569560475000,3.7812},{1569560483000,3.7812},{1569560491000,34.7875}]|[{1569560537000,3.7825},{1569560531000,34.7825},{1569560489000,34.7825},{1569560497000,34.7825}]|[{1569560505000,34.7825},{1569560513000,34.7825},{1569560521000,34.7825},{1569560531000,34.7825}]|[{1569560535000,34.7825},{1569560531000,34.7825},{1569560483000,34.7825}]|1569560483000|
    |XXXXX01|[{1569560531000,3.7825},{1569560475000,3.7812},{1569560483000,3.7812},{1569560491000,34.7875}]|[{1569560537000,3.7825},{1569560531000,34.7825},{1569560489000,34.7825},{1569560497000,34.7825}]|[{1569560505000,34.7825},{1569560513000,34.7825},{1569560521000,34.7825},{1569560531000,34.7825}]|[{1569560535000,34.7825},{1569560531000,34.7825},{1569560483000,34.7825}]|1569560497000|
    |XXXXX01|[{1569560531000,3.7825},{1569560475000,3.7812},{1569560483000,3.7812},{1569560491000,34.7875}]|[{1569560537000,3.7825},{1569560531000,34.7825},{1569560489000,34.7825},{1569560497000,34.7825}]|[{1569560505000,34.7825},{1569560513000,34.7825},{1569560521000,34.7825},{1569560531000,34.7825}]|[{1569560535000,34.7825},{1569560531000,34.7825},{1569560483000,34.7825}]|1569560475000|
    |XXXXX01|[{1569560531000,3.7825},{1569560475000,3.7812},{1569560483000,3.7812},{1569560491000,34.7875}]|[{1569560537000,3.7825},{1569560531000,34.7825},{1569560489000,34.7825},{1569560497000,34.7825}]|[{1569560505000,34.7825},{1569560513000,34.7825},{1569560521000,34.7825},{1569560531000,34.7825}]|[{1569560535000,34.7825},{1569560531000,34.7825},{1569560483000,34.7825}]|1569560489000|
    |XXXXX01|[{1569560531000,3.7825},{1569560475000,3.7812},{1569560483000,3.7812},{1569560491000,34.7875}]|[{1569560537000,3.7825},{1569560531000,34.7825},{1569560489000,34.7825},{1569560497000,34.7825}]|[{1569560505000,34.7825},{1569560513000,34.7825},{1569560521000,34.7825},{1569560531000,34.7825}]|[{1569560535000,34.7825},{1569560531000,34.7825},{1569560483000,34.7825}]|1569560535000|
    |XXXXX01|[{1569560531000,3.7825},{1569560475000,3.7812},{1569560483000,3.7812},{1569560491000,34.7875}]|[{1569560537000,3.7825},{1569560531000,34.7825},{1569560489000,34.7825},{1569560497000,34.7825}]|[{1569560505000,34.7825},{1569560513000,34.7825},{1569560521000,34.7825},{1569560531000,34.7825}]|[{1569560535000,34.7825},{1569560531000,34.7825},{1569560483000,34.7825}]|1569560531000|
    |XXXXX01|[{1569560531000,3.7825},{1569560475000,3.7812},{1569560483000,3.7812},{1569560491000,34.7875}]|[{1569560537000,3.7825},{1569560531000,34.7825},{1569560489000,34.7825},{1569560497000,34.7825}]|[{1569560505000,34.7825},{1569560513000,34.7825},{1569560521000,34.7825},{1569560531000,34.7825}]|[{1569560535000,34.7825},{1569560531000,34.7825},{1569560483000,34.7825}]|1569560513000|
    |XXXXX01|[{1569560531000,3.7825},{1569560475000,3.7812},{1569560483000,3.7812},{1569560491000,34.7875}]|[{1569560537000,3.7825},{1569560531000,34.7825},{1569560489000,34.7825},{1569560497000,34.7825}]|[{1569560505000,34.7825},{1569560513000,34.7825},{1569560521000,34.7825},{1569560531000,34.7825}]|[{1569560535000,34.7825},{1569560531000,34.7825},{1569560483000,34.7825}]|1569560537000|
    |XXXXX01|[{1569560531000,3.7825},{1569560475000,3.7812},{1569560483000,3.7812},{1569560491000,34.7875}]|[{1569560537000,3.7825},{1569560531000,34.7825},{1569560489000,34.7825},{1569560497000,34.7825}]|[{1569560505000,34.7825},{1569560513000,34.7825},{1569560521000,34.7825},{1569560531000,34.7825}]|[{1569560535000,34.7825},{1569560531000,34.7825},{1569560483000,34.7825}]|1569560491000|
    |XXXXX01|[{1569560531000,3.7825},{1569560475000,3.7812},{1569560483000,3.7812},{1569560491000,34.7875}]|[{1569560537000,3.7825},{1569560531000,34.7825},{1569560489000,34.7825},{1569560497000,34.7825}]|[{1569560505000,34.7825},{1569560513000,34.7825},{1569560521000,34.7825},{1569560531000,34.7825}]|[{1569560535000,34.7825},{1569560531000,34.7825},{1569560483000,34.7825}]|1569560521000|
    |XXXXX01|[{1569560531000,3.7825},{1569560475000,3.7812},{1569560483000,3.7812},{1569560491000,34.7875}]|[{1569560537000,3.7825},{1569560531000,34.7825},{1569560489000,34.7825},{1569560497000,34.7825}]|[{1569560505000,34.7825},{1569560513000,34.7825},{1569560521000,34.7825},{1569560531000,34.7825}]|[{1569560535000,34.7825},{1569560531000,34.7825},{1569560483000,34.7825}]|1569560505000|
    +-------+----------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------+-------------+

    //Final DataFrame
    val df2 =  df1.withColumn("SIG1_V", UDF_V(col("E"),col("SIG1"))).withColumn("SIG2_V", UDF_V(col("E"),col("SIG2"))).withColumn("SIG3_V", UDF_V(col("E"),col("SIG3"))).withColumn("SIG4_V", UDF_V(col("E"),col("SIG4"))).drop("SIG1","SIG2","SIG3","SIG4")

    df2.show()
    +-------+-------------+-------+-------+-------+-------+
    | NUM_ID|            E| SIG1_V| SIG2_V| SIG3_V| SIG4_V|
    +-------+-------------+-------+-------+-------+-------+
    |XXXXX01|1569560475000| 3.7812|       |       |       |
    |XXXXX01|1569560483000| 3.7812|       |       |34.7825|
    |XXXXX01|1569560489000|       |34.7825|       |       |
    |XXXXX01|1569560491000|34.7875|       |       |       |
    |XXXXX01|1569560497000|       |34.7825|       |       |
    |XXXXX01|1569560505000|       |       |34.7825|       |
    |XXXXX01|1569560513000|       |       |34.7825|       |
    |XXXXX01|1569560521000|       |       |34.7825|       |
    |XXXXX01|1569560531000| 3.7825|34.7825|34.7825|34.7825|
    |XXXXX01|1569560535000|       |       |       |34.7825|
    |XXXXX01|1569560537000|       | 3.7825|       |       |
    +-------+-------------+-------+-------+-------+-------+

【讨论】:

  • @Antony 我使用了正则表达式 "(\\w+),([\\w 0-9 .]+)" 如果您的数据发生更改,可能需要更改。如果数据格式为 {1569560531000,3.7825} 则没有问题。
  • @Nikk- 创建 E 数组的以下部分给我如下输出,其中没有所有 E 可用于所有信号。 val df1 = df.withColumn("E", UDF_E(struct(df.columns map col: _*)))E栏如下|WrappedArray([1570180028000,WrappedArray([1570180017000|
  • 你确定你使用的是正确的代码,因为我试图复制同样的问题,但它给了我正确的结果。你能用我使用的数据框运行相同的代码吗(检查上面的代码dataframe df) 并确认它是否工作。我保留了 csv 文件,请将其复制到某处并阅读我提到的 run all above 命令。
  • 这是 val out = out+","+a |354476053762294 |[[1570180028000, 53.0], [1570180027000, 53.0], [1570179967000, 53.0]]|[[1570180028000, 58.0], [1570179968000, 58.0], [1570179967000, 58.0]]|[[1570180028000, 58.0], [1570179968000, 58.0], [1570179967000, 58.0]]|[[1570180028000, 59.0], [1570179968000, 59.0], [1570179967000, 59.0]]|[[1570180028000, 57.0], [1570179968000, 57.0], [1570179967000, 57.0]]|[[1570180017000, 58.0], [1570179968000, 58.0], [1570179967000, 58.0]]|,WrappedArray([1570180028000,53.0], 的 o/p
  • [1570180027000,53.0], [1570179967000,53.0]),WrappedArray([1570180028000,58.0], [1570179968000,58.0], [1570179967000,58.0]),WrappedArray([1570180028000,58.0], [1570179968000,58.0], [1570179967000,58.0]),WrappedArray([1570180028000,59.0], [1570179968000,59.0], [1570179967000,59.0]),WrappedArray([1570180028000,57.0], [1570179968000,57.0], [1570179967000,57.0]),WrappedArray([1570180017000,58.0], [1570179968000,58.0], [1570179967000,58.0])|
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-02-26
  • 2020-12-14
  • 1970-01-01
  • 2023-01-08
  • 1970-01-01
  • 1970-01-01
  • 2019-01-01
相关资源
最近更新 更多