【问题标题】:Kafka with SSL failed in producer带有 SSL 的 Kafka 在生产者中失败
【发布时间】:2021-08-10 08:34:31
【问题描述】:

我正在使用 SSL 上传 kafka 环境,到那时为止,没有问题...

它正常上升,但是当我创建一个 MySQL 连接器时,

生产者没有收到docker环境发送的配置!

有什么建议吗?

---
  version: '3'
  services:
    zookeeper:
      image: confluentinc/cp-zookeeper:latest
      container_name: ${ZK_HOST}
      hostname: ${ZK_HOST}
      ports:
        - "${ZK_PORT}:${ZK_PORT}"
      environment:
        ZOOKEEPER_SERVER_ID: 1
        ZOOKEEPER_CLIENT_PORT: ${ZK_PORT}
        ZOOKEEPER_CLIENT_SECURE: 'true'
        ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/zookeeper/secrets/kafka.keystore.jks
        ZOOKEEPER_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
        ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/zookeeper/secrets/kafka.truststore.jks
        ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
      volumes:
        - ./secrets:/etc/zookeeper/secrets

    kafka-ssl:
      image: confluentinc/cp-kafka:latest
      container_name: ${BROKER_HOST}
      hostname: ${BROKER_HOST}
      ports:
        - "${BROKER_PORT}:${BROKER_PORT}"
      depends_on:
        - ${ZK_HOST}
      environment:
        KAFKA_BROKER_ID: 1
        KAFKA_ZOOKEEPER_CONNECT: '${ZK_HOST}:${ZK_PORT}'
        KAFKA_ADVERTISED_LISTENERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
        KAFKA_SSL_KEYSTORE_FILENAME: kafka.keystore.jks
        KAFKA_SSL_KEYSTORE_CREDENTIALS: cert_creds
        KAFKA_SSL_KEY_CREDENTIALS: cert_creds
        KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.truststore.jks
        KAFKA_SSL_TRUSTSTORE_CREDENTIALS: cert_creds
        KAFKA_SSL_CLIENT_AUTH: 'required'
        KAFKA_SECURITY_PROTOCOL: SSL
        KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      volumes:
        - ./secrets:/etc/kafka/secrets
  
    schema-registry:
      image: confluentinc/cp-schema-registry
      container_name: ${SR_HOST}
      hostname: ${SR_HOST}
      depends_on:
        - ${ZK_HOST}
        - ${BROKER_HOST}
      ports:
        - "${SR_PORT}:${SR_PORT}"
      environment:
        SCHEMA_REGISTRY_HOST_NAME: ${SR_HOST}
        SCHEMA_REGISTRY_LISTENERS: 'https://0.0.0.0:${SR_PORT}'
        SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: '${ZK_HOST}:${ZK_PORT}'
        SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
        SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SSL
        SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/kafka.keystore.jks
        SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/kafka.keystore.jks
        SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_KAFKASTORE_SSL_KEY_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_SSL_KEY_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/kafka.truststore.jks
        SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/kafka.truststore.jks
        SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
        SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: https
        SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
        SCHEMA_REGISTRY_SSL_CLIENT_AUTH: 'true'
      volumes:
        - ./secrets:/etc/schema-registry/secrets

    connect:
      build:
        context: .
        dockerfile: Dockerfile
      image: chethanuk/kafka-connect:5.3.1
      hostname: ${SR_CON}
      container_name: ${SR_CON}
      depends_on:
        - ${ZK_HOST}
        - ${BROKER_HOST}
        - ${SR_HOST}
      ports:
        - "${SR_CON_PORT}:${SR_CON_PORT}"
      environment:
        CONNECT_LISTENERS: 'https://0.0.0.0:${SR_CON_PORT}'
        CONNECT_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS'
        CONNECT_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
        CONNECT_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
        CONNECT_REST_ADVERTISED_HOST_NAME: ${SR_CON}
        CONNECT_REST_PORT: ${SR_CON_PORT}
        CONNECT_GROUP_ID: compose-connect-group
        CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
        CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
        CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
        CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
        CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
        CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
        CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
        CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
        CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
        CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: https://${SR_HOST}:${SR_PORT}
        CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
        CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
        CONNECT_ZOOKEEPER_CONNECT: '${ZK_HOST}:${ZK_PORT}'
        CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.1.jar
        CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
        CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
        CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
        CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
        CONNECT_SSL_CLIENT_AUTH: 'true'
        CONNECT_SECURITY_PROTOCOL: SSL
        CONNECT_SSL_KEY_PASSWORD: ${SSL_SECRET}
        CONNECT_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.truststore.jks
        CONNECT_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
        CONNECT_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.keystore.jks
        CONNECT_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
        CONNECT_PRODUCER_SECURITY_PROTOCOL: SSL
        CONNECT_PRODUCER_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
        CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.truststore.jks
        CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
        CONNECT_CONSUMER_SECURITY_PROTOCOL: SSL
        CONNECT_CONSUMER_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
        CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.truststore.jks
        CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
      volumes:
        - ./secrets:/etc/kafka/secrets

