Kafka的体系结构

 

/**

* 生产者

*/

public class TestProducer {

public static void main(String[] args) throws Exception {

Properties props = new Properties();

props.put("bootstrap.servers", "node4:9092,node2:9092,node3:9092");

props.put("key.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(

props);

for (int i = 0; i < 100000; i++) {

ProducerRecord<String, String> km = new ProducerRecord<String, String>(

"itheima01", "this is a msg=====" + i);producer.send(km);

producer.flush();

if (i % 100 == 0 && i != 0) {

Thread.sleep(10000);

}

}

}

}

 

 

---------消费者------------------------

public class TestConsumer {

public static void main(String[] args) throws Exception {

Properties props = new Properties();

// 指定 kafka 集群地址

props.put("bootstrap.servers", "node2:9092,node3:9092,node4:9092");

// 指定消费者组 id

props.put("group.id", "test1");

// key 的序列化类

props.put("key.deserializer",

"org.apache.kafka.common.serialization.StringDeserializer");

// value 的序列化类

props.put("value.deserializer",

"org.apache.kafka.common.serialization.StringDeserializer");

// 通过配置,创建消费者

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(

props);

//订阅消费

kafkaConsumer.subscribe(Arrays.asList("itheima01"));

while (true) {

//消费数据

ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);

for (ConsumerRecord<String, String> record : records) {

if (record.partition() == 0) {String topic = record.topic();

int partition = record.partition();

long offset = record.offset();

String key = record.key();

String value = record.value();

System.out.println("topic" + topic);

System.out.println("partition" + partition);

System.out.println("offset" + offset);

System.out.println("key" + key);

System.out.println("value" + value);

System.out.println("------------------------------");

}

}

}

}

}

 

 

---------------------------------

 

与Spark Streaming实时对接

 

 

package test

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{HashPartitioner, SparkConf, TaskContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
 * @author:keffer
 * @nowTime:2019/8/31
 */
object Kafka2Streaming {
  System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.1")
  def main(args: Array[String]): Unit = {
    val checkpointDir = "d://cp-20190715-2"
    val ssc = StreamingContext.getOrCreate(checkpointDir,
      () => {
        createContext()
      })
    ssc.start()
    ssc.awaitTermination()
  }

  // 创建上下文并计算,返回计算后的上下文
  def createContext(): StreamingContext = {
    // 创建上下文
    val sparkConf = new SparkConf()
      .setAppName("loadkafkadatatest").setMaster("local[2]").set("spark.testing.memory", "512000000")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    // 需要checkpoint
    ssc.checkpoint("d://cp-20190716-4")

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "NODE01:9092,NODE02:9092,NODE03:9092",
      // kafka的key和value的解码方式
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "ts02",
      // 消费位置
      "auto.offset.reset" -> "earliest",
      // 如果value合法,自动提交offset
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    val topics = Array("charge")

    val messages: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
      )

    /**
     * 数据操作
     */
    // 查看数据信息
        messages.foreachRDD(rdd=> {
          val offsetranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd.foreachPartition(it => {
            it.foreach(line => {
              val ranges = offsetranges(TaskContext.get().partitionId())
              println(line)
              println(ranges.topic)
              println(ranges.topicPartition())
              println(ranges.fromOffset)
              println(ranges.untilOffset)
              println(ranges.partition)
            })
          })
        })

    //    // 对数据进行单词计数
    //    // 因为DStream里的key是offset值,把DStream里的value数据取出来
    val lines: DStream[String] = messages.map(_.value())
    val tup = lines.flatMap(_.split(" ")).map((_, 1))
    val res: DStream[(String, Int)] = tup.updateStateByKey(
      func, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    res.print()

    ssc
  }

  val func = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
    it.map {    //里面使用模式匹配,则要用{}括号
      case (x, y, z) => {
        (x, y.sum + z.getOrElse(0))
      }
    }
  }
}

相关文章:

  • 2022-12-23
  • 2022-01-02
  • 2021-11-18
  • 2021-11-26
  • 2022-01-08
  • 2021-07-29
猜你喜欢
  • 2021-12-26
  • 2022-12-23
  • 2021-12-18
  • 2021-05-07
  • 2021-09-14
  • 2022-12-23
  • 2021-12-04
相关资源
相似解决方案