问题描述
原文地址:http://blog.csdn.net/xueba207/article/details/50381821
最近使用Spark streaming处理kafka的数据,业务数据量比较大,就使用了KafkaUtils的createDirectStream()方式,此方法直接从kafka的broker的分区中读取数据,跳过了zookeeper,并且没有receiver,是spark的task直接对接kakfa topic partition,能保证消息恰好一次语意,但是此种方式因为没有经过zk,topic的offset也就没有保存,当job重启后只能从最新的offset开始消费消息,造成重启过程中的消息丢失。
解决方案
一般,有两种方式可以先spark streaming 保存offset:spark checkpoint机制和程序中自己实现保存offset逻辑,下面分别介绍。
checkpoint机制
spark streaming job 可以通过checkpoint 的方式保存job执行断点,断点中有spark streaming context中的全部信息(包括有kakfa每个topic partition的offset)。checkpoint有两种方式,一个是checkpoint 数据和metadata,另一个只checkpoint metadata,一般情况只保存metadata即可,因此这里只介绍checkpoint metadata。
-
package com.nsfocus.bsa.example; -
import kafka.serializer.StringDecoder; -
import org.apache.spark.SparkConf; -
import org.apache.spark.api.java.function.Function; -
import org.apache.spark.streaming.Duration; -
import org.apache.spark.streaming.api.java.JavaDStream; -
import org.apache.spark.streaming.api.java.JavaPairInputDStream; -
import org.apache.spark.streaming.api.java.JavaStreamingContext; -
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; -
import org.apache.spark.streaming.kafka.KafkaUtils; -
import scala.Tuple2; -
import java.util.HashMap; -
import java.util.HashSet; -
import java.util.Set; -
/** -
* Checkpoint example -
* -
* @author Shuai YUAN -
* @date 2015/10/27 -
*/ -
public class CheckpointTest { -
private static String CHECKPOINT_DIR = "/checkpoint"; -
public static void main(String[] args) { -
// get javaStreamingContext from checkpoint dir or create from sparkconf -
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(CHECKPOINT_DIR, new JavaStreamingContextFactory() { -
public JavaStreamingContext create() { -
return createContext(); -
} -
}); -
jssc.start(); -
jssc.awaitTermination(); -
} -
public static JavaStreamingContext createContext() { -
SparkConf sparkConf = new SparkConf().setAppName("tachyon-test-consumer"); -
Set<String> topicSet = new HashSet<String>(); -
topicSet.add("test_topic"); -
HashMap<String, String> kafkaParam = new HashMap<String, String>(); -
kafkaParam.put("metadata.broker.list", "test1:9092,test2:9092"); -
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); -
// do checkpoint metadata to hdfs -
jssc.checkpoint(CHECKPOINT_DIR); -
JavaPairInputDStream<String, String> message = -
KafkaUtils.createDirectStream( -
jssc, -
String.class, -
String.class, -
StringDecoder.class, -
StringDecoder.class, -
kafkaParam, -
topicSet -
); -
JavaDStream<String> valueDStream = message.map(new Function<Tuple2<String, String>, String>() { -
public String call(Tuple2<String, String> v1) throws Exception { -
return v1._2(); -
} -
}); -
valueDStream.count().print(); -
return jssc; -
} -
}
自己实现保存offset到zk
开发者可以自己开发保存offset到zk的实现逻辑。spark streaming 的rdd可以被转换为HasOffsetRanges类型,进而得到所有partition的offset。
scala的实现网上很容易搜到,这里贴个Java实现的代码。
-
package com.xueba207.test; -
import kafka.common.TopicAndPartition; -
import kafka.message.MessageAndMetadata; -
import kafka.serializer.StringDecoder; -
import org.apache.spark.SparkConf; -
import org.apache.spark.api.java.JavaRDD; -
import org.apache.spark.api.java.function.Function; -
import org.apache.spark.broadcast.Broadcast; -
import org.apache.spark.sql.DataFrame; -
import org.apache.spark.sql.SaveMode; -
import org.apache.spark.sql.hive.HiveContext; -
import org.apache.spark.streaming.Duration; -
import org.apache.spark.streaming.api.java.JavaDStream; -
import org.apache.spark.streaming.api.java.JavaInputDStream; -
import org.apache.spark.streaming.api.java.JavaStreamingContext; -
import org.apache.spark.streaming.kafka.HasOffsetRanges; -
import org.apache.spark.streaming.kafka.KafkaCluster; -
import org.apache.spark.streaming.kafka.KafkaUtils; -
import org.apache.spark.streaming.kafka.OffsetRange; -
import scala.Predef; -
import scala.Tuple2; -
import scala.collection.JavaConversions; -
import java.util.HashMap; -
import java.util.HashSet; -
import java.util.Map; -
import java.util.Set; -
import java.util.concurrent.atomic.AtomicReference; -
/** -
* KafkaOffsetExample -
* -
* @author Shuai YUAN -
* @date 2015/10/28 -
*/ -
public class KafkaOffsetExample { -
private static KafkaCluster kafkaCluster = null; -
private static HashMap<String, String> kafkaParam = new HashMap<String, String>(); -
private static Broadcast<HashMap<String, String>> kafkaParamBroadcast = null; -
private static scala.collection.immutable.Set<String> immutableTopics = null; -
public static void main(String[] args) { -
SparkConf sparkConf = new SparkConf().setAppName("tachyon-test-consumer"); -
Set<String> topicSet = new HashSet<String>(); -
topicSet.add("test_topic"); -
kafkaParam.put("metadata.broker.list", "test:9092"); -
kafkaParam.put("group.id", "com.xueba207.test"); -
// transform java Map to scala immutable.map -
scala.collection.mutable.Map<String, String> testMap = JavaConversions.mapAsScalaMap(kafkaParam); -
scala.collection.immutable.Map<String, String> scalaKafkaParam = -
testMap.toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() { -
public Tuple2<String, String> apply(Tuple2<String, String> v1) { -
return v1; -
} -
}); -
// init KafkaCluster -
kafkaCluster = new KafkaCluster(scalaKafkaParam); -
scala.collection.mutable.Set<String> mutableTopics = JavaConversions.asScalaSet(topicSet); -
immutableTopics = mutableTopics.toSet(); -
scala.collection.immutable.Set<TopicAndPartition> topicAndPartitionSet2 = kafkaCluster.getPartitions(immutableTopics).right().get(); -
// kafka direct stream 初始化时使用的offset数据 -
Map<TopicAndPartition, Long> consumerOffsetsLong = new HashMap<TopicAndPartition, Long>(); -
// 没有保存offset时(该group首次消费时), 各个partition offset 默认为0 -
if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).isLeft()) { -
System.out.println(kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).left().get()); -
Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet2); -
for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) { -
consumerOffsetsLong.put(topicAndPartition, 0L); -
} -
} -
// offset已存在, 使用保存的offset -
else { -
scala.collection.immutable.Map<TopicAndPartition, Object> consumerOffsetsTemp = kafkaCluster.getConsumerOffsets("com.nsfocus.bsa.ys.test", topicAndPartitionSet2).right().get(); -
Map<TopicAndPartition, Object> consumerOffsets = JavaConversions.mapAsJavaMap(consumerOffsetsTemp); -
Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet2); -
for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) { -
Long offset = (Long)consumerOffsets.get(topicAndPartition); -
consumerOffsetsLong.put(topicAndPartition, offset); -
} -
} -
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000)); -
kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam); -
// create direct stream -
JavaInputDStream<String> message = KafkaUtils.createDirectStream( -
jssc, -
String.class, -
String.class, -
StringDecoder.class, -
StringDecoder.class, -
String.class, -
kafkaParam, -
consumerOffsetsLong, -
new Function<MessageAndMetadata<String, String>, String>() { -
public String call(MessageAndMetadata<String, String> v1) throws Exception { -
return v1.message(); -
} -
} -
); -
// 得到rdd各个分区对应的offset, 并保存在offsetRanges中 -
final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>(); -
JavaDStream<String> javaDStream = message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() { -
public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception { -
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); -
offsetRanges.set(offsets); -
return rdd; -
} -
}); -
// output -
javaDStream.foreachRDD(new Function<JavaRDD<String>, Void>() { -
public Void call(JavaRDD<String> v1) throws Exception { -
if (v1.isEmpty()) return null; -
//处理rdd数据,这里保存数据为hdfs的parquet文件 -
HiveContext hiveContext = SQLContextSingleton.getHiveContextInstance(v1.context()); -
DataFrame df = hiveContext.jsonRDD(v1); -
df.save("/offset/test", "parquet", SaveMode.Append); -
for (OffsetRange o : offsetRanges.get()) { -
// 封装topic.partition 与 offset对应关系 java Map -
TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition()); -
Map<TopicAndPartition, Object> topicAndPartitionObjectMap = new HashMap<TopicAndPartition, Object>(); -
topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset()); -
// 转换java map to scala immutable.map -
scala.collection.mutable.Map<TopicAndPartition, Object> testMap = -
JavaConversions.mapAsScalaMap(topicAndPartitionObjectMap); -
scala.collection.immutable.Map<TopicAndPartition, Object> scalatopicAndPartitionObjectMap = -
testMap.toMap(new Predef.$less$colon$less<Tuple2<TopicAndPartition, Object>, Tuple2<TopicAndPartition, Object>>() { -
public Tuple2<TopicAndPartition, Object> apply(Tuple2<TopicAndPartition, Object> v1) { -
return v1; -
} -
}); -
// 更新offset到kafkaCluster -
kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"), scalatopicAndPartitionObjectMap); -
// System.out.println( -
// o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() -
// ); -
} -
return null; -
} -
}); -
jssc.start(); -
jssc.awaitTermination(); -
} -
}