Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。主要应用场景是:日志收集系统和消息系统。
Kafka主要设计目标如下:
- 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
- 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
- 同时支持离线数据处理和实时数据处理。
- Scale out:支持在线水平扩展
消息系统介绍
一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需关注于数据,无需关注数据在两个或多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布-订阅模式。Kafka就是一种发布-订阅模式。
点对点消息传递模式
在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费者消费队列中的数据。但是一条消息只能被消费一次。当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式即使有多个消费者同时消费数据,也能保证数据处理的顺序。这种架构描述示意图如下:
生产者发送一条消息到queue,只有一个消费者能收到。
发布-订阅消息传递模式
在发布-订阅消息系统中,消息被持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。该模式的示例图如下:
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。
Kafka中的术语
概述
在深入理解Kafka之前,先介绍一下Kafka中的术语。下图展示了Kafka的相关术语以及之间的关系
broker
Kafka 集群包含一个或多个服务器,服务器节点称为broker。
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,类似于数据库的表名。
Partition
topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
Producer
生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
Consumer
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
Leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
Follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。
环境部署
Zookeeper的安装
首页:Apache ZooKeeper
安装:
# 解压 tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz # 修改配置文件 cd conf cp coo_sample.cfg zoo.cfg vim zoo.cfg #启动 bin/zkServer.sh start #查看 jps #状态查看 bin/zkServer.sh status #停止 bin/zkServer.sh stop #启动客户端 bin/zkCli.sh #退出 quit
- tickTime = 2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
- initLimit = 10:LF初始通信时限,Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)
- syncLimit = 5:LF同步通信时限,Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follwer死 掉,从服务器列表中删除Follwer。
- dataDir:保存Zookeeper中的数据,注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录。
- clientPort = 2181:客户端连接端口,通常不做修改。
Kafka的安装
官网:Apache Kafka
#解压 tar -zxvf kafka_2.11-2.4.0.tgz #修改配置文件 cd config vim server.properties # 修改以下配置 #broker.id属性在kafka集群中必须要是唯⼀ broker.id=0 #kafka部署的机器ip和提供服务的端⼝号(内网) #listeners=PLAINTEXT://服务器地址:9092 #阿里云外网 advertised.listeners=PLAINTEXT://阿里云地址:9092 #kafka的消息存储⽂件 log.dir=/usr/local/data/kafka-logs #kafka连接zookeeper的地址 zookeeper.connect=192.168.65.60:2181 #是否可以删除 delete.topic.enable=true # 启动 cd bin ./kafka-server-start.sh -daemon ../config/server.properties # 检查是否启动 jps #查看端口问题 netstat -an | grep 9092 #或者 lsof -i:9092 # 防火墙开发端口 firewall-cmd --zone=public --add-port=9092/tcp --permanent firewall-cmd --reload #停止kafka ./kafka-server-stop.sh ../config/server.properties
基本命令
创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my1
查看创建的topic
./kafka-topics.sh --list --zookeeper localhost:2181
删除某个topic
删除topic的前提是需要将kafka的消费者和生产者停止
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic my
查看某个topic的信息
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic: my-replicated-topic PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
发送消息
./kafka-console-producer.sh --broker-list 服务器地址:9092 --topic my1
接收消息
# 从头消费 ./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --topic my --from-beginning # :从当前主题中的最后⼀条消息的offset(偏移量位置)+1开始消费 ./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --topic my
消息的有序性
- ⽣产者将消息发送给broker,broker会将消息保存在本地的⽇志⽂件中
- 消息的保存是有序的,通过offset偏移量来描述消息的有序性
- 消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置
消费者组
单播消费
在⼀个kafka的topic中,启动两个消费者,⼀个⽣产者,问:⽣产者发送消息,这条消息是否 同时会被两个消费者消费? 如果多个消费者在同⼀个消费组,那么只有⼀个消费者可以收到订阅的topic中的消息。换⾔ 之,同⼀个消费组中只能有⼀个消费者收到⼀个topic中的消息。
./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --consumer-property group.id=testGroup --topic my --from-beginning1
多播消费
不同的消费组订阅同⼀个topic,那么不同的消费组中只有⼀个消费者能收到消息。实际上也 是多个消费组中的多个消费者收到了同⼀个消息。
./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --consumer-property group.id=testGroup01 --topic my --from-beginning ./kafka-console-consumer.sh --bootstrap-server 服务器地址:9092 --consumer-property group.id=testGroup02 --topic my --from-beginning
查看消费组的信息
./kafka-consumer-groups.sh --bootstrap-server 服务器地海:9092 --describe --group testGroup
重点关注以下⼏个信息:
- current-offset: 最后被消费的消息的偏移量
- Log-end-offset: 消息总量(最后⼀条消息的偏移量)
- Lag:积压了多少条消息
主题和分区的概念
主题
主题-topic在kafka中是⼀个逻辑的概念,kafka通过topic将消息进⾏分类。不同的topic会被订阅该topic的消费者消费。但是有⼀个问题,如果说这个topic中的消息⾮常⾮常多,多到需要⼏T来存,因为消息是会被保存到log⽇志⽂件中的。为了解决这个⽂件过⼤的问题,kafka提出了Partition分区的概念
分区
通过partition将⼀个topic中的消息分区来存储。这样的好处有多个:
- 分区存储,可以解决统⼀存储⽂件过⼤的问题
- 提供了读写的吞吐量:读和写可以同时在多个分区中进⾏
./kafka-topics.sh --create --zookeeper localhost:2181 --replicationfactor 1 --partitions 2 --topic test1
日志信息
- 00000.log: 这个⽂件中保存的就是消息
- __consumer_offsets-49: kafka内部⾃⼰创建了__consumer_offsets主题包含了50个分区。这个主题⽤来存放消费 者消费某个主题的偏移量。因为每个消费者都会⾃⼰维护着消费的主题的偏移量,也就是 说每个消费者会把消费的主题的偏移量⾃主上报给kafka中的默认主题: consumer_offsets。
- 因此kafka为了提升这个主题的并发性,默认设置了50个分区。 提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets 主题的分区数 提交到该主题中的内容是:key是consumerGroupId+topic+分区号,value就是当前 offset的值 ⽂件中保存的消息,默认保存7天。
- 七天到后消息会被删除。
集群搭建
Zookeeper集群的搭建
- 注意开放端口,以及关闭防火墙
- ip:2181,ip:2182,ip:2183
- 修改配置文件
cd conf #修改配置文件 vim zoo.cfg # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/environment/zookeeper/apache-zookeeper-3.6.3-bin/data_log # the port at which the clients will connect clientPort=2182 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 ## Metrics Providers # # https://prometheus.io Metrics Exporter #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider #metricsProvider.httpPort=7000 #metricsProvider.exportJvmInfo=true server.1=ip1:2888:3888 server.2=ip2:2888:3888 server.3=ip3:2888:3888 quorumListenOnAllIPs=true #启动zookeeper,修改其他机器的配置文件 bin/zkServer.sh start、 # 等待一下,查看选举状态 bin/zkServer.sh status [root@shu apache-zookeeper-3.6.3-bin]# bin/zkServer.sh status /usr/bin/java ZooKeeper JMX enabled by default Using config: /environment/zookeeper/apache-zookeeper-3.6.3-bin/bin/../conf/zoo.cfg Client port found: 2182. Client address: localhost. Client SSL: false. Mode: leader [root@shu apache-zookeeper-3.6.3-bin]# # 问题:端口开发问题,防火墙问题 # 防火墙开发端口 firewall-cmd --zone=public --add-port=2182/tcp --permanent firewall-cmd --reload #关闭防火墙 systemctl stop firewalld
Kafka集群的搭建
- 注意开放端口,以及关闭防火墙
- ip:9092,ip:9093,ip:9094
- 修改配置文件
cd config #修改配置文件 vim server.properties #修改zookeeper连接 zookeeper.connect=ip:2181,ip:2182,ip:2183 # 分布修改三台的机器的配置文件,并启动 #broker.id属性在kafka集群中必须要是唯⼀ broker.id=0 ./kafka-server-start.sh -daemon ../config/server.properties # 检查是否启动 jps #查看端口问题 netstat -an | grep 9092 #或者 lsof -i:9092 # 防火墙开发端口 firewall-cmd --zone=public --add-port=9092/tcp --permanent firewall-cmd --reload #停止kafka ./kafka-server-stop.sh ../config/server.properties # 验证,我们在lead机器上面创建一个topic ./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic my #查看其余机器上的topic [root@xlc bin]# ./kafka-topics.sh --list --zookeeper localhost:2183 my [root@shu bin]# ./kafka-topics.sh --list --zookeeper localhost:2181 my
副本的概念
副本是为了为主题中的分区创建多个备份,多个副本在kafka集群的多个broker中,会有⼀个 副本作为leader,其他是follower(就是备份)。
# 创建topic ./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 3 --partitions 2 --topic my-replicated-topic # 查看topic详细信息 ./kafka-topics.sh --describe --zookeeper localhost:2182 --topic my-replicated-topic [root@shu bin]# ./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 3 --partitions 2 --topic my-replicated-topic Created topic my-replicated-topic. [root@shu bin]# ./kafka-topics.sh --describe --zookeeper localhost:2182 --topic my-replicated-topic Topic: my-replicated-topic PartitionCount: 2 ReplicationFactor: 3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
- leader: kafka的写和读的操作,都发⽣在leader上。
- leader负责把数据同步给follower。当leader挂 了,经过主从选举,从多个follower中选举产⽣⼀个新的leader follower 接收leader的同步的数据
- isr:可以同步和已同步的节点会被存⼊到isr集合中。这⾥有⼀个细节:如果isr中的节点性能较差,会被提出isr集合。
- 集群中有多个broker,创建主题时可以指明主题有多个分区(把消息拆分到不同的分区中存 储),可以为分区创建多个副本,不同的副本存放在不同的broker⾥。
集群消费
我们在leader服务器中,创建主体,发送消息
# 创建topic ./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 3 --partitions 2 --topic my-replicated-topic # 查看topic信息 ./kafka-topics.sh --describe --zookeeper localhost:2182 --topic my-replicated-topic # 创建消息 ./kafka-console-producer.sh --broker-list ip:9093 --topic my-replicated-topic >nihao
其余机器接受消息
./kafka-console-consumer.sh --bootstrap-server ip:9092 --topic my-replicated-topic ./kafka-console-consumer.sh --bootstrap-server ip:9093 --topic my-replicated-topic2
集群消费组命令,参考前面的消费者组命令。⼀个partition只能被⼀个消费组中的⼀个消费者消费,⽬的是为了保证消费的顺序性,但 是多个partion的多个消费者消费的总的顺序性是得不到保证的,那怎么做到消费的总顺 序性呢?
partition的数量决定了消费组中消费者的数量,建议同⼀个消费组中消费者的数量不要超 过partition的数量,否则多的消费者消费不到消息。
集群中的controller
集群中谁来充当controller 每个broker启动时会向zk创建⼀个临时序号节点,获得的序号最⼩的那个broker将会作为集群中的controller。它负责这么⼏件事: 当集群中有⼀个副本的leader挂掉,需要在集群中选举出⼀个新的leader,选举的规则是 从isr集合中最左边获得。当集群中有broker新增或减少,controller会同步信息给其他broker 当集群中有分区新增或减少,controller会同步信息给其他broker。
rebalance机制
- 前提:消费组中的消费者没有指明分区来消费。
- 触发的条件:当消费组中的消费者和分区的关系发⽣变化的时候。
- 分区分配的策略:在rebalance之前,分区怎么分配会有这么三种策略。
- range:根据公示计算得到每个消费消费哪⼏个分区:前⾯的消费者是分区总数/消费 者数量+1,之后的消费者是分区总数/消费者数量。
- 轮询:⼤家轮着来。
- sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之 前的分配情况。如果这个策略没有开,那么就要进⾏全部的重新分配。建议开启。
HW和LEO
LEO是某个副本最后消息的消息位置(log-end-offset)。HW是已完成同步的位置。消息在写⼊broker时,且每个broker完成这条消息的同步后,hw 才会变化。在这之前消费者是消费不到这条消息的。在同步完成之后,HW更新之后,消费者 才能消费到这条消息,这样的⽬的是防⽌消息的丢失。
Java代码实现
消息生产者
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency>
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; public class MySimpleProducer { private final static String TOPIC_NAME = "my-replicated-topic"; public static void main(String[] args) throws ExecutionException, InterruptedException { //1.设置参数 Properties props = new Properties(); //领导者主机 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9093"); //把发送的key从字符串序列化为字节数组 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //把发送消息value从字符串序列化为字节数组 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //2.创建⽣产消息的客户端,传⼊参数 Producer<String,String> producer = new KafkaProducer<String, String>(props); //3.创建消息 //key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容 ProducerRecord<String,String> producerRecord = new ProducerRecord<>(TOPIC_NAME,"value","hello-kafka-ok"); //4.发送消息,得到消息发送的元数据并输出 RecordMetadata metadata = producer.send(producerRecord).get(); System.out.println( "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset()); } }
⽣产者中的ack的配置
同步
-
ack = 0 kafka-cluster:不需要任何的broker收到消息,就⽴即返回ack给⽣产者,最容易 丢消息的,效率是最⾼的 -
ack=1(默认): 多副本之间的leader已经收到消息,并把消息写⼊到本地的log中,才 会返回ack给⽣产者,性能和安全性是最均衡的 -
ack=-1/all:⾥⾯有默认的配置min.insync.replicas=2(默认为1,推荐配置⼤于等于2), 此时就需要leader和⼀个follower同步完后,才会返回ack给⽣产者(此时集群中有2个 broker已完成数据的接收),这种⽅式最安全,但性能最差。
props.put(ProducerConfig.ACKS_CONFIG, "1"); /* 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造 成消息重复发送,⽐如⽹络抖动,所以需要在 接收者那边做好消息接收的幂等性处理 */ props.put(ProducerConfig.RETRIES_CONFIG, 3); //重试间隔设置 props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
-
producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
-
producer 将消息发送给该 leader
-
leader 将消息写入本地 log
-
followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
-
leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
异步
异步发送,⽣产者发送完消息后就可以执⾏之后的业务,broker在收到消息后异步调⽤⽣产 者提供的callback回调⽅法。但是容易造成消息丢失。
//异步发送消息 producer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("发送消息失败:" +exception.getStackTrace()); } if (metadata != null) { System.out.println("异步⽅式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset()); } } });
消息缓冲区
如上图所示:
- kafka默认会创建⼀个消息缓冲区,⽤来存放要发送的消息,缓冲区是32m
- kafka本地线程会去缓冲区中⼀次拉16k的数据,发送到broker
- 如果线程拉不到16k的数据,间隔10ms也会将已拉到的数据发到broker
//缓存区默认大小 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); //拉取数据默认大小 props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); //如果数据未满16k,也提交 props.put(ProducerConfig.LINGER_MS_CONFIG,10);
消息消费者
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; /** * @Description 消费者 **/ public class MySimpleConsumer { //主题名 private final static String TOPIC_NAME = "my-replicated-topic"; //分组 private final static String CONSUMER_GROUP_NAME = "testGroup"; public static void main(String[] args) { Properties props =new Properties(); //消息地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.104.223.187:9093"); //分组 props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME); //序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //自动提交,拉取到信息之后,立马提交偏移量给consumer_offset,保证顺序消费,但是会造成消息丢失问题 // 是否⾃动提交offset,默认就是true props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // ⾃动提交offset的间隔时间 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); //手动提交,当消费者消费消息完毕之后,返回偏移量 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置 // props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); //props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000); //1.创建⼀个消费者的客户端 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //2. 消费者订阅主题列表 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { /* * 3.poll() API 是拉取消息的⻓轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { //4.打印消息 System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } //所有的消息已消费完 if (records.count() > 0) {//有消息 // ⼿动同步提交offset,当前线程会阻塞直到offset提交成功 // ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了 consumer.commitSync();//=======阻塞=== 提交成功 } } } }
自动提交与手动提交
- 消费者⽆论是⾃动提交还是⼿动提交,都需要把所属的消费组+消费的某个主题+消费的某个 分区及消费的偏移量,这样的信息提交到集群的
_consumer_offsets主题⾥⾯,保证顺序。 - 自动提交:消费者poll消息下来以后就会⾃动提交offset,但是会造成消失丢失。
//自动提交,拉取到信息之后,立马提交偏移量给consumer_offset,保证顺序消费,但是会造成消息丢失问题 // 是否⾃动提交offset,默认就是true props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // ⾃动提交offset的间隔时间 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
- 手动提交:当消费者消费完毕之后,提交偏移量给
_consumer_offsets
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000); while (true) { /* * 3.poll() API 是拉取消息的⻓轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { //4.打印消息 System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } //所有的消息已消费完 if (records.count() > 0) {//有消息 // ⼿动同步提交offset,当前线程会阻塞直到offset提交成功 // ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了 consumer.commitSync();//=======阻塞=== 提交成功 } }
⻓轮询poll消息
- 默认情况下,消费者⼀次会poll500条消息。
//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); //⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); //如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢出消费组。将分区分配给其他消费者。-rebalance props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000); while (true) { /* * poll() API 是拉取消息的⻓轮询 */ ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d,offset = %d, key = %s,value = %s%n", record.partition(), record.offset(), record.key(), record.value()); }
- 如果⼀次poll到500条,就直接执⾏for循环 如果这⼀次没有poll到500条。
- 且时间在1秒内,那么⻓轮询继续poll,要么到500 条,要么到1s 如果多次poll都没达到500条,且1秒时间到了,那么直接执⾏for循环
- 如果两次poll的间隔超过30s,集群会认为该消费者的消费能⼒过弱,该消费者被踢出消 费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数, 让⼀次poll的消息条数少⼀点
心跳检查
消费者每隔1s向kafka集群发送⼼跳,集群发现如果有超过10s没有续约的消费者,将被踢出 消费组,触发该消费组的rebalance机制,将该分区交给消费组⾥的其他消费者进⾏消费。
//consumer给broker发送⼼跳的间隔时间 props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); //kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏ rebalance,把分区分配给其他消费者。 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
指定分区和偏移量、时间消费
- 分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
- 从头消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
- 指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0))); consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
- 指定时间消费,根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该 offset之后的消息开始消费。
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME); //从1⼩时前开始消费 long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; Map<TopicPartition, Long> map = new HashMap<>(); for (PartitionInfo par : topicPartitions) { map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime); } Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) { TopicPartition key = entry.getKey(); OffsetAndTimestamp value = entry.getValue(); if (key == null || value == null) continue; Long offset = value.offset(); System.out.println("partition-" + key.partition() + "|offset-" + offset); System.out.println(); //根据消费⾥的timestamp确定offset if (value != null) { consumer.assign(Arrays.asList(key)); consumer.seek(key, offset); } }
SpringBoot代码实现
依赖导入
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
配置文件编写
server.port=8080 #########kafka配置############# # lead机器 spring.kafka.bootstrap-servers=ip:9093 #########producer############ # ack spring.kafka.producer.acks=1 # 拉取大小 spring.kafka.producer.batch-size=16384 # 重试次数 spring.kafka.producer.retries=10 # 缓冲区大小 spring.kafka.producer.buffer-memory=33554432 # 序列化 spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #########consumer############ # 关闭自动提交 spring.kafka.consumer.enable-auto-commit=false # 消费组 spring.kafka.consumer.group-id=default-group # spring.kafka.consumer.auto-offset-reset=earliest # 反序列化 spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer= org.apache.kafka.common.serialization.StringDeserializer # 最大消息 spring.kafka.consumer.max-poll-records=500 spring.kafka.listener.ack-mode=manual_immediate # redis spring.redis.host=ip
服务端
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class KafkaProvide { private final static String TOPIC_NAME = "my-replicated-topic"; @Autowired private KafkaTemplate<String,String> kafkaTemplate; @RequestMapping("/send") public String sendMessage(){ kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!"); return "send success!"; } }
消费端
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.PartitionOffset; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component public class KafkaConsumer { /** * 单条消息消费 * @param record * @param ack */ @KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1") public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); //⼿动提交offset ack.acknowledge(); } /** * 其他分区消费配置 * @param record * @param ack */ @KafkaListener(groupId = "testGroup", topicPartitions = { @TopicPartition(topic = "topic1", partitions = {"0", "1"}), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) },concurrency = "3")//concurrency就是同组下的消费者个数,就是并发消费数,建议⼩于等于分区总数 public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println(value); System.out.println(record); //⼿动提交offset ack.acknowledge(); } }
消息实体类
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; import java.util.Date; /** * @Description 消息实体类 **/ @Data @AllArgsConstructor @NoArgsConstructor public class MsgInfo implements Serializable { private Long id; private String name; private Long msg; private Date time; }
测试主程序
import com.demo.demo.pojo.MsgInfo; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.Acknowledgment; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @Component public class KafkaTest { //topic private final static String TOPIC_NAME = "my-replicated-topic"; //程序执行的初始时间,只会保留一份 private static final AtomicLong lastRecieveMessage = new AtomicLong(System.currentTimeMillis()); //时间转换 private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //前缀 private static final String KEY_PREFIX = "test"; //缓存 private final List<ConsumerRecord<String,String>> DataList = new ArrayList<>(); //json private final Gson gson = new GsonBuilder().create(); //kafka @Autowired private KafkaTemplate<String,String> kafkaTemplate; /** * 消息接受者(每隔1分钟执行) */ @Scheduled(cron = "0 */1 * * * ?") public void Consumer() { long last = lastRecieveMessage.get(); long current = System.currentTimeMillis(); if ((current - last) > (60 * 1000)){ System.out.println(DataList); for (ConsumerRecord<String, String> consumerRecord : DataList) { MsgInfo info = gson.fromJson(consumerRecord.value(), MsgInfo.class); System.out.println("消息:"+info); } DataList.clear(); } } /** * 消息发送者(30s执行一次) */ @Scheduled(cron = "0/30 * * * * ? ") public void Provide(){ long last = lastRecieveMessage.get(); long current = System.currentTimeMillis(); if ((current - last) > (30 * 1000) ){ MsgInfo msgInfo=new MsgInfo(current-last,"测试",last,new Date()); kafkaTemplate.send(TOPIC_NAME,"test",gson.toJson(msgInfo)); } } /** * 单条消费 * @param record * @param ack */ @KafkaListener(topics = TOPIC_NAME,groupId = "MyGroup1") public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { DataList.add(record); //⼿动提交offset ack.acknowledge(); } }