虽然有可能,但它比一个管道在同一个 Azure 数据工厂中执行另一个管道要复杂得多。
在 DF-A 中创建一个名为 ExecuteExternalPipeline 的管道,将以下 JSON 复制到“代码”选项卡中:
{
"name": "ExecuteExternalPipeline",
"properties": {
"description": "Executes an ADF pipeline in a different ADF",
"activities": [
{
"name": "StartPipelineThenWait",
"description": "Calls the ADF REST API to start a pipeline in another ADF running using the MSI of this current ADF. Then it waits on a webhook callback",
"type": "WebHook",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"url": {
"value": "@concat(\n 'https://management.azure.com/subscriptions/',\n pipeline().parameters.SubscriptionID,\n '/resourceGroups/',pipeline().parameters.ResourceGroup,\n '/providers/Microsoft.DataFactory/factories/',\n pipeline().parameters.DataFactory,\n '/pipelines/',\n pipeline().parameters.Pipeline,\n '/createRun?api-version=2018-06-01'\n)",
"type": "Expression"
},
"method": "POST",
"body": {
"value": "@json(\n concat(\n '{\n \"InputFileName\": \"', pipeline().parameters.InputFileName, '\"\n }'\n )\n)\n",
"type": "Expression"
},
"timeout": "20:00:00",
"authentication": {
"type": "MSI",
"resource": "https://management.azure.com"
}
}
},
{
"name": "ThrowErrorIfFailure",
"type": "IfCondition",
"dependsOn": [
{
"activity": "StartPipelineThenWait",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"expression": {
"value": "@if(equals(activity('StartPipelineThenWait').status,'success'),true,json('throw an error!'))",
"type": "Expression"
}
}
}
],
"parameters": {
"SubscriptionID": {
"type": "string",
"defaultValue": "12345abcd-468e-472a-9761-9da416b14c0d"
},
"ResourceGroup": {
"type": "string",
"defaultValue": "DF-B-RG"
},
"DataFactory": {
"type": "string",
"defaultValue": "DF-B"
},
"Pipeline": {
"type": "string",
"defaultValue": "ChildPipeline"
},
"InputFileName": {
"type": "string",
"defaultValue": "File1.txt"
}
},
"annotations": []
}
}
然后在DF-B中创建ChildPipeline,代码如下:
{
"name": "ChildPipeline",
"properties": {
"activities": [
{
"name": "DoYourLogicHere",
"description": "",
"type": "WebActivity",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"url": {
"value": "https://google.com",
"type": "Expression"
},
"method": "GET"
}
},
{
"name": "CallbackSuccess",
"description": "Do not remove this activity. It notifies the process which executed this pipeline that the pipeline is complete.",
"type": "WebActivity",
"dependsOn": [
{
"activity": "DoYourLogicHere",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"url": {
"value": "@pipeline().parameters.callBackUri",
"type": "Expression"
},
"method": "POST",
"body": {
"value": "@json(concat('{\"status\": \"success\", \"pipelineRunId\": \"',pipeline().RunId,'\"}'))",
"type": "Expression"
}
}
},
{
"name": "CallbackFail",
"description": "Do not remove this activity. It notifies the process which executed this pipeline that the pipeline is complete.",
"type": "WebActivity",
"dependsOn": [
{
"activity": "DoYourLogicHere",
"dependencyConditions": [
"Failed"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"url": {
"value": "@pipeline().parameters.callBackUri",
"type": "Expression"
},
"method": "POST",
"body": {
"value": "@json(concat('{\"status\": \"failure\", \"pipelineRunId\": \"',pipeline().RunId,'\"}'))",
"type": "Expression"
}
}
}
],
"parameters": {
"callBackUri": {
"type": "string",
"defaultValue": "https://google.com"
},
"InputFileName": {
"type": "string",
"defaultValue": "File1.txt"
}
},
"annotations": []
}
}
将 DoYourLogicHere 活动替换为您自己的活动,但保留两个回调活动。
然后您需要找到 DF-A 的 MSI(请参阅 Azure 门户中 DF-A 的属性选项卡),并将其设置为 DF-B 上的数据工厂贡献者,以便它可以在另一个中执行管道ADF。