【问题标题】:Scala transform and split String Column to MapType Column in DataframeScala将String列转换并拆分为Dataframe中的MapType列
【发布时间】:2020-12-31 19:21:49
【问题描述】:

我有一个带有列的 DataFrame,其中包含带有字段的跟踪请求 url,看起来像这样

df.show(truncate = false)
+--------------------------------
| request_uri
+-----------------------------------
| /i?aid=fptplay&ast=1582163970763&av=4.6.1&did=83295772a8fee349 ...
| /i?p=fplay-ottbox-2019&av=2.0.18&nt=wifi&ov=9&tv=1.0.0&tz=GMT%2B07%3A00 ...
| ...

我需要将此列转换为如下所示的内容

df.show(truncate = false)
+--------------------------------
| request_uri
+--------------------------------
| (aid -> fptplay, ast -> 1582163970763, tz -> [timezone datatype], nt -> wifi , ...) 
| (p -> fplay-ottbox-2019, av -> 2.0.18, ov -> 9, tv -> 1.0.0 , ...) 
| ...

基本上,我必须将字段名称 (delimiter = "&" ) 及其值拆分为某种 MapType,然后将其添加到列中。

谁能给我指点如何编写自定义函数将字符串列拆分为 MapType 列? 我被告知要使用 withColumn() 和 mapPartition,但我不知道如何以拆分字符串并将它们转换为 MapType 的方式实现它。

我们衷心感谢任何帮助,即使是微不足道的帮助。我对 Scala 完全陌生,并且已经坚持了一周。

【问题讨论】:

标签: scala dataframe apache-spark


【解决方案1】:

解决方法是使用UserDefinedFunctions

让我们一步一步解决问题。

// We need a function which converts strings into maps
// based on the format of request uris
def requestUriToMap(s: String): Map[String, String] = {
  s.stripPrefix("/i?").split("&").map(elem => {
    val pair = elem.split("=")     
    (pair(0), pair(1)) // evaluate each element to a tuple
  }).toMap
}

// Now we convert this function into a UserDefinedFunction.
import org.apache.spark.sql.functions.{col, udf}
// Given to a request uri string, convert it to map, the correct format is assumed.
val requestUriToMapUdf = udf((requestUri: String) => requestUriToMap(requestUri))

现在,我们进行测试。

// Test data
val df = Seq(
  ("/i?aid=fptplay&ast=1582163970763&av=4.6.1&did=83295772a8fee349"),
  ("/i?p=fplay-ottbox-2019&av=2.0.18&nt=wifi&ov=9&tv=1.0.0&tz=GMT%2B07%3A00")
).toDF("request_uri")

df.show(false)
//+-----------------------------------------------------------------------+
//|request_uri                                                            |
//+-----------------------------------------------------------------------+
//|/i?aid=fptplay&ast=1582163970763&av=4.6.1&did=83295772a8fee349         |
//|/i?p=fplay-ottbox-2019&av=2.0.18&nt=wifi&ov=9&tv=1.0.0&tz=GMT%2B07%3A00|
//+-----------------------------------------------------------------------+

// Now we execute our UDF to create a column, using the same name replaces that column
val mappedDf = df.withColumn("request_uri", requestUriToMapUdf(col("request_uri")))
mappedDf.show(false)
//+---------------------------------------------------------------------------------------------+
//|request_uri                                                                                  |
//+---------------------------------------------------------------------------------------------+
//|[aid -> fptplay, ast -> 1582163970763, av -> 4.6.1, did -> 83295772a8fee349]                 |
//|[av -> 2.0.18, ov -> 9, tz -> GMT%2B07%3A00, tv -> 1.0.0, p -> fplay-ottbox-2019, nt -> wifi]|
//+---------------------------------------------------------------------------------------------+

mappedDf.printSchema
//root
// |-- request_uri: map (nullable = true)
// |    |-- key: string
// |    |-- value: string (valueContainsNull = true)

mappedDf.schema
//org.apache.spark.sql.types.StructType = StructType(StructField(request_uri,MapType(StringType,StringType,true),true))

这就是你想要的。


替代方案:如果您不确定您的字符串是否符合要求,您可以尝试该函数的不同变体,即使该字符串不符合假定格式(不包含 = 或输入是空字符串)。

def requestUriToMapImproved(s: String): Map[String, String] = {
  s.stripPrefix("/i?").split("&").map(elem => {
    val pair = elem.split("=")
    pair.length match {
      case 0 => ("", "") // in case the given string produces an array with no elements e.g. "=".split("=") == Array() 
      case 1 => (pair(0), "") // in case the given string contains no = and produces a single element e.g. "potato".split("=") == Array("potato")
      case _ => (pair(0), pair(1)) // normal case e.g. "potato=masher".split("=") == Array("potato", "masher")
    }
  }).toMap
}

【讨论】:

  • 谢谢,这段代码 sn-p 运行良好。但是当我用我原来的 df 交换你的 Seq df 时,它在“(pair(0),pair(1))”行抛出了 ArrayIndexOutOfBoundsException。我不明白为什么。
  • @VũJean,这意味着&s 之间的字符串之一在文本正文中没有=,因此我们所做的拆分只是返回字符串一个数组,因此该数组没有第二个元素。 "potato".split("=") => Array(potato) 有一个元素。所以问题是,这些字符串应该怎么处理?
  • 字符串也可以完全为空。确保过滤掉空记录。
  • 我刚刚添加了一个处理不合格字符串情​​况的函数。因此,如果字符串不包含=,它将生成"key" -> "",如果给定字符串为空,则生成"" -> ""
【解决方案2】:

以下代码执行两阶段拆分过程。因为 uris 没有特定的结构,所以您可以使用如下 UDF 来实现:

val keys = List("aid", "p", "ast", "av", "did", "nt", "ov", "tv", "tz")

def convertToMap(keys: List[String]) = udf  {
  (in : mutable.WrappedArray[String]) =>
    in.foldLeft[Map[String, String]](Map()){ (a, str) =>
      keys.flatMap { key =>
        val regex = s"""${key}="""
        val arr = str.split(regex)
        val value = {
          if(arr.length == 2) arr(1)
            else ""
        }

        if(!value.isEmpty)
          a + (key -> value)
        else
          a
      }.toMap
    }
}

df.withColumn("_tmp",
  split($"request_uri","""((&)|(\?))"""))
  .withColumn("map_result", convertToMap(keys)($"_tmp"))
  .select($"map_result")
  .show(false)

它给出了一个 MapType 列:

+------------------------------------------------------------------------------------------------+
|map_result                                                                                      |
+------------------------------------------------------------------------------------------------+
|Map(aid -> fptplay, ast -> 1582163970763, av -> 4.6.1, did -> 83295772a8fee349)                 |
|Map(av -> 2.0.18, ov -> 9, tz -> GMT%2B07%3A00, tv -> 1.0.0, p -> fplay-ottbox-2019, nt -> wifi)|
+------------------------------------------------------------------------------------------------+ 

【讨论】:

    猜你喜欢
    • 2017-02-26
    • 1970-01-01
    • 2019-03-03
    • 1970-01-01
    • 2022-01-27
    • 2023-04-03
    • 2018-11-29
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多