【问题标题】:BigQuery MERGE unexpected row duplicationBigQuery MERGE 意外的行重复
【发布时间】:2020-08-20 11:02:56
【问题描述】:

我正在使用标准 SQL MERGE 来更新基于源外部表的常规目标表,该源外部表是存储桶中的一组 CVS 文件。这是一个简化的输入文件:

$ gsutil cat gs://dolphin-dev-raw/demo/input/demo_20191125_20200505050505.tsv
"id"    "PortfolioCode" "ValuationDate" "load_checksum"
"1"     "CIMDI000TT"    "2020-03-28"    "checksum1"

MERGE 语句是:

MERGE xx_producer_conformed.demo T
USING xx_producer_raw.demo_raw S
ON
    S.id = T.id
WHEN NOT MATCHED THEN
    INSERT (id, PortfolioCode, ValuationDate, load_checksum, insert_time, file_name, extract_timestamp, wf_id)
    VALUES (id, PortfolioCode, ValuationDate, load_checksum, CURRENT_TIMESTAMP(), _FILE_NAME, REGEXP_EXTRACT(_FILE_NAME, '.*_[0-9]{8}_([0-9]{14}).tsv'),'scheduled__2020-08-19T16:24:00+00:00')
WHEN MATCHED AND S.load_checksum != T.load_checksum THEN UPDATE SET
    T.id = S.id, T.PortfolioCode = S.PortfolioCode, T.ValuationDate = S.ValuationDate, T.load_checksum = S.load_checksum, T.file_name = S._FILE_NAME, T.extract_timestamp = REGEXP_EXTRACT(_FILE_NAME, '.*_[0-9]{8}_([0-9]{14}).tsv'), T.wf_id = 'scheduled__2020-08-19T16:24:00+00:00'

如果我擦除目标表并重新运行 MERGE,我得到的行修改计数为 1:

bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql  |  awk 'ORS=" "')"
Waiting on bqjob_r288f8d33_000001740b413532_1 ... (0s) Current status: DONE
Number of affected rows: 1

这成功导致目标表更新:

$ bq query --format=csv --max_rows=10 --use_legacy_sql=false "select * from ta_producer_conformed.demo"
Waiting on bqjob_r7f6b6a46_000001740b5057a3_1 ... (0s) Current status: DONE
id,PortfolioCode,ValuationDate,load_checksum,insert_time,file_name,extract_timestamp,wf_id
1,CIMDI000TT,2020-03-28,checksum1,2020-08-20 09:44:20,gs://dolphin-dev-raw/demo/input/demo_20191125_20200505050505.tsv,20200505050505,scheduled__2020-08-19T16:24:00+00:00

如果我再次返回 MERGE,我得到的行修改计数为 0:

$ bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql  |  awk 'ORS=" "')"
Waiting on bqjob_r3de2f833_000001740b4161b3_1 ... (0s) Current status: DONE
Number of affected rows: 0

这不会导致目标表发生任何更改。所以一切正常。

问题是,当我在一个更复杂的示例上运行代码时,我将许多输入文件插入到一个空的目标表中,我最终会得到具有相同 id 的行,其中 count(id) 不等于 @987654328 @:

$ bq query --use_legacy_sql=false --max_rows=999999 --location=asia-east2 "select count(id) as total_records from xx_producer_conformed.xxx; select count(distinct id) as unique_records from xx_producer_conformed.xxx; "
Waiting on bqjob_r5df5bec8_000001740b7dfa50_1 ... (1s) Current status: DONE
select count(id) as total_records from xx_producer_conformed.xxx; -- at [1:1]
+---------------+
| total_records |
+---------------+
|         11582 |
+---------------+
select count(distinct id) as unique_records from xx_producer_conformed.xxx; -- at [1:78]
+----------------+
| unique_records |
+----------------+
|           5722 |
+----------------+

这让我感到惊讶,因为我的期望是底层逻辑将逐步遍历每个底层文件中的每一行,并且只插入第一个 id 然后在任何后续 id 上插入它会更新。所以我的期望是输入存储桶中的行数不能超过唯一的ids。

