【问题标题】:Looping through Map Spark Scala循环通过 Map Spark Scala
【发布时间】:2018-06-04 05:21:25
【问题描述】:

在这段代码中,我们有两个文件:包含名称的运动员.csv 和包含推文消息的 twitter.test。我们想为 twitter.test 中与运动员.csv 中的名称匹配的每一行找到名称 我们应用 map 函数来存储来自于运动员.csv 的名称,并希望将所有名称迭代到测试中的所有行文件。

object twitterAthlete {

  def loadAthleteNames() : Map[String, String] = {

    // Handle character encoding issues:
    implicit val codec = Codec("UTF-8")
    codec.onMalformedInput(CodingErrorAction.REPLACE)
    codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

    // Create a Map of Ints to Strings, and populate it from u.item.
    var athleteInfo:Map[String, String] = Map()
    //var movieNames:Map[Int, String] = Map() 
     val lines = Source.fromFile("../athletes.csv").getLines()
     for (line <- lines) {
       var fields = line.split(',')
       if (fields.length > 1) {
        athleteInfo += (fields(1) -> fields(7))
       }
     }

     return athleteInfo
  }

  def parseLine(line:String): (String)= {
    var athleteInfo = loadAthleteNames()
    var hello = new String
    for((k,v) <- athleteInfo){
      if(line.toString().contains(k)){
        hello = k
      }
    }
    return (hello)
  }


  def main(args: Array[String]){
    Logger.getLogger("org").setLevel(Level.ERROR)

    val sc = new SparkContext("local[*]", "twitterAthlete")

    val lines = sc.textFile("../twitter.test")
    var athleteInfo = loadAthleteNames()

    val splitting = lines.map(x => x.split(";")).map(x => if(x.length == 4 && x(2).length <= 140)x(2)) 

    var hello = new String()
    val container = splitting.map(x => for((key,value) <- athleteInfo)if(x.toString().contains(key)){key}).cache


    container.collect().foreach(println)  

   // val mapping = container.map(x => (x,1)).reduceByKey(_+_)
    //mapping.collect().foreach(println)
  }
}

第一个文件看起来像:

id,name,nationality,sex,height........  
001,Michael,USA,male,1.96 ...
002,Json,GBR,male,1.76 ....
003,Martin,female,1.73 . ...

第二个文件看起来像:

time, id , tweet .....
12:00, 03043, some message that contain some athletes names  , .....
02:00, 03023, some message that contain some athletes names , .....

有些人是这样想的……

但运行此代码后我得到空结果,非常感谢任何建议

我得到的结果是空的:

()....
()...
()...

但我期望的结果是:

(name,1)
(other name,1)

【问题讨论】:

  • 您能否发布文件样本和您的预期输出?
  • 刚刚编辑的问题请看一下谢谢
  • 您可以尝试在 for 循环中使用 yield key 而不是仅使用 key 吗?
  • 正如@philantrovert 所说,您缺少yield 以将值返回到您的地图

标签: scala csv apache-spark twitter dataset


【解决方案1】:

您需要使用yield 将值返回给您的map

 val container = splitting.map(x => for((key,value) <- athleteInfo ; if(x.toString().contains(key)) ) yield (key, 1)).cache

【讨论】:

    【解决方案2】:

    我认为你应该先从最简单的选项开始...

    我会使用 DataFrame,因此您可以使用内置的 CSV 解析并利用 Catalyst、Tungsten 等。

    然后您可以使用内置的 Tokenizer 将推文拆分为单词、爆炸并进行简单的连接。根据运动员姓名数据的大小,您最终将获得更优化的广播连接并避免洗牌。

    import org.apache.spark.sql.functions._
    import org.apache.spark.ml.feature.Tokenizer
    
    val tweets = spark.read.format("csv").load(...)
    val athletes = spark.read.format("csv").load(...)
    
    val tokenizer = new Tokenizer()
    tokenizer.setInputCol("tweet")
    tokenizer.setOutputCol("words")
    
    val tokenized = tokenizer.transform(tweets)
    
    val exploded = tokenized.withColumn("word", explode('words))
    
    val withAthlete = exploded.join(athletes, 'word === 'name)
    
    withAthlete.select(exploded("id"), 'name).show()
    

    【讨论】:

    • 我想您会发现,如果您将 DF 解决方案与您的手动编码解决方案进行比较,那么这个解决方案的性能和可扩展性会更高,尤其是随着数据量的增加。如果觉得有用请点赞。
    猜你喜欢
    • 2013-03-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-02-02
    • 1970-01-01
    相关资源
    最近更新 更多