【问题标题】:Azure Data Factory - Insert Sql Row for Each File FoundAzure 数据工厂 - 为找到的每个文件插入 Sql 行
【发布时间】:2021-07-13 14:24:33
【问题描述】:

我需要一个数据工厂:

  • 检查 Azure blob 容器中的 csv 文件
  • 对于每个 csv 文件
    • 在 Azure Sql 表中插入一行,将文件名作为列值

blob 容器中只有一个 csv 文件,该文件包含五行。

到目前为止,我有以下操作:

在 for-each 操作中,我有一个复制操作。我确实给了它一个动态数据集的来源,该数据集的文件名设置为@Item().name 的参数。然而,结果是 5 行被插入到目标表中,而我期望只有 1 行。

for-each 循环只执行一次,但我不知道使用保存文件名和时间戳的变量的数据源?

【问题讨论】:

  • COPY 将复制内容,而不是元数据。您已经有了带有 @item().name 的 blob 名称 - 我将创建一个接受文件名作为参数的存储过程,并在 foreach 活动中调用它而不是 COPY。
  • 将数据流添加到您的管道。对于 Source,指向您的 blob 容器。对源转换启用采样并将行限制设置为 1。输入列名,即“用于存储文件名的列”的“myfilename”。最后,添加一个 Sink,它是您的 SQL 表。映射“我的文件名”列。这会将每个文件名存储为容器中的一行。

标签: azure-data-factory


【解决方案1】:

您正朝着正确的方向前进,但在 For each 中,您只需要一个存储过程活动,它将文件名(以及您可用的任何其他元数据)插入到 Azure DB 表中。

像这样:

以下是数据库中存储过程的示例:

CREATE Procedure Log.PopulateFileLog (@FileName varchar(100))

INSERT INTO Log.CvsRxFileLog
select
@FileName as FileName,
getdate() as ETL_Timestamp

编辑: 您还可以使用 For Each 中的 Lookup Activity 直接执行插入,如下所示:

编辑 2 这将展示如何在没有 for each 的情况下做到这一点

1,使用带有 Azure SQLDB 源和 Blob 存储 CSV 文件接收器的复制数据活动从查找/获取元数据活动中复制输出 Json 数组

--------来源:

--------下沉:

2、创建另一个复制数据Activity,Source是Blob Storage Json文件,Sink是Azure SQLDB

---------来源:

---------沉:

---------映射:

本质上,您将整个 json 输出保存到 Blob 中的文件中,然后使用 json 文件类型将该文件复制到 azure db。这样,即使您尝试从包含 500 个项目的数据集中插入,您也可以运行 3 个活动。

【讨论】:

  • 非常感谢特伦特的另一个详细回答。但是,如果可能的话,我宁愿避免使用存储过程,因为这是要管理的更多源代码。我对数据工厂很陌生,但我真的很喜欢与 DevOps git repos 和发布管道的轻松集成。如果我添加一个存储过程,那么我必须考虑如何通过开发、测试、产品等进行部署和提升。我将按照@Mark Kromer 的建议使用数据流研究解决方案,但如果失败,我将退回一个存储过程。
  • 旋转一个 Spark 集群来插入一行似乎有点过分,但它确实是可行的。您也可以在数据库中没有存储过程,只需通过查找将其作为直接插入查询执行。我将编辑答案以反映此选项。
  • 我还应该提到有一种方法可以在没有 For Each 的情况下做你想做的事,当我需要迭代数百个文件但不想运行 300 个文件时,我开发了这种方法For each 中的活动(因为您在使用 AutoResolveIR 时每 1000 次活动支付 1 美元,如果使用自托管 IR,则为 1.50 美元/1000 美元 *上次我查看价格)我将编辑我的答案以包含这种更有效的方法,而无需 For Each
【解决方案2】:

当然,做事的方法总是不止一种,但我认为你不需要 For Each 活动来完成这项任务。 LookupGet MetadataFilter 等活动将它们的结果输出为可以传递的 JSON。此 JSON 可以包含一个或多个项目,并且可以传递给 存储过程。示例模式:

这是早期 ADF 第 2 代(映射数据流之前)常见的 ELT 模式,它利用您的架构中已经使用的资源。您应该记住,您需要为 ADF 中的活动执行付费(例如,在不必要的 For Each 循环中进行多次迭代),并且通常在 Azure 中的计算成本很高且存储成本很低,因此在 ADF 中实现模式时请考虑这一点。如果您构建上述模式,您将拥有两种类型的计算:Azure SQL DB 背后的计算和 Azure 集成运行时,因此有两种类型的计算。如果您向其中添加数据流,您将有第三种类型的计算与其他两种同时运行,所以我个人只在特定条件下添加这些。

上述模式的一个示例实现:

注意我传递给示例日志记录过程的表达式:

@string(activity('Filter1').output.Value)

如果您想要一种低代码方法并且没有可用的计算资源来执行此处理,那么数据流非常适合。在您的情况下,您已经拥有一个非常有能力处理 JSON 的 Azure SQL DB,例如通过 OPENJSON、JSON_VALUE 和 JSON_QUERY 函数。

您提到不想部署我理解的其他代码,但是您的原始 SQL 表是从哪里来的?如果您绝对反对部署额外的代码,您可以简单地通过 Stored Proc 活动调用 sp_executesql 存储过程,使用插入记录的动态 SQL 语句,如下所示:

@concat( 'INSERT INTO dbo.myLog ( logRecord ) SELECT ''', activity('Filter1').output, ''' ')

在您的存储过程中或更高版本中粉碎 JSON,例如

SELECT y.[key] AS name, y.[value] AS [fileName]
FROM dbo.myLog
    CROSS APPLY OPENJSON( logRecord ) x
        CROSS APPLY OPENJSON( x.[value] ) y
WHERE logId = 16
   AND y.[key] = 'name';

【讨论】:

  • 另一个很好的答案 - 似乎 ADF 社区是一座金矿!为了回答您的问题“您的原始 SQL 表来自哪里?”,它通过实体框架核心迁移进行部署
猜你喜欢
  • 2021-12-03
  • 1970-01-01
  • 2016-10-02
  • 1970-01-01
  • 2020-11-26
  • 1970-01-01
  • 2019-06-05
  • 2021-12-24
  • 2022-11-04
相关资源
最近更新 更多