【问题标题】:Azure data factory pipeline for-each activity not working sequential每个活动的 Azure 数据工厂管道不按顺序工作
【发布时间】:2019-12-07 14:41:48
【问题描述】:

我有 azure 数据工厂管道,我需要通过它从 blob 存储容器中提取所有 CSV 文件并将这些文件存储到 azure 数据湖容器。在将这些文件存储到数据湖之前,我需要对该文件的数据进行一些数据操作。

现在我需要按顺序而不是并行执行此过程。所以我使用 ForEach Activity->Settings->Sequential。

但它不能按顺序工作,而是作为并行进程工作。

下面是流水线代码


{
    "name":"PN_obfuscate_and_move",
    "properties":{
        "description":"move PN blob csv to adlgen2(obfuscated)",
        "activities":[
            {
                "name":"GetBlobFileName",
                "type":"GetMetadata",
                "dependsOn":[

                ],
                "policy":{
                    "timeout":"7.00:00:00",
                    "retry":0,
                    "retryIntervalInSeconds":30,
                    "secureOutput":false,
                    "secureInput":false
                },
                "userProperties":[

                ],
                "typeProperties":{
                    "dataset":{
                        "referenceName":"PN_Getblobfilename_Dataset",
                        "type":"DatasetReference"
                    },
                    "fieldList":[
                        "childItems"
                    ],
                    "storeSettings":{
                        "type":"AzureBlobStorageReadSetting",
                        "recursive":true
                    },
                    "formatSettings":{
                        "type":"DelimitedTextReadSetting"
                    }
                }
            },
            {
                "name":"ForEachBlobFile",
                "type":"ForEach",
                "dependsOn":[
                    {
                        "activity":"GetBlobFileName",
                        "dependencyConditions":[
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties":[

                ],
                "typeProperties":{
                    "items":{
                        "value":"@activity('GetBlobFileName').output.childItems",
                        "type":"Expression"
                    },
                    "isSequential":true,
                    "activities":[
                        {
                            "name":"Blob_to_SQLServer",
                            "description":"Copy PN blob files to sql server table",
                            "type":"Copy",
                            "dependsOn":[

                            ],
                            "policy":{
                                "timeout":"7.00:00:00",
                                "retry":0,
                                "retryIntervalInSeconds":30,
                                "secureOutput":false,
                                "secureInput":false
                            },
                            "userProperties":[
                                {
                                    "name":"Source",
                                    "value":"PNemailattachment//"
                                },
                                {
                                    "name":"Destination",
                                    "value":"[dbo].[PN]"
                                }
                            ],
                            "typeProperties":{
                                "source":{
                                    "type":"DelimitedTextSource",
                                    "storeSettings":{
                                        "type":"AzureBlobStorageReadSetting",
                                        "recursive":false,
                                        "wildcardFileName":"*.*",
                                        "enablePartitionDiscovery":false
                                    },
                                    "formatSettings":{
                                        "type":"DelimitedTextReadSetting"
                                    }
                                },
                                "sink":{
                                    "type":"AzureSqlSink"
                                },
                                "enableStaging":false
                            },
                            "inputs":[
                                {
                                    "referenceName":"PNBlob",
                                    "type":"DatasetReference"
                                }
                            ],
                            "outputs":[
                                {
                                    "referenceName":"PN_SQLServer",
                                    "type":"DatasetReference"
                                }
                            ]
                        },
                        {
                            "name":"Obfuscate_PN_SQLData",
                            "description":"mask specific columns",
                            "type":"SqlServerStoredProcedure",
                            "dependsOn":[
                                {
                                    "activity":"Blob_to_SQLServer",
                                    "dependencyConditions":[
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy":{
                                "timeout":"7.00:00:00",
                                "retry":0,
                                "retryIntervalInSeconds":30,
                                "secureOutput":false,
                                "secureInput":false
                            },
                            "userProperties":[

                            ],
                            "typeProperties":{
                                "storedProcedureName":"[dbo].[Obfuscate_PN_Data]"
                            },
                            "linkedServiceName":{
                                "referenceName":"PN_SQLServer",
                                "type":"LinkedServiceReference"
                            }
                        },
                        {
                            "name":"SQLServer_to_ADLSGen2",
                            "description":"move PN obfuscated data to azure data lake gen2",
                            "type":"Copy",
                            "dependsOn":[
                                {
                                    "activity":"Obfuscate_PN_SQLData",
                                    "dependencyConditions":[
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy":{
                                "timeout":"7.00:00:00",
                                "retry":0,
                                "retryIntervalInSeconds":30,
                                "secureOutput":false,
                                "secureInput":false
                            },
                            "userProperties":[

                            ],
                            "typeProperties":{
                                "source":{
                                    "type":"AzureSqlSource"
                                },
                                "sink":{
                                    "type":"DelimitedTextSink",
                                    "storeSettings":{
                                        "type":"AzureBlobFSWriteSetting"
                                    },
                                    "formatSettings":{
                                        "type":"DelimitedTextWriteSetting",
                                        "quoteAllText":true,
                                        "fileExtension":".csv"
                                    }
                                },
                                "enableStaging":false
                            },
                            "inputs":[
                                {
                                    "referenceName":"PN_SQLServer",
                                    "type":"DatasetReference"
                                }
                            ],
                            "outputs":[
                                {
                                    "referenceName":"PNADLSGen2",
                                    "type":"DatasetReference"
                                }
                            ]
                        },
                        {
                            "name":"Delete_PN_SQLData",
                            "description":"delete all data from table",
                            "type":"SqlServerStoredProcedure",
                            "dependsOn":[
                                {
                                    "activity":"SQLServer_to_ADLSGen2",
                                    "dependencyConditions":[
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy":{
                                "timeout":"7.00:00:00",
                                "retry":0,
                                "retryIntervalInSeconds":30,
                                "secureOutput":false,
                                "secureInput":false
                            },
                            "userProperties":[

                            ],
                            "typeProperties":{
                                "storedProcedureName":"[dbo].[Delete_PN_Data]"
                            },
                            "linkedServiceName":{
                                "referenceName":"PN_SQLServer",
                                "type":"LinkedServiceReference"
                            }
                        }
                    ]
                }
            }
        ],
        "folder":{
            "name":"PN"
        },
        "annotations":[

        ]
    },
    "type":"Microsoft.DataFactory/factories/pipelines"
}

【问题讨论】:

  • 你能解释一下复制活动中的源数据集吗?
  • 1.“Blob_to_SQLServer”复制数据活动中的源是具有 csv 文件路径的 blobstorage 容器,其中 @item().name 作为文件名。 2.“SQLServer_to_ADLSGen2”复制数据活动中的来源是 SQL Server 中的特定表。
  • 嗨 Manish,如果是的话,您的问题得到答案了吗,请分享,因为我也面临同样的问题。
  • 是的,上面提到的相同过程有效,当时ADF发布的更新中存在一些错误,微软后来解决了这个问题。就是这样。

标签: foreach pipeline azure-data-factory sequential


【解决方案1】:

默认情况下,Azure 数据工厂 (ADF) 中的 ForEach activity 最多可并行运行 20 个任务。您最多可以运行 50 个。如果您想强制它按顺序运行,即一个接一个,那么您可以在 ForEach UI 的“设置”部分设置“顺序”复选框(请参阅下面)或将 JSON 中 ForEach 活动的 isSequential 属性设置为 true,例如

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
...

不过,我会谨慎使用此设置。串行运行,即一个接一个会减慢速度。是否有其他方法可以设计您的工作流以利用 Azure 数据工厂这一真正强大的功能?那么您的工作将只需要最长的任务,而不是所有任务的总和。

假设我有一个任务要运行,它有 10 个任务,每个任务需要 1 秒。如果我串行运行这个作业需要 10 秒,但如果我并行运行它需要 1 秒。

SSIS 从来没有真正拥有过这个功能——您可以手动创建多个路径,也可以使用第三方组件,但它不是内置的。它确实是 ADF 的一项极好的功能,您应该尝试并加以利用。当然,在某些情况下您确实需要串行运行,这就是此选项可用的原因。

【讨论】:

  • 首先,感谢您的精彩解释。正如您在问题中看到的那样,我选中了该选项。但是,它仍然无法按预期工作。此外,更新管道完整代码的问题。
【解决方案2】:

我也遇到过类似情况。

我刚刚开始使用 ADF,所以可能是错误的,但我注意到默认情况下,每个活动都有一个批量大小设置。我确保将其设置为 1 以及设置顺序复选框,现在内部活动按我预期的顺序运行。

对于其他面临此问题的人:

  1. 取消选中顺序复选框,然后输入 1 作为批次限制
  2. 重新选中该框。请注意,我使用的是设计器,在代码视图中看不到任何与批量大小相关的内容,因此它可能被隐藏了。

【讨论】:

    猜你喜欢
    • 2021-12-25
    • 1970-01-01
    • 2019-04-24
    • 1970-01-01
    • 1970-01-01
    • 2018-10-02
    • 1970-01-01
    • 1970-01-01
    • 2019-10-24
    相关资源
    最近更新 更多