【问题标题】:ksqlDB: Joining tables/streams with nested structure(not flattened)ksqlDB:使用嵌套结构连接表/流(未展平)
【发布时间】:2021-05-08 18:42:50
【问题描述】:

我有两个不同的主题,它们有自己的 avro 格式的架构,X 和 Y。这两个主题都有很多字段。 我想在它们之间创建一个表流连接关系,并将其输出到另一个主题,格式如下:

{
   id, // the id used to join them
   x_name : X,
   y_name: Y
}

换句话说,我想加入这两个嵌套的每个源。 我能够以正常方式加入它们,但是所有字段都变平了。 这可以用 KsqlDB 实现吗?我试图找到一种没有成功的好方法。

编辑:

添加更多信息和示例。 假设我有两个关于此类数据的主题。

产品供应

{
  "product_id": 1,
  "name": "name",
  "stock": 11
  "price": "141",
  "storage_ids": [1, 2, 3]
}

产品信息

{
  "product_id": 1,
  "description": "151",
  "manufacturer": "ABC"
  "Vendor_id": "5"
}

我想使用 KsqlDB 以非扁平方式加入这些表并发布到主题,如下所示:

{
  "product_id": 1,
  "product_information": {
      "product_id": 1,
      "description": "151",
      "manufacturer": "ABC"
      "Vendor_id": "5"
  }
  "product_supply": {
      "product_id": 1,
      "name": "name",
      "stock": 11
      "price": "141",
      "storage_ids": [1, 2, 3]
  }
}

我已经为每个主题添加了架构,如果可能的话,我希望使用这些架构而不必在 ksql 中显式定义每个字段。

【问题讨论】:

  • 欢迎来到 StackOverflow!您能否编辑您的问题以包含您尝试加入的两种消息类型的示例?
  • 嗨罗宾,谢谢!我添加了一个例子来证明我的意图。我希望它很清楚。

标签: apache-kafka confluent-platform ksqldb


【解决方案1】:

working with structured data in ksqlDB 上有一个很好的指南。基于此,我能够使其工作:

  • 创建样本数据

    CREATE STREAM PRODUCT_SUPPLY (PRODUCT_ID INT, NAME VARCHAR, STOCK INT, PRICE INT, STORAGE_IDS ARRAY<INT>) WITH (KAFKA_TOPIC='PRODUCT_SUPPLY', REPLICAS=1,PARTITIONS=6,VALUE_FORMAT='AVRO');
    CREATE TABLE PRODUCT_INFORMATION (PRODUCT_ID INT PRIMARY KEY, DESCRIPTION VARCHAR, MANUFACTURER VARCHAR, VENDOR_ID INT) WITH (KAFKA_TOPIC='PRODUCT_INFO', REPLICAS=1,PARTITIONS=6,VALUE_FORMAT='AVRO');
    INSERT INTO PRODUCT_SUPPLY VALUES(1,'NAME',11,141,ARRAY[1,2,3]);
    INSERT INTO PRODUCT_INFORMATION values (1,'151','abc',5);
    
  • 查询数据

    SET 'auto.offset.reset' = 'earliest';
    
    SELECT PS.PRODUCT_ID AS PRODUCT_ID,
          STRUCT(NAME        := PS.NAME,
                  STOCK       := PS.STOCK,
                  PRICE       := PS.PRICE,
                  STORAGE_IDS := PS.STORAGE_IDS) AS PRODUCT_SUPPLY,
          STRUCT(DESCRIPTION  := PI.DESCRIPTION,
                  MANUFACTURER := PI.MANUFACTURER,
                  VENDOR_ID    := PI.VENDOR_ID) AS PRODUCT_INFORMATION
      FROM PRODUCT_SUPPLY PS
          LEFT JOIN PRODUCT_INFORMATION PI
          ON PS.PRODUCT_ID=PI.PRODUCT_ID
    EMIT CHANGES LIMIT 1;
    
    +-------------------------+-------------------------+-------------------------+
    |PRODUCT_ID               |PRODUCT_SUPPLY           |PRODUCT_INFORMATION      |
    +-------------------------+-------------------------+-------------------------+
    |1                        |{NAME=NAME, STOCK=11, PRI|{DESCRIPTION=151, MANUFAC|
    |                         |CE=141, STORAGE_IDS=[1, 2|TURER=abc, VENDOR_ID=5}  |
    |                         |, 3]}                    |                         |
    Limit Reached
    Query terminated
    

【讨论】:

  • 你能告诉我为什么我得到以下错误。引起:第 1:40 行:输入不匹配 ':' 期待 {',', ')'} 引起:org.antlr.v4.runtime.InputMismatchException 错误在“:=" 引起
  • @PradipB 请开始一个新问题,详细说明如何重现您的问题。谢谢。
  • 感谢您的回复,我在这里发布了新问题,请您指导我stackoverflow.com/questions/67919311/…
猜你喜欢
  • 2014-05-24
  • 2013-11-23
  • 2013-11-17
  • 2022-07-10
  • 2017-06-17
  • 1970-01-01
  • 2015-03-25
  • 1970-01-01
  • 2019-01-21
相关资源
最近更新 更多