【问题标题】:BigQuery - Time Series and most efficient way to select the 'latest' recordBigQuery - 时间序列和选择“最新”记录的最有效方式
【发布时间】:2017-12-19 20:07:40
【问题描述】:

在我们的 BQ 设计中,我们有一个客户表(嵌套的原始数据),它是源自我们的微服务层(消耗 kinesis 流)的事件,其中每个事件都有事件所针对的实体的最新实体快照(后处理更改后的图像)。我猜是一种现代变更数据捕获。

每个事件中的最新快照是我们填充 BigQuery 的方式 - 它被提取并以仅附加模式加载到 biqQuery(通过 apache spark 结构化蒸汽连接器)。 (这与为给定 ID 改变和更新一行不同)

因此,鉴于这是仅附加的,因此该表的大小当然会随着时间的推移而增长 - 每个事件更改的条目。然而,它很好地是一个完整的、时间序列的客户状态和变化(我们的要求),并且是不可变的。例如,我们可以通过重播事件来重建完整的仓库......在上下文中已经足够了。

这样做的一个后果是加载到 BigQuery 中可能会导致重复(例如,如果 spark 错误并重试微批处理,则 BQ 在通过作业加载时不是幂等结构化流接收器,或者仅仅是由于分布式特性,它通常是可能的)。 SteamingInserts 可能需要稍后研究,因为它有助于重复数据删除.....

这种架构的结果是,我需要一个原始时间序列数据的视图(记住偶尔可能有重复),它在这些条件下返回 LATEST 记录。

最新由客户记录 (metadata.lastUpdated) 上的元数据结构字段确定 - 具有 MAX(metadata.lastUpdated) 的行是最新的。这是由我们的 MS 层保证的。

这也是一个真实的事件时间时间戳。表 id DAY 分区并有一个 _PARTITIONTIME 列,但这只是一个摄取时间,我不能使用它。当我可以指定要用作分区时间的列时,那就太好了! (愿望清单)。

重复将是具有相同客户“id”和“metadata.lastUpdated”的两行 - 所以 MAX(metadata.lastUpdated) 可以返回 2 行,所以我需要使用

