【问题标题】:How to get only the latest row from a window如何仅从窗口中获取最新行
【发布时间】:2018-04-14 04:42:27
【问题描述】:

我正在使用 Kinesis Analytics,我正在尝试了解如何编写我的应用程序,以便在 24 小时内为我提供一个滑动窗口。我已经生成了正确的数据,但看起来它每次都会重新生成它,这可能是它应该做的事情,而我自己的无知阻止了我查看问题对吗?

我想做什么:

我有一些设备为 Kinesis Stream 提供数据,此 Kinesis 分析应用程序已连接到这些设备。

现在,当有记录进入时,我想做的是 SUM 过去 24 小时内的值并将其存储。因此,在 Kinesis Analytics 完成工作后,我将其连接到 Lambda 以完成一些事情。

我的问题是,当我模拟发送一些数据时,在这种情况下有 5 条记录,一切都运行,它运行多次,而不是 5 次。看起来每次有记录进入它都会重做窗口中的所有内容(预期)这会触发发出的每一行的 lambda。随着桌子的增长,这是个坏消息。我真正想要的只是来自NOW - 24 HOUR 的窗口中的最新值,带有"id" 字段,这样我就可以将该“id”加入到存储在其他地方的记录中。

我的应用程序如下所示:

CREATE OR REPLACE STREAM "DEVICE_STREAM" (
    "id" VARCHAR(64),
    "timestamp_mark" TIMESTAMP,
    "device_id" VARCHAR(64),
    "property_a_id" VARCHAR(64),
    "property_b_id" VARCHAR(64),
    "value" DECIMAL
);

CREATE OR REPLACE PUMP "DEVICE_PUMP" AS
    INSERT INTO "DEVICE_STREAM"
    SELECT STREAM "id",
        "timestamp_mark",
        "device_id",
        "x_id",
        "y_id",
        SUM("value") OVER W1 AS "value",
    FROM "SOURCE_SQL_STREAM_001"
    WINDOW W1 AS (
        PARTITION BY "device_id", "property_a_id", "property_b_id" ORDER BY "SOURCE_SQL_STREAM_001".ROWTIME
        RANGE INTERVAL '24' HOUR PRECEDING
    );

嗯.. 这可能是一个更好的主意,在子选择中进行聚合并从中选择。看来我需要第二个窗口(下面的W2)来确保我得到返回的每条记录。

CREATE OR REPLACE STREAM "DEVICE_STREAM" (
    "id" VARCHAR(64),
    "timestamp_mark" TIMESTAMP,
    "device_id" VARCHAR(64),
    "property_a_id" VARCHAR(64),
    "property_b_id" VARCHAR(64),
    "value" DECIMAL
);

CREATE OR REPLACE PUMP "DEVICE_PUMP" AS
    INSERT INTO "DEVICE_STREAM"
    SELECT STREAM s."id",
        s."timestamp_mark",
        s."device_id",
        s."property_a_id",
        s."property_b_id",
        v."value"
    FROM "SOURCE_SQL_STREAM_001" OVER W2 AS s, (
        SELECT STREAM "SOURCE_SQL_STREAM_001"."ROWTIME", "id",
            "timestamp_mark",
            "device_id",
            "property_a_id",
            "property_b_id",
            SUM("value") OVER W1 AS "value",
            FROM "SOURCE_SQL_STREAM_001"
            WINDOW W1 AS (
                PARTITION BY "device_id", "property_a_id", "property_b_id" ORDER BY "SOURCE_SQL_STREAM_001".ROWTIME
                RANGE INTERVAL '24' HOUR PRECEDING
            )
        ) AS v
    WHERE s."id" = v."id"
    WINDOW W2 AS (
        RANGE INTERVAL '1' SECOND PRECEDING
    );

我还注意到,如果我重新启动 Kinesis Analytics 应用程序,SUM 值会重置,因此很明显它不会在重新启动后持续存在,这可能使其不适合此解决方案。我可能只需要设置一个 SQL 服务器并定期删除旧记录。

【问题讨论】:

    标签: amazon-web-services amazon-kinesis


    【解决方案1】:

    一般来说,当您需要根据事件中的数据而不是像挂钟时间这样的外部数据来做某事时,建议您使用 Streaming Analytics 解决方案(尤其是 Kinesis Analytics)。

    原因很简单:如果您需要每 24 小时执行一次操作,您可以创建一个作业,将数据从存储 (DB) 中取出一次,执行您的任务,然后再“休眠”24 小时 - 没有复杂性,易于管理高架。现在,如果您需要根据数据执行某些操作(例如,当跨多个事件的某个字段的总和超过 X 时)您在使用传统解决方案时遇到了麻烦,因为没有简单的标准来确定何时运行。如果您定期运行它,它可能会被多次调用,直到满足数据驱动的标准,从而产生明显的开销。

    在最新情况下,流分析解决方案将按设计使用,并在需要时触发您的逻辑,从而最大限度地减少开销。

    如果您更喜欢使用 Streaming Analytics(根据您对问题的描述,我个人不建议您这样做),但在 Kinesis Analytics 语法方面遇到了困难,您可以考虑使用 Drools Kinesis Analytics。它的功能包括cronscollectors,它们为您提供了一种非常简单的方法来按时触发作业。

    • 请注意,由于我是 Streamx 的 CTO,因此我的回答存在偏见。

    【讨论】:

      猜你喜欢
      • 2019-08-20
      • 2021-10-16
      • 2012-12-07
      • 1970-01-01
      • 1970-01-01
      • 2012-01-19
      • 1970-01-01
      • 2019-09-22
      • 1970-01-01
      相关资源
      最近更新 更多