【问题标题】:Kafka: Get broker host from ZooKeeperKafka:从 ZooKeeper 获取代理主机
【发布时间】:2015-06-11 23:06:24
【问题描述】:

出于特殊原因,我需要同时使用 ConsumerGroup(又名高级消费者)和 SimpleConsumer(又名低级消费者)来读取 Kafka。对于ConsumerGroup,我使用基于 ZooKeeper 的配置并且对它完全满意,但SimpleConsumer 需要实例化种子代理。

我不想同时保留 ZooKeeper 和代理主机的列表。因此,我正在寻找一种方法来从 ZooKeeper自动发现特定主题的代理

由于一些间接信息,我相信这些数据存储在 ZooKeeper 中的以下路径之一:

  • /brokers/topics/<topic>/partitions/<partition-id>/state
  • /brokers/ids/

但是,当我尝试从这些节点读取数据时,出现序列化错误(我为此使用 com.101tec.zkclient):

org.I0Itec.zkclient.exception.ZkMarshallingError:java.io.StreamCorruptedException:无效流标头:7B226A6D 在 org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) 在 org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) ... 64 省略 引起:java.io.StreamCorruptedException:无效的流标头:7B226A6D 在 java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) 在 java.io.ObjectInputStream.(ObjectInputStream.java:299) 在 org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30) 在 org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31) ... 69 更多

我可以毫无问题地编写和读取自定义 Java 对象(例如字符串),所以我相信这不是客户端的问题,而是棘手的编码问题。因此,我想知道:

  1. 如果这是正确的方法,如何正确读取这些节点
  2. 如果整个方法都是错误的,什么是正确的

【问题讨论】:

标签: apache-zookeeper apache-kafka


【解决方案1】:

这就是我的一位同事获取 Kafka 代理列表的方式。当您想要动态获取经纪人列表时,我认为这是一种正确的方式。

这是一个示例代码,展示了如何获取列表。

public class KafkaBrokerInfoFetcher {

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
        List<String> ids = zk.getChildren("/brokers/ids", false);
        for (String id : ids) {
            String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
            System.out.println(id + ": " + brokerInfo);
        }
    }
}

在由三个代理组成的集群上运行代码会导致

1: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}
2: {"jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094}
3: {"jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095}

【讨论】:

  • 感谢和抱歉迟到的回复。您能否也发布对您在代码中使用的 ZooKeeper 客户端库的引用?在我这边,我想出了如何修复我描述的错误,并将使用 Kafka 库附带的 com.101toc.ZkClient 发布替代解决方案。无论如何,我接受你的回答是完全有效的。
  • 应该是import org.apache.zookeeper.ZooKeeper;
  • +1 太简单和太有效的方式来获取有关 kafka 的信息(brokes,topics,partitions per topic ...等)谢谢。它只需要import org.apache.zookeeper.ZooKeeper
【解决方案2】:

事实证明,Kafka 使用ZKStringSerializer 将数据读写到 znode 中。因此,要修复错误,我只需将其添加为 ZkClient 构造函数中的最后一个参数:

val zkClient = new ZkClient(zkQuorum, Integer.MAX_VALUE, 10000, ZKStringSerializer)

使用它,我编写了几个有用的函数来发现经纪人 ID、他们的地址和其他东西:

import kafka.utils.Json
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.KafkaException


def listBrokers(): List[Int] = {
  zkClient.getChildren("/brokers/ids").toList.map(_.toInt)
}

def listTopics(): List[String] = {
  zkClient.getChildren("/brokers/topics").toList
}

def listPartitions(topic: String): List[Int] = {
  val path = "/brokers/topics/" + topic + "/partitions"
  if (zkClient.exists(path)) {
    zkClient.getChildren(path).toList.map(_.toInt)
  } else {
    throw new KafkaException(s"Topic ${topic} doesn't exist")
  }
}

def getBrokerAddress(brokerId: Int): (String, Int) = {
  val path = s"/brokers/ids/${brokerId}"
  if (zkClient.exists(path)) {
    val brokerInfo = readZkData(path)
    (brokerInfo.get("host").get.asInstanceOf[String], brokerInfo.get("port").get.asInstanceOf[Int])
  } else {
    throw new KafkaException("Broker with ID ${brokerId} doesn't exist")
  }
}

def getLeaderAddress(topic: String, partitionId: Int): (String, Int) = {
  val path = s"/brokers/topics/${topic}/partitions/${partitionId}/state"
  if (zkClient.exists(path)) {
    val leaderStr = zkClient.readData[String](path)
    val leaderId = Json.parseFull(leaderStr).get.asInstanceOf[Map[String, Any]].get("leader").get.asInstanceOf[Int]
    getBrokerAddress(leaderId)
  } else {
    throw new KafkaException(s"Topic (${topic}) or partition (${partitionId}) doesn't exist")
  }
}

【讨论】:

  • kafka.utils.ZkUtils 是什么库? 'org.apache.kafka',名称:'kafka_2.13'?这意味着我们必须指定我们正在使用的 kafka 的版本?
【解决方案3】:

使用 shell 执行此操作:

zookeeper-shell myzookeeper.example.com:2181
ls /brokers/ids
  => [2, 1, 0]
get /brokers/ids/2
get /brokers/ids/1
get /brokers/ids/0 

【讨论】:

    【解决方案4】:

    实际上,Kafka 中有 ZkUtils(至少对于 0.8.x 行),您可以使用一个小警告:您需要重新实现 ZkStringSerializer,它将字符串转换为 UTF-8 编码字节数组。如果您想使用 Java8 的流 API,可以通过 scala.collection.JavaConversions 迭代 Scala 集合。这对我的案子有帮助。

    【讨论】:

      【解决方案5】:
       public KafkaProducer(String zookeeperAddress, String topic) throws IOException,
              KeeperException, InterruptedException {
      
          this.zookeeperAddress = zookeeperAddress;
          this.topic = topic;
      
          ZooKeeper zk = new ZooKeeper(zookeeperAddress, 10000, null);
          List<String> brokerList = new ArrayList<String>();
      
          List<String> ids = zk.getChildren("/brokers/ids", false);
          for (String id : ids) {
              String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
              Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
              if (broker != null) {
                  brokerList.add(broker.connectionString());
              }
          }
      
          props.put("serializer.class", KAFKA_STRING_ENCODER);
          props.put("metadata.broker.list", String.join(",", brokerList));
          producer = new Producer<String, String>(new ProducerConfig(props));
      }
      

      【讨论】:

        猜你喜欢
        • 2020-08-27
        • 1970-01-01
        • 1970-01-01
        • 2018-02-17
        • 1970-01-01
        • 2016-05-24
        • 2019-05-17
        • 1970-01-01
        • 2018-06-24
        相关资源
        最近更新 更多