【发布时间】:2016-04-13 08:38:20
【问题描述】:
我在 Spark Streaming 中有一个程序可以检测 HDFS 中的传入文件,我想做的是分析每个文件并测试每个文件中是否存在两个单词,并且在每个点都知道有多少文件包含这两个字。 我试图做的是:
val recherche1 = lines.map(x => (x.split(":")(0),x.split(":")(1))).filter(x => x._2 == "mot1")
val recherche2 = lines.map(x => (x.split(":")(0),x.split(":")(1))).filter(x => x._2 == "mot2")
val n1 = recherche1.count()
val n2 = recherche1.count()
val p = n1.foreachRDD(rdd => {cont1 = rdd.count()
if (cont1 > 0)
{n2.foreachRDD(rdd => {cont2 = rdd.count()
if (cont2 > 0)
{number = number + 1}
else
{number = number}
})}
})
我想要的是使用变量“number”的值,所以由于我无法在 Spark Streaming 中打印它,所以我尝试将它放在 HBase 中,不幸的是它不起作用,当我启动脚本时spark-submit,它返回错误:
adding new inputs transformations and output operations after starting a context is not supported
有谁知道我做错了什么或者可以告诉我该怎么做?
提前谢谢你
【问题讨论】:
-
这个错误的原因是在你调用
streamingContext.start之后调用了一些DStream操作,因此问题必须超出这里显示的代码sn-p的范围。 -
对该问题的旁注:应该可以避免对输入进行两次映射以找到这两个单词。让我们先克服当前的错误,然后考虑优化这个过程。
-
@maasg 当我删除这段代码的 sn-p 时,错误消失了,这就是为什么我认为这是问题的原因
-
将此代码移到流上下文开始之前。
-
这就是我所做的:ssc.start() ssc.awaitTermination() 总是在脚本的末尾
标签: apache-spark spark-streaming