【发布时间】:2021-08-18 16:07:53
【问题描述】:
我有一个json格式的数据:
api_data = [{'ID': '1314420000c28b88d31115b9d8530bb1', 'NAME': 'Dummy User', 'STATUS': 'ACTIVE'}]
我有这些专栏。
columns = ['ID', 'NAME', 'STATUS']
我正在将数据作为 csv 文件加载到雪花中的暂存区,并将数据作为合并命令插入到主表中。 Id 是我的主键。 以下是我的代码:
def test_merger(self, api_data):
columns = [x for x in api_data[0].keys()]
try:
with TemporaryDirectory(prefix=f'test_USER_') as tmpdir:
df = pd.DataFrame(api_data)
df.to_csv(tmpdir + f'/test_USER.csv', sep='^', index=False, columns=columns)
stage_name = f"USER_{snowflake_client.generate_random_string()}"
create_stage = f"CREATE TEMPORARY STAGE {stage_name} COMMENT = 'TEMPORARY STAGE FOR USER DATA LOAD'"
snowflake_client.run("ALTER SESSION SET TIMEZONE = 'UTC';")
snowflake_client.run(create_stage)
snowflake_client.run(f"put file://{tmpdir}/* @{stage_name} PARALLEL=16")
snowflake_client.run(
f"MERGE INTO USER USING (SELECT $1 TID, $2 TNAME, $3 TSTATUS FROM @{stage_name}) TEMPSTAGE"
f"ON USER.ID = TEMPSTAGE.TID WHEN MATCHED THEN UPDATE SET USER.NAME = TEMPSTAGE.TNAME, USER.STATUS = TEMPSTAGE.TSTATUS "
"WHEN NOT MATCHED THEN INSERT (ID, NAME, STATUS) VALUES (TEMPSTAGE.TID, TEMPSTAGE.TNAME, TEMPSTAGE.TSTATUS);")
except Exception as e:
logger.error(f"ERROR {e}")
raise e
正在正确创建 csv 文件:
ID^NAME^STATUS
1314420000c28b88d31115b9d8530bb1^Dummy User^ACTIVE
我的用户表被错误地填充为:
ID NAME STATUS
ID^NAME^STATUS
我想要的是:
ID NAME STATUS
1314420000c28b88d31115b9d8530bb1 Dummy User Active
我做错了什么?
【问题讨论】:
-
请注意 - 这是 Snowpipe、Tasks 和 Streams 的一个很好的用例。您将作为 JSON 自动将数据摄取到 Snowflake 中,然后执行一项任务,在您接收数据时逐步合并数据。这样可以避免在到达 Snowflake 的途中通过数据框传递数据,并且所有执行都作为 Snowflake 中的计划任务处理。
标签: csv snowflake-cloud-data-platform snowflake-schema