【问题标题】:ksqlDB server fails to start when connecting to existing kafka broker连接到现有 kafka 代理时,ksqlDB 服务器无法启动
【发布时间】:2021-04-29 21:53:07
【问题描述】:

我在 :9092 有一个现有的 kafka 代理。该代理正在运行 Apache Kafka 版本 2.2.0。我想使用 ksqlDB 对来自该 kafka 代理的主题的数据进行一些流处理。因此,根据https://docs.confluent.io/platform/current/installation/versions-interoperability.html 的兼容性表,我使用的是 Confluent Platform 5.2 版的 ksqlDB。

我已经在ksql/ksql-server.properties 中设置了bootstrap.servers=<myurl>:9092

但是,当我尝试通过运行ksql-server-start etc/ksql/ksql-server.properties 启动 ksql-server 时,我收到以下错误:

ERROR Failed to start KSQL (io.confluent.ksql.rest.server.KsqlServerMain:53)
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition _confluent-ksql-default__command_topic-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.confluent.ksql.rest.server.computation.Command`, problem: `java.lang.NullPointerException`
 at [Source: (byte[])"("statement":"CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>>) WITH(KAFKA_TOPIC='default_ksql_processing_log', VALUE_FORMAT='JSON');","originalProperties":{"ksql.extension.dir":"ext","ksql.streams.ca"[truncated 3011 bytes]; line: 1, column: 3511]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.confluent.ksql.rest.server.computation.Command`, problem: `java.lang.NullPointerException`
 at [Source: (byte[])"("statement":"CREATE STREAM KSQL_PROCESSING_LOG (logger VARCHAR, level VARCHAR, time BIGINT, message STRUCT<type INT, deserializationError STRUCT<errorMessage VARCHAR, recordB64 VARCHAR, cause ARRAY<VARCHAR>, `topic` VARCHAR>, recordProcessingError STRUCT<errorMessage VARCHAR, record VARCHAR, cause ARRAY<VARCHAR>>, productionError STRUCT<errorMessage VARCHAR>>) WITH(KAFKA_TOPIC='default_ksql_processing_log', VALUE_FORMAT='JSON');","originalProperties":{"ksql.extension.dir":"ext","ksql.streams.ca"[truncated 3011 bytes]; line: 1, column: 3511]
...

如果我使用本地代理并设置bootstrap.servers=localhost:9092,ksql-server 启动没有任何问题。

如何解决这个空记录/反序列化问题,以便将 ksqldb 服务器连接到我现有的 kafka 代理?

【问题讨论】:

  • 如果没有其他人在该集群上使用 KSQL,您可以删除 _confluent-ksql-default__command_topic。否则,更改 ksql 配置中的命令主题名称
  • 感谢@OneCricketeer,删除主题有效!

标签: apache-kafka confluent-platform ksqldb


【解决方案1】:

正如@OneCricketeer 所指出的,问题是由集群上现有的命令主题引起的。您可以通过更改 ksqlDB 服务器属性中的 ksql.service.id 来使用新的命令主题。见Configuring ksqlDB server

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-10-29
    • 1970-01-01
    • 1970-01-01
    • 2020-03-12
    • 2015-01-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多