【发布时间】:2021-01-09 18:24:03
【问题描述】:
我们有一个“微服务”平台,我们正在使用 debezium 从这些平台上的数据库中捕获变更数据,效果很好。
现在,我们想让我们更轻松地加入这些主题并将结果流式传输到一个新主题中,供多个服务使用。
免责声明:这假定 v0.11 ksqldb 和 cli(似乎其中大部分可能不适用于旧版本)
来自两个数据库实例的两个表流入 Kafka 主题的示例:
-- source identity microservice (postgres)
CREATE TABLE public.user_entity (
id varchar(36) NOT NULL,
first_name varchar(255) NULL,
PRIMARY KEY (id)
);
-- ksql stream
CREATE STREAM stream_user_entity WITH (KAFKA_TOPIC='cdc.identity.public.user_entity', value_format='avro');
-- source organization microservice (postgres)
CREATE TABLE public.user_info (
id varchar(36) NOT NULL,
user_entity_id varchar(36) NOT NULL,
business_unit varchar(255) NOT NULL,
cost_center varchar(255) NOT NULL,
PRIMARY KEY (id)
);
-- ksql stream
CREATE STREAM stream_user_info WITH (KAFKA_TOPIC='cdc.application.public.user_info', value_format='avro');
选项 1:流
CREATE STREAM stream_user_info_by_user_entity_id
AS SELECT * FROM stream_user_info
PARTITION BY user_entity_id
EMIT CHANGES;
SELECT
user_entity_id,
first_name,
business_unit,
cost_center
FROM stream_user_entity ue
LEFT JOIN stream_user_info_by_user_entity_id ui WITHIN 365 DAYS ON ue.id = ui.user_entity_id
EMIT CHANGES;
注意WITHIN 365 DAYS,从概念上讲,这些表可能会持续很长时间而不会被更改,因此这个窗口在技术上会无限大。这看起来很可疑,似乎暗示这不是一个好方法。
选项 2:表格
CREATE TABLE ktable_user_info_by_user_entity_id (
user_entity_id,
first_name,
business_unit,
cost_center
)
with (KAFKA_TOPIC='stream_user_info_by_user_entity_id', value_format='avro');
SELECT
user_entity_id,
first_name,
business_unit,
cost_center
FROM stream_user_entity ue
LEFT JOIN ktable_user_info_by_user_entity_id ui ON ue.id = ui.user_entity_id
EMIT CHANGES;
我们不再需要窗口WITHIN 365 DAYS,所以这感觉更正确。 然而这只会在消息被发送到流而不是表时发出变化。
在这个例子中: 用户更新 first_name -> 发出更改 用户更新 business_unit -> 未发出任何更改
也许有一种方法可以创建一个由 user_entity_id 分区的合并流,并加入到将保持当前状态的子表,这导致我......
选项 3:合并流和表
-- "master" change stream with merged stream output
CREATE STREAM stream_user_changes (user_entity_id VARCHAR)
WITH (KAFKA_TOPIC='stream_user_changes', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes SELECT id as user_entity_id FROM stream_user_entity;
INSERT INTO stream_user_changes SELECT user_entity_id FROM stream_user_info;
CREATE STREAM stream_user_entity_by_id
AS SELECT * FROM stream_user_entity
PARTITION BY id
EMIT CHANGES;
CREATE TABLE ktable_user_entity_by_id (
id VARCHAR PRIMARY KEY,
first_name VARCHAR
) with (KAFKA_TOPIC='stream_user_entity_by_id', value_format='avro');
SELECT
uec.user_entity_id,
ue.first_name,
ui.business_unit,
ui.cost_center
FROM stream_user_entity_changes uec
LEFT JOIN ktable_user_entity_by_id ue ON uec.user_entity_id = ue.id
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id
EMIT CHANGES;
这个看起来最好,但似乎每个表都有很多移动组件,我们有 2 个流、1 个插入查询、1 个 ktable。此处的另一个潜在问题可能是隐藏的竞争条件,其中流在表更新之前发出更改。
选项 4:更多合并的表和流
CREATE STREAM stream_user_entity_changes_enriched
AS SELECT
ue.id AS user_entity_id,
ue.first_name,
ui.business_unit,
ui.cost_center
FROM stream_user_entity_by_id ue
LEFT JOIN ktable_user_info_by_user_entity_id ui ON uec.user_entity_id = ui.user_entity_id
EMIT CHANGES;
CREATE STREAM stream_user_info_changes_enriched
AS SELECT
ui.user_entity_id,
ue.first_name,
ui.business_unit,
ui.cost_center
FROM stream_user_info_by_user_entity_id ui
LEFT JOIN ktable_user_entity_by_id ue ON ui.user_entity_id = ue.id
EMIT CHANGES;
CREATE STREAM stream_user_changes_enriched (user_entity_id VARCHAR, first_name VARCHAR, business_unit VARCHAR, cost_center VARCHAR)
WITH (KAFKA_TOPIC='stream_user_changes_enriched', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='avro');
INSERT INTO stream_user_changes_enriched SELECT * FROM stream_user_entity_changes_enriched;
INSERT INTO stream_user_changes_enriched SELECT * FROM stream_user_info_changes_enriched;
这在概念上与之前的相同,但“合并”发生在连接之后。可以想象,这可能会消除任何潜在的竞争条件,因为我们主要从流而不是表中进行选择。
缺点是复杂性甚至比选项 3 还要糟糕,并且为具有两个以上表的任何连接编写和跟踪所有这些流将有点让人麻木......
问题: 什么方法最适合这个用例和/或我们是否正在尝试做一些不应该使用 ksql 的事情?我们最好将其卸载到传统的 RDBMS 或 spark 替代品上吗?
【问题讨论】:
-
我认为选项 2 是预期的行为。尽管表中的更改不会立即发出事件,但此后对流的任何后续更改都会在输出流上发出带有
the new information from the table的事件。如果更改是对现有行的更新,我不确定如果表中的更改立即发出事件应该是什么预期行为,我们可能需要返回输出流中的旧事件进行更新,但是由于流是不可变的,这不应该发生。 -
是的,我同意这种预期的行为。我正在寻找一种基本上可以解决这种行为的解决方案。
-
你觉得table-table(ksql表)更适合这种情况吗?你想要的输出在这里听起来不像一个流。
-
我想要一个流,我希望其他应用程序能够使用该流并收到这两个组合流的任何更新的通知。例如,我可以将它们加入消费者,但如果我有多个需要了解这些的服务,则最好在 kafka 端加入一个流,消费者不需要了解所涉及的逻辑.
-
连接表在保留历史记录的意义上不是一个流。如果我是正确的,使用连接表的应用程序仍然会通过连接记录的最新状态通知两个流的任何更新。这听起来像你想要的
标签: apache-kafka ksqldb debezium