【问题标题】:How to create a stop condition on Spark streaming?如何在 Spark 流上创建停止条件?
【发布时间】:2018-03-20 19:16:09
【问题描述】:

我想使用火花流从 HDFS 读取数据。这个想法是另一个程序将继续将新文件上传到 HDFS 目录,我的 spark 流作业将处理该目录。但是,我也想要一个结束条件。也就是说,将文件上传到 HDFS 的程序可以向 spark 流式传输程序发出信号的方式是,它已完成所有文件的上传。

举个简单的例子,取Here的程序。代码如下所示。假设另一个程序正在上传这些文件,那么该程序(不需要我们按 CTRL+C)如何向 spark 流程序发出结束条件的信号?

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage StreamingWordCount <input-directory> <output-directory>")
      System.exit(0)
    }
    val inputDir=args(0)
    val output=args(1)
    val conf = new SparkConf().setAppName("Spark Streaming Example")
    val streamingContext = new StreamingContext(conf, Seconds(10))
    val lines = streamingContext.textFileStream(inputDir)
    val words = lines.flatMap(_.split(" "))
    val wc = words.map(x => (x, 1))
    wc.foreachRDD(rdd => {
      val counts = rdd.reduceByKey((x, y) => x + y)
      counts.saveAsTextFile(output)
      val collectedCounts = counts.collect
      collectedCounts.foreach(c => println(c))
    }
    )

    println("StreamingWordCount: streamingContext start")
    streamingContext.start()
    println("StreamingWordCount: await termination")
    streamingContext.awaitTermination()
    println("StreamingWordCount: done!")
  }
}

【问题讨论】:

  • 能否在上传数据的作业末尾添加一些控制字节,然后在 Spark Streaming 程序中观察这些字节,并在这些字节匹配时终止?添加类似 0x1c0x0d 的内容?另外,为什么在这个用例中使用 Spark 流而不是在上传文件后开始另一个工作?

标签: scala hadoop apache-spark spark-streaming hadoop-streaming


【解决方案1】:

好的,我明白了。基本上,您从调用ssc.stop() 的位置创建另一个线程,以指示流处理停止。例如,像这样。

val ssc = new StreamingContext(sparkConf, Seconds(1))
//////////////////////////////////////////////////////////////////////
val thread = new Thread 
{
    override def run 
    {
        ....
        // On reaching the end condition
        ssc.stop()
    }
}
thread.start
//////////////////////////////////////////////////////////////////////
val lines = ssc.textFileStream("inputDir")
.....

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-11-30
    • 1970-01-01
    • 2015-07-16
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多