【问题标题】:Kafka Consumer path must not end with / characterKafka 消费者路径不能以 / 字符结尾
【发布时间】:2015-06-30 22:31:01
【问题描述】:

我正在使用 Apache Kafka 0.8.2.1 将 Web 事件流式传输到其他数据源。我编写的 Kafka Producer 运行良好,当我运行 kafka-console-consumer.sh 时,我能够看到数据通过我的主题流式传输。但是,我没有任何运气试图让我的 Kafka 消费者检索消息。有什么想法吗?

当我的代码尝试运行 consumer.createMessageStreams(topicCountMap) 时,输出以下有关不正确路径的错误

Exception in thread "main" java.lang.IllegalArgumentException: Path must not end with / character
        at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:58)
        at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1024)
        at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1073)
        at org.I0Itec.zkclient.ZkConnection.exists(ZkConnection.java:95)
        at org.I0Itec.zkclient.ZkClient$11.call(ZkClient.java:827)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:824)
        at org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:136)
        at kafka.consumer.ZookeeperConsumerConnector$$anonfun$kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer$4.apply(ZookeeperConsume
rConnector.scala:901)
        at kafka.consumer.ZookeeperConsumerConnector$$anonfun$kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer$4.apply(ZookeeperConsume
rConnector.scala:898)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:
898)
        at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:240)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97)

这是来自我的 Kafka Consumer 的代码。

  val consumer: ConsumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig())

  var executor: ExecutorService = null

  def run(a_numThreads: Integer) {
    var topicCountMap: java.util.Map[String, Integer] = new java.util.HashMap[String, Integer]()

    topicCountMap.put("testEvent", new Integer(a_numThreads))

    var consumerMap = consumer.createMessageStreams(topicCountMap)

    var streams = consumerMap.get("testEvent")
    // now launch all the threads
    executor = Executors.newFixedThreadPool(a_numThreads)

    // now create an object to consume the messages
    //
    var threadNumber: Integer = 0
    var streamsItr = streams.iterator()
    while (streamsItr.hasNext()) {
      var stream = streamsItr.next()
      executor.submit(new EventConsumer(stream, threadNumber))
      threadNumber = threadNumber + 1
    }
  }

  def createConsumerConfig(): ConsumerConfig = {
    var props: Properties = new Properties()
    props.put("zookeeper.connect", "127.0.0.1:2181")
    props.put("zk.connect", "127.0.0.1:2181")
    props.put("group.id", "testConsumer")
    props.put("groupid", "tesConsumer")
    props.put("zookeeper.session.timeout.ms", "400")
    props.put("zookeeper.sync.time.ms", "200")
    props.put("auto.commit.interval.ms", "1000")

    return new ConsumerConfig(props)
  }

【问题讨论】:

  • 这可能不是实际问题,但为什么你需要 zookeeper.connectzk.connect ?还有group.id & groupid ?
  • 在某个时候,Kafka 团队更改了消费者使用的配置属性,但不一定正确更新文档。不幸的是,我不知道它使用的是哪一个,所以我把它们都包括在内了。
  • 从 0.8 开始,您需要 zookeeper.connect & group.id .. 与异常相关,这看起来与 zookeeper 有关,但您的配置对我来说似乎没问题.. 您可能会深入了解并获得如果可能的话,提供更多细节..希望这可以为我们提供一些线索

标签: apache-kafka messagebroker kafka-consumer-api


【解决方案1】:

Spark CheckpointWriter 在无法访问存储的检查点路径时产生此异常消息。请确保检查点已禁用或提供正确的路径。由于连接成功后发生异常

在 org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:58)

似乎 writer 无法访问将保存检查点信息的目录。

https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-04
    • 1970-01-01
    • 1970-01-01
    • 2020-09-19
    相关资源
    最近更新 更多