kafka→分布式发布-订阅消息系统
一、kafka简介
1.kafka是什么?
- 最初由 LinkedIn 公司开发,使用 Scala语言编写,之后成为 Apache 项目的一部分。
- 是一个分布式的,可划分的,多订阅者,冗余备份的持久性的服务。
- 同时为发布和订阅提供高吞吐量。每秒可以生产约 25 万消息(50 MB),每秒处理 55 万消息(110 MB)。
- kafka的优点
- 降低系统组网复杂度。
- 降低编程复杂度,各个子系统不再是相互协商接口,各个子系统类似插口插在插座上,Kafka 承担高速数据总线的作用。
- 分布式系统,易于向外扩展。所有的 producer、broker 和 consumer 都会有多个,均为分布式的。无需停机即可扩展机器。
- 可进行持久化操作。将消息持久化到磁盘,可以防止数据丢失。
2.kafka核心概念
- 消息的发布(publish)称做producer。
- 消息的订阅(subscribe)称做consumer。
- 中间的存储阵列称做broker。
- 多个broker协同合作,producer、consumer和broker三者之间通过zookeeper来协调请求和转发。
- producer产生和推送(push)数据到broker,consumer从broker拉取(pull)数据并进行处理。
3.kafka的应用场景?
- 处理活跃的流式数据。
- 同时处理在线应用(消息)和离线应用(数据文件,日志)。
- 大数据系统中的数据流转。
4.其他消息队列介绍
- RabbitMQ:支持的协议多,非常重量级消息队列,对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持。
- ZeroMQ:号称最快的消息队列系统,尤其针对大吞吐量的需求场景,擅长的高级/复杂的队列,但是技术也复杂,并且只提供非持久性的队列。
- ActiveMQ:Apache下的一个子项,类似ZeroMQ,能够以代理人和点对点的技术实现队列。
- Redis:是一个key-Value的NOSql数据库,但也支持MQ功能,数据量较小,性能优于RabbitMQ,数据超过10K就慢的无法忍受。
5.kafka版本下载
- http://kafka.apache.org/downloads
- 下载最新版本稳定版的安装包,如下图所示。
二、kafka安装
1.软件环境
- CentOS7
- JDK1.8
- kafka10.1.1
2.安装准备
安装zookeeper单机版并启动
3.kafka安装和启动
- 输入 mkdir /usr/local/kafka建立kafka目录
- 将下载好的tar包放到该目录
- 上传tar包到 /usr/local/kafka 并解压,进入解压后的目录
- 自定义一个放kafka日志的目录,例如我这里在kafka解压目录下新建一个log目录
输入mkdir /usr/local/kafka/log
- 输入cp /usr/local/kafka/kafka_2.11-0.10.1.1/config/server.properties /usr/local/kafka/kafka_2.11-0.10.1.1/config/server.properties.bak
- vi /usr/local/kafka/kafka_2.11-0.10.1.1/config/server.properties
输入以下内容:
|
# broker的全局唯一编号,不能重复 broker.id=0 # 用来监听链接的端口,producer或consumer将在此端口建立连接 port=9092 # 处理网络请求的线程数量 num.network.threads=3 # 用来处理磁盘IO的线程数量 num.io.threads=8 # 发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 # 接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 # 请求套接字的缓冲区大小 socket.request.max.bytes=104857600 # kafka运行日志存放的路径 log.dirs=/usr/local/kafka/log # topic在当前broker上的分片个数 num.partitions=2 # 用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 # segment文件保留的最长时间,超时将被删除 log.retention.hours=168 # 滚动生产新的segment文件的最大时间 log.retention.hours=168 # zookeeper集群的地址,可以是多个,多个之间用逗号分割 zookeeper.connect=master:2181,node1:2181,node2:2181 #ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大zookeeper.connection.timeout.ms=6000 |
- 在master主机上,任意目录输入
scp -r /usr/local/kafka/ [email protected]:/usr/local/将文件分发到节点node1上
scp -r /usr/local/kafka/[email protected]:/usr/local/将文件分发到节点node2上
- 再次修改配置文件
依次修改各服务器上配置文件的的broker.id,分别是0,1,2不得重复。
- 输入cp /usr/local/kafka/kafka_2.11-0.10.1.1/config/server.properties /usr/local/kafka/kafka_2.11-0.10.1.1/config/server.properties.bak
- vi /usr/local/kafka/kafka_2.11-0.10.1.1/config/server.properties
输入以下内容:
再次修改配置文件
输入cd /usr/local/kafka/kafka_2.11-0.10.1.1/config/进入config目录下 vi server.properties
依次修改各服务器上配置文件的的broker.id,分别是0,1,2不得重复。
- 依次在各节点上启动kafka
进入kafka的bin目录,cd /usr/local/kafka/kafka_2.11-0.10.1.1/bin/,输入 ./kafka-server-start.sh /usr/local/kafka/kafka_2.11-0.10.1.1/config/server.properties 启动kafka
- 克隆一个会话,可输入 ps aux|grep kafka 查看进程
4.kafka测试
- cd /usr/local/kafka/kafka_2.11-0.10.1.1/bin/
- 进入kafka的bin目录,输入 ./kafka-console-producer.sh --broker-list master:9092 --topic test 运行producer
- 再克隆一个会话,进入kafka的bin目录,输入 ./kafka-console-consumer.sh --zookeeper master:2181 --topic test --from-beginning 运行consumer
- 在producer窗口输入一些内容,然后查看consumer窗口,可以查看到输入的内容,证明kafka连通性没问题