在Kafka0.9版本之前,Kafka集群时没有安全机制的。Kafka Client应用可以通过连接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。来获取存储在Zookeeper中的Kafka元数据信息。拿到Kafka Broker地址后,连接到Kafka集群,就可以操作集群上的所有主题了。由于没有权限控制,集群核心的业务主题时存在风险的。

本文主要使用SASL+ACL

二、技术关键点

配置文件

修改broker启动所需的server.properties文件,你至少需要配置(或修改)以下这些参数:

listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://$advertised_hostname:9092
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:admin

 

其他参数讲解,请参考链接:

https://www.cnblogs.com/xiao987334176/p/10065844.html

 

这里主要讲解几个重点参数

 

默认情况下,如果资源R没有关联acl,除了超级用户,没有用户允许访问。如果你想改变这种方式你可以做如下配置

allow.everyone.if.no.acl.found=true

什么意思呢?上面的配置已经启动了acl,除了超级用户之外,其他用户无法访问。那么问题就来了,在kafka集群中,其它节点需要同步数据,需要相互访问。

它默认会使用ANONYMOUS的用户名连接集群。在这种情况下,启动kafka集群,必然失败!所以这个参数一定要配置才行!

 

listeners=SASL_PLAINTEXT://:9092

这个参数,表示kafka监听的地址。此参数必须要配置,默认是注释掉的。默认会使用listeners=PLAINTEXT://:9092,但是我现在开启了SASL,必须使用SASL协议连接才行。

//:9092 这里虽然没有写IP地址,根据官方解释,它会监听所有IP。注意:这里只能是IP地址,不能是域名。否则启动时,会提示无法绑定IP。

 

advertised.listeners 这个参数,表示外部的连接地址。这里可以写域名,也可以写IP地址。建议使用域名,为什么呢?因为IP可能会变动,但是主机名是不会变动的。

所以在java代码里面写死,就可以了!注意:必须是SASL协议才行!

 

super.users=User:admin  表示启动超级用户admin,注意:此用户名不允许更改,否则使用生产模式时,会有异常!

 

启动脚本

bin/kafka-server-start.sh 这个是kafka的启动脚本,要使用ACL,需要增加一个参数才行。

有2种方法修改,这里分别介绍一下:

1. 增加环境变量KAFKA_OPTS(推荐)

先来看一下,默认的bin/kafka-server-start.sh的最后一行

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

只需要在最后一行的上面一行,添加一个环境变量即可

export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

 

2. 增加参数-Djava.security.auth.login.config

直接将最后一行修改为

exec $base_dir/kafka-run-class.sh -Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf $EXTRA_ARGS kafka.Kafka "$@"

 

JAAS文件

kafka_cluster_jaas.conf

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="123456"
user_admin="123456"
user_reader="123456"
user_writer="123456";
};

 

这个文件,是专门用来做认证的。用户名和密码的格式如下:

user_用户名="密码"

 

注意:对于超级用户,这几行是固定的

username="admin"
password="123456"
user_admin="admin"

这里指定的是admin用户密码为123456,密码可自行更改。

下面的,才是普通用户。最后一个用户,要有一个分号才行!

 

三、正式部署

环境介绍

本文采用的环境,参考以下链接

https://www.cnblogs.com/xiao987334176/p/10088497.html#autoid-3-0-0

 

使用了3台zookeeper和5台kafka。都是在一台服务器上面运行的!

其中zookeeper的镜像,不需要变动,直接启动即可。

但是kafka的镜像,需要重新构建,请看下面的内容。

 

创建镜像

创建空目录

mkdir /opt/kafka_cluster_acl

 

dockerfile

FROM ubuntu:16.04
# 修改更新源为阿里云
ADD sources.list /etc/apt/sources.list
ADD kafka_2.12-2.1.0.tgz /
ADD kafka_cluster_jaas.conf /
# 安装jdk
RUN apt-get update && apt-get install -y openjdk-8-jdk --allow-unauthenticated && apt-get clean all

EXPOSE 9092
# 添加启动脚本
ADD run.sh .
RUN chmod 755 run.sh
ENTRYPOINT [ "/run.sh"]

 

kafka_cluster_jaas.conf

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="123456"
user_admin="123456"
user_reader="123456"
user_writer="123456";
};

 

run.sh

#!/bin/bash

if [ -z $broker_id ];then
    echo "broker_id变量不能为空"
    exit 1
fi

if [ -z $zookeeper ];then
    echo "zookeeper变量不能为空"
    exit 2
fi

if [ -z $advertised_hostname ];then
    echo "advertised_hostname变量不能为空"
    exit 3
fi

# 开启kafka acl验证
echo "

listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://$advertised_hostname:9092
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:admin

" >> /kafka_2.12-2.1.0/config/server.properties

cd /kafka_2.12-2.1.0
# 设置唯一id
sed -i "21s/0/$broker_id/" /kafka_2.12-2.1.0/config/server.properties
# 设置zookeeper连接地址
sed -i "123s/localhost/$zookeeper/" /kafka_2.12-2.1.0/config/server.properties

# 配置启动脚本,最后一行之前添加环境变量
sed -i -e "44"i'\export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf"' bin/kafka-server-start.sh

# 添加配置文件
mv /kafka_cluster_jaas.conf /kafka_2.12-2.1.0/config/

# 临时添加5条hosts
echo "172.168.0.5 kafka-1.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.6 kafka-2.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.7 kafka-3.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.8 kafka-4.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.9 kafka-5.default.svc.cluster.local" >> /etc/hosts

# 启动kafka
bin/kafka-server-start.sh config/server.properties
View Code

相关文章: