【问题标题】:Trigger Azure Data Factory Pipeline from Logic App w/ Parameter从带有参数的逻辑应用程序触发 Azure 数据工厂管道
【发布时间】:2019-01-22 16:05:38
【问题描述】:

让我先声明一下,我是逻辑应用程序和数据工厂的菜鸟。无论如何,我目前正在进行集成,其中一部分是我需要从逻辑应用程序触发数据工厂中的管道。我已经成功地做到了,我似乎无法弄清楚的 one 部分是如何将参数传递给我的管道。我已经尝试在“参数”和“触发器”部分下更改 JSON,但到目前为止还没有任何东西可以点击。管道最终执行,但仅使用默认参数。

有人在这方面取得过成功吗?任何帮助表示赞赏。

【问题讨论】:

  • 您是否使用 REST API 来触发数据工厂管道运行? docs.microsoft.com/en-us/azure/data-factory/…。它可能对您的情况有用。逻辑应用连接器不支持参数?
  • 我可能需要尝试一下,也许使用逻辑应用程序中的 HTTP 操作?主要是想知道我是否真的能够在 ADF 连接器中使用参数,这听起来好像我做不到?
  • 是的,Http 操作应该可以工作
  • 嗨迈克,我设法通过结合天蓝色函数和逻辑应用程序来做到这一点。所以我写了一个运行管道的函数,使用 .net SDK 你可以传递参数。
  • @wBob 你可以检查我的答案,看看它是否适合你:)

标签: azure-data-factory azure-logic-apps


【解决方案1】:

您可以使用逻辑应用的“创建管道运行”操作的 body 属性将参数传递给管道。与往常一样,要小心,因为此操作不仅在预览中,而且我在任何 MS 文档中也找不到此解决方案。我只是根据其他类似动作的格式做了一个有根据的猜测。

例子:

"Run_my_pipeline": {
  "inputs": {
    "host": {
      "connection": {
        "name": "@parameters('$connections')['azuredatafactory']['connectionId']"
      }
    },
    "method": "post",
    "body": {
      "param1": "myParamValue",
      "param2": "myParamValue"
    },
    "path": "...",
    "queries": {
      "x-ms-api-version": "2017-09-01-preview"
    },
    "authentication": "@parameters('$authentication')"
  }
}

【讨论】:

  • 请注意,自此答案以来,“创建管道运行”操作添加了一个名为“参数”的开箱即用参数。该参数参数转换为我上面提到的同一个主体对象。因此,这个答案仍然有效,尽管您现在可以从门户视图添加 body 对象。郑重声明,此操作仍处于预览阶段。
【解决方案2】:

正如我在评论中所说,我使用 azure 函数创建了一个解决方法。 Azure 函数和逻辑应用可以很好地协同工作。 在此链接上,您可以看到如何使用 .net 创建和管理管道 https://docs.microsoft.com/en-us/azure/data-factory/quickstart-create-data-factory-dot-net

如果您已经拥有 ADF 和管道,您只想运行它(使用管道),那么您就可以

Dictionary<string, object> parameters = new Dictionary<string, object>
       {
           {"BoxSerialNumbers", req.BoxSerialNumbers},
           {"StartDate", req.StartDate },
           {"EndDate",req.EndDate },
           {"Recipient", req.Recipient }
       };//this is how you add initialaze parameters

        var client = Authenticate(); //Authentication with azure
        log.Info("Creating.");
        CreateRunResponse runResponse = client.Pipelines.CreateRun(resourceGroup, dataFactoryName, "pipeline1", parameters);//run pipeline, you can do this async (it's better)
        log.Info("Created.");
        var response = new HttpResponseMessage();
        if (client.PipelineRuns.Get(resourceGroup, dataFactoryName, runResponse.RunId).Status.Equals("InProgress"))
        {
            response = new HttpResponseMessage(HttpStatusCode.OK)
            {
                Content = new StringContent(runResponse.RunId, Encoding.UTF8)
            };
        }
        else
        {
            response = new HttpResponseMessage(HttpStatusCode.BadRequest)
            {
                Content = new StringContent("Pipeline didn't started", Encoding.UTF8)//just some validation for function
            };
        }
        return response;                                               


    public static DataFactoryManagementClient Authenticate()
    {
        var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
        ClientCredential cc = new ClientCredential(applicationID, authenticationKey);
        AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
        ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
        return new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionID };
    }

