目录
1.1、Receiver-based Approach(不推荐使用)
1.2、Direct Approach (No Receivers)
1.3、SparkStreaming与Kafka0.8版本整合数据不丢失方案(自己管理offset)
2.1、只有Direct Approach (No Receivers)
2.2、SparkStreaming与Kafka0.10版本整合数据不丢失方案(自己管理offset)
2.3、SparkStreaming应用程序如何保证Exactly-Once
1、SparkStreaming与Kafka-0-8整合
1.1、Receiver-based Approach(不推荐使用)
(1)使用Receiver接收数据。
Receiver是使用Kafka高级消费者API实现的。
从Kafka通过Receiver接收的数据存储在Spark执行器中,然后由Spark Streaming启动的作业处理数据。但是,在默认配置下,此方法可能会在失败时丢失数据(接收器可靠性,如图)。
为确保零数据丢失,必须在Spark Streaming中另外启用Write Ahead Logs(在Spark 1.2中引入)。这将同步保存所有收到的Kafka将数据写入分布式文件系统(例如HDFS)上的预写日志,以便在发生故障时可以恢复所有数据,但是性能不好。
(2)相关代码说明:
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
(3)相关案例展示
导入pom依赖包:
<!-- <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId> spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
scala代码:
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**StreamingContext
* 基于receiver
* 0.8版本
*
* 简单知道就可以,因为这个在工作不用
*/
object ReceiverKafkaWordCount {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
//步骤一:初始化程序入口
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ReceiverKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, String](
"zookeeper.connect"->"hadoop2:2181,hadoop3:2181,hadoop1:2181",
"group.id" -> "testflink"
)
val topics = "flink".split(",").map((_,1)).toMap
//步骤二:获取数据源
//默认只会有一个receiver(3 consumer)
//
// val lines = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](
// ssc,kafkaParams,topics, StorageLevel.MEMORY_AND_DISK_SER_2)
val kafkaStreams = (1 to 20).map(_ => {
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER)
})
val lines = ssc.union(kafkaStreams)
//步骤三:业务代码处理
lines.map(_._2).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
1.2、Direct Approach (No Receivers)
这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
(1)这种方式有的优点:
- 1、简化并行度:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
- 2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
- 3、一次且仅一次的事务机制:
- 基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。
- 4、降低资源。
- Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。
- 5、降低内存。
- Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的10G降至现在的2-4G左右。
- 6、鲁棒性更好。
- Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。
(2)相关代码说明:
val lines = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
(3)相关案例展示:
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object DirectKafkaWordCount {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
//步骤一:初始化程序入口
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
/**
*
* 0.8 API -> 消费 1.0集群的数据 的
*
*
*/
val kafkaParams = Map[String, String](
"bootstrap.servers"->"node01:9092",
"group.id" -> "testsparkstreaming",
"enable.auto.commit" -> "false" //自动提交关闭
)
val topics = "flink".split(",").toSet
/**
*
* ssc: StreamingContext,
* kafkaParams: Map[String, String],
* topics: Set[String]
* StringDecoder:反序列化器
*/
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics).map(_._2)
val result = lines.flatMap(_.split(",")).map((_, 1))
.reduceByKey(_ + _)
result.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
在节点node01启动socket server 服务器:nc –lk 9999,输入相关消息数据,观察控制台。
要想保证数据不丢失,最简单的就是靠checkpoint的机制,但是checkpoint机制有个特点,以后代码升级了,checkpoint机制就失效了。所以如果想实现数据不丢失,那么就需要自己管理offset。
1.3、SparkStreaming与Kafka0.8版本整合数据不丢失方案(自己管理offset)
0.8版,offset偏移量存储在zookeeper中。
代码步骤:
- 1)编写类继承Serializable类;
- 2)创建数据流createDirectStream方法;
- 3)调用setOrUpdateOffsets方法先根据实际情况更新offsets;(在zookeeper读取offsets之前操作)
- 4)编写更新方法setOrUpdateOffsets;
- 1、遍历topic,获取分区;(对分区的判断)
- 2、根据groupId和分区,获取消费者消费者offsetsE
- 3、判断是否消费过。如果消费过,则更新;如果没有消费,则获取消费者offserts
- 5)从zookeeper上读取offsets,开始消费message(利用kafka工具类调用createDirectStream方法)
- 6)创建createDirectStream的java对象
- 7)更新zookeeper上的offsets;
- 8)写监听类,用于判断任务是否失败,若失败,则终止offsets提交;若正常,则将offsets存入zookeeper中
import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.Decoder import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaPairInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils} import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import java.util.{Map => JMap, Set => JSet} import scala.collection.JavaConverters._ import scala.reflect.ClassTag /** * 自己管理offset * 0.8版偏移量存储在zookeeper中 * @param kafkaParams */ class KafkaManager (val kafkaParams:Map[String,String]) extends Serializable { private val kc = new KafkaCluster(kafkaParams) //创建kafka集群对象 //(2)先根据实际情况更新offsets //-------------------------------------------- private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = { topics.foreach(topic =>{ var hasConsumed = true val partitionsE = kc.getPartitions(Set(topic)) if (partitionsE.isLeft){ //Left包含错误或无效值, Right包含正确或有效值。 throw new SparkException(s"get kafka partition failed:${partitionsE.left.get}") } val partitions = partitionsE.right.get //获取正确的值 val consumerOffsetsE = kc.getConsumerOffsets(groupId,partitions) //获取消费offsets if (consumerOffsetsE.isLeft) hasConsumed = false if (hasConsumed){ //消费过 /** * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException, * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。 * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小, * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时, * 这时把consumerOffsets更新为earliestLeaderOffsets */ val earlistLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions) if (earlistLeaderOffsetsE.isLeft){ throw new SparkException(s"get earlist leader offsets failed:${earlistLeaderOffsetsE.left.get}") } val earlistLeaderOffsets = earlistLeaderOffsetsE.right.get //获取earlistLeaderOffsets val consumerOffsets = consumerOffsetsE.right.get //可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earlistLeaderOffsets var offsets:Map[TopicAndPartition,Long] = Map() consumerOffsets.foreach({ case (tp,n)=> val earlistLeaderOffset = earlistLeaderOffsets(tp).offset if (n < earlistLeaderOffset){ //比较 println("consumer group:"+groupId+",topic:"+tp.topic+",partition:"+tp.partition+"offsets已经过时,更新为"+earlistLeaderOffset) offsets +=(tp ->earlistLeaderOffset) //lambda表达式 } }) if (!offsets.isEmpty){ //判断offsets是否为空, kc.setConsumerOffsets(groupId,offsets) //设置消费offset } }else{ //没有消费过 val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase()) var leaderOffsets:Map[TopicAndPartition,LeaderOffset] = null if (reset ==Some("smallest")){ val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions) if (leaderOffsetsE.isLeft){ throw new SparkException(s"get earlist leader offsets failed:${leaderOffsetsE.left.get}") } leaderOffsets = leaderOffsetsE.right.get }else{ val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions) if (leaderOffsetsE.isLeft){ throw new SparkException(s"get latest leader offsets failed:${leaderOffsetsE.left.get}") } leaderOffsets = leaderOffsetsE.right.get } val offsets = leaderOffsets.map{ case (tp,offset) =>(tp,offset.offset) } kc.setConsumerOffsets(groupId,offsets) //设置消费offset } }) } //------------------------------------------------- //(1)创建数据流 def createDirectStream[K:ClassTag,V:ClassTag,KD <: Decoder[K]:ClassTag,VD <: Decoder[V]:ClassTag]( ssc:StreamingContext, kafkaParams:Map[String,String], topics:Set[String] ):InputDStream[(K, V)] = { val groupId = kafkaParams.get("group.id").get //在zookeeper上读取offset前,先根据实际情况更新offsets setOrUpdateOffsets(topics, groupId) //(2)创建更新offsets方法 //(3)从zookeeper上读取offset开始消费message val messages = { val partitionsE = kc.getPartitions(topics) //获取topic if (partitionsE.isLeft) { throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}") } val partitions = partitionsE.right.get val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions) if (consumerOffsetsE.isLeft) { throw new SparkException(s"get kafka consumer offsets failed:${consumerOffsetsE.left.get}") } val consumerOffsets = consumerOffsetsE.right.get KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)]( //调用kafka工具,创建createDirectStream ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) ) } messages } //(4)创建createDirectStream的java对象 def createDirectStream[K,V,KD <: Decoder[K],VD <:Decoder[V]]( jssc:JavaStreamingContext, keyClass:Class[K], valueClass:Class[V], keyDecoderClass:Class[KD], valueDecoderClass:Class[VD], kafkaParams:JMap[String,String], topics:JSet[String] ):JavaPairInputDStream[K,V] ={ implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) } createDirectStream[K, V, KD, VD](jssc.ssc, Map(kafkaParams.asScala.toSeq: _*), Set(topics.asScala.toSeq: _*)); /** * (5)更新zookeeper上的消费offsets */ def updateZKOffsets[K,V](rdd:RDD[(K,V)]):Unit={ val groupId = kafkaParams.get("group.id").get val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (offsets <- offsetsList){ val topicAndPartition = TopicAndPartition(offsets.topic,offsets.partition) val o = kc.setConsumerOffsets(groupId,Map((topicAndPartition,offsets.untilOffset))) if (o.isLeft){ println(s"Error updating the offset to kafka cluster:${o.left.get}") } } } }(6)写一个监听类,继承StreamingListener类,防止任务失败的时候偏移量提交。
import kafka.common.TopicAndPartition; import org.apache.spark.streaming.kafka.KafkaCluster; import org.apache.spark.streaming.kafka.OffsetRange; import org.apache.spark.streaming.scheduler.*; import scala.Option; import scala.collection.JavaConversions; import scala.collection.immutable.List; import java.util.HashMap; import java.util.Map; public class MyListener implements StreamingListener { private KafkaCluster kc; public scala.collection.immutable.Map<String, String> kafkaParams; public MyListener(scala.collection.immutable.Map<String, String> kafkaParams){ this.kafkaParams=kafkaParams; kc = new KafkaCluster(kafkaParams); } @Override public void onReceiverStarted(StreamingListenerReceiverStarted receiverStarted) { } @Override public void onReceiverError(StreamingListenerReceiverError receiverError) { } @Override public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) { } @Override public void onBatchSubmitted(StreamingListenerBatchSubmitted batchSubmitted) { } @Override public void onBatchStarted(StreamingListenerBatchStarted batchStarted) { } /** * 批次完成时调用的方法 * @param batchCompleted */ @Override public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) { //如果本批次里面有任务失败了,那么就终止偏移量提交 scala.collection.immutable.Map<Object, OutputOperationInfo> opsMap = batchCompleted.batchInfo().outputOperationInfos(); Map<Object, OutputOperationInfo> javaOpsMap = JavaConversions.mapAsJavaMap(opsMap); for (Map.Entry<Object, OutputOperationInfo> entry : javaOpsMap.entrySet()) { //failureReason不等于None(是scala中的None),说明有异常,不保存offset if (!"None".equalsIgnoreCase(entry.getValue().failureReason().toString())) { return; } } long batchTime = batchCompleted.batchInfo().batchTime().milliseconds(); /** * topic,分区,偏移量 */ Map<String, Map<Integer, Long>> offset = getOffset(batchCompleted); for (Map.Entry<String, Map<Integer, Long>> entry : offset.entrySet()) { String topic = entry.getKey(); Map<Integer, Long> paritionToOffset = entry.getValue(); //把偏移信息放入到zookeeper就可以了。 for(Map.Entry<Integer,Long> p2o : paritionToOffset.entrySet()){ Map<TopicAndPartition, Object> map = new HashMap<TopicAndPartition, Object>(); TopicAndPartition topicAndPartition = new TopicAndPartition(topic,p2o.getKey()); map.put(topicAndPartition,p2o.getValue()); scala.collection.immutable.Map<TopicAndPartition, Object> topicAndPartitionObjectMap = TypeHelper.toScalaImmutableMap(map); //调用TypeHelper类 kc.setConsumerOffsets(kafkaParams.get("group.id").get(), topicAndPartitionObjectMap); } } } @Override public void onOutputOperationStarted(StreamingListenerOutputOperationStarted outputOperationStarted) { } @Override public void onOutputOperationCompleted(StreamingListenerOutputOperationCompleted outputOperationCompleted) { } private Map<String, Map<Integer, Long>> getOffset(StreamingListenerBatchCompleted batchCompleted) { Map<String, Map<Integer, Long>> map = new HashMap<>(); scala.collection.immutable.Map<Object, StreamInputInfo> inputInfoMap = batchCompleted.batchInfo().streamIdToInputInfo(); Map<Object, StreamInputInfo> infos = JavaConversions.mapAsJavaMap(inputInfoMap); infos.forEach((k, v) -> { Option<Object> optOffsets = v.metadata().get("offsets"); if (!optOffsets.isEmpty()) { Object objOffsets = optOffsets.get(); if (List.class.isAssignableFrom(objOffsets.getClass())) { List<OffsetRange> scalaRanges = (List<OffsetRange>) objOffsets; Iterable<OffsetRange> ranges = JavaConversions.asJavaIterable(scalaRanges); for (OffsetRange range : ranges) { if (!map.containsKey(range.topic())) { map.put(range.topic(), new HashMap<>()); } map.get(range.topic()).put(range.partition(), range.untilOffset()); } } } }); return map; } }//(7)编写类TypeHelper,其目的是为了让API支持多语言
import scala.Tuple2; public class TypeHelper { @SuppressWarnings("unchecked") public static <K, V> scala.collection.immutable.Map<K, V> toScalaImmutableMap(java.util.Map<K, V> javaMap) { final java.util.List<Tuple2<K, V>> list = new java.util.ArrayList<>(javaMap.size()); for (final java.util.Map.Entry<K, V> entry : javaMap.entrySet()) { list.add(Tuple2.apply(entry.getKey(), entry.getValue())); } final scala.collection.Seq<Tuple2<K, V>> seq = scala.collection.JavaConverters.asScalaBufferConverter(list).asScala().toSeq(); return (scala.collection.immutable.Map<K, V>) scala.collection.immutable.Map$.MODULE$.apply(seq); } }
2、SparkStreaming与Kafka-0-10整合
2.1、只有Direct Approach (No Receivers)
0.10版offset偏移量存储在kafka集群中。
(1)pom.xml文件修改如下:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.spark</groupId>
<artifactId> spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
</dependency>-->
(2)案例展示:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaDirect010 {
def main(args: Array[String]): Unit = {
//步骤一:获取配置信息
val conf = new SparkConf().setAppName("KafkaDirect010").setMaster("local[2]")
conf.set("spark.streaming.kafka.maxRatePerPartition", "5")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val streamingContext = new StreamingContext(conf,Seconds(5))
val brokers = "node01:9092,node02:9092,node03:9092"
val topics = "flink"
val groupId = "flink" //注意,这个也就是我们的消费者的名字
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String,Object](
"bootstrap.servers" -> brokers,
"group.id" -> groupId,
"fetch.message.max.bytes" -> "209715200",
"key.deserializer" -> classOf[StringDeserializer], //反序列化
"value.deserializer" -> classOf[StringDeserializer],
"enable.auto.commit" -> "false" //关闭自动提交
)
//步骤二:获取数据源
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
streamingContext,
//指定策略
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
)
stream.foreachRDD(rdd =>{
//步骤三:业务逻辑处理
val newRDD:RDD[String] = rdd.map(_.value())
newRDD.foreach( line =>{
println(line)
})
//步骤四:提交偏移量信息,把偏移量信息添加到kafka里
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()
}
}
2.2、SparkStreaming与Kafka0.10版本整合数据不丢失方案(自己管理offset)
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object DirectKafka010Kafka {
def main(args: Array[String]): Unit = {
// Logger.getLogger("org").setLevel(Level.ERROR)
//步骤一:获取配置信息
val conf = new SparkConf().setAppName("DirectKafka010").setMaster("local[5]")
conf.set("spark.streaming.kafka.maxRatePerPartition", "5")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
val ssc = new StreamingContext(conf,Seconds(5))
val brokers = "node01:9092"
val topics = "flink"
val groupId = "flink_consumer2" //注意,这个也就是我们的消费者的名字
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"group.id" -> groupId,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//步骤二:获取数据源
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
//设置监听器
ssc.addStreamingListener(new MyListener(stream))
val result = stream.map(_.value()).flatMap(_.split(","))
.map((_, 1))
.reduceByKey(_ + _)
result.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
编写监听类MyListener
package com.kkb.kafka.ten10.offsetManag10
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, OffsetRange}
import org.apache.spark.streaming.scheduler._
class MyListener(var stream:InputDStream[ConsumerRecord[String, String]]) extends StreamingListener {
override def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit =
super.onStreamingStarted(streamingStarted)
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit =
super.onReceiverStarted(receiverStarted)
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit =
super.onReceiverError(receiverError)
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit =
super.onReceiverStopped(receiverStopped)
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
val info: Map[Int, StreamInputInfo] = batchSubmitted.batchInfo.streamIdToInputInfo
var offsetRangesTmp:List[OffsetRange]=null;
var offsetRanges:Array[OffsetRange]=null;
for( k <- info){ //遍历
val offset: Option[Any] = k._2.metadata.get("offsets") //获取offsets
if(!offset.isEmpty){ //判断offset是否为空
try {
val offsetValue = offset.get
offsetRangesTmp= offsetValue.asInstanceOf[List[OffsetRange]] //若满足类型List
offsetRanges=offsetRangesTmp.toSet.toArray; //则转换成array
} catch {
case e:Exception => println(e)
}
}
}
if(offsetRanges != null){
//官网说明,提交偏移量
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges);
}
}
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit ={
}
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit ={
}
override def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit =
super.onOutputOperationStarted(outputOperationStarted)
override def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit =
super.onOutputOperationCompleted(outputOperationCompleted)
}
2.3、SparkStreaming应用程序如何保证Exactly-Once
一个流式计算如果想要保证Exactly-Once,那么首先要对这三个点有有要求:
- (1)Source支持Replay。
- (2)流计算引擎本身处理能保证Exactly-Once。
- (3)Sink支持幂等或事务更新。
也就是说如果要想让一个SparkSreaming的程序保证Exactly-Once,那么从如下三个角度出发:
- (1)接收数据:从Source中接收数据。
- (2)转换数据:用DStream和RDD算子转换。(SparkStreaming内部天然保证Exactly-Once)
- (3)储存数据:将结果保存至外部系统。
如果SparkStreaming程序需要实现Exactly-Once语义,那么每一个步骤都要保证Exactly-Once。
(1)pom.xml添加内容如下:
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc_2.11</artifactId>
<version>3.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc-config -->
<dependency>
<groupId>org.scalikejdbc</groupId>
<artifactId>scalikejdbc-config_2.11</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
(2)代码
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory
import scalikejdbc.{ConnectionPool, DB, _}
/**
* 版本10
* SparkStreaming EOS:
* Input:Kafka
* Process:Spark Streaming
* Output:Mysql
*
* 保证EOS:
* 1、偏移量自己管理,即enable.auto.commit=false,这里保存在Mysql中
* 我们这次的方案没有把偏移量存储到zk,或者是kafka
* 2、使用createDirectStream
* 3、事务输出: 结果存储与Offset提交在Driver端同一Mysql事务中
*/
object SparkStreamingEOSKafkaMysqlAtomic {
@transient lazy val logger = LoggerFactory.getLogger(this.getClass) //无需序列化
def main(args: Array[String]): Unit = {
val topic="topic1"
val group="spark_app1"
//Kafka配置
val kafkaParams= Map[String, Object](
"bootstrap.servers" -> "node1:6667,node2:6667,node3:6667",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",//latest earliest
"enable.auto.commit" -> (false: java.lang.Boolean),
"group.id" -> group)
//在Driver端创建数据库连接池
ConnectionPool.singleton("jdbc:mysql://node3:3306/bigdata", "", "")
val conf = new SparkConf().setAppName(this.getClass.getSimpleName.replace("$",""))
val ssc = new StreamingContext(conf,Seconds(5))
//1)初次启动或重启时,从指定的Partition、Offset构建TopicPartition
//2)运行过程中,每个Partition、Offset保存在内部currentOffsets = Map[TopicPartition, Long]()变量中
//3)后期Kafka Topic分区动扩展,在运行过程中不能自动感知
val initOffset=DB.readOnly(implicit session=>{
sql"select `partition`,offset from kafka_topic_offset where topic =${topic} and `group`=${group}"
.map(item=> new TopicPartition(topic, item.get[Int]("partition")) -> item.get[Long]("offset"))
.list().apply().toMap
})
//CreateDirectStream
//从指定的Topic、Partition、Offset开始消费
val sourceDStream =KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String,String](initOffset.keys,kafkaParams,initOffset)
)
sourceDStream.foreachRDD(rdd=>{
if (!rdd.isEmpty()){
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(offsetRange=>{
logger.info(s"Topic: ${offsetRange.topic},Group: ${group},Partition: ${offsetRange.partition},fromOffset: ${offsetRange.fromOffset},untilOffset: ${offsetRange.untilOffset}")
})
//统计分析
//将结果收集到Driver端
val sparkSession = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import sparkSession.implicits._
val dataFrame = sparkSession.read.json(rdd.map(_.value()).toDS)
dataFrame.createOrReplaceTempView("tmpTable")
val result=sparkSession.sql(
"""
|select
| --每分钟
| eventTimeMinute,
| --每种语言
| language,
| -- 次数
| count(1) pv,
| -- 人数
| count(distinct(userID)) uv
|from(
| select *, substr(eventTime,0,16) eventTimeMinute from tmpTable
|) as tmp group by eventTimeMinute,language
""".stripMargin
).collect()
//在Driver端存储数据、提交Offset
//结果存储与Offset提交在同一事务中原子执行
//这里将偏移量保存在Mysql中
DB.localTx(implicit session=>{
//结果存储
result.foreach(row=>{
sql"""
insert into twitter_pv_uv (eventTimeMinute, language,pv,uv)
value (
${row.getAs[String]("eventTimeMinute")},
${row.getAs[String]("language")},
${row.getAs[Long]("pv")},
${row.getAs[Long]("uv")}
)
on duplicate key update pv=pv,uv=uv
""".update.apply()
})
//Offset提交
offsetRanges.foreach(offsetRange=>{
val affectedRows = sql"""
update kafka_topic_offset set offset = ${offsetRange.untilOffset}
where
topic = ${topic}
and `group` = ${group}
and `partition` = ${offsetRange.partition}
and offset = ${offsetRange.fromOffset}
""".update.apply()
if (affectedRows != 1) {
throw new Exception(s"""Commit Kafka Topic: ${topic} Offset Faild!""")
}
})
})
}
})
ssc.start()
ssc.awaitTermination()
}
}