解决方法是使用UserDefinedFunctions。
让我们一步一步解决问题。
// We need a function which converts strings into maps
// based on the format of request uris
def requestUriToMap(s: String): Map[String, String] = {
s.stripPrefix("/i?").split("&").map(elem => {
val pair = elem.split("=")
(pair(0), pair(1)) // evaluate each element to a tuple
}).toMap
}
// Now we convert this function into a UserDefinedFunction.
import org.apache.spark.sql.functions.{col, udf}
// Given to a request uri string, convert it to map, the correct format is assumed.
val requestUriToMapUdf = udf((requestUri: String) => requestUriToMap(requestUri))
现在,我们进行测试。
// Test data
val df = Seq(
("/i?aid=fptplay&ast=1582163970763&av=4.6.1&did=83295772a8fee349"),
("/i?p=fplay-ottbox-2019&av=2.0.18&nt=wifi&ov=9&tv=1.0.0&tz=GMT%2B07%3A00")
).toDF("request_uri")
df.show(false)
//+-----------------------------------------------------------------------+
//|request_uri |
//+-----------------------------------------------------------------------+
//|/i?aid=fptplay&ast=1582163970763&av=4.6.1&did=83295772a8fee349 |
//|/i?p=fplay-ottbox-2019&av=2.0.18&nt=wifi&ov=9&tv=1.0.0&tz=GMT%2B07%3A00|
//+-----------------------------------------------------------------------+
// Now we execute our UDF to create a column, using the same name replaces that column
val mappedDf = df.withColumn("request_uri", requestUriToMapUdf(col("request_uri")))
mappedDf.show(false)
//+---------------------------------------------------------------------------------------------+
//|request_uri |
//+---------------------------------------------------------------------------------------------+
//|[aid -> fptplay, ast -> 1582163970763, av -> 4.6.1, did -> 83295772a8fee349] |
//|[av -> 2.0.18, ov -> 9, tz -> GMT%2B07%3A00, tv -> 1.0.0, p -> fplay-ottbox-2019, nt -> wifi]|
//+---------------------------------------------------------------------------------------------+
mappedDf.printSchema
//root
// |-- request_uri: map (nullable = true)
// | |-- key: string
// | |-- value: string (valueContainsNull = true)
mappedDf.schema
//org.apache.spark.sql.types.StructType = StructType(StructField(request_uri,MapType(StringType,StringType,true),true))
这就是你想要的。
替代方案:如果您不确定您的字符串是否符合要求,您可以尝试该函数的不同变体,即使该字符串不符合假定格式(不包含 = 或输入是空字符串)。
def requestUriToMapImproved(s: String): Map[String, String] = {
s.stripPrefix("/i?").split("&").map(elem => {
val pair = elem.split("=")
pair.length match {
case 0 => ("", "") // in case the given string produces an array with no elements e.g. "=".split("=") == Array()
case 1 => (pair(0), "") // in case the given string contains no = and produces a single element e.g. "potato".split("=") == Array("potato")
case _ => (pair(0), pair(1)) // normal case e.g. "potato=masher".split("=") == Array("potato", "masher")
}
}).toMap
}