在Kafka0.9版本之前,Kafka集群时没有安全机制的。Kafka Client应用可以通过连接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。来获取存储在Zookeeper中的Kafka元数据信息。拿到Kafka Broker地址后,连接到Kafka集群,就可以操作集群上的所有主题了。由于没有权限控制,集群核心的业务主题时存在风险的。
本文主要使用SASL+ACL
二、技术关键点
配置文件
修改broker启动所需的server.properties文件,你至少需要配置(或修改)以下这些参数:
listeners=SASL_PLAINTEXT://:9092 advertised.listeners=SASL_PLAINTEXT://$advertised_hostname:9092 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer #SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN allow.everyone.if.no.acl.found=true super.users=User:admin
其他参数讲解,请参考链接:
https://www.cnblogs.com/xiao987334176/p/10065844.html
这里主要讲解几个重点参数
默认情况下,如果资源R没有关联acl,除了超级用户,没有用户允许访问。如果你想改变这种方式你可以做如下配置
allow.everyone.if.no.acl.found=true
什么意思呢?上面的配置已经启动了acl,除了超级用户之外,其他用户无法访问。那么问题就来了,在kafka集群中,其它节点需要同步数据,需要相互访问。
它默认会使用ANONYMOUS的用户名连接集群。在这种情况下,启动kafka集群,必然失败!所以这个参数一定要配置才行!
listeners=SASL_PLAINTEXT://:9092
这个参数,表示kafka监听的地址。此参数必须要配置,默认是注释掉的。默认会使用listeners=PLAINTEXT://:9092,但是我现在开启了SASL,必须使用SASL协议连接才行。
//:9092 这里虽然没有写IP地址,根据官方解释,它会监听所有IP。注意:这里只能是IP地址,不能是域名。否则启动时,会提示无法绑定IP。
advertised.listeners 这个参数,表示外部的连接地址。这里可以写域名,也可以写IP地址。建议使用域名,为什么呢?因为IP可能会变动,但是主机名是不会变动的。
所以在java代码里面写死,就可以了!注意:必须是SASL协议才行!
super.users=User:admin 表示启动超级用户admin,注意:此用户名不允许更改,否则使用生产模式时,会有异常!
启动脚本
bin/kafka-server-start.sh 这个是kafka的启动脚本,要使用ACL,需要增加一个参数才行。
有2种方法修改,这里分别介绍一下:
1. 增加环境变量KAFKA_OPTS(推荐)
先来看一下,默认的bin/kafka-server-start.sh的最后一行
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
只需要在最后一行的上面一行,添加一个环境变量即可
export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf" exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
2. 增加参数-Djava.security.auth.login.config
直接将最后一行修改为
exec $base_dir/kafka-run-class.sh -Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf $EXTRA_ARGS kafka.Kafka "$@"
JAAS文件
kafka_cluster_jaas.conf
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456" user_admin="123456" user_reader="123456" user_writer="123456"; };
这个文件,是专门用来做认证的。用户名和密码的格式如下:
user_用户名="密码"
注意:对于超级用户,这几行是固定的
username="admin" password="123456" user_admin="admin"
这里指定的是admin用户密码为123456,密码可自行更改。
下面的,才是普通用户。最后一个用户,要有一个分号才行!
三、正式部署
环境介绍
本文采用的环境,参考以下链接
https://www.cnblogs.com/xiao987334176/p/10088497.html#autoid-3-0-0
使用了3台zookeeper和5台kafka。都是在一台服务器上面运行的!
其中zookeeper的镜像,不需要变动,直接启动即可。
但是kafka的镜像,需要重新构建,请看下面的内容。
创建镜像
创建空目录
mkdir /opt/kafka_cluster_acl
dockerfile
FROM ubuntu:16.04 # 修改更新源为阿里云 ADD sources.list /etc/apt/sources.list ADD kafka_2.12-2.1.0.tgz / ADD kafka_cluster_jaas.conf / # 安装jdk RUN apt-get update && apt-get install -y openjdk-8-jdk --allow-unauthenticated && apt-get clean all EXPOSE 9092 # 添加启动脚本 ADD run.sh . RUN chmod 755 run.sh ENTRYPOINT [ "/run.sh"]
kafka_cluster_jaas.conf
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456" user_admin="123456" user_reader="123456" user_writer="123456"; };
run.sh
#!/bin/bash if [ -z $broker_id ];then echo "broker_id变量不能为空" exit 1 fi if [ -z $zookeeper ];then echo "zookeeper变量不能为空" exit 2 fi if [ -z $advertised_hostname ];then echo "advertised_hostname变量不能为空" exit 3 fi # 开启kafka acl验证 echo " listeners=SASL_PLAINTEXT://:9092 advertised.listeners=SASL_PLAINTEXT://$advertised_hostname:9092 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer #SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN allow.everyone.if.no.acl.found=true super.users=User:admin " >> /kafka_2.12-2.1.0/config/server.properties cd /kafka_2.12-2.1.0 # 设置唯一id sed -i "21s/0/$broker_id/" /kafka_2.12-2.1.0/config/server.properties # 设置zookeeper连接地址 sed -i "123s/localhost/$zookeeper/" /kafka_2.12-2.1.0/config/server.properties # 配置启动脚本,最后一行之前添加环境变量 sed -i -e "44"i'\export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf"' bin/kafka-server-start.sh # 添加配置文件 mv /kafka_cluster_jaas.conf /kafka_2.12-2.1.0/config/ # 临时添加5条hosts echo "172.168.0.5 kafka-1.default.svc.cluster.local" >> /etc/hosts echo "172.168.0.6 kafka-2.default.svc.cluster.local" >> /etc/hosts echo "172.168.0.7 kafka-3.default.svc.cluster.local" >> /etc/hosts echo "172.168.0.8 kafka-4.default.svc.cluster.local" >> /etc/hosts echo "172.168.0.9 kafka-5.default.svc.cluster.local" >> /etc/hosts # 启动kafka bin/kafka-server-start.sh config/server.properties