因此,在请求中,您可以从逻辑应用传递参数,使用 runId 可以检查状态。然后在逻辑应用程序中只需简单的 HTTP 请求即可调用此函数。希望这对某人有所帮助。

【讨论】:

  • 有趣的方法 - 我赞成。注意微软刚刚在Logic Apps feedback 站点上发布(2018 年 5 月 11 日),从逻辑应用程序本地运行数据工厂管道的操作已达到已完成状态。我猜它很快就会出现在产品中!
  • @wBob 这样我设法将参数从我们的 Web 应用程序传递到数据工厂和 u sql 脚本。此外,奇怪的是没有连接到 u sql 的连接器(好的,您也可以使用 .net 来管理它,但是......)。在逻辑应用程序中,它仍处于预览阶段,我相信他们会改变这一点。
【解决方案3】:

我使用 DraganB 的解决方案,但调用签名打开

CreateRunResponse runResponse = client.Pipelines.CreateRun(resourceGroup, dataFactoryName, "pipeline1", parameters);

变了。细微的修改使这项工作完美:

CreateRunResponse runResponse = client.Pipelines.CreateRun(resourceGroup, dataFactoryName, "pipeline1", parameters: parameters);

这是任何需要它的人的功能。

[FunctionName("DatafactoryShim")]
    public async static Task<HttpResponseMessage> Run(
        [HttpTrigger(AuthorizationLevel.Function, "post")]
        HttpRequestMessage req,
        ExecutionContext context,
        TraceWriter log
    )
    {
        string messageBody = await req.Content.ReadAsStringAsync();

        BlobToDatalakeFactoryParameters postValues = JsonHelper.ToClass<BlobToDatalakeFactoryParameters>(messageBody);

        Dictionary<string, object> parameters = new Dictionary<string, object>
        {
            {"blobContainer", postValues.BlobContainer},
            {"blobFolder", postValues.BlobFolder },
            {"relativeDatalakeFolder", postValues.RelativeDatalakeFolder },
            {"modelType", postValues.ModelType }

        }; //this is how you add initialaze parameters

        var client = Authenticate(); //Authentication with azure

        string resourceGroup = ConfigurationManager.AppSettings["resourceGroup"];
        string dataFactoryName = ConfigurationManager.AppSettings["dataFactoryName"];
        string pipelineName = ConfigurationManager.AppSettings["pipelineName"];

        Console.WriteLine("Creating pipeline run...");
        CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
            resourceGroup,
            dataFactoryName,
            pipelineName,
            parameters: parameters).Result.Body;
        Console.WriteLine("Pipeline run ID: " + runResponse.RunId);

        var response = new HttpResponseMessage();

        if (client.PipelineRuns.Get(ConfigurationManager.AppSettings["resourceGroup"],
            ConfigurationManager.AppSettings["dataFactoryName"], runResponse.RunId).Status.Equals("InProgress"))
        {
            response = new HttpResponseMessage(HttpStatusCode.OK)
            {
                Content = new StringContent(runResponse.RunId, Encoding.UTF8)
            };
        }
        else
        {
            response = new HttpResponseMessage(HttpStatusCode.BadRequest)
            {
                Content =
                    new StringContent("Pipeline didn't started", Encoding.UTF8) //just some validation for function
            };
        }

        return response;
    }

【讨论】:

    猜你喜欢
    • 2020-12-22
    • 1970-01-01
    • 2020-02-02
    • 1970-01-01
    • 1970-01-01
    • 2020-10-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多