- 搭建 maven 工程 Flink
- pom 文件
-
添加 scala 框架 和 scala 文件夹
-
批处理 wordcount
-
流处理 StreamWordCount
pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lxh</groupId>
<artifactId>flink</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.10.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.12</scala.version>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<!-- Scala Library, provided by Flink as well. -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.10.0</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将Scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<!-- 声明绑定到maven的compile阶段 -->
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
添加 scala 框架 和 scala 文件夹
批处理 wordcount
object wordCount {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//从文件读取数据
var inputPath= "D:\\flink\\src\\main\\resources\\hello.txt"
var inputDataSet=env.readTextFile(inputPath)
//分词之后,对单词进行 groupby 分组,然后用 sum 进行聚合
val wordCountDataSet =inputDataSet.flatMap(_.split(" "))
.map( (_,1) )
.groupBy(0)
.sum(1)
//打印输出
wordCountDataSet.print()
}
}
我的文件放在这里
运行wordCount 控制台打印内容
注意:
Flink
程序支持
java 和 scala
两种语言,这里以scala语言为主
流处理 StreamWordCount
object StreamWordCount {
def main(args: Array[String]): Unit = {
val params =ParameterTool.fromArgs(args);
val hostname:String = params.get("hostname")
val port:Int = params.getInt("port")
//创建一个流处理的执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment;
//接收socket数据流
val textDataStream = env.socketTextStream(hostname,port)
//逐一读取数据流,打散之后进行wordcount
val wordCountDateStream = textDataStream.flatMap(_.split("\\s"))
.filter(_.nonEmpty)
.map( (_,1) )
.keyBy(0)
.sum(1)
//打印输出
wordCountDateStream.print().setParallelism(1)
/* wordCountDateStream.writeAsCsv()*/
//执行任务
env.execute("Stream word count job")
}
}
设置启动参数
测试——在
linux
系统中用
netcat
命令进行发送测试。
nc -lk 7777
运行项目StreamWordCount 在linux 系统中输入
控制台打印内容
写博客记录学习之路 还望指点