kafka作为一个实时的分布式消息队列,实时的生产和消费消息,这里我们可以

1.KafkaUils.createDstream方式:利用SparkStreaming实时计算框架实时的读取kafka中的数据然后进行计算。在spark1.3版本后,kafkaUtils里面提供了两个创建dstream的方法,一种为KafkaUtils.createDstream,另一种为KafkaUtils.createDirectStream。

构造函数为KafkaUtils.createDstream(ssc,[zk], [consumer group id], [per-topic,partitions] ) 使用了receivers来接收数据,利用的是Kafka高层次的消费者api,对于所有的receivers接收到的数据将会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,它同步将接受到数据保存到分布式文件系统上比如HDFS。 所以数据在出错的情况下可以恢复出来 。

SparkStreaming整合kafka实践

 

A、创建一个receiver来对kafka进行定时拉取数据,ssc的rdd分区和kafka的topic分区不是一个概念,故如果增加特定主消费的线程数仅仅是增加一个receiver中消费topic的线程数,并不增加spark的并行处理数据数量。
B、对于不同的group和topic可以使用多个receivers创建不同的DStream 
C、如果启用了WAL(spark.streaming.receiver.writeAheadLog.enable=true)

同时需要设置存储级别(默认StorageLevel.MEMORY_AND_DISK_SER_2),

即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)
 

1.1KafkaUtils.createDstream实践

(1) 添加pom依赖

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.10.6</scala.version>
<scala.compat.version>2.10</scala.compat.version>
<encoding>UTF-8</encoding>
</properties>

<dependencies>
<!-- 添加scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 添加spark的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 导入spark sql的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.2.0</version>
</dependency>
<!-- spark steaming的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>2.2.0</version>
</dependency>
<!--kafka依赖0.8-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.10</artifactId>
<version>2.2.0</version>
</dependency>
<!--kafka衣依赖0.10-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
<version>2.2.0</version>
</dependency>
<!--hadoop的依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.5</version>
</dependency>
</dependencies>

<!-- 指定插件-->
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-make:transitive</arg>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.bie.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>


(2)启动zookeeper集群:zkServer.sh start

(3)启动kafka集群:

kafka-server-start.sh -daemon /home/bigdata/kafka_0.10/config/server.properties 

SparkStreaming整合kafka实践

(4)创建topic:

kafka-topics.sh --create --zookeeper 192.168.11.11:2181,192.168.11.12:2181,192.168.11.13:2181 --replication-factor 2 --partitions 3 --topic demo

(5)启动生产者:

kafka-console-producer.sh --broker-list 192.168.11.11:9092,192.168.11.12:9092,192.168.11.13:9092 --topic demo

(6)消费者测试:

kafka-console-consumer.sh --zookeeper 192.168.11.11:2181,192.168.11.12:2181,192.168.11.13:2181 --topic demo1 --from-beginnin

(7)编写SparkStreaming(基于kafka高级API——偏移量有zk保存)

import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamingKafka_Receiver {
def main(args: Array[String]): Unit = {
//1.创建sparkConf
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName(s"${this.getClass.getName}")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
//开启wal预写日志,保存数据源的可靠性
//2. 创建sparkContext
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
//3.创建StreamingContext
val ssc = new StreamingContext(sc, Seconds(10))
//4.设置checkpoint
ssc.checkpoint("hdfs://192.168.11.11:8020/Kafka_Receiver")
//5.定义zk地址
val zkQuorum = "192.168.11.11:2181,192.168.11.12:2181,192.168.11.13:2181"
//6.定义消费者组
val groupId = "spark_receiver"
//7.定义topic相关信息Map[String,Int] value 表示的topic中每一个分区被N个线程消费
val topics = Map("demo1" -> 2)
//8.通过KafkaUtils.createStream对接kafka 开启3个receiver接受数据
val receiverInputDStream = (1 to 3).map(x => {
val stream = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
stream
})
//9.union合并receiver中的数据
val union = ssc.union(receiverInputDStream)
//10.获取topic中的数据
val topicData = union.map(_._2)
//11.切分每一行,每个单词记为1
val wordAndOne = topicData.flatMap(_.split(" ")).map((_, 1))
//12.相同单词出现次数累加
val result = wordAndOne.reduceByKey(_ + _)
result.print()
ssc.start()
ssc.awaitTermination()
}
}

SparkStreaming整合kafka实践

SparkStreaming整合kafka实践

总结:

       通过这种方式实现,刚开始的时候系统正常运行,没有发现问题,但是如果系统异常重新启动sparkstreaming程序后,发现程序会重复处理已经处理过的数据,这种基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。官方现在也已经不推荐这种整合方式,官网相关地址下面我们使用官网推荐的第二种方式kafkaUtils的createDirectStream()方式。
 

2.KafkaUtils.createDirectStream方式

        不同于Receiver接收数据,这种方式定期地从kafka的topic下对应的partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,Spark通过调用kafka简单的消费者Api读取一定范围的数据。
相比基于Receiver方式有几个优点: 
A、简化并行

不需要创建多个kafka输入流,然后union它们,sparkStreaming将会创建和kafka分区一种的rdd的分区数,而且会从kafka中并行读取数据,spark中RDD的分区数和kafka中的分区数据是一一对应的关系。

B、高效

第一种实现数据的零丢失是将数据预先保存在WAL中,会复制一遍数据,会导致数据被拷贝两次,第一次是被kafka复制,另一次是写到WAL中。而没有receiver的这种方式消除了这个问题。 
C、恰好一次语义(Exactly-once-semantics)

Receiver读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,虽然这种方法可以通过数据保存在WAL中保证数据不丢失,但是可能会因为sparkStreaming和ZK中保存的偏移量不一致而导致数据被消费了多次。EOS通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具

2.1KafkaUtils.createDirectStream实战

import kafka.serializer.StringDecoder
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils

object SparkStreamingKafka_Direct1 {
def main(args: Array[String]): Unit = {
//创建sparkConf
val sparkConf: SparkConf = new SparkConf().setAppName(s"${this.getClass.getName}")
.setMaster("local[2]")
//创建sparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
val kafkaParams = Map("metadata.broker.list" -> 
"192.168.11.11:9092,192.168.11.12:9092,192.168.11.13:9092", "group.id" -> "Kafka_Direct")
val topics = Set("demo")
//6、通过 KafkaUtils.createDirectStream接受kafka数据,这里采用是kafka低级api偏移量不受zk管理
val dstream: InputDStream[(String, String)] = 
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val topicData: DStream[String] = dstream.map(_._2)
val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_, 1))
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
result.print()
ssc.start()
ssc.awaitTermination()
}
}

SparkStreaming整合kafka实践

 

 

相关文章: