【问题标题】:Run a Pipeline when another Pipeline completes on another Data factory当另一个管道在另一个数据工厂上完成时运行一个管道
【发布时间】:2020-10-24 17:52:46
【问题描述】:

我的 Azure 订阅上有两个独立的数据工厂,我们称它们为 DF-A 和另一个 DF-B 在数据工厂 DF-A 我有一个管道,当它完成后,我希望 DF-B 上的管道运行;我将如何实现这一目标?

谢谢

【问题讨论】:

  • 在 ADF 中无法实现这一点。您是否考虑使用逻辑应用程序?逻辑应用可以做到这一点。
  • 你有例子吗
  • 我们的答案有帮助吗?
  • 嗨@viperdenwo82,如果我的回答对您有帮助,您可以接受它作为答案(单击答案旁边的复选标记,将其从灰色切换为已填充。)。这对其他社区成员可能是有益的。谢谢。

标签: azure azure-data-factory azure-data-factory-2


【解决方案1】:

在逻辑应用设计器中,您可以创建两个管道运行步骤来触发不同数据工厂中的两个管道运行。
使用逻辑应用更容易实现这一点。

  1. 创建一个 Recurrence 触发器来安排执行和两个 Azure 数据工厂 操作来触发管道运行。

  1. Azure 数据工厂 操作中,选择 创建管道运行 操作。

  1. 摘要在这里:

【讨论】:

    【解决方案2】:

    虽然有可能,但它比一个管道在同一个 Azure 数据工厂中执行另一个管道要复杂得多。

    在 DF-A 中创建一个名为 ExecuteExternalPipeline 的管道,将以下 JSON 复制到“代码”选项卡中:

    {
        "name": "ExecuteExternalPipeline",
        "properties": {
            "description": "Executes an ADF pipeline in a different ADF",
            "activities": [
                {
                    "name": "StartPipelineThenWait",
                    "description": "Calls the ADF REST API to start a pipeline in another ADF running using the MSI of this current ADF. Then it waits on a webhook callback",
                    "type": "WebHook",
                    "dependsOn": [],
                    "userProperties": [],
                    "typeProperties": {
                        "url": {
                            "value": "@concat(\n 'https://management.azure.com/subscriptions/',\n pipeline().parameters.SubscriptionID,\n '/resourceGroups/',pipeline().parameters.ResourceGroup,\n '/providers/Microsoft.DataFactory/factories/',\n pipeline().parameters.DataFactory,\n '/pipelines/',\n pipeline().parameters.Pipeline,\n '/createRun?api-version=2018-06-01'\n)",
                            "type": "Expression"
                        },
                        "method": "POST",
                        "body": {
                            "value": "@json(\n concat(\n  '{\n   \"InputFileName\": \"', pipeline().parameters.InputFileName, '\"\n  }'\n )\n)\n",
                            "type": "Expression"
                        },
                        "timeout": "20:00:00",
                        "authentication": {
                            "type": "MSI",
                            "resource": "https://management.azure.com"
                        }
                    }
                },
                {
                    "name": "ThrowErrorIfFailure",
                    "type": "IfCondition",
                    "dependsOn": [
                        {
                            "activity": "StartPipelineThenWait",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "expression": {
                            "value": "@if(equals(activity('StartPipelineThenWait').status,'success'),true,json('throw an error!'))",
                            "type": "Expression"
                        }
                    }
                }
            ],
            "parameters": {
                "SubscriptionID": {
                    "type": "string",
                    "defaultValue": "12345abcd-468e-472a-9761-9da416b14c0d"
                },
                "ResourceGroup": {
                    "type": "string",
                    "defaultValue": "DF-B-RG"
                },
                "DataFactory": {
                    "type": "string",
                    "defaultValue": "DF-B"
                },
                "Pipeline": {
                    "type": "string",
                    "defaultValue": "ChildPipeline"
                },
                "InputFileName": {
                    "type": "string",
                    "defaultValue": "File1.txt"
                }
            },
            "annotations": []
        }
    }
    

    然后在DF-B中创建ChildPipeline,代码如下:

    {
        "name": "ChildPipeline",
        "properties": {
            "activities": [
                {
                    "name": "DoYourLogicHere",
                    "description": "",
                    "type": "WebActivity",
                    "policy": {
                        "timeout": "7.00:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "url": {
                            "value": "https://google.com",
                            "type": "Expression"
                        },
                        "method": "GET"
                    }
                },
                {
                    "name": "CallbackSuccess",
                    "description": "Do not remove this activity. It notifies the process which executed this pipeline that the pipeline is complete.",
                    "type": "WebActivity",
                    "dependsOn": [
                        {
                            "activity": "DoYourLogicHere",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "policy": {
                        "timeout": "7.00:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "url": {
                            "value": "@pipeline().parameters.callBackUri",
                            "type": "Expression"
                        },
                        "method": "POST",
                        "body": {
                            "value": "@json(concat('{\"status\": \"success\", \"pipelineRunId\": \"',pipeline().RunId,'\"}'))",
                            "type": "Expression"
                        }
                    }
                },
                {
                    "name": "CallbackFail",
                    "description": "Do not remove this activity. It notifies the process which executed this pipeline that the pipeline is complete.",
                    "type": "WebActivity",
                    "dependsOn": [
                        {
                            "activity": "DoYourLogicHere",
                            "dependencyConditions": [
                                "Failed"
                            ]
                        }
                    ],
                    "policy": {
                        "timeout": "7.00:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "url": {
                            "value": "@pipeline().parameters.callBackUri",
                            "type": "Expression"
                        },
                        "method": "POST",
                        "body": {
                            "value": "@json(concat('{\"status\": \"failure\", \"pipelineRunId\": \"',pipeline().RunId,'\"}'))",
                            "type": "Expression"
                        }
                    }
                }
            ],
            "parameters": {
                "callBackUri": {
                    "type": "string",
                    "defaultValue": "https://google.com"
                },
                "InputFileName": {
                    "type": "string",
                    "defaultValue": "File1.txt"
                }
            },
            "annotations": []
        }
    }
    

    将 DoYourLogicHere 活动替换为您自己的活动,但保留两个回调活动。

    然后您需要找到 DF-A 的 MSI(请参阅 Azure 门户中 DF-A 的属性选项卡),并将其设置为 DF-B 上的数据工厂贡献者,以便它可以在另一个中执行管道ADF。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-12-10
      • 1970-01-01
      • 1970-01-01
      • 2021-08-02
      • 2020-06-14
      • 2021-12-05
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多