【问题标题】:Spark Streaming - Count distinct element in stateSpark Streaming - 计算状态中的不同元素
【发布时间】:2017-07-27 11:33:18
【问题描述】:

我有一个具有 VideoID-UserID 键值对的 dstream,按 VideoID 计算不同 UserID 组的良好做法是什么?

// VideoID,UserID
foo,1
foo,2
bar,1
bar,2
foo,1
bar,2

如上,我想随时去掉多余的foo,1bar,2得到VideoID-CountUserID,所以结果应该是:

foo: 2
bar: 2

换句话说,我想在内存中保存一个大型状态数据集。当新一批 dstream 到达时,将其与数据集进行比较,以计算每个视频的不同用户。

怎么做?

我正在开发 Spark 1.6,但接受了后续版本的答案。可能的话 Python 代码。

【问题讨论】:

    标签: python scala apache-spark spark-streaming spark-dataframe


    【解决方案1】:

    为了获得按视频 ID 分组的不同用户 ID 计数,请考虑使用 aggregateByKey。抱歉,这是 Scala,所以你必须翻译。

    val rdd = sc.textFile("your_file.txt")
    
    val initialSet = Set.empty[Int]
    val addToSet = (s: Set[Int], v:Int) => s + v
    val mergeSets = (s1: Set[Int], s2: Set[Int]) => s1 ++ s2
    
    val distinctValSets = rdd.aggregateByKey(initialSet)(addToSet, mergeSets)
    val distinctValCountd = rdd.map({case(k,s) => (k,s.size)})
    

    Initial set 是聚合对象的初始值,addToSet 和 mergeSets 指定如何将值添加到您的集合中,并根据键合并不同的集合。这应该为您提供与每个视频相关联的不同数量的用户,并且比 reduceByKey 和 groupByKey 更便宜(在空间方面)。

    【讨论】:

    • 感谢您抽出宝贵时间!现在我认为在内存中保存一个大数据集不是一个好方法,所以我使用窗口函数来计算一个周期内的不同,而不是它。谢谢!
    【解决方案2】:
      val rdd1 = sc.parallelize(Seq(("foo", 1),("foo", 2),("foo", 1)))
      rdd1.groupByKey.mapValues(x=>x.toSet.toSeq).flatMapValues(x=>x).collect
    

    【讨论】:

      猜你喜欢
      • 2017-04-19
      • 2016-06-12
      • 2017-03-06
      • 2014-09-11
      • 2016-12-24
      • 2023-01-20
      • 1970-01-01
      • 1970-01-01
      • 2016-04-10
      相关资源
      最近更新 更多