/**
* 生产者
*/
public class TestProducer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "node4:9092,node2:9092,node3:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
props);
for (int i = 0; i < 100000; i++) {
ProducerRecord<String, String> km = new ProducerRecord<String, String>(
"itheima01", "this is a msg=====" + i);producer.send(km);
producer.flush();
if (i % 100 == 0 && i != 0) {
Thread.sleep(10000);
}
}
}
}
---------消费者------------------------
public class TestConsumer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
// 指定 kafka 集群地址
props.put("bootstrap.servers", "node2:9092,node3:9092,node4:9092");
// 指定消费者组 id
props.put("group.id", "test1");
// key 的序列化类
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// value 的序列化类
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// 通过配置,创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(
props);
//订阅消费
kafkaConsumer.subscribe(Arrays.asList("itheima01"));
while (true) {
//消费数据
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
if (record.partition() == 0) {String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();
System.out.println("topic:" + topic);
System.out.println("partition:" + partition);
System.out.println("offset:" + offset);
System.out.println("key:" + key);
System.out.println("value:" + value);
System.out.println("------------------------------");
}
}
}
}
}
---------------------------------
与Spark Streaming实时对接
package test
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{HashPartitioner, SparkConf, TaskContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author:keffer
* @nowTime:2019/8/31
*/
object Kafka2Streaming {
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.1")
def main(args: Array[String]): Unit = {
val checkpointDir = "d://cp-20190715-2"
val ssc = StreamingContext.getOrCreate(checkpointDir,
() => {
createContext()
})
ssc.start()
ssc.awaitTermination()
}
// 创建上下文并计算,返回计算后的上下文
def createContext(): StreamingContext = {
// 创建上下文
val sparkConf = new SparkConf()
.setAppName("loadkafkadatatest").setMaster("local[2]").set("spark.testing.memory", "512000000")
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 需要checkpoint
ssc.checkpoint("d://cp-20190716-4")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "NODE01:9092,NODE02:9092,NODE03:9092",
// kafka的key和value的解码方式
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "ts02",
// 消费位置
"auto.offset.reset" -> "earliest",
// 如果value合法,自动提交offset
"enable.auto.commit" -> (true: java.lang.Boolean)
)
val topics = Array("charge")
val messages: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
/**
* 数据操作
*/
// 查看数据信息
messages.foreachRDD(rdd=> {
val offsetranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition(it => {
it.foreach(line => {
val ranges = offsetranges(TaskContext.get().partitionId())
println(line)
println(ranges.topic)
println(ranges.topicPartition())
println(ranges.fromOffset)
println(ranges.untilOffset)
println(ranges.partition)
})
})
})
// // 对数据进行单词计数
// // 因为DStream里的key是offset值,把DStream里的value数据取出来
val lines: DStream[String] = messages.map(_.value())
val tup = lines.flatMap(_.split(" ")).map((_, 1))
val res: DStream[(String, Int)] = tup.updateStateByKey(
func, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
res.print()
ssc
}
val func = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
it.map { //里面使用模式匹配,则要用{}括号
case (x, y, z) => {
(x, y.sum + z.getOrElse(0))
}
}
}
}