【问题标题】:Kafka: creating stream from topic with values in separate columnsKafka:从主题创建流,其值位于单独的列中
【发布时间】:2020-07-13 11:35:31
【问题描述】:

我刚刚使用 postgres 源连接器将我的 kafka 连接到 postgres。现在,当我打印主题时,我得到以下输出:

rowtime: 4/1/20 4:16:12 PM UTC, key: <null>, value: {"userid": 4, "id": 5, "title": "lorem", "body": "dolor sit amet, consectetur"}
rowtime: 4/1/20 4:16:12 PM UTC, key: <null>, value: {"userid": 5, "id": 6, "title": "ipsum", "body": "cupidatat non proident"}

我如何从这个主题创建一个流,以便将值分离到它们自己的列中,就像它们最初在数据库表中一样?

额外问题:有没有办法在 jdbc-connector 中指定在创建源连接器时将列分隔到主题中?

我的连接器如下所示:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_postgres_02",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:postgresql://postgres:5432/kafka",
                "connection.user": "bob",
                "connection.password": "builder",
                "topic.prefix": "post_",
                "mode":"bulk",
                "table.whitelist" : "kafka_t.users_t",
                "poll.interval.ms" : 500
                }
        }'

【问题讨论】:

  • 你能解释一下你所说的“分离到自己的列”是什么意思吗?您正在显示一条反序列化的消息(从 ksqlDB 的外观来看)。如果您从源连接器使用 Avro,则架构已经存在且具有不同的列
  • 我编辑了我设置连接器的方式。我没有碰过任何东西 Avro。我的最终目标是在 ksqldb 中创建包含该主题的用户 ID、ID、标题、正文列的表,以便稍后我可以在其他地方下沉
  • 您能否为您的 Kafka Connect 工作人员添加配置?

标签: postgresql jdbc apache-kafka apache-kafka-connect


【解决方案1】:

我如何从这个主题创建一个流,以便将值分离到它们自己的列中,就像它们最初在数据库表中一样?

不是 100% 确定您的意思。例如,如果您使用 Kafka Streams,您可以创建一个带有自定义 Columns 类型(或仅使用 JSON 作为值类型)的 KStream&lt;KeyType, Columns&gt;,以获取数据的“列视图”。

同样,您可以将 ksqlDB 与 CREATE STREAM 命令一起使用——它可以自动将 JSON 值解析为相应的列。

额外问题:有没有办法在 jdbc-connector 中指定在创建源连接器时将列分隔到主题中?

你这是什么意思? Kafka主题具有键值数据模型,因此如果您将任何数据存储在主题中,它必须进入键或值。如果您有更结构化的类型,例如 DB 元组,则 Kafka 代理中没有原生支持,但您需要将其放入键值模型中。

【讨论】:

  • 是的,实际上我想做的是使用CREATE STREAM 命令将列从主题值字段中分离到流中的相应列中。我陷入了困境,因为在 CREATE STREAM 之后我无法从 strem 中查询任何内容,所以我想从 @Matthias J. Sax 的主题中对其进行故障排除
  • I couldnt query anything from the strem --- 可能检查 kslqDB 服务器日志,看看为什么你没有得到数据—— IIRC,损坏的行将被删除并仔细检查你的 CREATE STREAM 语句是否定义了正确的名称和数据类型根据值中的 JSON 数据。
  • 我实际上是通过另一个问题解决了这个问题。感谢您的分析!
猜你喜欢
  • 2018-12-26
  • 1970-01-01
  • 2020-03-31
  • 2023-01-25
  • 1970-01-01
  • 2016-11-30
  • 2022-12-24
  • 2021-09-09
  • 1970-01-01
相关资源
最近更新 更多