【问题标题】:ksql - CREATE TABLE results in table with null values even though kafka topic is populatedksql - 即使填充了 kafka 主题,CREATE TABLE 也会生成具有空值的表
【发布时间】:2021-03-12 05:04:48
【问题描述】:

使用 ksqlDB,我创建了一个带有自定义查询的 JDBC 连接器。然后,从生成的 kafka 主题中,我创建了一个表。但是,从表中选择只返回 PRIMARY KEY 的数据,而返回所有其他值的 null。我正在连接的 postgres 数据库的销售表不断更新新数据,我正在尝试使用 ksql 进行流式传输。

ksql> CREATE SOURCE CONNECTOR con WITH (
  'connector.class'      ='io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.url'       = '....',
  'topic.prefix'         = 'sales',
  ...
  'key'                  = 'id',
  'query'                = 'SELECT id, time, price FROM sales');

Message

Created connector CON

ksql> print sales limit 1;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/11/30 09:07:55.109 Z, key: [123], value: {"schema":{"type":"struct","fields":[{"type":"string","optional":alse,"field":"id"},{"type":"int64","optional":true,"field":"time"},{"type":"float","optional":true,"field":"price"}],"optional":false},"payload":{"id":"123","time":1,"price":10.0}}
Topic printing ceased

ksql> CREATE TABLE sales_table (id VARCHAR PRIMARY KEY, time INT, price DOUBLE) WITH (kafka_topic='sales', partitions=1, value_format='JSON');

Message

Table created

ksql> SELECT * FROM sales_table EMIT CHANGES LIMIT 1;
+-----+-----+-----+
|ID   |TIME |PRICE|
+-----+-----+-----+
|123  |null |null |
Limit Reached
Query terminated

如您所见,kafka 主题在时间和价格字段中具有适当值的条目。但是,当基于该主题创建表时,从表中选择会产生空时间和价格字段。只有 id(即 PRIMARY KEY 列)打印正确。

知道为什么会这样吗?

【问题讨论】:

    标签: apache-kafka apache-kafka-connect ksqldb


    【解决方案1】:

    您在连接器中使用org.apache.kafka.connect.json.JsonConverter 转换器与schemas.enable=true,因此您的架构不是(id VARCHAR PRIMARY KEY, time INT, price DOUBLE),因此您得到NULL 值。

    更好的是在源连接器中使用io.confluent.connect.avro.AvroConverter(或Protobuf,或JSON Schema),因为这样您甚至不必为CREATE STREAM 输入架构,您只需

    CREATE TABLE sales_table  WITH (kafka_topic='sales', value_format='AVRO');
    

    您可以这样指定替代转换器:

    CREATE SOURCE CONNECTOR SOURCE_01 WITH (
    …
        'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
        'value.converter'= 'io.confluent.connect.avro.AvroConverter',
        'value.converter.schema.registry.url'= 'http://schema-registry:8081'
        );
    

    但如果您必须使用 JSON,请在源连接器中禁用模式:

    CREATE SOURCE CONNECTOR SOURCE_01 WITH (
    …
        'value.converter.schemas.enable'= 'false'
        );
    

    参考:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-02-13
      • 1970-01-01
      • 2014-01-20
      • 1970-01-01
      • 1970-01-01
      • 2020-09-20
      相关资源
      最近更新 更多