【问题标题】:Scala how to use reduceBykey when I have two keysScala当我有两个键时如何使用reduceBykey
【发布时间】:2018-06-01 02:58:40
【问题描述】:

一行数据格式:

id: 123456  
Topiclist: ABCDE:1_8;5_10#BCDEF:1_3;7_11 

一个id可以有很多行:

id: 123456 
Topiclist:ABCDE:1_1;7_2;#BCDEF:1_2;7_11# 

目标:(123456, (ABCDE,9,2),(BCDEF,5,2))

主题列表中的记录被#分割,所以ABCDE:1_8;5_10是一条记录。

记录的格式为<topicid>:<topictype>_<topicvalue>

例如ABCDE:1_8

topicid = ABCDE

主题类型 = 1

主题值 = 8

目标:求和TopicType1的总和,和TopicType1的计数频率 所以应该是(id, (topicid, value,frequency)),例如:(123456, (ABCDE,9,2),(BCDEF,5,2))

【问题讨论】:

  • 你有任何代码示例来说明你到目前为止所做的事情吗?
  • 一行是什么意思?那是一条线还是三条线?而 reduceBykey 是 spark 的 api。
  • 我更好奇主题类型的来源。

标签: scala mapreduce


【解决方案1】:

假设您的数据是“123456!ABCDE:1_8;5_10#BCDEF:1_3;7_11”和“123456!ABCDE:1_1;7_2#BCDEF:1_2;7_11”,所以我们使用“!”获取您的用户 ID“123456”

rdd.map{f=>
          val userID = f.split("!")(0)
          val items = f.split("!")(1).split("#")
          var result = List[Array[String]]()
          for (item <- items){
            val topicID = item.split(":")(0)
            for (topicTypeValue <- item.split(":")(1).split(";") ){
              println(topicTypeValue);
              if (topicTypeValue.split("_")(0)=="1"){result = result:+Array(topicID,topicTypeValue.split("_")(1),"1") }
            }
          }
          (userID,result)
          }
    .flatMapValues(x=>x).filter(f=>f._2.length==3)
    .map{f=>( (f._1,f._2(0)),(f._2(1).toInt,f._2(2).toInt) )}
    .reduceByKey{case(x,y)=> (x._1+y._1,x._2+y._2) }
    .map(f=>(f._1._1,(f._1._2,f._2._1,f._2._2)))   // (userID, (TopicID,valueSum,frequences) )

输出是 ("12345",("ABCDE",9,2)), ("12345",("BCDEF",5,2)) 与您的输出略有不同,您可以将此结果分组,如果你真的需要 ("12345",("ABCDE",9,2), ("BCDEF",5,2))

【讨论】:

  • 非常感谢!很有用
  • 错误:取消注册 ApplicationMaster 失败(诊断消息:用户类抛出异常:java.lang.UnsupportedOperationException:不支持 scala.Iterable[(String, Double, Double)] 类型的架构)跨度>
猜你喜欢
  • 2017-02-05
  • 2015-03-26
  • 2020-12-09
  • 2019-08-20
  • 1970-01-01
  • 2023-03-11
  • 2017-10-13
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多