【发布时间】:2015-06-11 23:06:24
【问题描述】:
出于特殊原因,我需要同时使用 ConsumerGroup(又名高级消费者)和 SimpleConsumer(又名低级消费者)来读取 Kafka。对于ConsumerGroup,我使用基于 ZooKeeper 的配置并且对它完全满意,但SimpleConsumer 需要实例化种子代理。
我不想同时保留 ZooKeeper 和代理主机的列表。因此,我正在寻找一种方法来从 ZooKeeper自动发现特定主题的代理。
由于一些间接信息,我相信这些数据存储在 ZooKeeper 中的以下路径之一:
-
/brokers/topics/<topic>/partitions/<partition-id>/state - /brokers/ids/
但是,当我尝试从这些节点读取数据时,出现序列化错误(我为此使用 com.101tec.zkclient):
org.I0Itec.zkclient.exception.ZkMarshallingError:java.io.StreamCorruptedException:无效流标头:7B226A6D 在 org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) 在 org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) 在 org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) ... 64 省略 引起:java.io.StreamCorruptedException:无效的流标头:7B226A6D 在 java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) 在 java.io.ObjectInputStream.(ObjectInputStream.java:299) 在 org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30) 在 org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31) ... 69 更多
我可以毫无问题地编写和读取自定义 Java 对象(例如字符串),所以我相信这不是客户端的问题,而是棘手的编码问题。因此,我想知道:
- 如果这是正确的方法,如何正确读取这些节点?
- 如果整个方法都是错误的,什么是正确的?
【问题讨论】:
-
我发现有助于在 Zookeeper 数据中查看的一个工具是:code.google.com/p/zooviewer
标签: apache-zookeeper apache-kafka