【问题标题】:Upload data to the Azure ADLS Gen2 from on-premise using Python or Java使用 Python 或 Java 从本地将数据上传到 Azure ADLS Gen2
【发布时间】:2019-12-09 02:32:18
【问题描述】:

我有一个使用 Data Lake Gen2 的 Azure 存储帐户。我想使用 Python(或 Java)将数据从本地上传到 Lake Gen2 文件系统。

我找到了examples,了解如何与存储帐户中的文件共享进行交互,但我还没有找到如何上传到 Lake(而不是文件共享)。我还发现了如何为 Gen1 Lakes here 做这件事,但除了为 Gen2 关闭 requests 之外什么都没有。

我的问题是,到今天为止,Python 是否也能做到这一点;或者,如何使用 Java 将文件上传到 Gen2 Lake?非常感谢您提供演示上传 API 调用的代码 sn-p。

【问题讨论】:

    标签: java python azure azure-storage azure-data-lake


    【解决方案1】:

    根据官方教程Quickstart: Upload, download, and list blobs with Python,如下所示,如果您还没有注册multi-protocol access on Data Lake Storage的公共预览版,则不能直接使用Azure Storage SDK for Python在Azure Data Lake Store Gen 2中进行任何操作。

    注意

    只有在注册multi-protocol access on Data Lake Storage 的公共预览版后,具有分层命名空间的帐户才能使用本文中描述的功能。要查看限制,请参阅已知问题文章。

    所以向 ADLS Gen2 上传数据的唯一解决方案是使用 ADLS Gen2 的 REST API,请参考其参考资料Azure Data Lake Store REST API

    这是我在 Python 中将数据上传到 ADLS Gen2 的示例代码,它运行良好。

    import requests
    import json
    
    def auth(tenant_id, client_id, client_secret):
        print('auth')
        auth_headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }
        auth_body = {
            "client_id": client_id,
            "client_secret": client_secret,
            "scope" : "https://storage.azure.com/.default",
            "grant_type" : "client_credentials"
        }
        resp = requests.post(f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token", headers=auth_headers, data=auth_body)
        return (resp.status_code, json.loads(resp.text))
    
    def mkfs(account_name, fs_name, access_token):
        print('mkfs')
        fs_headers = {
            "Authorization": f"Bearer {access_token}"
        }
        resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}?resource=filesystem", headers=fs_headers)
        return (resp.status_code, resp.text)
    
    def mkdir(account_name, fs_name, dir_name, access_token):
        print('mkdir')
        dir_headers = {
            "Authorization": f"Bearer {access_token}"
        }
        resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{dir_name}?resource=directory", headers=dir_headers)
        return (resp.status_code, resp.text)
        
    def touch_file(account_name, fs_name, dir_name, file_name, access_token):
        print('touch_file')
        touch_file_headers = {
            "Authorization": f"Bearer {access_token}"
        }
        resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{dir_name}/{file_name}?resource=file", headers=touch_file_headers)
        return (resp.status_code, resp.text)
    
    def append_file(account_name, fs_name, path, content, position, access_token):
        print('append_file')
        append_file_headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "text/plain",
            "Content-Length": f"{len(content)}"
        }
        resp = requests.patch(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{path}?action=append&position={position}", headers=append_file_headers, data=content)
        return (resp.status_code, resp.text)
        
    def flush_file(account_name, fs_name, path, position, access_token):
        print('flush_file')
        flush_file_headers = {
            "Authorization": f"Bearer {access_token}"
        }
        resp = requests.patch(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{path}?action=flush&position={position}", headers=flush_file_headers)
        return (resp.status_code, resp.text)
    
    def mkfile(account_name, fs_name, dir_name, file_name, local_file_name, access_token):
        print('mkfile')
        status_code, result = touch_file(account_name, fs_name, dir_name, file_name, access_token)
        if status_code == 201:
            with open(local_file_name, 'rb') as local_file:
                path = f"{dir_name}/{file_name}"
                content = local_file.read()
                position = 0
                append_file(account_name, fs_name, path, content, position, access_token)
                position = len(content)
                flush_file(account_name, fs_name, path, position, access_token)
        else:
            print(result)
            
        
    if __name__ == '__main__':
        tenant_id = '<your tenant id>'
        client_id = '<your client id>'
        client_secret = '<your client secret>'
        
        account_name = '<your adls account name>'
        fs_name = '<your filesystem name>'
        dir_name = '<your directory name>'
        file_name = '<your file name>'
        local_file_name = '<your local file name>'
        
        # Acquire an Access token
        auth_status_code, auth_result = auth(tenant_id, client_id, client_secret)
        access_token = auth_status_code == 200 and auth_result['access_token'] or ''
        print(access_token)
        
        # Create a filesystem
        mkfs_status_code, mkfs_result = mkfs(account_name, fs_name, access_token)
        print(mkfs_status_code, mkfs_result)
        
        # Create a directory
        mkdir_status_code, mkdir_result = mkdir(account_name, fs_name, dir_name, access_token)
        print(mkdir_status_code, mkdir_result)
        
        # Create a file from local file
        mkfile(account_name, fs_name, dir_name, file_name, local_file_name, access_token)
    

    希望对你有帮助。

    【讨论】:

    • 是否也可以进行批量上传?例如,如果我有一个必须上传的文件的整个目录,或者一个本地路径列表?
    • 嗨,这也可以使用现有的 SAS 令牌吗?所以我不必将存储帐户的帐户密钥放在代码中
    猜你喜欢
    • 2021-01-30
    • 1970-01-01
    • 2021-11-25
    • 1970-01-01
    • 2020-10-17
    • 1970-01-01
    • 2022-08-02
    • 2020-09-15
    • 2020-04-20
    相关资源
    最近更新 更多