【问题标题】:Snowflake upsert from temp stage来自临时阶段的雪花插入
【发布时间】:2021-08-18 16:07:53
【问题描述】:

我有一个json格式的数据:

api_data = [{'ID': '1314420000c28b88d31115b9d8530bb1', 'NAME': 'Dummy User', 'STATUS': 'ACTIVE'}]

我有这些专栏。

columns = ['ID', 'NAME', 'STATUS']

我正在将数据作为 csv 文件加载到雪花中的暂存区,并将数据作为合并命令插入到主表中。 Id 是我的主键。 以下是我的代码:

def test_merger(self, api_data):
        columns = [x for x in api_data[0].keys()]


        try:
            with TemporaryDirectory(prefix=f'test_USER_') as tmpdir:
                df = pd.DataFrame(api_data)


                df.to_csv(tmpdir + f'/test_USER.csv', sep='^', index=False, columns=columns)


                stage_name = f"USER_{snowflake_client.generate_random_string()}"
                create_stage = f"CREATE TEMPORARY STAGE {stage_name} COMMENT = 'TEMPORARY STAGE FOR USER DATA LOAD'"
                snowflake_client.run("ALTER SESSION SET TIMEZONE = 'UTC';")
                snowflake_client.run(create_stage)

                snowflake_client.run(f"put file://{tmpdir}/* @{stage_name} PARALLEL=16")

                snowflake_client.run(
                    f"MERGE INTO USER USING (SELECT $1  TID, $2  TNAME, $3  TSTATUS FROM @{stage_name})  TEMPSTAGE"
                    f"ON USER.ID = TEMPSTAGE.TID WHEN MATCHED THEN UPDATE SET USER.NAME = TEMPSTAGE.TNAME, USER.STATUS = TEMPSTAGE.TSTATUS "
                    "WHEN NOT MATCHED THEN INSERT (ID, NAME, STATUS) VALUES (TEMPSTAGE.TID, TEMPSTAGE.TNAME, TEMPSTAGE.TSTATUS);")

        except Exception as e:
            logger.error(f"ERROR {e}")
            raise e

正在正确创建 csv 文件:

ID^NAME^STATUS
1314420000c28b88d31115b9d8530bb1^Dummy User^ACTIVE

我的用户表被错误地填充为:

ID                      NAME                STATUS
ID^NAME^STATUS

我想要的是:

ID                                        NAME                STATUS
1314420000c28b88d31115b9d8530bb1          Dummy User          Active

我做错了什么?

【问题讨论】:

  • 请注意 - 这是 Snowpipe、Tasks 和 Streams 的一个很好的用例。您将作为 JSON 自动将数据摄取到 Snowflake 中,然后执行一项任务,在您接收数据时逐步合并数据。这样可以避免在到达 Snowflake 的途中通过数据框传递数据,并且所有执行都作为 Snowflake 中的计划任务处理。

标签: csv snowflake-cloud-data-platform snowflake-schema


【解决方案1】:

我假设您文件中的字段分隔符是 ^ - 基于您显示为正在加载的数据。如果是这种情况,那么您将需要使用针对您的阶段或在您的选择语句中定义的文件格式

File formats when querying stages

【讨论】:

  • 谢谢@NickW。我让它工作了。只是一个问题,我需要像docs.snowflake.com/en/sql-reference/sql/remove.html 中提到的那样删除暂存文件,还是在成功加载后自动删除它们? copy 提供 purge true 有没有这样的合并选项?
  • 据我所知,您必须手动删除它们。实际上,您所做的只是对数据运行选择,将阶段视为外部表,因此 Snowflake 没有理由删除它
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-11-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-06-07
相关资源
最近更新 更多