【问题标题】:Strimzi - Connecting external clientsStrimzi - 连接外部客户端
【发布时间】:2020-02-03 07:19:22
【问题描述】:

在讨论 here 之后,我使用以下步骤启用外部客户端(基于 kafkajs)连接到 OpenShift 上的 Strimzi。这些步骤来自here

启用外部路由

kafka-persistent-single.yaml编辑成如下图。

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 2.3.0
    replicas: 1
    listeners:
      plain: {}
      tls: {}
      external:
          type: route
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.3"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 5Gi
        deleteClaim: false
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 5Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

提取证书,

为了提取证书并在客户端使用它,我运行了以下命令:

kubectl get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -D > ca.crt

请注意,我必须在我的 macOS 上使用 base64 -D 而不是 base64 -d,如文档中所示。

Kafkajs 客户端

这是改编自他们的npm 页面和他们的documentation 的客户端。

const fs = require('fs')
const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io'],
  ssl : { rejectUnauthorized: false,
    ca : [fs.readFileSync('ca.crt', 'utf-8')]
  }
})

const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {
  // Producing
  await producer.connect()
  await producer.send({
    topic: 'test-topic',
    messages: [
      { value: 'Hello KafkaJS user!' },
    ],
  })

  // Consuming
  await consumer.connect()
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

run().catch(console.error)

问题

当我从具有ca.crt 的文件夹中运行node sample.js 时,我收到一条连接被拒绝的消息。

{"level":"ERROR","timestamp":"2019-10-05T03:22:40.491Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.99.100:9094","broker":"my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io:9094","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.99.100:9094\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1113:14)"}

我错过了什么?

【问题讨论】:

    标签: node.js apache-kafka openshift kafka-consumer-api strimzi


    【解决方案1】:

    在与@ppatierno 进行了长时间的讨论后,我觉得 Strimzi 集群可以很好地与 Kafka 控制台客户端配合使用。另一方面,kafkajs 包一直以NOT_LEADER_FOR_PARTITION 失败。

    更新Python client 似乎工作正常;所以,我放弃了kafkajs

    【讨论】:

    • 如果主题是在飞行中创建的,这个错误应该是暂时的,然后客户端应该可以正常工作。
    • 如何查看“主题是否在飞行中创建”?
    • 我尝试使用kafka-topics 创建主题;我收到了Topic already exists. 的消息。
    【解决方案2】:

    我猜问题是您在代理地址上缺少正确的端口 443,因此您必须使用

    经纪人:['my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io:443']

    否则它会尝试连接到 OpenShift 路由上的默认端口 80。

    【讨论】:

    • 好的,我正在尝试使用8443。通过443,我收到了这条消息,{"level":"ERROR","timestamp":"2019-10-05T05:21:44.986Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Closed connection","retryCount":0,"retryTime":325}
    • 似乎来自NodeJS及其配置。我会尝试禁用主机名验证以查看它是否有效。然后只需使用具有相同证书和路由地址的原始 openssl 客户端即可从该角度查看一切正常。如果可能的话,使用原始的 Kafka 控制台客户端做同样的事情。也看看这篇博文strimzi.io/2019/04/30/accessing-kafka-part-3.html
    • 我会休息一会儿,你稍后告诉我;)
    • 要禁用主机验证,我尝试了这个checkServerIdentity: () => undefined,如此处所述 - stackoverflow.com/a/47957605/919480 不走运。
    • 您是否尝试过使用 openssl 客户端测试证书,或者 kafka 控制台使用者是否能够建立连接。我们需要检查它是否是 NodeJS 问题,因为恕我直言,Strimzi 配置似乎没问题。
    猜你喜欢
    • 2015-05-04
    • 2015-11-24
    • 1970-01-01
    • 2015-02-09
    • 1970-01-01
    • 1970-01-01
    • 2011-08-04
    • 1970-01-01
    • 2018-12-29
    相关资源
    最近更新 更多