【问题标题】:Counting with Spark Streaming使用 Spark Streaming 计数
【发布时间】:2016-04-13 08:38:20
【问题描述】:

我在 Spark Streaming 中有一个程序可以检测 HDFS 中的传入文件,我想做的是分析每个文件并测试每个文件中是否存在两个单词,并且在每个点都知道有多少文件包含这两个字。 我试图做的是:

val recherche1 = lines.map(x => (x.split(":")(0),x.split(":")(1))).filter(x => x._2 == "mot1")
    val recherche2 = lines.map(x => (x.split(":")(0),x.split(":")(1))).filter(x => x._2 == "mot2")
    val n1 = recherche1.count()
    val n2 = recherche1.count()
   val p = n1.foreachRDD(rdd => {cont1 = rdd.count()
    if (cont1 > 0)
    {n2.foreachRDD(rdd => {cont2 = rdd.count()
      if (cont2 > 0)
      {number = number + 1}
      else
      {number = number}
    })}
    })

我想要的是使用变量“number”的值,所以由于我无法在 Spark Streaming 中打印它,所以我尝试将它放在 HBase 中,不幸的是它不起作用,当我启动脚本时spark-submit,它返回错误:

adding new inputs transformations and output operations after starting a context is not supported

有谁知道我做错了什么或者可以告诉我该怎么做?

提前谢谢你

【问题讨论】:

  • 这个错误的原因是在你调用streamingContext.start之后调用了一些DStream操作,因此问题必须超出这里显示的代码sn-p的范围。
  • 对该问题的旁注:应该可以避免对输入进行两次映射以找到这两个单词。让我们先克服当前的错误,然后考虑优化这个过程。
  • @maasg 当我删除这段代码的 sn-p 时,错误消失了,这就是为什么我认为这是问题的原因
  • 将此代码移到流上下文开始之前。
  • 这就是我所做的:ssc.start() ssc.awaitTermination() 总是在脚本的末尾

标签: apache-spark spark-streaming


【解决方案1】:

我通过我的问题的答案更改了问题中显示的代码的整个结构:filter the lines by two words Spark Streaming 并且它起作用了,也许这种结构不适合 Spark Streaming ..

【讨论】:

    猜你喜欢
    • 2014-09-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-08
    • 2020-03-19
    • 2016-11-29
    • 1970-01-01
    相关资源
    最近更新 更多