kafka集群搭建
环境搭建参考:https://www.cnblogs.com/luotianshuai/p/5206662.html

1、软件环境

1、linux一台或多台,大于等于2

2、已经搭建好的zookeeper集群

3、软件版本kafka_2.11-0.9.0.1.tgz

2.解压

#解压软件
tar -zxvf kafka_2.11-0.9.0.1.tgz

3、修改配置文件

进入到config目录

[[email protected] config]# pwd
/opt/kafka_2.11-2.1.1/config
[[email protected] config]# ll
total 68
-rw-r--r--. 1 root root  906 Feb  8 10:30 connect-console-sink.properties
-rw-r--r--. 1 root root  909 Feb  8 10:30 connect-console-source.properties
-rw-r--r--. 1 root root 5321 Feb  8 10:30 connect-distributed.properties
-rw-r--r--. 1 root root  883 Feb  8 10:30 connect-file-sink.properties
-rw-r--r--. 1 root root  881 Feb  8 10:30 connect-file-source.properties
-rw-r--r--. 1 root root 1111 Feb  8 10:30 connect-log4j.properties
-rw-r--r--. 1 root root 2262 Feb  8 10:30 connect-standalone.properties
-rw-r--r--. 1 root root 1221 Feb  8 10:30 consumer.properties
-rw-r--r--. 1 root root 4727 Feb  8 10:30 log4j.properties
-rw-r--r--. 1 root root 1925 Feb  8 10:30 producer.properties
-rw-r--r--. 1 root root 7136 Apr 29 03:21 server.properties
-rw-r--r--. 1 root root 1032 Feb  8 10:30 tools-log4j.properties
-rw-r--r--. 1 root root 1169 Feb  8 10:30 trogdor.conf
-rw-r--r--. 1 root root 1023 Feb  8 10:30 zookeeper.properties

主要关注:server.properties 这个文件即可,我们可以发现在目录下:

有很多文件,这里可以发现有Zookeeper文件,我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群
参数解释:

broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口

实际的修改项为:

#broker.id=0  每台服务器的broker.id都不能相同


#hostname
host.name=192.168.7.100

#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#设置zookeeper的连接端口
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:12181

4.启动kafka集群进行测试

4.1启动服务

#从后台启动Kafka集群(3台都需要启动)
cd
/opt/kafka/kafka_2.11-0.9.0.1//bin #进入到kafka的bin目录 
./kafka-server-start.sh -daemon ../config/server.properties

如果需要前台打印启动日志,将-daemon去掉即可
后台运行也可以使用:

nohup ./kafka-server-start.sh ../config/server.properties &

4.2 检查是否启动成功

[[email protected] bin]# jps
3235 QuorumPeerMain
9539 Jps
8612 Kafka

4.3、创建Topic来验证是否创建成功

#创建Topic
./kafka-topics.sh --create --zookeeper 192.168.7.100:12181 --replication-factor 2 --partitions 1 --topic shuaige
#解释
--replication-factor 2   #复制两份
--partitions 1 #创建1个分区
--topic #主题为shuaige

'''在一台服务器上创建一个发布者'''
#创建一个broker,发布者
./kafka-console-producer.sh --broker-list 192.168.7.100:19092 --topic shuaige

'''在一台服务器上创建一个订阅者'''
最早是用下面这个命令:
./kafka-console-consumer.sh --zookeeper localhost:12181 --topic shuaige --from-beginning(执行不识别了)
但是现在好像改为使用:
./kafka-console-consumer.sh --bootstrap-server 192.168.7.100:19092 -topic shuaige--from-beginning

测试(在发布者那里发布消息看看订阅者那里是否能正常收到~):
4.3.1自己的例子:
生产者:

[[email protected] bin]# ./kafka-console-producer.sh --broker-list 192.168.197.129:9092 --topic 129tiancai
>12
>123
>

消费者:

[[email protected] bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.197.129:9092 -topic 129tiancai --from-beginning
12

4.3.2其他命令:
查看topic

./kafka-topics.sh --list --zookeeper localhost:12181
#就会显示我们创建的所有topic

查看topic状态

/kafka-topics.sh --describe --zookeeper localhost:12181 --topic shuaige
#下面是显示信息
Topic:ssports    PartitionCount:1    ReplicationFactor:2    Configs:
    Topic: shuaige    Partition: 0    Leader: 1    Replicas: 0,1    Isr: 1
#分区为为1  复制因子为2   他的  shuaige的分区为0 
#Replicas: 0,1   复制的为0,1
#

5.其他说明标注

5.1、日志说明

默认kafka的日志是保存在/opt/kafka/kafka_2.10-0.9.0.0/logs目录下的,这里说几个需要注意的日志

server.log #kafka的运行日志
state-change.log  #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里

controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.

5.2、上面的大家你完成之后可以登录zk来查看zk的目录情况

#使用客户端进入zk
./zkCli.sh -server 127.0.0.1:12181  #默认是不用加’-server‘参数的因为我们修改了他的端口

#查看目录情况 执行“ls /”
[zk: 127.0.0.1:12181(CONNECTED) 0] ls /

#显示结果:[consumers, config, controller, isr_change_notification, admin, brokers, zookeeper, controller_epoch]
'''
上面的显示结果中:只有zookeeper是,zookeeper原生的,其他都是Kafka创建的
'''

#标注一个重要的
[zk: 127.0.0.1:12181(CONNECTED) 1] get /brokers/ids/0
{"jmx_port":-1,"timestamp":"1456125963355","endpoints":["PLAINTEXT://192.168.7.100:19092"],"host":"192.168.7.100","version":2,"port":19092}
cZxid = 0x1000001c1
ctime = Mon Feb 22 15:26:03 CST 2016
mZxid = 0x1000001c1
mtime = Mon Feb 22 15:26:03 CST 2016
pZxid = 0x1000001c1
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x152e40aead20016
dataLength = 139
numChildren = 0
[zk: 127.0.0.1:12181(CONNECTED) 2] 

#还有一个是查看partion
[zk: 127.0.0.1:12181(CONNECTED) 7] get /brokers/topics/shuaige/partitions/0
null
cZxid = 0x100000029
ctime = Mon Feb 22 10:05:11 CST 2016
mZxid = 0x100000029
mtime = Mon Feb 22 10:05:11 CST 2016
pZxid = 0x10000002a
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
[zk: 127.0.0.1:12181(CONNECTED) 8]

6.遇到的错误记录

上面的内容来自https://www.cnblogs.com/luotianshuai/p/5206662.html
最后一部分记录自己搭建环境的过程中遇到的错误:
6.1 主机名、ip一定要和kafka配置文件里的保持一致
路径:vi /etc/hosts
6.2 配置文件中broker.id=0 每台服务器的broker.id都不能相同,而且设定好之后,启动过了最好不要轻易修改,否则数据文件会丢失
6.3 困扰我一下午才解决的问题:报错本机连接超时timeout错误信息
解决办法:修改kafka配置文件中的zookeeper.connection.timeout.ms,往大了修改。可能就是因为自己的机器,反应慢需要的时间太久,默认超时,连不上zookeeper
kafka集群环境搭建及错误记录
6.4 搭建好之后:只有一台服务器可以作为kafka的消息生产者,其他的服务器都无法作为生产者,但是可以成为消费者,获取消息。
原因:启动三台服务器kafka之后,其中两台挂掉了,只有一台kafka在运行,我却本能的认为其他两台也在跑,使用jps查看即可发现其余两台kafka并未运行。重启其余两台kafka,便遇到了6.3的问题。

相关文章: