【发布时间】:2021-03-26 18:36:43
【问题描述】:
我有各种流,但有些流变得陈旧。为了避免它们变得陈旧,我想放置一些可以读取“显示流”属性“陈旧之后”的进程,如果只剩下 1 天,运行一个进程来刷新流。
【问题讨论】:
标签: snowflake-cloud-data-platform
我有各种流,但有些流变得陈旧。为了避免它们变得陈旧,我想放置一些可以读取“显示流”属性“陈旧之后”的进程,如果只剩下 1 天,运行一个进程来刷新流。
【问题讨论】:
标签: snowflake-cloud-data-platform
要实现您的目标,您必须捕获SHOW STREAMS https://docs.snowflake.com/en/sql-reference/sql/show-streams.html 的输出。您可以开始构建一个存储过程来运行它并使用TABLE(RESULT_SCAN(LAST_QUERY_ID())) 将其输出作为结果集返回,类似于以下内容,可以通过您要检查的时间窗口的参数来丰富它(您的“还剩 1 天)和随后的CREATE OR REPLACE STREAM。
请注意,这不是您问题的完整解决方案,而只是其中的一半,因为它不包括重新创建陈旧流所需的操作。
CREATE OR REPLACE PROCEDURE sp_show_stream_stale()
RETURNS VARIANT NOT NULL
LANGUAGE Javascript
EXECUTE AS Caller
AS
$$
var sql_command0 = snowflake.createStatement({ sqlText:`show streams in database`});
var sql_command1 = snowflake.createStatement({ sqlText:`SELECT "created_on"
, "name"
, "database_name"
, "schema_name"
, "owner"
, "comment"
, "table_name"
, "type"
, "stale"
, "mode"
, "stale_after"
FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))`});
try {
sql_command0.execute();
var db = sql_command1.execute();
var json_rows = {};
var array_of_rows = [];
var COLUMNS = ["created_on","name","database_name","schema_name","owner", "comment", "table_name", "type", "stale", "mode", "stale_after"];
var row_num = 1;
while (db.next()) {
json_rows = {};
for (var col_num = 0; col_num < COLUMNS.length; col_num = col_num + 1) {
var col_name = COLUMNS[col_num];
json_rows[col_name] = db.getColumnValue(col_num + 1);
}
array_of_rows.push(json_rows);
++row_num;
}
return array_of_rows;
}
catch (err) {
return "Failed: " + err;
}
$$;
由于结果集是单个 JSON,您可以运行存储过程并在以下 SELECT 语句后不久以表格格式获取结果集。
CALL sp_show_stream_stale();
SELECT value:created_on::datetime as "created_on",
value:name::string as "name",
value:database_name::string as "database_name",
value:schema_name::string as "schema_name",
value:owner::string as "owner",
value:comment::string as "comment",
value:table_name::string as "table_name",
value:type::string as "type",
value:stale::string as "stale",
value:mode::string as "mode",
value:stale_after::datetime as "stale_after"
FROM (SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())))
, LATERAL FLATTEN(Input => sp_show_stream_stale)
WHERE DATEDIFF(Day, current_timestamp, value:stale_after::datetime) <= 1 ;
【讨论】: