前提:集群已安装好zookeeper
1 上传压缩包到虚拟机,解压
tar -zxvf kafka_2.11-0.11.0.3.tgz
2 修改名字便于使用
mv kafka_2.11-0.11.0.3 kafka
3 修改配置文件,进入kafka安装目录下的config目录
vim server.properties
加入以下内容:
broker.id=1 //相当于zookeeper中的myid,id不能相同
port=9092 //默认值
host.name=slave1 //三台各不相同
zookeeper.connect=master:2181,slave1:2181 //指定zookeeper地址
4 启动zookeeper集群(所有节点都要执行启动)
5 启动kafka(所有节点都要执行启动)
进入kafka的bin目录
执行:./kafka-server-start.sh -daemon ../config/server.properties
6 查看进程,检查服务是否启动
7 发送消息来验证是否成功
创建topic:./kafka-topics.sh --create --zookeeper 192.168.146.128:2181,192.168.146.129:2181 --replication-factor 2 --partitions 1 --topic apple
--create:Create a new topic
--zookeeper<String: urls>:REQUIRED,The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over
--topic:The topic to be create, alter or describe.
--partition:The number of partitions for the topic being created or altered
--replication-factor:The replication factor for each partition in the topic being created.
创建topic时zookeeper指定集群中所有的机器
在master上创建一个发布者:./kafka-console-producer.sh --broker-list 192.168.146.129:9092 --topic apple
--broker-list<String: broker-list>:REQUIRED,The broker list string in the form HOST1:PORT1,HOST2:PORT2.
在创建发布者时bootstrap只需要指定发布者的IP和端口号
在slave1上创建一个订阅者:./kafka-console-consumer.sh --zookeeper 192.168.146.129:2181 --topic apple --from-beginning
--zookeeper<String: urls>:REQUIRED (only when using old consumer),The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
--from-beginning:If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.
在创建发布者时zookeeper只需要指定发布者的IP和端口号
测试(在发布者那里发布消息看看订阅者那里是否能正常收到)
如何停止呢?简单粗暴ctrl-C
8 停止kafka:./bin/kafka-server-stop.sh
参考:
https://blog.csdn.net/fanren224/article/details/79693850
https://blog.csdn.net/u011254180/article/details/79481088