【问题标题】:Why min function is not recognized by ksql为什么 ksql 不能识别 min 函数
【发布时间】:2020-08-07 22:09:55
【问题描述】:

我正在使用 confluent 编写查询以获取 kafka 主题的 5 分钟窗口中的第一个时间戳。这是查询(我知道这不是很好的方法):

CREATE STREAM start_metric_value AS
select metric_value 
FROM dataaggregaion
WINDOW TUMBLING (SIZE 5 MINUTE)
where metric_datetime_utc = MIN(TIMESTAMPTOSTRING(metric_datetime_utc, 'yyyy-MM-dd HH:mm:ss')) LIMIT 1;

但我有这个错误:

谓词的代码生成失败:找不到任何函数 名称'MIN'。表达式:(METRIC_DATETIME_UTC = MIN(TIMESTAMPTOSTRING(METRIC_DATETIME_UTC, 'yyyy-MM-dd HH:mm:ss'))), 架构:ROWKEY 字符串键,ID 字符串,METRIC_NAME 字符串, METRIC_VALUE 字符串,METRIC_DATETIME_UTC BIGINT,METRIC_INDEX 字符串,IANA_TIMEZONE 字符串,PROCESSED_DATETIME_UTC BIGINT, DATA_TYPE 字符串,ASSET_TYPE 字符串,ROWTIME BIGINT,ROWKEY STRING 原因:找不到任何名为“MIN”的函数

谁能知道如何解决这个问题

【问题讨论】:

  • 如果您添加有关dataaggregaion 架构的详细信息以及一些示例和所需的输出,将会有所帮助。这将帮助人们了解您想要实现的目标。

标签: apache-kafka confluent-platform ksqldb


【解决方案1】:

不是 100% 清楚您要达到的目标。有关添加更多详细信息以帮助人们了解您要实现的目标的问题,请参阅上面的评论。

也就是说,我可以说....

Min 函数未被识别有两个原因:

  • 您将TIMESTAMPTOSTRING 的输出传递给MIN,但MIN 不接受字符串。
  • 您不能在 WHERE 子句中使用聚合函数。

您看到的错误消息看起来像一个错误。如果它仍然存在于最新版本的 ksqlDB 上,您可能需要raise an issue in the ksqlDB GitHub project

即使更正您查询的这两件事仍然会失败,因为 ksqlDB 中的窗口化需要聚合,因此您需要 GROUP BY

例如,如果您想为每 5 分钟的窗口捕获每 metric_value 的最小 metric_datetime_utc,您可以这样做:

CREATE TABLE start_metric_value AS
  SELECT
    metric_value,
    MIN(metric_datetime_utc) as minTs
  FROM dataaggregaion 
  WINDOW TUMBLING (SIZE 5 MINUTE)
  GROUP BY metric_value;

这将创建一个窗口表,即一个由metric_valueWINDOWSTART 时间组成的表。 minTs 将存储看到的最短日期时间。

让我们通过查询运行一些数据以了解发生了什么:

输入:

rowtime | metric_value  | metric_datetime_utc
--------|---------------|--------------------
 1      |  A            | 3
 2      |  A            | 4
 3      |  A            | 2
 4      |  B            | 5
 300000 |  A            | 6

START_METRIC_VALUE 主题的输出可能是(注意:metric_Value 和 windowStart 将存储在 Kafka 记录的键中,而 minTs 将存储在值中):

metric_value | windowStart | minTs 
-------------|-------------|------
 A           | 0           | 3
 A           | 0           | 3
 A           | 0           | 2
 B           | 0           | 5
 A           | 300000      | 6

实际输出到主题的内容取决于您的cache.max.bytes.buffering 值。将此设置为0,关闭缓冲,将看到上述输出。但是,启用缓冲后,一些中间结果可能不会输出到 Kafka,尽管每个窗口的最终结果将保持不变。您还可以使用即将推出的 SUPPRESS functionality 控制向 Kafka 输出的内容

上述解决方案为您提供了每个 metric_value 的最小时间戳。如果您希望每个窗口看到全局最小日期时间,那么您可以GROUP BY 一个常量。请注意,这会将所有事件路由到单个 ksqlDB 节点,因此它不能很好地作为解决方案进行扩展。如果缩放是一个问题,则有解决方案,例如比如首先计算最小值metric_value,然后对其进行后处理以找到全局最小值。

CREATE TABLE start_metric_value AS
  SELECT
    1 as Key,
    MIN(metric_datetime_utc) as minTs
  FROM dataaggregaion 
  WINDOW TUMBLING (SIZE 5 MINUTE)
  GROUP BY 1;

注意:ksqlDB 0.10 版本的语法是正确的。您可能需要针对其他版本进行调整。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-06-06
    • 1970-01-01
    • 2019-05-08
    • 2019-10-23
    • 2018-11-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多