【问题标题】:unable to read kafka messages but can list the available topics无法阅读 kafka 消息,但可以列出可用主题
【发布时间】:2021-05-11 14:03:08
【问题描述】:

我正在尝试从 IntelliJ IDEA 本地连接到 minikube 上 k8s 中的 kafka 并阅读一些消息。我可以列出消费者可用的主题,但无法阅读任何消息。

消费者代码:

public class TestConsumer {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-test");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());

    consumer.listTopics().forEach((key, value) -> System.out.println("topic = " + key));

    consumer.subscribe(Collections.singletonList("test"));

    try {
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));

        for (ConsumerRecord<String, String> record : records)
          System.out.println(record.offset() + ": " + record.value());

        consumer.commitAsync();// doesn't work with or without this line
      }
    } finally {
      consumer.close();
    }
  }
}

为了在 k8s 中安装,我使用了这个 helm chart https://github.com/bitnami/charts/tree/master/bitnami/kafka,配置如下

replicaCount: 3
deleteTopicEnable: true
metrics:
  kafka:
    enabled: true

k8s 服务:

NAME                       TYPE           CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE

kafka                      ClusterIP      10.97.216.82     <none>        9092/TCP                     30m
kafka-headless             ClusterIP      None             <none>        9092/TCP,9093/TCP            30m
kafka-metrics              ClusterIP      10.107.104.199   <none>        9308/TCP                     30m
kafka-zookeeper            ClusterIP      10.101.103.6     <none>        2181/TCP,2888/TCP,3888/TCP   30m
kafka-zookeeper-headless   ClusterIP      None             <none>        2181/TCP,2888/TCP,3888/TCP   30m

k8s 豆荚:

NAME                                     READY   STATUS    RESTARTS   AGE

kafka-0                                  1/1     Running   4          32m
kafka-1                                  1/1     Running   4          32m
kafka-2                                  1/1     Running   2          32m
kafka-client                             1/1     Running   0          31m
kafka-exporter-6ccc69f8cc-tgnxh          1/1     Running   0          33m
kafka-zookeeper-0                        1/1     Running   0          33m

(只有 kafka-client pod 是手动创建的,不是由 helm chart 创建的)

我正在使用kafka的这个依赖compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.7.0'

作为我运行的设置的补充:

kubectl port-forward kafka-0 9092:9092

并将这一行添加到我的主机文件中

127.0.0.1 kafka-headless.default.svc.cluster.local kafka-headless

创建主题:

./kafka-topics.sh --create \
--zookeeper kafka-zookeeper:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test

制作主题

./kafka-console-producer.sh --broker-list kafka:9092 --topic test

我可以通过控制台消费者阅读所有消息

./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning

感谢任何帮助和建议。

【问题讨论】:

  • “看不懂”是什么意思?你得到什么错误?
  • 端口转发不够confluent.io/blog/kafka-listeners-explained。 + 你的终端用kafka:9092连接,不是127.0.0.1:9092,所以这些其实是不同的连接
  • @daniu none,只是records每次迭代都是空的
  • 客户端或服务器上是否有异常消息?您通常需要在属性中设置键/值解码器(但不确定它是否默认为 ByteArrayDecoder)。

标签: java kubernetes apache-kafka kafka-consumer-api


【解决方案1】:

谢谢你们!我终于设法阅读了来自 kafka 的消息。

@OneCricketeer 指出了问题所在,我找到了一个类似的链接,它很好地解释了问题https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/

我所做的是在端口 9094 服务上转发 kafka-0-external 并将舵图的值更改为

replicaCount: 3
deleteTopicEnable: true
metrics:
  kafka:
    enabled: true
externalAccess:
  enabled: true
  service:
    type: NodePort
    port: 9094
  autoDiscovery:
    enabled: true
serviceAccount:
  create: true
rbac:
  create: true
extraEnvVars:
  - name: KAFKA_LISTENERS
    value: "CLIENT://:9092,INTERNAL://:9093,EXTERNAL://localhost:9094"
  - name: KAFKA_ADVERTISED_LISTENERS
    value: "CLIENT://:9092,INTERNAL://:9093,EXTERNAL://localhost:9094"
  - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
    value: "INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT"

【讨论】:

    猜你喜欢
    • 2018-01-12
    • 1970-01-01
    • 1970-01-01
    • 2018-01-09
    • 1970-01-01
    • 2021-09-28
    • 1970-01-01
    • 1970-01-01
    • 2019-01-11
    相关资源
    最近更新 更多