Socket数据实时计算:
准备工作
nc -lk 9999

hadoop spark sqoop hadoop spark hive hadoop
使用IDEA编写StructuredStreaming_scoket

代码演示 :

def main(args: Array[String]): Unit = {
//1 创建sparksession
val spark: SparkSession = SparkSession.builder()
.master(“local[*]”)
.appName(“StructStreaming_socket”)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel(“WARN”)
//2 读物实时数据 数据类型是Row
val socketDatasRow: DataFrame = spark.readStream.option(“host”,“hadoop01”).
option(“port”,“9999”)
.format(“socket”)
.load()
//3 对数据进行处理和计算
import spark.implicits._
val socketDatasString: Dataset[String] = socketDatasRow.as[String]
val Word: Dataset[String] = socketDatasString.flatMap(a=>{a.split(" ")})
//使用DSL (SQL)对数据进行计算
val StructWordCount: Dataset[Row] = Word.groupBy(“value”).count().sort($“count”)
//4 输出(启动-等待关闭)
StructWordCount.writeStream
.trigger(Trigger.ProcessingTime(0)) //尽快执行
.format(“console”) //数据输出到控制台
.outputMode(“complete”) //输出所有数据
.start() //开始计算
.awaitTermination() //=等待关闭
}

相关文章:

  • 2021-07-29
  • 2021-04-12
  • 2021-12-09
  • 2021-06-17
  • 2021-12-08
  • 2022-01-23
  • 2022-01-21
  • 2021-04-17
猜你喜欢
  • 2021-04-03
  • 2021-10-30
  • 2022-03-03
  • 2021-11-05
  • 2022-12-23
  • 2021-04-18
  • 2021-10-26
相关资源
相似解决方案