import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object WordCount {
def main(args: Array[String]): Unit = {
//设置环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
//设置数据源
val context: DataStream[String] = env.socketTextStream("node01", 9999)
//计算逻辑
val process: DataStream[(String, Int)] = context
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
.reduce((x, y) => {
val key: String = x._1
val value = x._2 + y._2
(key, value)
})
//输出结果
process.addSink(x => {
print(x)
})
//提交任务
env.execute("word count")
}
}

相关文章:

  • 2022-12-23
  • 2021-12-03
  • 2022-01-18
  • 2021-11-27
  • 2022-12-23
  • 2022-01-19
猜你喜欢
  • 2021-11-18
  • 2021-07-07
  • 2022-12-23
  • 2021-08-05
  • 2022-02-16
  • 2021-07-07
  • 2022-12-23
相关资源
相似解决方案