【问题标题】:Flink Stream SQL order byFlink Stream SQL order by
【发布时间】:2018-08-17 21:09:30
【问题描述】:

我有一个流式输入,比如股票价格数据(包括多只股票),我想每 1 分钟按价格进行一次排名。排名基于所有股票的最新价格,无论前1分钟是否更新,都需要对所有股票进行排序。我尝试在 flink 流 SQL 中使用 ORDER BY。

我未能实现我的逻辑,我对两部分感到困惑:

  1. 为什么ORDER BY只能以时间属性为主,只支持ASC?如何执行价格等其他类型的订单?

  2. 下面的 SQL(来自 Flink 文档)是什么意思?没有窗口也没有窗口,所以我假设每个订单进来都会立即执行 SQL,在这种情况下,对一个元素进行排序看起来毫无意义。

[更新]:当我阅读 ProcimeSortProcessFunction.scala 的代码时,似乎 Flink 对接下来一毫秒内收到的元素进行了排序。

SELECT *
FROM Orders
ORDER BY orderTime

最后,有没有办法在 SQL 中实现我的逻辑?

【问题讨论】:

    标签: apache-flink flink-streaming flink-sql


    【解决方案1】:

    流式查询中的ORDER BY 很难计算,因为当我们必须发出一个需要转到结果表开头的结果时,我们不想更新整个结果。因此,如果我们可以保证结果具有(大致)增加的时间戳,我们只支持ORDER BY time-attribute

    在未来(Flink 1.6 或更高版本),我们还将支持ORDER BY x ASC LIMIT 10 等一些查询,这将导致更新表,其中包含具有最小x 值的10 个记录。

    无论如何,您不能(轻松)使用GROUP BY 翻转窗口计算每分钟的前 k 名排名。 GROUP BY 查询将组的记录(在 GROUP BY TUMBLE(rtime, INTERVAL '1' MINUTE) 的情况下也是窗口)聚合为单个记录。所以每分钟不会有多条记录,而只有一条。

    如果您希望查询以每分钟计算字段 a 的前 10 名,您需要一个类似于以下的查询:

    SELECT a, b, c 
    FROM (
      SELECT 
        a, b, c, 
        RANK() OVER (ORDER BY a PARTITION BY CEIL(t TO MINUTE) BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as rank 
      FROM yourTable)
    WHERE rank <= 10
    

    但是,Flink(1.4 版)尚不支持此类查询,因为时间属性用于PARTITION BY 子句,而不是OVER 窗口的ORDER BY 子句。

    【讨论】:

    • 感谢 Fabian,我错误地认为 ORDER BY 是针对窗口内的元素而不是窗口结果计算的,谢谢澄清。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-06-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多