【问题标题】:Is there a way to modify this code to let spark streaming read from json?有没有办法修改此代码以让火花流从 json 中读取?
【发布时间】:2021-09-11 02:39:00
【问题描述】:

我正在开发一个 spark 流应用程序/代码,它不断地从 localhost 9098 读取数据。有没有办法将 localhost 修改为 以便自动从文件夹路径或 json 读取数据?

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.Logger
import org.apache.log4j.Level

object StreamingApplication extends App {

  Logger.getLogger("Org").setLevel(Level.ERROR)

  //creating spark streaming context
  val sc = new SparkContext("local[*]", "wordCount")
  val ssc = new StreamingContext(sc, Seconds(5))

  // lines is a Dstream
  val lines = ssc.socketTextStream("localhost", 9098)

  // words is a transformed Dstream
  val words = lines.flatMap(x => x.split(" "))

  // bunch of transformations
  val pairs = words.map(x=> (x,1))
  val wordsCount = pairs.reduceByKey((x,y) => x+y)

  // print is an action
  wordsCount.print()

  // start the streaming context
  ssc.start()

ssc.awaitTermination()


}

基本上,我需要帮助来修改以下代码:

val lines = ssc.socketTextStream("localhost", 9098)

到这里:

val lines = ssc.socketTextStream("<folder path>")

仅供参考,我正在使用 IntelliJ Idea 来构建它。

【问题讨论】:

标签: json scala apache-spark spark-streaming


【解决方案1】:

我建议阅读 Spark 文档,尤其是 scaladoc。

似乎存在一个方法fileStream

https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/streaming/StreamingContext.html

【讨论】:

  • 也试过 textFileStream 但不知何故它不起作用。我错过了什么吗? val lines = ssc.textFileStream("Users/Desktop/raw.json")
  • 它侦听目录中的新文件,它不会读取 1 个现有文件。
  • 好的,但是有没有办法读取文件内的数据?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-04-02
  • 2017-12-09
  • 1970-01-01
  • 2021-04-30
  • 2019-06-17
  • 1970-01-01
相关资源
最近更新 更多