【问题标题】:From Postgres to Kafka with changes tracking从 Postgres 到 Kafka,带有更改跟踪
【发布时间】:2020-06-14 22:08:36
【问题描述】:

这个问题跟随this one

主要任务是在 KSQL 端进行连接。下面的例子将说明它。事件消息到达 Kafka 主题。该消息的结构:

[
    {
        "name": "from_ts", 
        "type": "bigint"
    },
    {
        "name": "to_ts", 
        "type": "bigint"
    },
    {
        "name": "rulenode_id",
        "type": "int"
    }
]

还有一个 Postgres 表 rulenode:

id | name | description 

两个来源的数据都需要通过rulenode_id = rulenode.id字段连接,才能得到字段from_ts, to_ts, rulenode_id, rulenode_name, rulenode_description的单条记录。

我想通过 KSQL 来做到这一点,而不是像现在这样的后端。

现在来自 Postgres 表的数据通过 JdbcSourceConnector 传输到 Kafka。但是有一个小问题——你可以猜到 Postgres 表中的数据可能会改变。当然,这些更改也应该在 KSQL 流 OR 表中。

下面有人问我为什么选择 KTable 而不是 Kstream。好吧,请访问this page 并查看第一个 GIF。当新数据到达时,表的记录正在更新。我认为这种行为是我所需要的(而不是名称 Alice,Bob 我有 Postgres 表 rulenode 的主键 id)。这就是我选择 KTable 的原因。

JdbcSourceConnect 的批量模式复制所有表。如您所知,所有行都到达 Kafka 表中以获取以前的 Postgres 表快照。


按照建议,我创建了一个带有配置的连接器:

{
  "name": "from-pg",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "errors.log.enable": "true",
  "connection.url": "connection.url",
  "connection.user": "postgres",
  "connection.password": "*************",
  "table.whitelist": "rulenode",
  "mode": "bulk",
  "poll.interval.ms": "5000",
  "topic.prefix": "pg."
}

然后创建了一个流:

create stream rulenodes 
    with (kafka_topic='pg.rules_rulenode', value_format='avro', key='id');

现在正在尝试创建一个表:

create table rulenodes_unique 
    as select * from rulenodes;

但这并没有出现错误:

无效的结果类型。您的 SELECT 查询会生成一个 STREAM。请用 改为 CREATE STREAM AS SELECT 语句。

我读到表用于存储聚合信息。例如用 COUNT 函数聚合存储:

create table rulenodes_unique 
    as select id, count(*) from rulenodes order by id;

你能说一下如何处理这个错误吗?

【问题讨论】:

  • 您能否编辑您的问题以解释您为什么要创建一个表而不是来自该选择的流?正如错误所说,没有聚合的流的输出确实是一个流。
  • 嗨,罗宾。完成。

标签: apache-kafka apache-kafka-connect ksqldb


【解决方案1】:

不清楚是哪个语句抛出了错误,但是如果在表定义上会误导

您可以直接从主题创建表格。无需通过流

https://docs.confluent.io/current/ksql/docs/developer-guide/create-a-table.html

如果你也想使用流,就像文档说的那样

使用CREATE TABLE <b>AS SELECT</b> 语句创建一个表,其中包含来自现有表或流的查询结果。

您可能希望在语句中使用区分大小写的值

CREATE STREAM rulenodes WITH (
    KAFKA_TOPIC ='pg.rules_rulenode', 
    VALUE_FORMAT='AVRO', 
    KEY='id'
);


CREATE TABLE rulenodes_unique AS
    SELECT id, COUNT(*) FROM rulenodes 
    ORDER BY id;

【讨论】:

    【解决方案2】:

    您可以使用 ksqlDB 在 Kafka 主题之上创建 STREAMTABLE - 这与您希望如何建模数据有关。根据您的问题,很明显您需要将其建模为表格(因为您想加入 最新版本的密钥)。所以你需要这样做:

    create table rulenodes 
        with (kafka_topic='pg.rules_rulenode', value_format='avro');
    

    现在您还有要做一件事,那就是确保主题中的数据正确键入。您不能指定 key='id' 并且它会自动发生 - key 参数只是一个“提示”。您必须确保 Kafka 主题中的消息在 key 中有 id 字段。有关详细信息,请参阅ref doc

    您可以通过Single Message Transform in Kafka Connect 做到这一点:

    "transforms":"createKey,extractInt",
    "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields":"id",
    "transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractInt.field":"id"
    

    或者您可以在 ksqlDB 中执行此操作并更改密钥 - 因为我们想要处理每个 event 我们首先将其建模为 stream (!) 并声明重新键入主题的表格:

    create stream rulenodes_source 
        with (kafka_topic='pg.rules_rulenode', value_format='avro');
    
    CREATE STREAM RULENODES_REKEY AS SELECT * FROM rulenodes_source PARITION BY id;
    
    CREATE TABLE rulenodes WITH (kafka_topic='RULENODES_REKEY', value_format='avro');
    

    我会选择单一消息转换路线,因为它总体上更简洁。

    【讨论】:

      猜你喜欢
      • 2020-06-14
      • 2019-04-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-01-02
      相关资源
      最近更新 更多