【问题标题】:KSQL STRUCT Fields For TIMESTAMPTIMESTAMP 的 KSQL STRUCT 字段
【发布时间】:2019-06-03 19:34:52
【问题描述】:

有没有办法让这个工作

JSON 数据

"Header": {
    "StoreID": 10225,
    "BusinessDate": "2019-05-03",
    "PeriodBusinessDate": "2019-05-03",
    "ProcessMode": "Partial"
  }

我试试这个,但给我: 定义的架构中不存在 WITH 子句中提供的时间戳列名称 HEADER->BUSINESSDATE 的列。

CREATE STREAM test2 (HEADER STRUCT<StoreID int,BusinessDate VARCHAR>) WITH (KAFKA_TOPIC='hermes__output__tfrema__v1',VALUE_FORMAT='JSON',
timestamp='HEADER->BusinessDate',timestamp_format='yyyy-MMM-dd');

【问题讨论】:

    标签: ksqldb


    【解决方案1】:

    您不能在TIMESTAMP 参数中使用嵌套字段。您需要先提取它然后使用它。例如:

    CREATE STREAM X (COL1 INT, COL2 VARCHAR, HEADER STRUCT<StoreID int,BusinessDate VARCHAR>) 
      WITH (KAFKA_TOPIC='hermes__output__tfrema__v1',VALUE_FORMAT='JSON')
    
    CREATE STREAM Y AS 
      SELECT COL1, COL2, HEADER->BusinessDate AS BusinessDate, HEADER 
      FROM X;
    
    CREATE STREAM Z COL1 INT, COL2 VARCHAR, BusinessDate VARCHAR, HEADER STRUCT<StoreID int,BusinessDate VARCHAR>) 
      WITH (KAFKA_TOPIC='Y',VALUE_FORMAT='JSON',timestamp='BusinessDate',timestamp_format='yyyy-MMM-dd');)
    

    如果您使用 Avro,您可以简化事情,因为架构不需要重新声明:

    CREATE STREAM X (COL1 INT, COL2 VARCHAR, HEADER STRUCT<StoreID int,BusinessDate VARCHAR>) 
      WITH (KAFKA_TOPIC='hermes__output__tfrema__v1',VALUE_FORMAT='JSON')
    
    CREATE STREAM Y WITH (VALUE_FORMAT='AVRO') 
      AS SELECT COL1, COL2, HEADER->BusinessDate, HEADER FROM X;
    
    CREATE STREAM Z 
      WITH (KAFKA_TOPIC='Y',VALUE_FORMAT='JSON',timestamp='BusinessDate',timestamp_format='yyyy-MMM-dd');)
    

    【讨论】:

      猜你喜欢
      • 2015-01-10
      • 2022-01-26
      • 2017-06-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-11-09
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多