ROW_NUMBER() OVER (PARTITION BY .... 所以我可以选择 rowNum=1

在我看来,也只选择有重复的 1 行。

好的,足够多的单词/上下文(对不起),下面是我的视图 SQL 以获得最新的。它适用于我的测试,但我不确定当表的大小/行数变大时,这是实现我的结果的最有效方法,并且想知道是否有任何 BigQuery 研究人员可能有更高效/更聪明SQL来做到这一点?为什么 SQL 可以,但绝不是性能调优方面的专家,尤其是 BQ 性能调优的最佳 SQL 方法。

我只是希望能够将所有数据放在一个表中,并依靠 dremel 引擎的强大功能来查询它,而不是需要有多个表或做任何过于复杂的事情。

所以我的 SQL 在下面。注意 - 我的时间戳是作为字符串提取的,因此也需要在视图中解析它。

WITH
  cus_latest_watermark AS (
  SELECT
    id,
    MAX(PARSE_TIMESTAMP("%Y-%m-%dT%H:%M:%E*S%Ez", metadata.lastUpdated)) AS maxLastUpdatedTimestampUTC
  FROM
    `project.dataset.customer_refdata`
  GROUP BY
    id ),
  cust_latest_rec_dup AS (
  SELECT
    cus.*,
    ROW_NUMBER() OVER (PARTITION BY cus.id ORDER BY cus.id) AS rowNum
  FROM
    `project.dataset.customer_refdata` cus
  JOIN
    cus_latest_watermark
  ON
    cus.id = cus_latest_watermark.id
    AND PARSE_TIMESTAMP("%Y-%m-%dT%H:%M:%E*S%Ez", cus.metadata.lastUpdated) = cus_latest_watermark.maxLastUpdatedTimestampUTC)
SELECT
  cust_latest_rec_dup.* EXCEPT(rowNum)
FROM
  cust_latest_rec_dup
WHERE
  rowNum = 1

谢谢!

【问题讨论】:

    标签: google-bigquery


    【解决方案1】:

    我是 Mikhail 的忠实粉丝,我们一直在做类似 OVER(ORDER) 的查询 - 但让我提出一个由于#standardSQL 而成为可能的替代方案。

    此查询失败,因为它在单个分区中有太多需要 ORDER BY 的元素:

    #standardSQL
    SELECT *
    FROM (
      SELECT repo.name, type, actor.id as actor, payload, created_at
        , ROW_NUMBER() OVER(PARTITION BY actor.id ORDER BY created_at DESC) rn
      FROM `githubarchive.month.201706` 
    )
    WHERE rn=1
    ORDER BY created_at
    LIMIT 100
    
    
    "Error: Resources exceeded during query execution."
    

    同时此查询在 15 秒内运行:

    #standardSQL
    SELECT actor, event
    FROM (
      SELECT actor.id actor, 
        ARRAY_AGG(
          STRUCT(type, repo.name, payload, created_at) 
          ORDER BY created_at DESC LIMIT 1
        ) events
      FROM `githubarchive.month.201706` 
      GROUP BY 1
    ), UNNEST(events) event
    ORDER BY event.created_at
    LIMIT 100
    

    这是因为允许 ORDER BY 删除每个 GROUP BY 上的所有内容 - 除了最高记录。

    如果你想要所有记录,相当于 SELECT *(感谢 Elliott):

    #standardSQL
    SELECT event.* FROM (
      SELECT ARRAY_AGG(
        t ORDER BY t.created_at DESC LIMIT 1
      )[OFFSET(0)]  event
      FROM `githubarchive.month.201706` t 
      GROUP BY actor.id
    )
    ORDER BY created_at
    LIMIT 100
    

    【讨论】:

    • 谢谢 Felipe :o) 仅供参考:我决定不在我的答案中包含此选项的原因是因为用户要求:I still need to select all the columns 我认为这会使此选项更加复杂调整。但同意这是一个很好的选择,并且可能有更好的机会在海量数据上取得成功!
    • 真的 - 我们不能 * 这个 STRUCT() :)
    • 如果要将结构体中的所有字段都包含在ARRAY_AGG中,可以在为表指定别名t后使用ARRAY_AGG((SELECT AS STRUCT t.*) ...
    • 您的输出会生成包含所有原始字段的记录 - 因此原始结构被破坏。可能有更好的方法来解决这个问题 - 但我只是做了一个外部查询 - 所以最终输出与原始模式完全相同
    • 你可以忽略limit 100 - 它只是为了限制输出,因为它只是一个例子。
    【解决方案2】:

    试试下面的 BigQuery 标准 SQL

    #standardSQL
    WITH cus_watermark AS (
      SELECT
        *,
        PARSE_TIMESTAMP("%Y-%m-%dT%H:%M:%E*S%Ez", metadata.lastUpdated) AS UpdatedTimestampUTC
      FROM `project.dataset.customer_refdata`
    ),
    cust_latest_rec_dup AS (
      SELECT 
        *,
        ROW_NUMBER() OVER (PARTITION BY id ORDER BY UpdatedTimestampUTC DESC) AS rowNum
      FROM cus_watermark
    )
    SELECT * EXCEPT(rowNum)
    FROM cust_latest_rec_dup
    WHERE rowNum = 1  
    

    您可以使用以下虚拟数据来玩/测试这种方法

    #standardSQL
    WITH `project.dataset.customer_refdata` AS (
      SELECT 1 AS id, '2017-07-14 16:47:27' AS lastUpdated UNION ALL
      SELECT 1, '2017-07-14 16:47:27' UNION ALL
      SELECT 1, '2017-07-14 17:47:27' UNION ALL
      SELECT 1, '2017-07-14 18:47:27' UNION ALL
      SELECT 2, '2017-07-14 16:57:27' UNION ALL
      SELECT 2, '2017-07-14 17:57:27' UNION ALL
      SELECT 2, '2017-07-14 18:57:27' 
    ),
    cus_watermark AS (
      SELECT
        *,
        PARSE_TIMESTAMP("%Y-%m-%d %T", lastUpdated) AS UpdatedTimestampUTC
      FROM `project.dataset.customer_refdata`
    ),
    cust_latest_rec_dup AS (
      SELECT 
        *,
        ROW_NUMBER() OVER (PARTITION BY id ORDER BY UpdatedTimestampUTC DESC) AS rowNum
      FROM cus_watermark
    )
    SELECT * EXCEPT(rowNum)
    FROM cust_latest_rec_dup
    WHERE rowNum = 1
    

    【讨论】:

    • Mikhail 感谢 heaps 的回复(再次)将尝试这个 tweek 可以看到您在分区中使用 ORDER BY DESC - 一件事,我仍然需要从 project.dataset.customer_refdata 表本身中选择所有列作为最终的投影。感谢您的意见!
    • 是的 - 所有列,这就是它的工作方式。只需在 cus_watermark 子选择中将 id 替换为 *
    • 另外,如果您只想保留原始列 - 您应该将 UpdatedTimestampUTC 字段添加到 EXCEPT 列表中
    • @MikhailBerlyant - 在stackoverflow.com/a/45112050/132438提出 ARRAY_AGG 替代方案
    • 刚刚看到 - 显然不错。看来我今天有点懒了:o)
    猜你喜欢
    • 2014-02-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-07-31
    • 1970-01-01
    • 2012-03-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多