一、简介

Apache Flume 是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到分布式计算框架或者存储系统中。Spark Straming 提供了以下两种方式用于 Flume 的整合。

二、推送式方法

在推送式方法 (Flume-style Push-based Approach) 中,Spark Streaming 程序需要对某台服务器的某个端口进行监听,Flume 通过 avro Sink 将数据源源不断推送到该端口。这里以监听日志文件为例,具体整合方式如下:

2.1 配置日志收集Flume

新建配置 netcat-memory-avro.properties,使用 tail 命令监听文件内容变化,然后将新的文件内容通过 avro sink 发送到 hadoop001 这台服务器的 8888 端口:

#指定agent的sources,sinks,channels
a1.sources = s1
a1.sinks = k1
a1.channels = c1

#配置sources属性
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /tmp/log.txt
a1.sources.s1.shell = /bin/bash -c
a1.sources.s1.channels = c1

#配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 8888
a1.sinks.k1.batch-size = 1
a1.sinks.k1.channel = c1

#配置channel类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

2.2 项目依赖

项目采用 Maven 工程进行构建,主要依赖为 spark-streamingspark-streaming-flume

<properties>
   <scala.version>2.11</scala.version>
   <spark.version>2.4.0</spark.version>
</properties>

<dependencies>
   <!-- Spark Streaming-->
   <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_${scala.version}</artifactId>
       <version>${spark.version}</version>
   </dependency>
   <!-- Spark Streaming 整合 Flume 依赖-->
   <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming-flume_${scala.version}</artifactId>
       <version>2.4.3</version>
   </dependency>
</dependencies>

2.3 Spark Streaming接收日志数据

调用 FlumeUtils 工具类的 createStream 方法,对 hadoop001 的 8888 端口进行监听,获取到流数据并进行打印:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.flume.FlumeUtils

object PushBasedWordCount {
   
 def main(args: Array[String]): Unit = {
   val sparkConf = new SparkConf()
   val ssc = new StreamingContext(sparkConf, Seconds(5))
   // 1.获取输入流
   val flumeStream = FlumeUtils.createStream(ssc, "hadoop001", 8888)
   // 2.打印输入流的数据
   flumeStream.map(line => new String(line.event.getBody.array()).trim).print()

   ssc.start()
   ssc.awaitTermination()
}
}

2.4 项目打包

因为 Spark 安装目录下是不含有 spark-streaming-flume 依赖包的,所以在提交到集群运行时候必须提供该依赖包,你可以在提交命令中使用 --jar 指定上传到服务器的该依赖包,或者使用 --packages org.apache.spark:spark-streaming-flume_2.12:2.4.3 指定依赖包的完整名称,这样程序在启动时会先去中央仓库进行下载。

这里我采用的是第三种方式:使用 maven-shade-plugin 插件进行 ALL IN ONE 打包,把所有依赖的 Jar 一并打入最终包中。需要注意的是 spark-streaming 包在 Spark 安装目录的 jars 目录中已经提供,所以不需要打入。插件配置如下:

<build>
   <plugins>
       <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-compiler-plugin</artifactId>
           <configuration>
               <source>8</source>
               <target>8</target>
           </configuration>
       </plugin>
       <!--使用 shade 进行打包-->
       <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-shade-plugin</artifactId>
           <configuration>
               <createDependencyReducedPom>true</createDependencyReducedPom>
               <filters>
                   <filter>
                       <artifact>*:*</artifact>
                       <excludes>
                           <exclude>META-INF/*.SF</exclude>
                           <exclude>META-INF/*.sf</exclude>
                           <exclude>META-INF/*.DSA</exclude>
                           <exclude>META-INF/*.dsa</exclude>
                           <exclude>META-INF/*.RSA</exclude>
                           <exclude>META-INF/*.rsa</exclude>
                           <exclude>META-INF/*.EC</exclude>
                           <exclude>META-INF/*.ec</exclude>
                           <exclude>META-INF/MSFTSIG.SF</exclude>
                           <exclude>META-INF/MSFTSIG.RSA</exclude>
                       </excludes>
                   </filter>
               </filters>
               <artifactSet>
                   <excludes>
                       <exclude>org.apache.spark:spark-streaming_${scala.version}</exclude>
                       <exclude>org.scala-lang:scala-library</exclude>
                       <exclude>org.apache.commons:commons-lang3</exclude>
                   </excludes>
               </artifactSet>
           </configuration>
           <executions>
               <execution>
                   <phase>package</phase>
                   <goals>
                       <goal

相关文章: