【问题标题】:Maintaining and updating large subsets of SQL results in an efficient manner以有效的方式维护和更新 SQL 的大型子集
【发布时间】:2016-10-28 13:59:45
【问题描述】:

在我们的网络堆栈中,我正在努力实现一项功能,该功能允许用户基本上指定几个过滤条件(现在称为 list),这些条件是从 Postgres 数据库中的表中获取的.用户可以拥有许多这样的列表,每个列表都具有一组列的“包含”或“等于”等条件。

其中涉及的逻辑相对简单,但问题开始是因为客户希望能够每天查看列表查询结果的更改/更新(因此基本上存储增量的每日快照)和其中一些过滤条件可能很慢并且在非索引列上运行大型表(有问题的表每个有 2-3 百万行)。

目前,我们使用 Redis 和 Postgres 作为我们的存储后端,我不完全确定表示甚至管理这些每日更新以及为每个用户的每个列表编制索引的最佳方式是什么。

  • 我看到的最大问题是确定查询返回的昨天(上次运行)和今天(当前运行)的结果之间的差异,我怀疑之前为每个列表存储的结果集效率低下。如果没有它,我不知道我们将如何弄清楚两个查询之间发生了什么变化(这些变化可能发生在系统中的许多地方,因此捕获所有这些并在应用程序代码中处理它们可能是大量的工作) .
  • 如果结果集之间的差异已知(需要知道每个字段及其新值),则相关表将填充更改列表

我确信很多处理分析数据的软件都可以解决类似的问题,但我不太熟悉如何以有效的方式解决这个问题,我不想尝试重新发明轮子,所以我想问一下是否有人对如何实现它有任何想法/建议(可能使用其他软件以及 PG 和 Redis)?

详细说明,必须每 12 小时(当前)对所有现有列表批量执行此操作,很可能使用调用更新程序的守护程序(或仅 cronjob)。

(对不起,如果这个问题看起来含糊不清,我试图概述我能想到的每一个可能的方面,但我不确定我做得是否足够好)

【问题讨论】:

  • 所以你的后端可能是用某种语言编写的——Python、Go、PHP、Perl、Java,所以你有办法使用 Redis 和某种消息队列、线程、池来使事情异步.
  • 列表是用户选择的(参数)?增量是列表中的增量,还是在执行它们的结果中?
  • 是的,它本质上是一组转换为查询的条件。只需要在两个查询结果之间计算增量 - 批处理作业上次获得的旧结果和当前批处理作业获得的新结果。然后,新结果将成为下一次执行作业的结果。

标签: database postgresql scalability


【解决方案1】:

我们所做的是使用触发器将所有更改存储在审计表中,将表的键存储为文本数组,并将更改的值(旧的和新的)存储为两个 jsonb 字段:

CREATE OR REPLACE FUNCTION text2intsafe(text) RETURNS int AS $$
  SELECT CASE WHEN $1 ~ '^ *-?\d{1,9} *$' THEN $1::int END
$$ LANGUAGE SQL IMMUTABLE RETURNS NULL ON NULL INPUT;

CREATE TABLE IF NOT EXISTS crm.audit (
  audit_id bigserial NOT NULL PRIMARY KEY,
  date timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP,
  username text NOT NULL DEFAULT crm.f_user(),
  tableclass regclass NOT NULL,
  action text NOT NULL CHECK (action IN ('I','U','D')),
  id text[] NOT NULL,
  old_data jsonb NULL,
  new_data jsonb NULL
);

CREATE INDEX ON audit (date);
CREATE INDEX ON audit (tableclass, id);
CREATE INDEX ON audit USING gin (old_data);
CREATE INDEX audit_tableclass_id_idx ON audit (tableclass, text2intsafe(id[1]));

