【问题标题】:Transforming one column into multiple ones in a Spark Dataframe在 Spark Dataframe 中将一列转换为多列
【发布时间】:2017-02-20 02:11:36
【问题描述】:

我有一个具有这种结构的大数据框(或多或少 1.2GB):

+---------+-------------+------------------------ -------------------------------------------------- --------------------------------------------+ |国家 |日期数据 |正文 | +---------+-------------+------------------------ -------------------------------------------------- --------------------------------------------+ | 《欧洲联盟》 | "2016-10-03" | "T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12\nT_ADD: Sec_1_P\n ......\nT_R: 45ee" | | 《欧洲联盟》 | "2016-10-03" | "T_D: QQAA\nT_NAME: name_2\nT_IN: ind_2\nT_C: c1ws12\nT_ADD: Sec_1_P\n ......\nT_R: 46ee" | | . | . | . | | . | . | . | | 《欧洲联盟》 | "2016-10-03" | "T_D: QQWE\nT_NAME: name_300000\nT_IN: ind_65\nT_C: c1ws12\nT_ADD: Sec_1_P\n ......\nT_R: 47aa" | +---------+-------------+------------------------ -------------------------------------------------- --------------------------------------------+

行数为 300.000,“文本”字段为大约 5000 个字符的字符串。

我想在这个新字段中分离字段“文本”:

+---------+------------+------+-------------+-- ---+--------+---------+--------+------+ |国家 |日期数据 | t_d | t_name | t_in | t_c | t_add | ...... | t_r | +---------+------------+------+-------------+-- ---+--------+---------+--------+------+ | EEUU | 2016-10-03 | QQWE |名称_1 | ind_1 | c1ws12 | Sec_1_P | ...... | 45ee | | EEUU | 2016-10-03 | QQAA |名称_2 | ind_2 | c1ws12 | Sec_1_P | ...... | 45ee | | . | . | . | . | . | . | . | . | | | . | . | . | . | . | . | . | . | | | . | . | . | . | . | . | . | . | | | EEUU | 2016-10-03 | QQWE | name_300000 | ind_65 | c1ws12 | Sec_1_P | ...... | 47aa | +---------+------------+------+-------------+-- ---+--------+---------+--------+------+

目前,我正在使用正则表达式来解决这个问题。首先,我编写正则表达式并创建一个函数来从文本中提取单个字段(总共 90 个正则表达式):

val D_text = "((?<=T_D: ).*?(?=\\\\n))".r
val NAME_text = "((?<=nT_NAME: ).*?(?=\\\\n))".r
val IN_text = "((?<=T_IN: ).*?(?=\\\\n))".r
val C_text = "((?<=T_C: ).*?(?=\\\\n))".r
val ADD_text = "((?<=T_ADD: ).*?(?=\\\\n))".r
        .
        .
        .
        .
val R_text = "((?<=T_R: ).*?(?=\\\\n))".r   

//UDF function:
 def getFirst(pattern2: scala.util.matching.Regex) = udf(
          (url: String) => pattern2.findFirstIn(url) match { 
              case Some(texst_new) => texst_new
              case None => "NULL"
              case null => "NULL"
          }
   )

然后,我创建了一个新的数据框 (tbl_separate_fields ),因为该函数使用正则表达式从文本中提取每个新字段。

val tbl_separate_fields = hiveDF.select(
          hiveDF("country"),
          hiveDF("date_data"),   
          getFirst(D_text)(hiveDF("texst")).alias("t_d"),
          getFirst(NAME_text)(hiveDF("texst")).alias("t_name"),
          getFirst(IN_text)(hiveDF("texst")).alias("t_in"),
          getFirst(C_text)(hiveDF("texst")).alias("t_c"),
          getFirst(ADD_text)(hiveDF("texst")).alias("t_add"),
                            .
                            .
                            .
                            .

        getFirst(R_text)(hiveDF("texst")).alias("t_r") 

        )

最后,我将这个数据框插入到 Hive 表中:

tbl_separate_fields.registerTempTable("tbl_separate_fields") 
hiveContext.sql("INSERT INTO TABLE TABLE_INSERT PARTITION (date_data)  SELECT * FROM tbl_separate_fields")

此解决方案对整个数据帧持续 1 小时,因此我希望优化并减少执行时间。有什么解决办法吗?

我们正在使用 Hadoop 2.7.1Apache-Spark 1.5.1。 Spark 的配置是:

val conf = new SparkConf().set("spark.storage.memoryFraction", "0.1")
val sc = new SparkContext(conf)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

提前致谢。

编辑数据:

