【发布时间】:2018-06-04 05:21:25
【问题描述】:
在这段代码中,我们有两个文件:包含名称的运动员.csv 和包含推文消息的 twitter.test。我们想为 twitter.test 中与运动员.csv 中的名称匹配的每一行找到名称 我们应用 map 函数来存储来自于运动员.csv 的名称,并希望将所有名称迭代到测试中的所有行文件。
object twitterAthlete {
def loadAthleteNames() : Map[String, String] = {
// Handle character encoding issues:
implicit val codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
// Create a Map of Ints to Strings, and populate it from u.item.
var athleteInfo:Map[String, String] = Map()
//var movieNames:Map[Int, String] = Map()
val lines = Source.fromFile("../athletes.csv").getLines()
for (line <- lines) {
var fields = line.split(',')
if (fields.length > 1) {
athleteInfo += (fields(1) -> fields(7))
}
}
return athleteInfo
}
def parseLine(line:String): (String)= {
var athleteInfo = loadAthleteNames()
var hello = new String
for((k,v) <- athleteInfo){
if(line.toString().contains(k)){
hello = k
}
}
return (hello)
}
def main(args: Array[String]){
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]", "twitterAthlete")
val lines = sc.textFile("../twitter.test")
var athleteInfo = loadAthleteNames()
val splitting = lines.map(x => x.split(";")).map(x => if(x.length == 4 && x(2).length <= 140)x(2))
var hello = new String()
val container = splitting.map(x => for((key,value) <- athleteInfo)if(x.toString().contains(key)){key}).cache
container.collect().foreach(println)
// val mapping = container.map(x => (x,1)).reduceByKey(_+_)
//mapping.collect().foreach(println)
}
}
第一个文件看起来像:
id,name,nationality,sex,height........
001,Michael,USA,male,1.96 ...
002,Json,GBR,male,1.76 ....
003,Martin,female,1.73 . ...
第二个文件看起来像:
time, id , tweet .....
12:00, 03043, some message that contain some athletes names , .....
02:00, 03023, some message that contain some athletes names , .....
有些人是这样想的……
但运行此代码后我得到空结果,非常感谢任何建议
我得到的结果是空的:
()....
()...
()...
但我期望的结果是:
(name,1)
(other name,1)
【问题讨论】:
-
您能否发布文件样本和您的预期输出?
-
刚刚编辑的问题请看一下谢谢
-
您可以尝试在 for 循环中使用
yield key而不是仅使用key吗? -
正如@philantrovert 所说,您缺少
yield以将值返回到您的地图
标签: scala csv apache-spark twitter dataset