【问题标题】:Kafka 1.0.0 admin client cannot create topic with EOFExceptionKafka 1.0.0 管理客户端无法使用 EOFException 创建主题
【发布时间】:2017-12-18 15:38:41
【问题描述】:

使用 1.0.0 Kafka 管理客户端,我希望以编程方式在代理上创建主题。我碰巧在使用 Scala。我尝试使用以下代码在 Kafka 代理上创建主题或仅列出可用主题

import org.apache.kafka.clients.admin.{AdminClient, ListTopicsOptions, NewTopic}
import scala.collection.JavaConverters._

val zkServer = "localhost:2181"
val topic = "test1"

val zookeeperConnect = zkServer
val sessionTimeoutMs = 10 * 1000
val connectionTimeoutMs = 8 * 1000


val partitions = 1
val replication:Short = 1
val topicConfig = new Properties() // add per-topic configurations settings here

import org.apache.kafka.clients.admin.AdminClientConfig
val config = new Properties
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, zkServer)
val admin = AdminClient.create(config)

val existing = admin.listTopics(new ListTopicsOptions().timeoutMs(500).listInternal(true))
val nms = existing.names()
nms.get().asScala.foreach(nm => println(nm)) // nms.get() fails

val newTopic = new NewTopic(topic, partitions, replication)
newTopic.configs(Map[String,String]().asJava)
val ret = admin.createTopics(List(newTopic).asJavaCollection)
ret.all().get() // Also fails
admin.close()

使用任一命令,ZooKeeper (3.4.10) 端都会抛出 EOFException 并关闭连接。调试 ZooKeeper 端本身,它似乎无法反序列化管理客户端正在发送的消息(它试图读取的字节用完了)

谁能让 1.0.0 Kafka 管理客户端用于创建或列出主题?

【问题讨论】:

    标签: scala apache-kafka


    【解决方案1】:

    AdminClient直接连接Kafka,不需要访问Zookeeper。

    您需要将AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG 设置为指向您的Kafka 代理(例如localhost:9092)而不是Zookeeper。

    【讨论】:

    猜你喜欢
    • 2018-10-11
    • 1970-01-01
    • 2018-02-22
    • 1970-01-01
    • 2017-08-12
    • 2019-04-12
    • 2019-12-19
    • 1970-01-01
    相关资源
    最近更新 更多