通过IntelliJ IDEA搭建Flink开发环境,首先要安装Flink和Scala,具体操作请参照:
Flink安装:https://blog.csdn.net/x976269167/article/details/105700963
Scala安装:https://blog.csdn.net/x976269167/article/details/105740307
1、创建一个maven工程
2、填入项目名称、GroupId和ArtifactId,点击Finish后选择New Window新开一个窗口
3、打开Setting,选择Plugins,下载Scala插件;如果搜不到可以通过官网下载,下载完成后,解压到IntelliJ IDEA安装目录的plugins目录下,地址如下
https://plugins.jetbrains.com/plugin/1347-scala
4、下载完成,按照提示重启IntelliJ IDEA,然后打开Project Structure,选择Libraries,点击添加,选择Scala SDK版本
5、配置Flink环境,继续选择Project Structure,选择Libraries,点击添加,选择Java,选择自己下载安装的Flink,选择lib包
6、在pom.xml文件中添加如下配置
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
<flink.version>1.9.2</flink.version>
<scala.version>2.12</scala.version>
</properties>
<dependencies>
<!-- flink的scala的api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<!-- flink streaming的scala的api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
7、在src下新建一个scala目录,然后选择Project Structure,将scala目录标记为Sources
8、验证之前,要先装netcat,解压后将文件放到C盘 Windows System32目录下
9、验证
(1)在scala目录下新建一个scala,代码如下,
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object WordCountScala {
def main(args: Array[String]): Unit = {
//生成了配置对象
val config = new Configuration()
//打开flink-webui
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
//配置webui的日志文件,否则打印日志到控制台,这样你的控制台就清净了
config.setString("web.log.path", "D:\\Java\\Logs\\Flink\\log.file")
//配置taskManager的日志文件,否则打印日志到控制台,这样你的控制台就清净了
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "D:\\Java\\Logs\\Flink\\log.file")
//获得local运行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
//定义socket的source源
val text: DataStream[String] = env.socketTextStream( hostname="localhost", port = 6666)
//scala开发需要加一行隐式转换,否则在调用operator的时候会报错,作用是找到scala类型的TypeInformation
import org.apache.flink.api.scala._
//定义operators,作用是解析数据,分组,并且求wordCount
val wordCount: DataStream[(String, Int)] = text.flatMap(_.split(" ")).map((_,1)).keyBy(_._1).sum( position = 1)
//定义sink,打印数据到控制台
wordCount.print()
//定义任务的名称并运行
//注意:operator是惰性的,只有遇到execute才执行
env.execute(jobName = "SocketWordCount")
}
}
(2) 打开cmd,输入如下命令,
nc -l -p 6666
(3) 右键刚才scala下添加的方法,点击运行main方法
(4)在cmd中随便输入数字,可以看到控制台在计算了,验证成功