【问题标题】:How to extract efficiently multiple columns from a single string column RDD?如何从单个字符串列 RDD 中有效地提取多列?
【发布时间】:2020-05-09 00:16:25
【问题描述】:

我有一个包含 20 多列的文件,我想从中提取一些。到目前为止,我有以下代码。我确信有一种聪明的方法可以做到这一点,但无法让它成功运行。有什么想法吗?

mvnmdata 是 RDD[String] 类型

val strpcols = mvnmdata.map(x => x.split('|')).map(x => (x(0),x(1),x(5),x(6),x(7),x(8),x(9),x(10),x(11),x(12),x(13),x(14),x(15),x(16),x(17),x(18),x(19),x(20),x(21),x(22),x(23) ))```

【问题讨论】:

  • 上述代码执行时出现了什么样的错误?是不是像“元组的元素太多”?
  • 没有错误,我得到了预期的结果。但我在想是否有一条捷径可以在不重复“x(n)”的情况下编写相同的命令。

标签: scala apache-spark


【解决方案1】:

下一个解决方案提供了一种简单且可扩展的方式来管理您的列名和索引。它基于确定列名/索引关系的映射。该地图还将帮助我们处理提取列的索引及其名称。

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructType, StructField}

val rdd = spark.sparkContext.parallelize(Seq(
"1|500|400|300",
"1|34|67|89",
"2|10|20|56",
"3|2|5|56",
"3|1|8|22"))

val dictColums = Map("c0" -> 0, "c2" -> 2)

// create schema from map keys
val schema = StructType(dictColums.keys.toSeq.map(StructField(_, StringType, true)))

val mappedRDD = rdd.map{line => line.split('|')}
                    .map{
                      cols => Row.fromSeq(dictColums.values.toSeq.map{cols(_)})
                    }

val df = spark.createDataFrame(mappedRDD, schema).show

//output
+---+---+
| c0| c2|
+---+---+
|  1|400|
|  1| 67|
|  2| 20|
|  3|  5|
|  3|  8|
+---+---+
  • 首先我们声明dictColums,在本例中,我们将提取列“c0”-> 0 和“c2”-> 2
  • 接下来我们根据地图的键创建架构
  • 第一个映射(您已经拥有)将用| 分割行,第二个映射将创建一个Row,其中包含与dictColums.values 的每个项目对应的值

更新:

您还可以从上述功能创建一个函数,以便能够多次重复使用它:

import org.apache.spark.sql.DataFrame

def stringRddToDataFrame(colsMapping: Map[String, Int], rdd: RDD[String]) : DataFrame = {
  val schema = StructType(colsMapping.keys.toSeq.map(StructField(_, StringType, true)))

  val mappedRDD = rdd.map{line => line.split('|')}
                    .map{
                      cols => Row.fromSeq(colsMapping.values.toSeq.map{cols(_)})
                    }

  spark.createDataFrame(mappedRDD, schema)
}

然后将其用于您的情况:

val cols = Map("c0" -> 0, "c1" -> 1, "c5" -> 5, ... "c23" -> 23)

val df = stringRddToDataFrame(cols, rdd)

【讨论】:

  • 感谢您的解决方案。我需要一点时间来了解每个步骤背后发生了什么!感谢您的时间。
  • 一定要慢慢来。这里有一些进一步的细节: 1. 我们基于 dictColums 映射的键创建模式。首先,我们通过将每个键映射到一个新的 StructField 实例来创建一个应该用 StructField 填充的 StructType,其中键是名称,类型将是 StringType(如果您愿意,可以更改它以支持更多类型)。在我们的例子中,c0、c1 将是模式的列名。 2.我们在|上分割每一行,这将产生一个字符串数组
  • 3.对于每个数组,我们生成一个新的 Row 实例,其中包含我们用dictColums.values.toSeq.map{cols(_)} 提取的元素。最后一个使用字典的值从数组中提取所需的列,这实际上将是所需的数组索引。 c0 的索引为 0,c2 的索引为 2。这两个x(0), x(2) 对应的值也将是新创建的行的内容。 4. 最后我们使用 mappedRDD 和 schema 填充一个新的数据框
【解决方案2】:

如下,如果不想写重复的x(i),可以循环处理。示例 1:

val strpcols = mvnmdata.map(x => x.split('|'))
  .map(x =>{
    val xbuffer = new ArrayBuffer[String]()
    for (i <- Array(0,1,5,6...)){
      xbuffer.append(x(i))
    }
    xbuffer
  })

如果您只想定义带有 start&end 的索引列表以及要排除的数字,请参见下面的示例 2:

scala> (1 to 10).toSet
res8: scala.collection.immutable.Set[Int] = Set(5, 10, 1, 6, 9, 2, 7, 3, 8, 4)

scala> ((1 to 10).toSet -- Set(2,9)).toArray.sortBy(row=>row)
res9: Array[Int] = Array(1, 3, 4, 5, 6, 7, 8, 10)

你想要的最终代码:

  //define the function to process indexes
  def getSpecIndexes(start:Int, end:Int, removedValueSet:Set[Int]):Array[Int] = {
    ((start to end).toSet -- removedValueSet).toArray.sortBy(row=>row)
  }

  val strpcols = mvnmdata.map(x => x.split('|'))
    .map(x =>{
      val xbuffer = new ArrayBuffer[String]()
      //call the function
      for (i <- getSpecIndexes(0,100,Set(3,4,5,6))){
        xbuffer.append(x(i))
      }
      xbuffer
    })

【讨论】:

  • 谢谢。这就是我要找的东西!
  • 抱歉,没看到。现在改不了了。上面的解决方案在我的版本(Scala 2.11)中不起作用,您是在哪个版本中编写的?
  • 您能描述一下您遇到的错误吗?我的 Scala 版本也是 2.11。
  • 您可以只定义一个函数,然后在示例1的map块中调用该函数。
  • 现在我已经写好了上面的最终代码。getSpecIndexes函数正是你想要的。看看吧。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-12-06
  • 2019-05-19
  • 2020-04-18
  • 1970-01-01
  • 1970-01-01
  • 2022-06-10
相关资源
最近更新 更多