+---------+-------------+------------------------ -------------------------------------------------- --------------------------------------------+ |国家 |日期数据 |正文 | +---------+-------------+------------------------ -------------------------------------------------- --------------------------------------------+ | 《欧洲联盟》 | "2016-10-03" | "T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12\nT_ADD: Sec_1_P\n ......\nT_R: 45ee" | | 《欧洲联盟》 | "2016-10-03" | "T_NAME: name_2\nT_D: QQAA\nT_IN: ind_2\nT_C: c1ws12 ......\nT_R: 46ee" | | . | . | . | | . | . | . | | 《欧洲联盟》 | "2016-10-03" | "T_NAME: name_300000\nT_ADD: Sec_1_P\nT_IN: ind_65\nT_C: c1ws12\n ......\nT_R: 47aa" | +---------+-------------+------------------------ -------------------------------------------------- --------------------------------------------+

【问题讨论】:

    标签: scala apache-spark dataframe hadoop apache-spark-sql


    【解决方案1】:

    在这种情况下使用正则表达式既慢又脆弱。

    如果您知道所有记录具有相同的结构,即所有“文本”值具有相同的“部分”编号顺序,则以下代码将工作(对于任意数量的列),主要利用org.apache.spark.sql.functions 中的split 函数:

    import org.apache.spark.sql.functions._
    
    // first - split "text" column values into Arrays
    val textAsArray: DataFrame = inputDF
      .withColumn("as_array", split(col("text"), "\n"))
      .drop("text")
      .cache()
    
    // get a sample (first row) to get column names, can be skipped if you want to hard-code them:
    val sampleText = textAsArray.first().getAs[mutable.WrappedArray[String]]("as_array").toArray
    val columnNames: Array[(String, Int)] = sampleText.map(_.split(": ")(0)).zipWithIndex
    
    // add Column per columnName with the right value and drop the no-longer-needed as_array column
    val withValueColumns: DataFrame = columnNames.foldLeft(textAsArray) {
      case (df, (colName, index)) => df.withColumn(colName, split(col("as_array").getItem(index), ": ").getItem(1))
    }.drop("as_array")
    
    withValueColumns.show()
    // for the sample data I created, 
    // with just 4 "parts" in "text" column, this prints:
    // +-------+----------+----+------+-----+------+
    // |country| date_data| T_D|T_NAME| T_IN|   T_C|
    // +-------+----------+----+------+-----+------+
    // |   EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12|
    // |   EEUU|2016-10-03|QQAA|name_2|ind_2|c1ws12|
    // +-------+----------+----+------+-----+------+
    

    或者,如果上面的假设不成立,可以使用UDF将文本列转换为Map,然后对硬编码的部分进行类似的reduceLeft操作所需列的列表:

    import sqlContext.implicits._
    
    // sample data: not the same order, not all records have all columns:
    val inputDF: DataFrame = sc.parallelize(Seq(
      ("EEUU", "2016-10-03", "T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12"),
      ("EEUU", "2016-10-03", "T_D: QQAA\nT_IN: ind_2\nT_NAME: name_2")
    )).toDF("country", "date_data", "text")
    
    // hard-coded list of expected column names:
    val columnNames: Seq[String] = Seq("T_D", "T_NAME", "T_IN", "T_C")
    
    // UDF to convert text into key-value map
    val asMap = udf[Map[String, String], String] { s =>
      s.split("\n").map(_.split(": ")).map { case Array(k, v) => k -> v }.toMap
    }
    
    
    val textAsMap = inputDF.withColumn("textAsMap", asMap(col("text"))).drop("text")
    
    // for each column name - lookup the value in the map
    val withValueColumns: DataFrame = columnNames.foldLeft(textAsMap) {
      case (df, colName) => df.withColumn(colName, col("textAsMap").getItem(colName))
    }.drop("textAsMap")
    
    withValueColumns.show()
    // prints:
    // +-------+----------+----+------+-----+------+
    // |country| date_data| T_D|T_NAME| T_IN|   T_C|
    // +-------+----------+----+------+-----+------+
    // |   EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12|
    // |   EEUU|2016-10-03|QQAA|name_2|ind_2|  null|
    // +-------+----------+----+------+-----+------+
    

    【讨论】:

    • 感谢您的回答。正如您所说的那样,如果“知道所有记录具有相同的结构,即所有“文本”值具有相同数量和“部分”顺序”,则此解决方案是有效的。在我们的特殊情况下,文本的结构可以随机改变(顺序、“部分”的数量、重复的部分等等)。问题末尾添加了一个更好的示例。
    • 我明白了 - 更新了答案并提供了解决方案。
    猜你喜欢
    • 1970-01-01
    • 2016-12-01
    • 2018-04-17
    • 2021-09-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-26
    • 1970-01-01
    相关资源
    最近更新 更多