【问题标题】:Unable to communicate with kafka server using kafka Producer API无法使用 kafka Producer API 与 kafka 服务器通信
【发布时间】:2015-05-19 21:14:53
【问题描述】:

我已经在单个节点上设置了 kafka 并启动了 zookeeper 和 kafka 服务器。我在控制台上为内部生产者和消费者测试了它,它运行良好。但是当我在控制台和我的运行内部 kafka 消费者时自定义生产者不起作用。

下面是我的Producer类

    Properties props = new Properties();

    props.put("metadata.broker.list", "xx.xx.xx.xx:9092");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("partitioner.class", "com.example.producer.SimplePartitioner");
    props.put("request.required.acks", "1");

    ProducerConfig config = new ProducerConfig(props);

    Producer<String, String> producer = new Producer<String, String>(config);
    KeyedMessage<String, String> data = new KeyedMessage<String, String>(
            "mails", "xxxx");
    producer.send(data);

当控件到达 producer.send() 时,它会在 3 次尝试后停止,但出现以下异常

java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)

【问题讨论】:

  • 请您尝试评论props.put("partitioner.class", "com.example.producer.SimplePartitioner") 部分

标签: java apache-kafka kafka-consumer-api


【解决方案1】:

我试图从外部 VM 的 Eclipse 中的 Producer 类连接到 kafka 服务器。 我不得不在 kafka 的 config/ 中的 producer.properties 中将 localhost 替换为 ip 地址。

【讨论】:

  • 也在 /etc/hosts 文件中插入 IP
猜你喜欢
  • 1970-01-01
  • 2019-06-13
  • 2021-11-09
  • 2020-05-26
  • 2020-09-11
  • 1970-01-01
  • 2021-04-23
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多