【问题标题】:How to block creating/deleting kafka topic by unauthorized users?如何阻止未经授权的用户创建/删除 kafka 主题?
【发布时间】:2019-10-31 16:41:37
【问题描述】:

我已经设置了 Kafka 和 zookeeper 身份验证,使用 SASL+ACL 和 Kafka 通过 SSL 两种身份验证(包括加密)对生产者和消费者进行身份验证。

通过在 Kafka 和 zookeeper 之间启用 SASL 和 ACL,它不允许未经授权的 Kafka 代理登录到 zookeeper 集群。但是,仍然可以不受任何限制地创建和删除主题。

zookeeper.properties

dataDir=/x02/lsesv2-s/data/Zookeeper

clientPort=15300

tickTime=2000

initLimit=10

syncLimit=5


authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

requireClientAuthScheme=sasl

jaasLoginRenew=3600000

quorum.auth.enableSasl=true

quorum.auth.learnerRequireSasl=true

quorum.auth.serverRequireSasl=true

quorum.auth.learner.loginContext=QuorumLearner

quorum.auth.server.loginContext=QuorumServer

server.1=172.25.33.12:15302:15301
server.2=172.25.33.13:15302:15301
server.3=172.25.33.11:15302:15301

zookeeper_jaas.conf

Server {
        org.apache.zookeeper.server.auth.DigestLoginModule required
        username="admin"
        password="abc123"
        user_admin="abc123";
};

QuorumServer {
        org.apache.zookeeper.server.auth.DigestLoginModule required
        user_admin="abc123";
};

QuorumLearner {
        org.apache.zookeeper.server.auth.DigestLoginModule required
        username="admin"
        password="abc123";
};

通过以下代码设置ACL

 final CountDownLatch connectedSignal = new CountDownLatch(1);
        String connect = "localhost:15300";
        ZooKeeper zooKeeper = null;

        try
        {
            String userName = "admin";
            String password = "mit123";

            zooKeeper = new ZooKeeper(connect, 5000, we ->
            {
                if (we.getState() == Watcher.Event.KeeperState.SyncConnected)
                {
                    connectedSignal.countDown();
                }
            });

            connectedSignal.await();

            zooKeeper.addAuthInfo("digest", (userName + ":" + password).getBytes());

            final String aclString = "auth:" + userName + ":" + password + ":" + "cdrwa" + 
            ",sasl:" + userName + ":" + "cdrwa";

            zooKeeper.setACL("/", parseACLs(aclString), -1);

        } finally
        {
            if (zooKeeper != null)
            {
                zooKeeper.close();
            }
        }

上面的代码正在运行,下面是执行代码后的结果。

Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
getAcl /
'sasl,'admin
: cdrwa
'digest,'admin:oiasY+rmnmmK9mec8kpnvv281HE=
: cdrwa

我在启动时覆盖了 Kafka 属性,而不是 server.properties 文件。 *

Kafka 属性

kafka/bin/kafka-server-start.sh /x02/lsesv2-s/current/kafka/config/server.properties 
--override broker.id=1 
--override zookeeper.connect=10g-flton-onl01:15300,10g-flton-onl02:15300,10g-flton-nor02:15300 
--override num.network.threads=16 
--override num.io.threads=16 
--override socket.send.buffer.bytes=10240000
--override socket.receive.buffer.bytes=10240000 
--override log.dirs=/x02/lsesv2-s/data/Kafka 
--override offsets.topic.replication.factor=1 
--override min.insync.replicas=1 
--override inter.broker.listener.name=INTERNAL 
--override listeners=INTERNAL://10g-flton-onl01:15307 
--override advertised.listeners=INTERNAL://10g-flton-onl01:15307 
--override listener.security.protocol.map=INTERNAL:SSL 
--override security.protocol=SSL 
--override ssl.client.auth=required 
--override ssl.key.password=abc123 
--override ssl.keystore.location=configs/MHV/kafka.server.keystore.jks 
--override ssl.keystore.password=abc123 
--override ssl.truststore.location=configs/MHV/kafka.server.truststore.jks 
--override ssl.truststore.password=abc123 
--override ssl.endpoint.identification.algorithm=

Kafka 到生产者/消费者的身份验证工作正常,zookeeper 到 kafka 的身份验证也工作正常。 不过,未授权用户也可以创建和删除主题。

主题创建

kafka/bin/kafka-topics.sh --create --zookeeper localhost:15300 --replication-factor 3 --partitions 8 --topic test

主题删除

kafka/bin/kafka-topics.sh --zookeeper localhost:15300 --delete --topic test

注意:我在创建或删除主题时没有设置-Djava.security.auth.login.config=kafka_server_jaas.conf。所以应该限制这个操作。但实际上并没有。

仅针对授权用户帮助我创建和删除主题。

【问题讨论】:

    标签: ssl apache-kafka apache-zookeeper acl sasl


    【解决方案1】:

    这似乎是本地测试所需的属性。

     KAFKA_ZOOKEEPER_SET_ACL: "true"
    

    也可以直接用于 Confluent 图像或地图。

    zookeeper.set.acl
    

    Reference

    也如Kafka 101 Confluent中所述

    存储在 ZooKeeper 中的元数据只有代理才能修改相应的 znode,但 znode 是全球可读的。

    因为我们将 ZooKeeper 配置为需要 SASL 身份验证,所以我们需要在启动 kafka-topics 工具时设置 java.security.auth.login.config 系统属性:

    代码示例和 docker-compose 文件显示在here

    【讨论】:

      猜你喜欢
      • 2013-05-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-11-12
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多