【问题标题】:How to compare 2 columns and concatenate in Scala如何在Scala中比较2列并连接
【发布时间】:2019-06-10 18:02:31
【问题描述】:

这是我作为程序输入的文本文件:

Id       Title Copy
B2002010 gyh   1
D2001001 abc   12
M2003005 zxc   3
D2002003 qwe   13
M2001002 efg   1
D2001004 asd   6
D2003005 zxc   3
M2001006 wer   6
D2001006 wer   6
B2004008 sxc   10
D2002007 sdf   9
D2004008 sxc   10

ID 格式为Xyyyyrrr 其中:

  • XB => 书籍或 M => 杂志
  • yyyy 是年份
  • rrr 是随机数。

我要做的是:获取同一年的书籍或杂志的总副本数。另外,对“复制”列进行小数据清理,如果我发现数字以外的内容,我会将其替换为“0”。

我的 Spark 项目在 Eclipse 上,我正在使用 Maven 和 Scala IDE 我需要使用 MapReduce 函数。

我已经启动了拆分文本文件的地图功能。

这是我开始的代码:

package bd.spark_app

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import scala.io.Source
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
import org.apache.log4j._
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.Row
import scala.Array

object alla {
  def main(args:Array[String]) = {
    val conf = new SparkConf().setMaster("local").setAppName("trying")
    val sc = new SparkContext(conf)
    val x = sc.textFile("/home/hadoopusr/sampledata")

    x.map(_.split(" ")).foreach(r => 
      println(r(0).dropRight(3), r(2))
    )

    sc.stop()
  }
} 

这是我上面展示的 Map 函数的结果

(B2002,1)
(D2001,12)
(M2003,3)
(D2002,13)
(M2001,1)
(D2001,6)
(D2003,3)
(M2001,6)
(D2001,6)
(B2004,10)
(D2002,9)
(D2004,10)
(M2004,11)
(D2004,11)

我只需要某种 reduce 函数,它会抓取同一年的所有书籍和杂志,并将副本数加在一起,并检查“副本”列是否为数字

例如:记录(B2002,12)(B2002,16) 的结果应该是(B2002,28)

【问题讨论】:

  • 您提到需要书籍和杂志的计数,但没有尝试过滤掉第三个值(D)。
  • 是的,我可能错过了。但是对于书籍、杂志和字典,同样的工作也会完成

标签: eclipse scala apache-spark


【解决方案1】:

可以使用方法“reduceByKey”:

val converted = x.map(_.split(" ")).map(r => (r(0).dropRight(3), r(2).toInt))
val result = converted.reduceByKey(_ + _)

输出:

(M2001,7)
(D2001,24)
(M2003,3)
(D2003,3)
(D2002,22)
(D2004,10)
(B2002,1)
(B2004,10)

注意:看起来输入文件是“csv”格式,最好使用“spark.read.csv”来读取数据,并且使用DataFrame而不是RDD。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-01-03
    • 2020-08-25
    • 2011-02-06
    • 2014-01-02
    • 2022-12-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多