如果我再次尝试运行 MERGE,它会告诉我目标表中有不止一行具有相同的 id:

$  bq query --use_legacy_sql=false --location=asia-east2 "$(cat merge.sql  |  awk 'ORS=" "')"
Waiting on bqjob_r2fe783fc_000001740b8271aa_1 ... (0s) Current status: DONE
Error in query string: Error processing job 'xxxx-10843454-datamesh-
dev:bqjob_r2fe783fc_000001740b8271aa_1': UPDATE/MERGE must match at most one
source row for each target row

我原以为当 MERGE 语句插入时不会有两行具有相同的“id”。

使用的所有表和查询都是从列出“业务列”的文件中生成的。因此,上面的简单演示示例在登录和 MERGE 语句中的连接方面与全面查询相同。

为什么上面的 MERGE 查询会导致“id”重复的行,我该如何解决这个问题?

【问题讨论】:

    标签: google-bigquery


    【解决方案1】:

    通过擦除目标表并复制一个相对较大的输入作为输入,这个问题很容易重复:

    AAAA_20200805_20200814200000.tsv
    AAAA_clone_20200805_20200814200000.tsv
    

    我相信其核心是并行性。许多文件的单个大型 MERGE 可以并行生成许多工作线程。任何两个并行运行的工作线程加载不同的文件以立即“看到”彼此的插入将非常慢。相反,我希望它们独立运行,而不是“看到”彼此写入单独的缓冲区。当缓冲区最终合并时,它会导致多个插入具有相同的id

    为了解决这个问题,我正在使用一些 CTE 根据 extract_timestamp 使用 ROW_NUMBER() OVER (PARTITION BY id ORDER BY extract_timestamp DESC) 来选择任何 id 的最新记录。然后我们可以按最低值过滤以选择最新版本的记录。完整的查询是:

    MERGE xx_producer_conformed.demo T
    USING (
        WITH cteExtractTimestamp AS (
            SELECT
                id, PortfolioCode, ValuationDate, load_checksum
                , _FILE_NAME
                , REGEXP_EXTRACT(_FILE_NAME, '.*_[0-9]{8}_([0-9]{14}).tsv') AS extract_timestamp
            FROM
                xx_producer_raw.demo_raw
        ),
        cteRanked AS (
            SELECT
                id, PortfolioCode, ValuationDate, load_checksum
                , _FILE_NAME
                , extract_timestamp
                , ROW_NUMBER() OVER (PARTITION BY id ORDER BY extract_timestamp DESC) AS row_num
            FROM 
                cteExtractTimestamp
        )
        SELECT 
            id, PortfolioCode, ValuationDate, load_checksum
            , _FILE_NAME
            , extract_timestamp
            , row_num
            , "{{ task_instance.xcom_pull(task_ids='get_run_id') }}" AS wf_id
        FROM cteRanked 
        WHERE row_num = 1
    ) S
    ON
        S.id = T.id
    WHEN NOT MATCHED THEN
        INSERT (id, PortfolioCode, ValuationDate, load_checksum, insert_time, file_name, extract_timestamp, wf_id)
        VALUES (id, PortfolioCode, ValuationDate, load_checksum, CURRENT_TIMESTAMP(), _FILE_NAME, extract_timestamp, wf_id)
    WHEN MATCHED AND S.load_checksum != T.load_checksum THEN UPDATE SET
        T.id = S.id, T.PortfolioCode = S.PortfolioCode, T.ValuationDate = S.ValuationDate, T.load_checksum = S.load_checksum, T.file_name = S._FILE_NAME, T.extract_timestamp = S.extract_timestamp, T.wf_id = S.wf_id
    

    这意味着克隆文件而不更改文件名中的 extract_timestamp 将随机选择两行之一。在正常运行中,我们期望具有更新数据的后续提取是具有新 extract_timetamp 的源文件。然后上面的查询将选择最新的记录合并到目标表中。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-09-30
      • 2014-07-27
      • 1970-01-01
      • 1970-01-01
      • 2014-09-25
      • 1970-01-01
      • 1970-01-01
      • 2020-08-15
      相关资源
      最近更新 更多