【问题标题】:Changing key during reduce phase在减少阶段更改密钥
【发布时间】:2018-07-08 20:17:14
【问题描述】:

假设我的 (key,value) 对现在是这些:

(word1,d1=1)
(word1,d2=1)
(word2,d1=2)
(word3,d1=1)

是否有可能减少到以下其中键更改并且值是原始次数/出现次数的情况?

(word1@d1, 1/2)
(word1@d2, 1/2)
(word2@d1, 2/1)
(word3@d1, 1/1)

我阅读了 Spark 的文档。 reduceByKey() 将返回一个 (K, V) 的数据集,其中 V1,V2 -> V 并且密钥将保持为 K。但在上述情况下,K 将是 K',我必须为不同的密钥更新 V .有什么方法可以实现上述目标?刚开始学习 Spark,我现在很困惑。感谢您的帮助!

【问题讨论】:

  • 你能分享一下你到目前为止的尝试吗?
  • 我尝试使用 countByKey() 保存到地图变量,它确实告诉我它出现了多少次。然后我尝试用 countByKey() 值映射相同的 K,V 对,但似乎有些结果可能是 0 可能是由于 map-reduce 的性质?所以我想countByKey() 可能不起作用,我只剩下reduceByKey()?我无法想象 reduce 将如何在这里工作?
  • 您是说问题中的 (word2@d1, 2/1) 而不是 (word2@d1, 2/2) 吗?
  • 是的!忽略了这一点。谢谢。

标签: java apache-spark mapreduce


【解决方案1】:

按字连接全局计数,可以使用countByKey(广播连接)和标准join 完成。目前还不清楚有哪些类型,所以我们假设:

val sc: SparkContext

val rdd = sc.parallelize(Seq(
  ("word1", "d1=1"), ("word1", "d2=1"), ("word2", "d1=2"), ("word3", "d1=1")
))

countByKey:

val cnts = sc.broadcast(rdd.countByKey)

map:

rdd.map { case (k, v) => (k, (v, cnts.value.getOrElse(k, 0L))) }

collected 给出:

Array((word1,(d1=1,2)), (word1,(d2=1,2)), (word2,(d1=2,1)), (word3,(d1=1,1)))

join

val cntsRDD = rdd.mapValues(_ => 1L).reduceByKey(_ + _)
rdd.join(cntsRDD)

collected 给出:

Array((word2,(d1=2,1)), (word3,(d1=1,1)), (word1,(d1=1,2)), (word1,(d2=1,2)))

我将根据确切的输入类型重新调整结果,作为用户练习。

【讨论】:

    猜你喜欢
    • 2013-07-05
    • 1970-01-01
    • 2015-09-12
    • 2011-08-07
    • 2013-04-15
    • 1970-01-01
    • 2022-12-25
    • 2023-03-19
    • 2016-07-22
    相关资源
    最近更新 更多