【发布时间】:2020-06-14 22:08:36
【问题描述】:
这个问题跟随this one。
主要任务是在 KSQL 端进行连接。下面的例子将说明它。事件消息到达 Kafka 主题。该消息的结构:
[
{
"name": "from_ts",
"type": "bigint"
},
{
"name": "to_ts",
"type": "bigint"
},
{
"name": "rulenode_id",
"type": "int"
}
]
还有一个 Postgres 表 rulenode:
id | name | description
两个来源的数据都需要通过rulenode_id = rulenode.id字段连接,才能得到字段from_ts, to_ts, rulenode_id, rulenode_name, rulenode_description的单条记录。
我想通过 KSQL 来做到这一点,而不是像现在这样的后端。
现在来自 Postgres 表的数据通过 JdbcSourceConnector 传输到 Kafka。但是有一个小问题——你可以猜到 Postgres 表中的数据可能会改变。当然,这些更改也应该在 KSQL 流 OR 表中。
下面有人问我为什么选择 KTable 而不是 Kstream。好吧,请访问this page 并查看第一个 GIF。当新数据到达时,表的记录正在更新。我认为这种行为是我所需要的(而不是名称 Alice,Bob 我有 Postgres 表 rulenode 的主键 id)。这就是我选择 KTable 的原因。
JdbcSourceConnect 的批量模式复制所有表。如您所知,所有行都到达 Kafka 表中以获取以前的 Postgres 表快照。
按照建议,我创建了一个带有配置的连接器:
{
"name": "from-pg",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"errors.log.enable": "true",
"connection.url": "connection.url",
"connection.user": "postgres",
"connection.password": "*************",
"table.whitelist": "rulenode",
"mode": "bulk",
"poll.interval.ms": "5000",
"topic.prefix": "pg."
}
然后创建了一个流:
create stream rulenodes
with (kafka_topic='pg.rules_rulenode', value_format='avro', key='id');
现在正在尝试创建一个表:
create table rulenodes_unique
as select * from rulenodes;
但这并没有出现错误:
无效的结果类型。您的 SELECT 查询会生成一个 STREAM。请用 改为 CREATE STREAM AS SELECT 语句。
我读到表用于存储聚合信息。例如用 COUNT 函数聚合存储:
create table rulenodes_unique
as select id, count(*) from rulenodes order by id;
你能说一下如何处理这个错误吗?
【问题讨论】:
-
您能否编辑您的问题以解释您为什么要创建一个表而不是来自该选择的流?正如错误所说,没有聚合的流的输出确实是一个流。
-
嗨,罗宾。完成。
标签: apache-kafka apache-kafka-connect ksqldb