CREATE OR REPLACE FUNCTION f_audit_get_value(p_tableclass regclass, p_date timestamptz, p_id text[], p_object text[]) RETURNS jsonb AS $$
SELECT COALESCE(
  (SELECT a.old_data#>$4
   FROM audit a
   WHERE a.tableclass = $1 AND a.date >= $2 AND id = $3 AND (a.old_data#>$4) IS NOT NULL
   ORDER BY a.date LIMIT 1),
  (SELECT a.new_data#>$4
   FROM audit a
   WHERE a.tableclass = $1 AND a.date < $2 AND id = $3 AND (a.new_data#>$4) IS NOT NULL
   ORDER BY a.date DESC LIMIT 1))
$$ LANGUAGE SQL SECURITY DEFINER STABLE STRICT;

CREATE OR REPLACE FUNCTION f_audit_get_text(p_tableclass regclass, p_date timestamptz, p_id text[], p_object text[]) RETURNS text AS $$
SELECT COALESCE(
  (SELECT a.old_data#>>$4
   FROM audit a
   WHERE a.tableclass = $1 AND a.date >= $2 AND id = $3 AND (a.old_data#>$4) IS NOT NULL
   ORDER BY a.date LIMIT 1),
  (SELECT a.new_data#>>$4
   FROM audit a
   WHERE a.tableclass = $1 AND a.date < $2 AND id = $3 AND (a.new_data#>$4) IS NOT NULL
   ORDER BY a.date DESC LIMIT 1))
$$ LANGUAGE SQL SECURITY DEFINER STABLE STRICT;

CREATE OR REPLACE FUNCTION f_jsonb_diff(jsonb, jsonb) RETURNS jsonb AS $$
-- return an object with attributes from $2 that differ from those of $1
SELECT json_object_agg(v2.key, CASE COALESCE(t2, t1)
                                WHEN 'object' THEN public.f_json_diff(CASE WHEN t1 IS NULL THEN '{}'::jsonb ELSE v1.value END, CASE WHEN t2 IS NULL THEN '{}'::jsonb ELSE v2.value END)
                                WHEN 'array'  THEN public.f_json_array_diff(CASE WHEN t1 = 'array' THEN v1.value ELSE '[]'::jsonb END, CASE WHEN t2 = 'array' THEN v2.value ELSE '[]'::jsonb END)
                                ELSE v2.value END)::jsonb
FROM      jsonb_each(COALESCE(CASE WHEN jsonb_typeof($2) = 'object' THEN $2 END, '{}'::jsonb)) v2
LEFT JOIN jsonb_each(COALESCE(CASE WHEN jsonb_typeof($1) = 'object' THEN $1 END, '{}'::jsonb)) v1 on (v1.key = v2.key)
LEFT JOIN LATERAL NULLIF(jsonb_typeof(v1.value), 'null') t1 ON (true)
LEFT JOIN LATERAL NULLIF(jsonb_typeof(v2.value), 'null') t2 ON (true)
WHERE v1.value IS DISTINCT FROM v2.value
$$ LANGUAGE sql IMMUTABLE;

CREATE OR REPLACE FUNCTION tf_audit() RETURNS TRIGGER AS $$
DECLARE
  _id text[];
  _key int2vector;
  _orow jsonb;
  _nrow jsonb;
  _odata jsonb;
  _ndata jsonb;
BEGIN
  _key := indkey FROM pg_index WHERE indrelid = TG_RELID AND indisunique ORDER BY indisprimary DESC LIMIT 1;

  IF TG_OP = 'INSERT' THEN
    _ndata := to_jsonb(NEW);
    _id := ARRAY(SELECT _ndata->>attname FROM pg_attribute WHERE attrelid = TG_RELID AND attnum = ANY(_key) ORDER BY attnum);
  ELSIF TG_OP = 'UPDATE' THEN
    _nrow := to_jsonb(NEW);
    _orow := to_jsonb(OLD);

    _odata := f_jsonb_diff(_nrow, _orow);
    _ndata := f_jsonb_diff(_orow, _nrow);

    _id := ARRAY(SELECT _nrow->>attname FROM pg_attribute WHERE attrelid = TG_RELID AND attnum = ANY(_key) ORDER BY attnum);
  ELSIF TG_OP = 'DELETE' THEN
    _odata := to_jsonb(OLD);
    _id := ARRAY(SELECT _odata->>attname FROM pg_attribute WHERE attrelid = TG_RELID AND attnum = ANY(_key) ORDER BY attnum);
  END IF;

  IF _odata <> '{}'::jsonb OR _ndata <> '{}'::jsonb THEN
    INSERT INTO audit (username, tableclass, action, id, old_data, new_data)
    VALUES (crm.f_user(), TG_RELID, substring(TG_OP FROM 1 FOR 1), _id, _odata, _ndata);
  END IF;

  RETURN CASE WHEN TG_OP = 'DELETE' THEN OLD ELSE NEW END;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

然后对于您要审核的每个表,只需添加:

CREATE TRIGGER t_table_audit BEFORE UPDATE OR DELETE ON table
  FOR EACH ROW EXECUTE PROCEDURE crm.tf_audit();

然后,您可以为任何已审计的表(从您实施审计之日起)在任意两个时间点之间生成准确的增量。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-08-22
    • 2022-01-25
    • 2022-01-22
    • 2011-06-04
    • 2023-03-14
    • 1970-01-01
    • 1970-01-01
    • 2011-03-31
    相关资源
    最近更新 更多