【问题标题】:How to filter from RDDs and DataFrames in Spark?如何从 Spark 中的 RDD 和 DataFrame 中过滤?
【发布时间】:2016-02-09 17:13:46
【问题描述】:

我有一个.tsv 文件pageviews_by_secondtimestamp siterequestsfields 组成:

"timestamp"              "site"   "requests"
"2015-03-16T00:09:55"   "mobile"    1595
"2015-03-16T00:10:39"   "mobile"    1544
"2015-03-16T00:19:39"   "desktop"   2460

我希望第一行消失,因为它会导致我必须对数据执行的操作出错。

我尝试了以下方式:

1.在拆分之前过滤RDD

val RDD1 = sc.textFile("pageviews_by_second")       
val top_row = RDD1.first() 
//returns: top_row: String = "timestamp"    "site"  "requests"    
val RDD2 = RDD1.filter(x => x!= top_row)
RDD2.first() 
//returns: "2015-03-16T00:09:55"    "mobile"    1595

2.RDD拆分后过滤

val RDD1 = sc.textFile("pageviews_by_second").map(_.split("\t")
RDD1.first()  //returns res0: Array[String] = Array("timestamp, 'site", "requests")
val top_row = RDD1.first()
val RDD2 = RDD1.filter(x => x!= top_row)
RDD2.first() //returns: res1: Array[String] = Array("timestamp", "site" ,"requests")
val RDD2 = RDD1.filter(x => x(0)!="timestamp" && x(1)!="site" && x(2)!="requests")
 RDD2.first() //returns: res1: Array[String] = Array("timestamp", "site" ,"requests")

3.使用“案例类”转换为DataFrame并对其进行过滤

case class Wiki(timestamp: String, site: String, requests: String)
val DF = sc.textFile("pageviews_by_second").map(_.split("\t")).map(w => Wiki(w(0), w(1), w(2))).toDF()
val top_row = DF.first()
//returns: top_row: org.apache.spark.sql.Row = ["timestamp","site","requests"]
DF.filter(_ => _ != top_row)
//returns: error: missing parameter type
val DF2 = DF.filter(_ => _ != top_row2)

为什么只有第一种方法能够过滤掉第一行而其他两种方法不能?在方法 3 中,为什么会出现错误以及如何纠正?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-dataframe


    【解决方案1】:

    您首先需要了解在删除顶行时要比较的数据类型。

    比较两个字符串将在方法 1 中产生真或假。因此它会过滤掉顶行

    在方法 2 中,您正在比较 2 个数组。使用数组的 deep 方法对 scala 中的数组进行更深入的比较

    Method2
    val RDD1 = sc.textFile("D:\\trial.txt").map(_.split("\t"))
    val top_row = RDD1.first()
    val RDD2 = RDD1.filter(x => x.deep!= top_row.deep)
    RDD2.first().foreach(println(_))
    

    在方法 3 中,您正在比较数据框的两行对象。如果将 row 转换为 toSeq 后跟 toArray ,然后使用 deep 方法过滤掉第一行数据框会更好。

    //Method 3    
    DF.filter(_ => _.toSeq.toArray.deep!=top_row.toSeq.toArray.deep)
    

    如果有帮助,请回复。谢谢!!!

    【讨论】:

    【解决方案2】:

    首先,你真的应该使用spark-csv-package - 它可以在创建DataFrame(或rdd)时自动过滤掉标题。您只需指定 :)

    其次,rdds 的排序方式并非您认为的那样。调用 first 不能保证返回 csv 文件的第一行。这是您的第一个场景,显然您确实获得了第一行,但如果您认为自己在这种情况下很幸运,那就更好了。此外,从可能非常大的数据集中删除这样的标头效率非常低,因为 Spark 需要搜索所有行以仅过滤掉一行。

    如果订购对您进行进一步计算很重要,您可以随时发送zipWithIndex。这样,您就可以对rdd 进行排序以保留顺序。

    【讨论】:

    • 谢谢 Glennie,我知道 first() 方法不一定是第一个,我只是很幸运,我不想使用 spark-csv 包,因为我只是在学习和想从头开始编写代码。但是spark-csv 的使用如何不影响使用filter() 的效率呢?在后台,spark-csv 必须执行类似的操作,不是吗?
    • 我还没有阅读代码,但我想spark-csv 只是省略了第一行,当它将数据通过管道传输到rdd :) 另外,我不确定我是否完全了解从头开始编写代码 - 恕我直言,学习 Spark 就是学习 Spark 库以及如何使用它们。
    • 顺便说一句,使用 spark-csv 仅仅意味着你写 sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load(myFile) 而不是 sc.textFile(myFile).map(_.split("\t"))
    • 感谢您的帮助
    【解决方案3】:

    有办法去除标题,而不是深度比较:

    data = sc.textFile('path_to_data') header = data.first() #extract header data = data.filter(lambda x:x !=header) #filter out header

    可能与您相关: How do I skip a header from CSV files in Spark?

    【讨论】:

    • 但这正是我所做的。您的代码适用于 PySpark
    • 你应该投票,因为你得到了正确的参考来找到你的有效答案。
    【解决方案4】:

    我发现了另一种方法,它比我使用的过滤方法更有效。将其作为答案发布,因为其他人可能会发现它有帮助:

    rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
    

    来源:https://stackoverflow.com/a/27857878/3546389

    【讨论】:

      猜你喜欢
      • 2022-11-02
      • 2019-07-16
      • 2020-12-14
      • 2015-06-27
      • 1970-01-01
      • 2020-09-26
      • 2017-01-05
      • 2019-10-20
      • 1970-01-01
      相关资源
      最近更新 更多