1. Kafka是什么
1.1 Kafka是什么?三个定义
- Kafka是一个消息队列(生产者消费者模式)
- Kafka目标:构建企业中统一的、高通量、低延时的消息平台
- 大多数的消息队列(消息中间件)都是基于JMS标准实现的,Kafka类似于JMS的实现。
1.2 Kafka有什么用?(消息队列有什么用?)
- 作为缓冲,来异构、解耦系统
- 用户注册需要完成多个步骤,每个步骤执行都需要很长时间。代表用户等待时间是所有步骤的累计时间。
- 为了减少用户等待的时间,使用并行执行,有多少个步骤,就开启多少个线程来执行。代表用户等待时间是所有步骤中耗时最长的那个步骤时间。
-有了新得问题:开启多线程执行每个步骤,如果以一个步骤执行异常,或者严重超时,用户等待的时间就不可控了。 - 通过消息队列来保证。
- 注册时,立即返回成功。
- 发送注册成功的消息到消息平台。
- 对注册信息感兴趣的程序,可以消息消息。
2. Kafka的基本架构
Kafka Cluster:由多个服务器组成。每个服务器单独的名字broker(掮客)。
Kafka Producer:生产者、负责生产数据。
Kafka consumer:消费者、负责消费数据。
Kafka Topic: 主题,一类消息的名称。存储数据时将一类数据存放在某个topic下,消费数据也是消费一类数据。
订单系统:创建一个topic,叫做order。
用户系统:创建一个topic,叫做user。
商品系统:创建一个topic,叫做product。
注意:Kafka的元数据都是存放在zookeeper中。
3. 搭建Kafka集群
3.1 准备3台虚拟机
192.168.140.128 kafka01
192.168.140.129 kafka02
192.168.140.130 kafka03
3.2 初始化环境
1)安装jdk、安装zookeeper
2)安装目录
安装包存放的目录:/export/software
安装程序存放的目录:/export/servers
数据目录:/export/data
日志目录:/export/logs
mkdir -p /export/servers/
mkdir -p /export/software /
mkdir -p /export/data /
mkdir -p /export/logs /
3)安装用户
安装hadoop,会创建一个hadoop用户
安装kafka,创建一个kafka用户
或者 创建bigdata用户,用来安装所有的大数据软件。
本例:使用root用户
4)验证环境
- jdk环境
- zookeeper环境
zkServer.sh status
4.3 搭建Kafka集群
4.3.1 准备安装包
由于kafka是scala语言编写的,基于scala的多个版本,kafka发布了多个版本。
其中2.11是推荐版本。
4.3.2 下载安装包及解压
tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers/
cd /export/servers/
rm -rf /export/servers/kafka
rm -rf /export/logs/kafka/
rm -rf /export/data/kafka
mv kafka_2.11-1.0.0 kafka
1)解压文件
2)删除之前的安装记录
3)重命名
4.3.3 查看目录及修改配置文件
- 查看目录
- 修改配置文件
进入配置目录,查看server.properties文件
cat server.properties |grep -v “#”
通过以上命令,查看到了默认的配置文件,对默认的文件进行修改。
修改三个地方
1)Borker.id
2)数据存放的目录,注意目录如果不存在,需要新建下。
3)zookeeper的地址信息
# broker.id 标识了kafka集群中一个唯一broker。
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 存放生产者生产的数据 数据一般以topic的方式存放
# 创建一个数据存放目录 /export/data/kafka --- mkdir -p /export/data/kafka
log.dirs=/export/data/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# zk的信息
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
4.3.4 分发配置文件及修改brokerid
将修改好的配置文件,分发到node02,node03上。
- 先在node02、node03上删除以往的安装记录
rm -rf /export/servers/kafka
rm -rf /export/logs/kafka/
rm -rf /export/data/kafka
- 分发安装包
scp -r /export/servers/kafka/ node02:/export/servers/
scp -r /export/servers/kafka/ node03:/export/servers/
vi /export/servers/kafka/config/server.properties
vi /export/servers/kafka/config/server.properties
4.3.5 启动集群
cd /export/servers/kafka/bin
./kafka-server-start.sh /export/servers/kafka/config/server.properties
4.3.6 查看Kafka集群
由于kafka集群并没有UI界面可以查看。
需要借助外部工具,来查看卡夫卡的集群
这个工具是一个java程序,必须要安装好JDK