错误:

[2021-05-21 05:13:50,157] INFO Requested thread factory for connector MySqlConnector, id = myql named = db-history-config-check (io.debezium.util.Threads)
[2021-05-21 05:13:50,160] INFO ProducerConfig values: 
    acks = 1
    batch.size = 32768
    bootstrap.servers = [broker:29092]
    buffer.memory = 1048576
    client.dns.lookup = default
    client.id = myql-dbhistory
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 10000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 1
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer
 (org.apache.kafka.clients.producer.ProducerConfig)
[2021-05-21 05:13:50,162] WARN Couldn't resolve server broker:29092 from bootstrap.servers as DNS resolution failed for broker (org.apache.kafka.clients.ClientUtils)
[2021-05-21 05:13:50,162] INFO [Producer clientId=myql-dbhistory] Closing the Kafka producer with timeoutMillis = 0 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2021-05-21 05:13:50,162] INFO WorkerSourceTask{id=zabbix-hosts-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-05-21 05:13:50,162] INFO WorkerSourceTask{id=zabbix-hosts-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-05-21 05:13:50,163] ERROR WorkerSourceTask{id=zabbix-hosts-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at io.debezium.relational.history.KafkaDatabaseHistory.start(KafkaDatabaseHistory.java:235)
    at io.debezium.relational.HistorizedRelationalDatabaseSchema.<init>(HistorizedRelationalDatabaseSchema.java:40)
    at io.debezium.connector.mysql.MySqlDatabaseSchema.<init>(MySqlDatabaseSchema.java:90)
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:94)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:130)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:407)
    ... 14 more
[2021-05-21 05:13:50,164] ERROR WorkerSourceTask{id=zabbix-hosts-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

变量

  • SSL_SECRET=
  • ZK_HOST=动物园管理员
  • ZK_PORT=2181
  • BROKER_HOST=kafka-ssl
  • BROKER_PORT=9092
  • SR_HOST=schema-registry
  • SR_PORT=8181
  • SR_CON=连接
  • SR_CON_PORT=8083
  • 主机=本地主机

【问题讨论】:

    标签: ssl apache-kafka apache-kafka-connect kafka-producer-api


    【解决方案1】:

    buildimage 不应一起使用。你没有展示你的 Dockerfile,所以不清楚你在做什么,但它可以解释为什么实际上没有加载任何变量


    bootstrap.servers = [broker:29092]
    

    在您的 Connect 配置中,您没有使用 kafka-ssl:9092 作为连接字符串

    请注意,您的键和值序列化程序使用的是字符串,而不是 Avro 设置...拦截器列表为空,似乎未应用 SSL 设置等

    要缩小范围,我认为您不需要_PRODUCER_BOOTSTRAP_SERVERS 或消费者。

    您应该执行到您的容器中并查看创建的模板化 connect-distributed.properties 文件

    请注意,Debezium 映像带有 mysql 连接器类,所以也许您不需要自己的映像?

    【讨论】:

    • 问题出在这个mysql连接database.history.kafka.bootstrap.servers
    猜你喜欢
    • 2019-10-23
    • 2018-02-11
    • 2022-07-06
    • 1970-01-01
    • 2016-06-26
    • 2014-04-09
    • 1970-01-01
    • 1970-01-01
    • 2018-04-28
    相关资源
    最近更新 更多