【问题标题】:Is there way to trigger data lake analytics job by event?有没有办法按事件触发数据湖分析作业?
【发布时间】:2017-10-02 14:36:27
【问题描述】:

有没有办法通过下一个事件触发数据湖分析作业: “当数据/事件 来到事件中心时”它会触发作业。
它不一定是事件中心,但我想触发工作并以某种方式传递数据(例如 json)

【问题讨论】:

    标签: c# azure azure-data-lake azure-eventhub


    【解决方案1】:

    它不一定是事件中心,但我想触发工作并以某种方式传递数据(例如 json)

    根据您的描述,我建议您可以考虑使用azure web jobs(您也可以使用eventhub triggerqueue trigger)并使用azure data Lake analytics net SDK 来实现您的要求。

    在开始使用 Azure Data Lake Analytics Net SDK 之前,您需要先为您的应用程序注册一个 AD 应用程序,以请求令牌(使用客户端 ID 和密码)访问 ADLA。

    在 Azure AD 中注册一个应用并为其创建服务原则。更多注册app和获取access token的详细步骤请参考document

    注意:不要忘记为您的 AD 组添加访问数据湖的权限,更多详细信息,您可以参考此article

    完成此操作后,您可以使用以下代码创建一个 Web 作业,该作业将由队列(或事件中心)触发,以在数据湖分析中创建一个新作业来运行您的脚本。

    代码如下:

    注意:您需要从 Nuget 安装以下软件包:

    Microsoft.Azure.Graph.RBAC (preview)
    Microsoft.Azure.Management.DataLake.Analytics
    Microsoft.Azure.Management.DataLake.Store
    Microsoft.IdentityModel.Clients.ActiveDirectory 
    Microsoft.Rest.ClientRuntime.Azure.Authentication
    

    函数.cs:

    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using Microsoft.Azure.WebJobs;
    using Microsoft.Rest;
    using System.Threading;
    using Microsoft.Rest.Azure.Authentication;
    
    using Microsoft.IdentityModel.Clients.ActiveDirectory;
    using Microsoft.Azure.Management.DataLake.Analytics;
    using Microsoft.Azure.Management.DataLake.Store;
    using Microsoft.Azure.Graph.RBAC;
    using Microsoft.Azure.Management.DataLake.Analytics.Models;
    using System.Security.Cryptography.X509Certificates;
    
    namespace WebJob1
    {
        public class Functions
        {
            // This function will get triggered/executed when a new message is written 
            // on an Azure Queue called queue.
            public static void ProcessQueueMessage([QueueTrigger("queue")] string message, TextWriter log)
            {
                string adlaAccountName = "adlaAccountName";
                string subscriptionId = "yoursubscriptionid";
    
                string domain = "tenantid";
                var armTokenAudience = new Uri(@"https://management.core.windows.net/");
                var adlTokenAudience = new Uri(@"https://datalake.azure.net/");
                var aadTokenAudience = new Uri(@"https://graph.windows.net/");
    
                // ----------------------------------------
                // Perform authentication to get credentials
                // ----------------------------------------
    
    
                // NON - INTERACTIVE WITH SECRET KEY
                string clientId = "clientId";
                string secretKey = "clientsecretKey";
                var armCreds = GetCredsServicePrincipalSecretKey(domain, armTokenAudience, clientId, secretKey);
                var adlCreds = GetCredsServicePrincipalSecretKey(domain, adlTokenAudience, clientId, secretKey);
                var aadCreds = GetCredsServicePrincipalSecretKey(domain, aadTokenAudience, clientId, secretKey);
    
                // INTERACTIVE WITH CACHE
                //var tokenCache = new TokenCache();
                //tokenCache.BeforeAccess = BeforeTokenCacheAccess;
                //tokenCache.AfterAccess = AfterTokenCacheAccess;
                //var armCreds = GetCredsInteractivePopup(domain, armTokenAudience, tokenCache, PromptBehavior.Auto);
                //var adlCreds = GetCredsInteractivePopup(domain, adlTokenAudience, tokenCache, PromptBehavior.Auto);
                //var aadCreds = GetCredsInteractivePopup(domain, aadTokenAudience, tokenCache, PromptBehavior.Auto);
    
                // INTERACTIVE WITHOUT CACHE
                // var armCreds = GetCredsInteractivePopup(domain, armTokenAudience, PromptBehavior.Auto);
                // var adlCreds = GetCredsInteractivePopup(domain, adlTokenAudience, PromptBehavior.Auto);
                // var aadCreds = GetCredsInteractivePopup(domain, aadTokenAudience, PromptBehavior.Auto);
    
    
                // NON-INTERACTIVE WITH CERT
                // string clientId = "<service principal / application client ID>";
                // var certificate = new X509Certificate2(@"<path to (PFX) certificate file>", "<certificate password>");
                // var armCreds = GetCredsServicePrincipalCertificate(domain, armTokenAudience, clientId, certificate);
                // var adlCreds = GetCredsServicePrincipalCertificate(domain, adlTokenAudience, clientId, certificate);
                // var aadCreds = GetCredsServicePrincipalCertificate(domain, aadTokenAudience, clientId, certificate);
    
                // ----------------------------------------
                // Create the REST clients using the credentials
                // ----------------------------------------
    
                var adlaAccountClient = new DataLakeAnalyticsAccountManagementClient(armCreds);
                adlaAccountClient.SubscriptionId = subscriptionId;
    
                var adlsAccountClient = new DataLakeStoreAccountManagementClient(armCreds);
                adlsAccountClient.SubscriptionId = subscriptionId;
    
                var adlaCatalogClient = new DataLakeAnalyticsCatalogManagementClient(adlCreds);
                var adlaJobClient = new DataLakeAnalyticsJobManagementClient(adlCreds);
                var adlsFileSystemClient = new DataLakeStoreFileSystemManagementClient(adlCreds);
    
                var graphClient = new GraphRbacManagementClient(aadCreds);
                graphClient.TenantID = domain;
    
                // ----------------------------------------
                // Perform operations with the REST clients
                // ----------------------------------------
    
                var script = @" your script ";
                var jobId = Guid.NewGuid();
                var properties = new USqlJobProperties(script);
                var parameters = new JobInformation("test1", JobType.USql, properties, priority: 1, degreeOfParallelism: 1, jobId: jobId);
                //Create and submit new job
                var jobInfo = adlaJobClient.Job.Create(adlaAccountName, jobId, parameters);
            }
    
            // The interactive samples reuse Azure PowerShell's client ID
            // For production code you should use your own client ids
            private static string azure_powershell_clientid = "1950a258-227b-4e31-a9cf-717495945fc2";
    
            /*
             *  Interactive: User popup
             *  (no token cache to reuse/save session state)
             */
            private static ServiceClientCredentials GetCredsInteractivePopup(string domain, Uri tokenAudience, PromptBehavior promptBehavior = PromptBehavior.Auto)
            {
                SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
    
                // The client id comes from Azure PowerShell
                // for production code you should use your own client id
    
                var clientSettings = new ActiveDirectoryClientSettings
                {
                    ClientId = azure_powershell_clientid,
                    ClientRedirectUri = new Uri("urn:ietf:wg:oauth:2.0:oob"),
                    PromptBehavior = promptBehavior
                };
    
                var serviceSettings = ActiveDirectoryServiceSettings.Azure;
                serviceSettings.TokenAudience = tokenAudience;
    
                var creds = UserTokenProvider.LoginWithPromptAsync(domain, clientSettings, serviceSettings).GetAwaiter().GetResult();
    
                return creds;
            }
    
            /*
             *  Interactive: User popup
             *  (using a token cache to reuse/save session state)
             */
            private static ServiceClientCredentials GetCredsInteractivePopup(string domain, Uri tokenAudience, TokenCache tokenCache, PromptBehavior promptBehavior = PromptBehavior.Auto)
            {
                SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
    
                var clientSettings = new ActiveDirectoryClientSettings
                {
                    ClientId = azure_powershell_clientid,
                    ClientRedirectUri = new Uri("urn:ietf:wg:oauth:2.0:oob"),
                    PromptBehavior = promptBehavior
                };
    
                var serviceSettings = ActiveDirectoryServiceSettings.Azure;
                serviceSettings.TokenAudience = tokenAudience;
    
                var creds = UserTokenProvider.LoginWithPromptAsync(domain, clientSettings, serviceSettings, tokenCache).GetAwaiter().GetResult();
    
                return creds;
            }
    
    
    
            /*
             *  Interactive: Device code login
             *  NOT YET SUPPORTED by Azure's .NET SDK authentication library
             */
            private static ServiceClientCredentials GetCredsDeviceCode()
            {
                throw new NotImplementedException("Azure SDK's .NET authentication library doesn't support device code login yet.");
            }
    
            /*
             *  Non-interactive: Service principal / application using a secret key
             *  Setup: https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-group-authenticate-service-principal#create-service-principal-with-password
             */
            private static ServiceClientCredentials GetCredsServicePrincipalSecretKey(string domain, Uri tokenAudience, string clientId, string secretKey)
            {
                SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
    
                var serviceSettings = ActiveDirectoryServiceSettings.Azure;
                serviceSettings.TokenAudience = tokenAudience;
    
                var creds = ApplicationTokenProvider.LoginSilentAsync(domain, clientId, secretKey, serviceSettings).GetAwaiter().GetResult();
    
                return creds;
            }
    
            /*
             *  Non-interactive: Service principal / application using a certificate
             *  Setup: https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-group-authenticate-service-principal#create-service-principal-with-self-signed-certificate
             */
            private static ServiceClientCredentials GetCredsServicePrincipalCertificate(string domain, Uri tokenAudience, string clientId, X509Certificate2 certificate)
            {
                SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
    
                var clientAssertionCertificate = new ClientAssertionCertificate(clientId, certificate);
                var serviceSettings = ActiveDirectoryServiceSettings.Azure;
                serviceSettings.TokenAudience = tokenAudience;
    
                var creds = ApplicationTokenProvider.LoginSilentWithCertificateAsync(domain, clientAssertionCertificate, serviceSettings).GetAwaiter().GetResult();
    
                return creds;
            }
        }
    }
    

    结果:

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-11-26
      • 2021-09-26
      • 1970-01-01
      • 1970-01-01
      • 2015-02-20
      • 2011-04-30
      • 1970-01-01
      • 2021-07-28
      相关资源
      最近更新 更多