【问题标题】:Dynamic outputfilename in Data Flows in Azure Data Factory results in folders instead of filesAzure 数据工厂中数据流中的动态输出文件名生成文件夹而不是文件
【发布时间】:2020-07-14 10:16:22
【问题描述】:

我正在 ADF 中设置一个数据流,该数据流将 Azure 表数据集作为源,添加一个派生列,该列根据源架构中的数据字段添加一个名为“文件名”的列和一个动态值。

然后将输出发送到连接到附加到 Blob 存储(尝试 ADLS Gen2 和标准 Blob 存储)的 DataSet 的接收器。

但是,在执行管道之后,我没有在容器中找到多个文件,而是看到创建了名称为 filename=ABC123.csv 的文件夹,这些文件夹本身包含其他文件(这让我想到了镶木地板文件):

- filename=ABC123.csv
  + _started_UNIQUEID
  + part-00000-tid-UNIQUEID-guids.c000.csv

所以,我显然遗漏了一些东西,因为我需要在数据集容器中列出具有我在管道中指定的名称的单个文件。

这是管道的样子:

水槽形状的优化选项卡如下所示:

这里可以看到 Sink 形状的设置:

这是流水线的代码(不过有些部分被删掉了):

source(output(
        PartitionKey as string,
        RowKey as string,
        Timestamp as string,
        DeviceId as string,
        SensorValue as double
    ),
    allowSchemaDrift: true,
    validateSchema: false,
    inferDriftedColumnTypes: true) ~> devicetable
devicetable derive(filename = Isin + '.csv') ~> setoutputfilename
setoutputfilename sink(allowSchemaDrift: true,
    validateSchema: false,
    rowUrlColumn:'filename',
    mapColumn(
        RowKey,
        Timestamp,
        DeviceId,
        SensorValue
    ),
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> distributetofiles

有什么建议或提示吗? (我对 ADF 比较陌生,所以请耐心等待)

【问题讨论】:

  • 您能分享一下 Sink 的“设置”和“优化”选项卡的屏幕截图吗?有很多选项可以管理输出文件。
  • @JoelCochran,谢谢回复。我用两张截图更新了上面的描述
  • @Sam Vanhoutte 看起来你在 stackoverflow.com/questions/66775452/… 之前就这样做过

标签: azure-data-factory


【解决方案1】:

我最近遇到了与您的情况类似的事情(但不完全相同)。这里有很多选项和活动部件,所以这篇文章并不详尽。希望其中的某些内容能够引导您找到您所追求的解决方案。

第 1 步:源分区 在数据流中,您可以通过设置分区将类似的行组合在一起。许多选项之一是按键(源中的一列):

在此示例中,我们有 51 个美国州(50 个州 + DC),因此最终将有 51 个分区。

第 2 步:接收器设置 正如您所发现的,“作为列中的数据”选项会生成一个结构化的文件夹名称,例如 {columnName}={columnValue}。有人告诉我这是因为它是 Hadoop/Spark 类型环境中的标准。该文件夹内将包含一组文件,通常具有非人类友好的基于 GUID 的名称。

“默认”将给出与您当前的结果大致相同的结果,但没有基于列的文件夹名称。输出到单个文件”是非常不言自明的,离你所追求的解决方案最远的事情。如果你想控制最终的文件名,我发现的最好的选择是“模式”选项。这将生成文件( s) 具有指定的名称和变量编号 [n]。老实说,我不知道每个分区会生成什么,但它可能接近你所追求的结果,每列 1 个文件价值。

一些注意事项:

  • 文件夹名称是在接收器数据集中定义的,而不是在数据流中定义的。数据集参数真的很可能是“Step 0”。对于 Blob 类型的输出,您可能会硬编码文件夹名称,如“myfolder/fileName-[n]”。 YMMV。
  • 很遗憾,这些选项都不允许您使用派生列来生成文件名。 [如果你打开表达式 编辑器,您会发现“传入架构”未填充。]

第 3 步:接收器优化 您可以尝试的最后一块是 Optimize 选项卡下的 Sink Partitioning:

“使用当前分区”将根据源配置中设置的分区对结果进行分组。 “单个分区”会将所有结果分组到一个输出组中(几乎可以肯定不是您想要的)。 “设置分区”将允许您根据 Key 列对 Sink 数据进行重新分组。与 Sink 设置不同,这将允许您访问派生列名称,但我猜您最终会遇到与现在相同的文件夹命名问题。

目前,这就是我所知道的一切。我相信这些选项的组合会产生你想要的东西,或者接近它的东西。您可能需要通过多个步骤来解决此问题,例如将此流输出到错误命名的文件夹到暂存位置,然后使用另一个管道/流来处理每个文件夹并将结果折叠为所需的名称。

【讨论】:

  • 好的,Cochran 先生,您说服我本周录制了一段视频,介绍如何在数据流接收器中使用这些不同的分区和文件命名功能!
  • 所以,谢谢@joel。我在源中应用了 Key 分区,将我想要的列作为 PartitionKey。在我的接收器中,在文件名选项中的每个分区中指示(我将 {filename} 指定为文件名值)。我在接收器的优化设置中保留了“当前分区”。感谢您的建议,它似乎工作!
  • @Mark Kromer MSFT 看起来你以前这样做过。能帮忙吗?stackoverflow.com/questions/66775452/…
【解决方案2】:

您会在数据集文件夹路径中看到 Spark 进程留下的幽灵文件。当您使用“作为列中的数据”时,ADF 将使用从容器根目录开始的字段值写入文件。

您会在“带有文件名的列”属性中看到这一点:

因此,如果您导航到存储容器根目录,您应该会看到 ABC123.csv 文件。

现在,如果您想将该文件放在一个文件夹中,只需将该文件夹名称添加到您的派生列转换公式中,如下所示:

“输出/文件夹1/{Isin}.csv”

双引号激活 ADF 的字符串插值。您可以通过这种方式将文字文本与公式结合起来。

【讨论】:

猜你喜欢
  • 2021-05-26
  • 1970-01-01
  • 1970-01-01
  • 2021-01-01
  • 1970-01-01
  • 2020-05-01
  • 2022-08-22
  • 1970-01-01
  • 2023-02-06
相关资源
最近更新 更多