【问题标题】:How to generate/retrieve file in datalake using message queue?如何使用消息队列在数据湖中生成/检索文件?
【发布时间】:2021-06-13 08:20:00
【问题描述】:

我有一个 Azure 函数 QueueTrigger1,它执行函数 executeTemplateProcess 以在 Google Drive 上上传 tsv 文件并更新 Jira 票证。

我需要创建一个消息队列以在 datalake 上生成一个 tsv 文件,运行 python 代码,然后从 datalake 中检索 tsv 文件(位置)并将其添加到队列中。

我今天有一个队列的基本基础,但我不确定如何在数据湖上生成文件并检索其位置。我们需要将文件作为输入传递到 python 代码中,这就是为什么我认为我们需要将数据湖上的文件位置排入队列,但我不确定如何执行此操作。

这是QueueTrigger1executeTemplateProcess() 的命名空间

namespace DI
{
    public class DIProcess
    {
        public static void executeTemplateProcess(string jiraKey, string jiraIssueType, string jiraSummary, Component component, string jiraDescription)
        {
            if (rowCount > 0)
            {   //python code would run somewhere here following queue process before jira code executes below
                string dfileId = CopyTemplate(component.FileId, sheetName);

                // stop process if copy template not sucessfull
                if (string.IsNullOrEmpty(dfileId))
                    return;

                jira.AddComment("Google File copied.");

                // Update JIRA with the web link
                webLink = $"https://docs.google.com/spreadsheets/d/{dfileId}";
                jira.AddWebLink(webLink, sheetName);
                jira.AddComment("Jira weblink added.");
            }
            else
            {
                jira.UpdateStatus("Abandoned");
                jira.AddComment("Jira status updated to Abandoned.");
            }
        }
    }
}   

namespace companyxyzjira.QueueTrigger1
{
    public static class JiraQueueTrigger
    {
        [FunctionName("QueueTrigger1")]
        public static void Run([QueueTrigger("companyxyz-jira-dev-am", Connection = "storageaccountcompanyxyzji42f6_STORAGE")]string myQueueItem
            , ILogger log, ExecutionContext context)
        {
            dynamic jira;
            string jiraKey;
            string jiraIssueType;
            string jiraSummary;
            string jiraDescription;
            string[] jiraComponentNames;
            Component jiraComponent;

            log.LogInformation("Queue trigger function processing");

            jira = JsonConvert.DeserializeObject(myQueueItem);

            jiraKey = jira.issue.key;
            jiraIssueType = jira.issue.fields.issuetype.name;
            jiraSummary = jira.issue.fields.summary;
            jiraDescription = jira.issue.fields.description;

            try
            {
                DIProcess.executeTemplateProcess(jiraKey, jiraIssueType, jiraSummary, jiraComponent, jiraDescription);
            }
            catch (System.Exception e)
            {
                log.LogError(e.ToString());
                log.LogError(e.Message);
                log.LogError(e.StackTrace);
            }
        }
    }
}

我想这是我的思路,但我不确定如何与数据湖通信......

[FunctionName("HttpTriggerCSharp")]
public static async Task<IActionResult> Run(
    [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)]
    HttpRequest req, [Queue("companyxyz-jira-dev-pm-mapping-done")] ICollector<string> QueueItem, ILogger log)
{
    log.LogInformation("HTTP trigger function processed a request.");

    string name = req.Query["name"];
    
    string requestBody = String.Empty;
    using (StreamReader streamReader =  new  StreamReader(req.Body))
    {
        requestBody = await streamReader.ReadToEndAsync();
        QueueItem.Add(requestBody); //i think?
    }
    dynamic data = JsonConvert.DeserializeObject(requestBody);
    name = name ?? data?.name;
    
    return name != null
        ? (ActionResult)new OkObjectResult($"{name}")
        : new BadRequestObjectResult("Please pass a name on the query string or in the request body");
}

带有输入/输出文件的数据湖快照(手动上传,但这是我们从现在开始想要自动化的内容,因此我们需要如上所述从/向消息队列生成/检索这些工件)

【问题讨论】:

    标签: c# azure-functions azure-data-lake azure-blob-storage


    【解决方案1】:
    from azure.storage.filedatalake import DataLakeServiceClient
    import pandas as pd
    
    connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
        
    service_client = DataLakeServiceClient.from_connection_string(connect_str)
    #Put above code out of the body of function.
    
    file_system_client = service_client.get_file_system_client(file_system="test")
    directory_client = file_system_client.get_directory_client("test")
    file_client = directory_client.create_file("uploaded-file0316.txt")
    
    #Upload to datalake
    head = ["col1" , "col2" , "col3"]
    l = [[1 , 2 , 3],[4,5,6] , [8 , 7 , 9]]
    df = pd.DataFrame (l , columns = head)
    data = df.to_csv(index_label="idx", encoding = "utf-8")
    output = data.replace(',', '\t')
    print(output)
    file_client.append_data(data=output, offset=0, length=len(output))
    file_client.flush_data(len(output))
    
    #download from datalake
    download = file_client.download_file()
    content = download.readall()
    print(content)
    

    【讨论】:

    • 感谢鲍曼!所以你认为这应该直接从 python 代码运行?我考虑消息队列的原因是因为我们想要一种“文件观察器”来触发 executeTemplateProcess() 的其余部分,那么这个 python 代码是否像这样,消除了对队列的需求?另外,如果我们需要在 executeTemplateProcess 完成工作后检索 blob(例如 tsv)文件的位置,如何从 Azure 数据湖传递文件的位置?
    • @Cataster 你的意思是需要获取生成文件的位置并保存到队列中?如果是这样,我认为元数据应该有路径信息
    • 我的答案中的代码描述了如何使用 sdk 与 datalake 交互(生成文件并获取有关现有文件的信息)
    • 您需要 Python 还是 C# 来执行此操作?
    • @Cataster 不,第一个是获取容器,第二个是获取目录。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-23
    • 2011-01-13
    • 1970-01-01
    • 1970-01-01
    • 2018-01-09
    • 1970-01-01
    相关资源
    最近更新 更多