【发布时间】:2018-04-14 04:42:27
【问题描述】:
我正在使用 Kinesis Analytics,我正在尝试了解如何编写我的应用程序,以便在 24 小时内为我提供一个滑动窗口。我已经生成了正确的数据,但看起来它每次都会重新生成它,这可能是它应该做的事情,而我自己的无知阻止了我查看问题对吗?
我想做什么:
我有一些设备为 Kinesis Stream 提供数据,此 Kinesis 分析应用程序已连接到这些设备。
现在,当有记录进入时,我想做的是 SUM 过去 24 小时内的值并将其存储。因此,在 Kinesis Analytics 完成工作后,我将其连接到 Lambda 以完成一些事情。
我的问题是,当我模拟发送一些数据时,在这种情况下有 5 条记录,一切都运行,它运行多次,而不是 5 次。看起来每次有记录进入它都会重做窗口中的所有内容(预期)这会触发发出的每一行的 lambda。随着桌子的增长,这是个坏消息。我真正想要的只是来自NOW - 24 HOUR 的窗口中的最新值,带有"id" 字段,这样我就可以将该“id”加入到存储在其他地方的记录中。
我的应用程序如下所示:
CREATE OR REPLACE STREAM "DEVICE_STREAM" (
"id" VARCHAR(64),
"timestamp_mark" TIMESTAMP,
"device_id" VARCHAR(64),
"property_a_id" VARCHAR(64),
"property_b_id" VARCHAR(64),
"value" DECIMAL
);
CREATE OR REPLACE PUMP "DEVICE_PUMP" AS
INSERT INTO "DEVICE_STREAM"
SELECT STREAM "id",
"timestamp_mark",
"device_id",
"x_id",
"y_id",
SUM("value") OVER W1 AS "value",
FROM "SOURCE_SQL_STREAM_001"
WINDOW W1 AS (
PARTITION BY "device_id", "property_a_id", "property_b_id" ORDER BY "SOURCE_SQL_STREAM_001".ROWTIME
RANGE INTERVAL '24' HOUR PRECEDING
);
嗯.. 这可能是一个更好的主意,在子选择中进行聚合并从中选择。看来我需要第二个窗口(下面的W2)来确保我得到返回的每条记录。
CREATE OR REPLACE STREAM "DEVICE_STREAM" (
"id" VARCHAR(64),
"timestamp_mark" TIMESTAMP,
"device_id" VARCHAR(64),
"property_a_id" VARCHAR(64),
"property_b_id" VARCHAR(64),
"value" DECIMAL
);
CREATE OR REPLACE PUMP "DEVICE_PUMP" AS
INSERT INTO "DEVICE_STREAM"
SELECT STREAM s."id",
s."timestamp_mark",
s."device_id",
s."property_a_id",
s."property_b_id",
v."value"
FROM "SOURCE_SQL_STREAM_001" OVER W2 AS s, (
SELECT STREAM "SOURCE_SQL_STREAM_001"."ROWTIME", "id",
"timestamp_mark",
"device_id",
"property_a_id",
"property_b_id",
SUM("value") OVER W1 AS "value",
FROM "SOURCE_SQL_STREAM_001"
WINDOW W1 AS (
PARTITION BY "device_id", "property_a_id", "property_b_id" ORDER BY "SOURCE_SQL_STREAM_001".ROWTIME
RANGE INTERVAL '24' HOUR PRECEDING
)
) AS v
WHERE s."id" = v."id"
WINDOW W2 AS (
RANGE INTERVAL '1' SECOND PRECEDING
);
我还注意到,如果我重新启动 Kinesis Analytics 应用程序,SUM 值会重置,因此很明显它不会在重新启动后持续存在,这可能使其不适合此解决方案。我可能只需要设置一个 SQL 服务器并定期删除旧记录。
【问题讨论】:
标签: amazon-web-services amazon-kinesis