【问题标题】:Spark UDF in Scala for Extracting Relevant DataScala 中的 Spark UDF 用于提取相关数据
【发布时间】:2019-02-10 01:09:58
【问题描述】:

我有一个 Dataframe,其中有一列需要清理。 我期待一个正则表达式模式,它可以应用于 Java/Scala 中的 Spark UDF,它将从字符串中提取有效内容。

userId 列的示例输入行如下 DataFrame 所示:

[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]

名为“userId”的列的预期转换:

一个字符串,看起来像:

105286112|115090439|29818926

我需要修改userId 列的逻辑/方法,以便制作相同的UDF。可以用正则表达式或其他方法实现吗?

输入的 DataFrame 如下所示:

+--------------------+--------------------+
|    dt_geo_cat_brand|        userId      |
+--------------------+--------------------+
|2017-10-30_17-18 ...|[[133207500,2017-...|
|2017-10-19_21-22 ...|[[194112773,2017-...|
|2017-10-29_17-18 ...|[[274188233,2017-...|
|2017-10-29_14-16 ...|[[86281353,2017-1...|
|2017-10-01_09-10 ...|[[92478766,2017-1...|
|2017-10-09_17-18 ...|[[156663365,2017-...|
|2017-10-06_17-18 ...|[[111869972,2017-...|
|2017-10-13_09-10 ...|[[64404465,2017-1...|
|2017-10-13_07-08 ...|[[146355663,2017-...|
|2017-10-22_21-22 ...|[[54096488,2017-1...|
+--------------------+--------------------+

架构:

root
 |-- dt_geo_cat_brand: string (nullable = true)
 |-- userId: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: string (nullable = true)

期望的输出:

+--------------------+--------------------+
|    dt_geo_cat_brand|         userId     |
+--------------------+--------------------+
|2017-10-30_17-18 ...|133207500,1993333444|
|2017-10-19_21-22 ...|122122212,3432323333|
|2017-10-29_17-18 ...|274188233,8869696966|
|2017-10-29_14-16 ...|862813534,444344444,43444343434|
|2017-10-01_09-10 ...|92478766,880342342,4243244432,5554335535|
+--------------------+--------------------+

等等……

【问题讨论】:

  • 但是您为什么要尝试使用正则表达式从数据框中提取数据?
  • 我需要使用从该列中提取的值(数值),以便稍后在处理模型中生成位图。您看到这样的数据的原因是我习惯于 Cassandra 按键对数据进行分组,并根据键将值组合在一起。

标签: java regex scala apache-spark dataframe


【解决方案1】:

使用下面的正则表达式编写 UDF。它将提取所需的内容。

import ss.implicits._

val df = ss.read.csv(path).as("")
df.show()

val reg = "\\[\\[(\\d*).*\\],\\s*\\[(\\d*).*\\],\\s*\\[(\\d*).*" // regex which can extract the required data

val input = "[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]"   // input string
val mat = reg.r.findAllIn(input)  // extracting the data

println(mat)
while (mat.hasNext) {
    mat.next()
    println(mat.group(1) + "|" + mat.group(2)+ "|" +  mat.group(3)) // each group will print the 3 extracted fields
}

输出:

105286112|115090439|29818926

使用 UDF:

import ss.implicits._

    val reg = "\\[\\[(\\d*).*\\],\\s*\\[(\\d*).*\\],\\s*\\[(\\d*).*"

    def reg_func = { (s: String) =>
        {
            val mat = reg.r.findAllIn(s)

            println(mat)
            var out = ""
            while (mat.hasNext) {
                mat.next()
                out = mat.group(1) + "|" + mat.group(2) + "|" + mat.group(3)
            }
            out
        }
    }

    val reg_udf = udf(reg_func)

    val df = ss.read.text(path)
    .withColumn("Extracted_fields", reg_udf($"value"))
    df.show(false)

输入:创建了一些示例第二条记录

[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]
[[105286113,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090440,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818927,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]

输出:

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|value                                                                                                                                                                                       |Extracted_fields            |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|[[105286112,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090439,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818926,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]|105286112|115090439|29818926|
|[[105286113,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [115090440,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX], [29818927,2017-11-19_14-16 >> ABCDE >> GrocersRetail >> XXX]]|105286113|115090440|29818927|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+

【讨论】:

  • 你好,可以提取3个以上的字段,目前你使用的是mat.group(1) + "|" + mat.group(2) + "|" + mat.group(3),我们可以让它动态而不是硬编码吗?
【解决方案2】:

您不需要正则表达式来解决这个问题。数据被格式化为结构数组,并查看架构您想要的是每个结构的_1 字符串。这可以通过提取值的 UDF 来解决,然后将所有内容转换为带有 mkString("|") 的字符串以获得预期的输出:

val extract_id = udf((arr: Seq[Row]) => { 
  arr.map(_.getAs[String](0)).mkString("|")
})

df.withColumn("userId", extract_id($"userId"))

根据评论 #1 添加:

如果要将在dt_geo_cat_brand 上分区的结果保存在 csv 文件中(所有值都在其自己的行中),您可以按如下方式进行。首先,从 udf 返回一个列表而不是一个字符串并使用explode

val extract_id = udf((arr: Seq[Row]) => { 
  arr.map(_.getAs[String](0))
})

val df2 = df.withColumn("userId", explode(extract_id($"userId")))

然后在保存时使用partitionBy(dt_geo_cat_brand)。这将根据dt_geo_cat_brand 列中的值创建一个文件夹结构。根据分区的不同,每个文件夹中 csv 文件的数量可能会有所不同,但它们的值都来自 dt_geo_cat_brand 中的单个值(如果您想要单个文件并有足够的内存,请在保存之前使用 repartition(1))。

df2.write.partitionBy("dt_geo_cat_brand").csv(baseOutputBucketPath)

根据评论 #2 进行补充:

在保存为单独文件时不使用partitionBy,您可以执行以下操作(推荐使用partitioBy 方法)。首先,找到dt_geo_cat_brand中所有不同的值:

val vals = df.select("dt_geo_cat_brand").distinct().as[String].collect()

对于每个值,过滤数据框并保存(在此处使用分解的 df2 数据框作为补充 #1):

vals.foreach { v =>
  df2.filter($"dt_geo_cat_brand" === v)
    .select("userId")
    .write
    .csv(s"$baseOutputBucketPath=$v/")})
}

或者,如果使用该 udf,请不要使用分解的数据帧,而是在 "|" 上拆分:

vals.foreach { v =>
  df.filter($"dt_geo_cat_brand" === v)
    .select(split($"userId", "\\|").as("userId"))
    .write
    .csv(s"$baseOutputBucketPath=$v/")})
}

【讨论】:

  • 嗨@Shaido,帮助! Udf 生成 - ABCD, 2323|4343434|644646|5454545|4756456 EFGH, 456464564|432444|4244554|525454 我想根据第一个列(ABCD/EFGH)存储 id - ABCD 分区应该有一个 csv 文件,其 id 由\n - 222323 323233 323232 。我试过 - import sparkSession.implicits._ dataFrame.collect.foreach(t => { val dt_geo_cat_brand= t.dt_geo_cat_brand val mbid = t.mbid.split("\\|").toList.toDF("mbid") mbid .repartition(1).write.csv(s"$baseOutputBucketPath=$dt_geo_cat_brand/")}) 由于内存问题而失败?如何并行?
  • 嗨@CodeReaper,因为在评论中回答有点太长了,所以我对上面的答案做了补充。希望它有所帮助:)
  • 如果我使用 partitionBy 那么它会导致很多洗牌吗?最初我为每一行连接 ID 的原因是我可以避免基于第一列的 partitionBy。最初我加载了整个数据,在第一列分区,写入分区需要很长时间。因此,我使用 Cassandra 为我获取了 ID 连续出现的 DF。我想避免数据洗牌。
  • @CodeReaper: partitionBy 不应该导致任何数据洗牌,只要您将数据保存在节点所属的分布式文件系统(例如 HDFS)上(但是,repartition 将和collect 会将所有数据放在驱动节点上)。这个答案解释得更多:stackoverflow.com/questions/40416357/…
  • 如果我们使用explode,那么从技术上讲,每一行数据都会被分解成尽可能多的数值。那么 ABCD, 4445,7778,8899 我们会分成 3 行吗?然后当我们使用 partitionBy 时,它必须从所有执行程序中获取与每个键对应的值?此外,我已经根据您的上一个 UDF 处理了数据,并且我正在寻找该阶段之后的解决方案以并行写入数据。
猜你喜欢
  • 2022-12-09
  • 1970-01-01
  • 1970-01-01
  • 2018-09-17
  • 2016-12-02
  • 2017-09-16
  • 1970-01-01
  • 2018-11-22
  • 2017-05-04
相关资源
最近更新 更多