前言
消息队列的主要有3大作用
进程通信(IPC):Interprocess Communication
程序解耦:程序由异步变为了异步,提升程序并发(规避IO等待时间)能力。
数据流量削峰:把消息暂时缓冲在消息队列里面。
NSQ传递的消息通常是无序的,当然你也可以保留下信息去check时间戳,因此NSQ更适合处理数据量大但是彼此间没有顺序关系的消息。
消息队列的2种消息传递的模式:
1.点对点模式(queue)
消息生产者生成消息发送到queue中,然后消费者从queue中主动获取数据进行消费。
当1条消息被消费之后,这条消息就会从queue中消失,不存在重复消费。
golang中的channel就是这种模型。
2.发布和订阅者(topic)
生产者生成消息时把消息分类成不同Topic,消费者通过订阅(subcrible)这些Topic,把不同的消息获出来消费。
发布和订阅模式和点对点模式最大的区别:
引入 Publish---> Topic<--->Subcrible概念,Topic把消息分类了意味着生产端可以有多个不同的生成者生成不同的消息,消费端也可有多个不同的消费者订阅不同的Topic。
这种设计模式为生产和消费2端都提供了扩展/收缩的弹性空间 。
在发布和订阅这模式中,消费者端从消息队列获取数据的方式也有2种:
2.1.队列主动推送消息到消费端(供大于求)
2.2.消费者主动去队列拉取消息 (求大于供)
kafka概念
kafka是由Linkedln公司开发的用于处理公司内部海量日志传输问题的,由Scala和大数据实时处理场景。
kafka具有高吞吐、低延迟、容错率高的特点。
kafka架构
从宏观角度来看kafka就是1个非常粗大管子。在这个大管子的两端有2中角色生产者(producer)和消费者(consumer)。
broker:kafka的分布式就体现在kafka集群中可以灵活扩展broker(kafka集群中的1个节点,服务器)方便我们对kafka集群进行弹性扩展。broker-id不在集群里不能重复。
topic:对消息的分类。
partion:每个Topic可以有N个partion,同1个Topic的数据分布在不同Partion且数据是不重复的,所以partion实现了topic数据的负载均衡,partion的表现形式就是1个个的文件夹。
每个partion类似于Python里面的list,来1条消息apend进去,保证了消息的顺序。
这个list中的每条消息都会分配1个的index(offset偏移量),offset保证消息快速读取,而不是遍历随机读,这也是kafka读取消息快的原因。
follower:每个leader partion(主分区)都有多个 follower pation(副本分区)也就是是备胎(follower),follower实现partion的备份。
当leader partion故障时kafak会选择1个follower partion成为leader partion,kafaka中主分区可以设置的最大的副本数量为10,leader和folloer partion不能在同一个服务器上,followers的数量也不能大于brokers(1个kafka集群中服务器)的数量。
consumer group: 多个消费者组成1个组对同1个topc的消息进行消费(增加了kafka的消费能力)
在同1个consumer group中1个consumer可消费多个分区,但是1个分区只能被consumer group里其中1个consumer消费。
也就是说只要N个consumer在同1 consumer group中,它们消费的数据永远是不一致的。这也是kafka的partion和NSQ的channel之间的本质区别!
producer写数据到kafka的工作流程
1.producer先从kafka集群中获取分区的leader信息
2.producer将消息发送给leader
3.leader将消息写入本地磁盘
4.follower从leader分区拉取消息数据
5.follower将消息写入本地磁盘,向leader partion发送ACK确定.
6.leader收到所有follower的ACK之后,向Producer发生ACK确定。
ps:ack应答机制保证了Producer数据写入的可靠性。
关于以上步骤细节:
producer选择Leader partion的原则?
在kafka中1个topic对应多个leader分区,producer在获取leader信息的时候是如何选择其中1个leader的呢?
1.根据producer指定leader分区来帮它存储数据。(Producer指定了Partion)
2.如果producer没有指定特定的partion那么会根据producer设置的key,hash出1个值自动判断选择哪个leader partion.(Producer指定了Key)
3.如果既没有指定partion也没有指定key就采取轮询的方式写入到不同的partion(轮着来雨露均沾)
producer往kafka发送数据成功之后,ACK应答机制都也哪些?
producer在向kafka发布消息时,可以设置消息数据写入kafka成功之后是否需要ACK应答机制?有以下3个参数!
0:producer往集群中发布数据不需要等待kafka的ACK。(可靠性低,效率高!)
1:代表producer要求只需要leader进行ACK,follower从leader拉取数据的时候就别ACK了。(折中方案)
all:producer--->leader----->leader下面所有的follower数据接收成功后都需要 ACK应答。(可靠性高,效率低)
ps:如果producer往不存在的Topic中发送数据时,kafka会自动创建该Topic,partion和replication的数量默认配置都为1.
安装kafka
1.下载二进制包
二进制包下载之后,根据操作系统 执行kafka和zookeeper的启动脚本即可
[root@zhanggen bin]# ls connect-distributed.sh kafka-producer-perf-test.sh connect-mirror-maker.sh kafka-reassign-partitions.sh connect-standalone.sh kafka-replica-verification.sh kafka-acls.sh kafka-run-class.sh kafka-broker-api-versions.sh kafka-server-start.sh kafka-configs.sh kafka-server-stop.sh kafka-console-consumer.sh kafka-streams-application-reset.sh kafka-console-producer.sh kafka-topics.sh kafka-consumer-groups.sh kafka-verifiable-consumer.sh kafka-consumer-perf-test.sh kafka-verifiable-producer.sh kafka-delegation-tokens.sh trogdor.sh kafka-delete-records.sh windows kafka-dump-log.sh zookeeper-security-migration.sh kafka-leader-election.sh zookeeper-server-start.sh kafka-log-dirs.sh zookeeper-server-stop.sh kafka-mirror-maker.sh zookeeper-shell.sh kafka-preferred-replica-election.sh [root@zhanggen bin]#
2.启动zookeeper
zookeeprt是kafka集群中注册、自动发现服务,类似于NSQ中的Lookupd。kafka的二进制包包含了zookeeper无需单独下载
zookeeper配置kafka集群信息
tickTime=2000 dataDir=/home/myname/zookeeper clientPort=2181 initLimit=5 syncLimit=2 server.1=192.168.229.160:2888:3888 server.2=192.168.229.161:2888:3888 server.3=192.168.229.162:2888:3888