【发布时间】:2018-10-15 11:24:48
【问题描述】:
我正在尝试使用 Azure Batch 并行化 Python 应用程序。我在 Python 客户端脚本中遵循的工作流是: 1) 使用 blobxfer 实用程序(输入容器)将本地文件上传到 Azure Blob 容器
2) 使用 azure-cli 的服务主体帐户登录后,启动批处理服务以处理 input-container 中的文件。
3) 通过使用 Azure Batch 分布在节点上的 python 应用程序将文件上传到输出容器。
我遇到的问题与我在此处阅读的问题非常相似,但不幸的是,这篇文章中没有给出解决方案。 Nodes go into Unusable State
我现在将提供相关信息,以便重现此错误:
用于 Azure Batch 的图像是自定义的。
1) 选择 Ubuntu Server 18.04 LTS 作为 VM 的操作系统,并打开以下端口-ssh、http、https。其余设置在 Azure 门户中保持默认。
2)以下脚本在服务器可用后运行。
sudo apt-get install build-essential checkinstall -y
sudo apt-get install libreadline-gplv2-dev libncursesw5-dev libssl-dev
libsqlite3-dev tk-dev libgdbm-dev libc6-dev libbz2-dev -y
cd /usr/src
sudo wget https://www.python.org/ftp/python/3.6.6/Python-3.6.6.tgz
sudo tar xzf Python-3.6.6.tgz
cd Python-3.6.6
sudo ./configure --enable-optimizations
sudo make altinstall
sudo pip3.6 install --upgrade pip
sudo pip3.6 install pymupdf==1.13.20
sudo pip3.6 install tqdm==4.19.9
sudo pip3.6 install sentry-sdk==0.4.1
sudo pip3.6 install blobxfer==1.5.0
sudo pip3.6 install azure-cli==2.0.47
3) 此服务器的映像是使用此链接中概述的过程创建的。
Creating VM Image in Azure Linux
同样在取消配置期间,用户没有被删除:sudo waagent -deprovision
4) 图片的资源 ID 已从 Azure 门户中注明。这将作为 python 客户端脚本中的参数之一提供
安装在客户端服务器上的包,用于批处理的 python 脚本将在其中运行
sudo pip3.6 install tqdm==4.19.9
sudo pip3.6 install sentry-sdk==0.4.1
sudo pip3.6 install blobxfer==1.5.0
sudo pip3.6 install azure-cli==2.0.47
sudo pip3.6 install pandas==0.22.0
Azure Batch 期间使用的资源是通过以下方式创建的:
1) 具有贡献者权限的服务主体帐户是使用 cmd 创建的。
$az ad sp create-for-rbac --name <SERVICE-PRINCIPAL-ACCOUNT>
2) 与 Batch Account 关联的 Resource-Group、Batch-Account 和 Storage 的创建方式如下:
$ az group create --name <RESOURCE-GROUP-NAME> --location eastus2
$ az storage account create --resource-group <RESOURCE-GROUP-NAME> --name <STORAGE-ACCOUNT-NAME> --location eastus2 --sku Standard_LRS
$ az batch account create --name <BATCH-ACCOUNT-NAME> --storage-account <STORAGE-ACCOUNT-NAME> --resource-group <RESOURCE-GROUP-NAME> --location eastus2
启动上传和处理的客户端 Python 脚本: (更新 3)
import subprocess
import os
import time
import datetime
import tqdm
import pandas
import sys
import fitz
import parmap
import numpy as np
import sentry_sdk
import multiprocessing as mp
def batch_upload_local_to_azure_blob(azure_username,azure_password,azure_tenant,azure_storage_account,azure_storage_account_key,log_dir_path):
try:
subprocess.check_output(["az","login","--service-principal","--username",azure_username,"--password",azure_password,"--tenant",azure_tenant])
except subprocess.CalledProcessError:
sentry_sdk.capture_message("Invalid Azure Login Credentials")
sys.exit("Invalid Azure Login Credentials")
dir_flag=False
while dir_flag==False:
try:
no_of_dir=input("Enter the number of directories to upload:")
no_of_dir=int(no_of_dir)
if no_of_dir<0:
print("\nRetry:Enter an integer value")
else:
dir_flag=True
except ValueError:
print("\nRetry:Enter an integer value")
dir_path_list=[]
for dir in range(no_of_dir):
path_exists=False
while path_exists==False:
dir_path=input("\nEnter the local absolute path of the directory no.{}:".format(dir+1))
print("\n")
dir_path=dir_path.replace('"',"")
path_exists=os.path.isdir(dir_path)
if path_exists==True:
dir_path_list.append(dir_path)
else:
print("\nRetry:Enter a valid directory path")
timestamp = time.time()
timestamp_humanreadable= datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d-%H-%M-%S')
input_azure_container="pdf-processing-input"+"-"+timestamp_humanreadable
try:
subprocess.check_output(["az","storage","container","create","--name",input_azure_container,"--account-name",azure_storage_account,"--auth-mode","login","--fail-on-exist"])
except subprocess.CalledProcessError:
sentry_sdk.capture_message("Invalid Azure Storage Credentials.")
sys.exit("Invalid Azure Storage Credentials.")
log_file_path=os.path.join(log_dir_path,"upload-logs"+"-"+timestamp_humanreadable+".txt")
dir_upload_success=[]
dir_upload_failure=[]
for dir in tqdm.tqdm(dir_path_list,desc="Uploading Directories"):
try:
subprocess.check_output(["blobxfer","upload","--remote-path",input_azure_container,"--storage-account",azure_storage_account,\
"--enable-azure-storage-logger","--log-file",\
log_file_path,"--storage-account-key",azure_storage_account_key,"--local-path",dir])
dir_upload_success.append(dir)
except subprocess.CalledProcessError:
sentry_sdk.capture_message("Failed to upload directory: {}".format(dir))
dir_upload_failure.append(dir)
return(input_azure_container)
def query_azure_storage(azure_storage_container,azure_storage_account,azure_storage_account_key,blob_file_path):
try:
blob_list=subprocess.check_output(["az","storage","blob","list","--container-name",azure_storage_container,\
"--account-key",azure_storage_account_key,"--account-name",azure_storage_account,"--auth-mode","login","--output","tsv"])
blob_list=blob_list.decode("utf-8")
with open(blob_file_path,"w") as f:
f.write(blob_list)
blob_df=pandas.read_csv(blob_file_path,sep="\t",header=None)
blob_df=blob_df.iloc[:,3]
blob_df=blob_df.to_frame(name="container_files")
blob_df=blob_df.assign(container=azure_storage_container)
return(blob_df)
except subprocess.CalledProcessError:
sentry_sdk.capture_message("Invalid Azure Storage Credentials")
sys.exit("Invalid Azure Storage Credentials.")
def analyze_files_for_tasks(data_split,azure_storage_container,azure_storage_account,azure_storage_account_key,download_folder):
try:
blob_df=data_split
some_calculation_factor=2
analyzed_azure_blob_df=pandas.DataFrame()
analyzed_azure_blob_df=analyzed_azure_blob_df.assign(container="empty",container_files="empty",pages="empty",max_time="empty")
for index,row in blob_df.iterrows():
file_to_analyze=os.path.join(download_folder,row["container_files"])
subprocess.check_output(["az","storage","blob","download","--container-name",azure_storage_container,"--file",file_to_analyze,"--name",row["container_files"],\
"--account-name",azure_storage_account,"--auth-mode","key"]) #Why does login auth not work for this while we are multiprocessing
doc=fitz.open(file_to_analyze)
page_count=doc.pageCount
analyzed_azure_blob_df=analyzed_azure_blob_df.append([{"container":azure_storage_container,"container_files":row["container_files"],"pages":page_count,"max_time":some_calculation_factor*page_count}])
doc.close()
os.remove(file_to_analyze)
return(analyzed_azure_blob_df)
except Exception as e:
sentry_sdk.capture_exception(e)
def estimate_task_completion_time(azure_storage_container,azure_storage_account,azure_storage_account_key,azure_blob_df,azure_blob_downloads_file_path):
try:
cores=mp.cpu_count() #Number of CPU cores on your system
partitions = cores-2
timestamp = time.time()
timestamp_humanreadable= datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d-%H-%M-%S')
file_download_location=os.path.join(azure_blob_downloads_file_path,"Blob_Download"+"-"+timestamp_humanreadable)
os.mkdir(file_download_location)
data_split = np.array_split(azure_blob_df,indices_or_sections=partitions,axis=0)
analyzed_azure_blob_df=pandas.concat(parmap.map(analyze_files_for_tasks,data_split,azure_storage_container,azure_storage_account,azure_storage_account_key,file_download_location,\
pm_pbar=True,pm_processes=partitions))
analyzed_azure_blob_df=analyzed_azure_blob_df.reset_index(drop=True)
return(analyzed_azure_blob_df)
except Exception as e:
sentry_sdk.capture_exception(e)
sys.exit("Unable to Estimate Job Completion Status")
def azure_batch_create_pool(azure_storage_container,azure_resource_group,azure_batch_account,azure_batch_account_endpoint,azure_batch_account_key,vm_image_name,no_nodes,vm_compute_size,analyzed_azure_blob_df):
timestamp = time.time()
timestamp_humanreadable= datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d-%H-%M-%S')
pool_id="pdf-processing"+"-"+timestamp_humanreadable
try:
subprocess.check_output(["az","batch","account","login","--name", azure_batch_account,"--resource-group",azure_resource_group])
except subprocess.CalledProcessError:
sentry_sdk.capture_message("Unable to log into the Batch account")
sys.exit("Unable to log into the Batch account")
#Pool autoscaling formula would go in here
try:
subprocess.check_output(["az","batch","pool","create","--account-endpoint",azure_batch_account_endpoint, \
"--account-key",azure_batch_account_key,"--account-name",azure_batch_account,"--id",pool_id,\
"--node-agent-sku-id","batch.node.ubuntu 18.04",\
"--image",vm_image_name,"--target-low-priority-nodes",str(no_nodes),"--vm-size",vm_compute_size])
return(pool_id)
except subprocess.CalledProcessError:
sentry_sdk.capture_message("Unable to create a Pool corresponding to Container:{}".format(azure_storage_container))
sys.exit("Unable to create a Pool corresponding to Container:{}".format(azure_storage_container))
def azure_batch_create_job(azure_batch_account,azure_batch_account_key,azure_batch_account_endpoint,pool_info):
timestamp = time.time()
timestamp_humanreadable= datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d-%H-%M-%S')
job_id="pdf-processing-job"+"-"+timestamp_humanreadable
try:
subprocess.check_output(["az","batch","job","create","--account-endpoint",azure_batch_account_endpoint,"--account-key",\
azure_batch_account_key,"--account-name",azure_batch_account,"--id",job_id,"--pool-id",pool_info])
return(job_id)
except subprocess.CalledProcessError:
sentry_sdk.capture_message("Unable to create a Job on the Pool :{}".format(pool_info))
sys.exit("Unable to create a Job on the Pool :{}".format(pool_info))
def azure_batch_create_task(azure_batch_account,azure_batch_account_key,azure_batch_account_endpoint,pool_info,job_info,azure_storage_account,azure_storage_account_key,azure_storage_container,analyzed_azure_blob_df):
print("\n")
for i in tqdm.tqdm(range(180),desc="Waiting for the Pool to Warm-up"):
time.sleep(1)
successful_task_list=[]
unsuccessful_task_list=[]
input_azure_container=azure_storage_container
output_azure_container= "pdf-processing-output"+"-"+input_azure_container.split("-input-")[-1]
try:
subprocess.check_output(["az","storage","container","create","--name",output_azure_container,"--account-name",azure_storage_account,"--auth-mode","login","--fail-on-exist"])
except subprocess.CalledProcessError:
sentry_sdk.cpature_message("Unable to create an output container")
sys.exit("Unable to create an output container")
print("\n")
pbar = tqdm.tqdm(total=analyzed_azure_blob_df.shape[0],desc="Creating and distributing Tasks")
for index,row in analyzed_azure_blob_df.iterrows():
try:
task_info="mytask-"+str(index)
subprocess.check_output(["az","batch","task","create","--task-id",task_info,"--job-id",job_info,"--command-line",\
"python3 /home/avadhut/pdf_processing.py {} {} {}".format(input_azure_container,output_azure_container,row["container_files"])])
pbar.update(1)
except subprocess.CalledProcessError:
sentry_sdk.capture_message("unable to create the Task: mytask-{}".format(i))
pbar.update(1)
pbar.close()
def wait_for_tasks_to_complete(azure_batch_account,azure_batch_account_key,azure_batch_account_endpoint,job_info,task_file_path,analyzed_azure_blob_df):
try:
print(analyzed_azure_blob_df)
nrows_tasks_df=analyzed_azure_blob_df.shape[0]
print("\n")
pbar=tqdm.tqdm(total=nrows_tasks_df,desc="Waiting for task to complete")
for index,row in analyzed_azure_blob_df.iterrows():
task_list=subprocess.check_output(["az","batch","task","list","--job-id",job_info,"--account-endpoint",azure_batch_account_endpoint,"--account-key",azure_batch_account_key,"--account-name",azure_batch_account,\
"--output","tsv"])
task_list=task_list.decode("utf-8")
with open(task_file_path,"w") as f:
f.write(task_list)
task_df=pandas.read_csv(task_file_path,sep="\t",header=None)
task_df=task_df.iloc[:,21]
active_task_list=[]
for x in task_df:
if x =="active":
active_task_list.append(x)
if len(active_task_list)>0:
time.sleep(row["max_time"]) #This time can be changed in accordance with the time taken to complete each task
pbar.update(1)
continue
else:
pbar.close()
return("success")
pbar.close()
return("failure")
except subprocess.CalledProcessError:
sentry_sdk.capture_message("Error in retrieving task status")
def azure_delete_job(azure_batch_account,azure_batch_account_key,azure_batch_account_endpoint,job_info):
try:
subprocess.check_output(["az","batch","job","delete","--job-id",job_info,"--account-endpoint",azure_batch_account_endpoint,"--account-key",azure_batch_account_key,"--account-name",azure_batch_account,"--yes"])
except subprocess.CalledProcessError:
sentry_sdk.capture_message("Unable to delete Job-{}".format(job_info))
def azure_delete_pool(azure_batch_account,azure_batch_account_key,azure_batch_account_endpoint,pool_info):
try:
subprocess.check_output(["az","batch","pool","delete","--pool-id",pool_info,"--account-endpoint",azure_batch_account_endpoint,"--account-key",azure_batch_account_key,"--account-name",azure_batch_account,"--yes"])
except subprocess.CalledProcessError:
sentry_sdk.capture_message("Unable to delete Pool--{}".format(pool_info))
if __name__=="__main__":
print("\n")
print("-"*40+"Azure Batch processing POC"+"-"*40)
print("\n")
#Credentials and initializations
sentry_sdk.init(<SENTRY-CREDENTIALS>) #Sign-up for a Sentry trail account
azure_username=<AZURE-USERNAME>
azure_password=<AZURE-PASSWORD>
azure_tenant=<AZURE-TENANT>
azure_resource_group=<RESOURCE-GROUP-NAME>
azure_storage_account=<STORAGE-ACCOUNT-NAME>
azure_storage_account_key=<STORAGE-KEY>
azure_batch_account_endpoint=<BATCH-ENDPOINT>
azure_batch_account_key=<BATCH-ACCOUNT-KEY>
azure_batch_account=<BATCH-ACCOUNT-NAME>
vm_image_name=<VM-IMAGE>
vm_compute_size="Standard_A4_v2"
no_nodes=2
log_dir_path="/home/user/azure_batch_upload_logs/"
azure_blob_downloads_file_path="/home/user/blob_downloads/"
blob_file_path="/home/user/azure_batch_upload.tsv"
task_file_path="/home/user/azure_task_list.tsv"
input_azure_container=batch_upload_local_to_azure_blob(azure_username,azure_password,azure_tenant,azure_storage_account,azure_storage_account_key,log_dir_path)
azure_blob_df=query_azure_storage(input_azure_container,azure_storage_account,azure_storage_account_key,blob_file_path)
analyzed_azure_blob_df=estimate_task_completion_time(input_azure_container,azure_storage_account,azure_storage_account_key,azure_blob_df,azure_blob_downloads_file_path)
pool_info=azure_batch_create_pool(input_azure_container,azure_resource_group,azure_batch_account,azure_batch_account_endpoint,azure_batch_account_key,vm_image_name,no_nodes,vm_compute_size,analyzed_azure_blob_df)
job_info=azure_batch_create_job(azure_batch_account,azure_batch_account_key,azure_batch_account_endpoint,pool_info)
azure_batch_create_task(azure_batch_account,azure_batch_account_key,azure_batch_account_endpoint,pool_info,job_info,azure_storage_account,azure_storage_account_key,input_azure_container,analyzed_azure_blob_df)
task_status=wait_for_tasks_to_complete(azure_batch_account,azure_batch_account_key,azure_batch_account_endpoint,job_info,task_file_path,analyzed_azure_blob_df)
if task_status=="success":
azure_delete_job(azure_batch_account,azure_batch_account_key,azure_batch_account_endpoint,job_info)
azure_delete_pool(azure_batch_account,azure_batch_account_key,azure_batch_account_endpoint,pool_info)
print("\n\n")
sys.exit("Job Complete")
else:
azure_delete_job(azure_batch_account,azure_batch_account_key,azure_batch_account_endpoint,job_info)
azure_delete_pool(azure_batch_account,azure_batch_account_key,azure_batch_account_endpoint,pool_info)
print("\n\n")
sys.exit("Job Unsuccessful")
用于创建压缩文件的cmd:
zip pdf_process_1.zip pdf_processing.py
打包成zip文件并通过客户端脚本上传到批处理的Python App
(更新 3)
import os
import fitz
import subprocess
import argparse
import time
from tqdm import tqdm
import sentry_sdk
import sys
import datetime
def azure_active_directory_login(azure_username,azure_password,azure_tenant):
try:
azure_login_output=subprocess.check_output(["az","login","--service-principal","--username",azure_username,"--password",azure_password,"--tenant",azure_tenant])
except subprocess.CalledProcessError:
sentry_sdk.capture_message("Invalid Azure Login Credentials")
sys.exit("Invalid Azure Login Credentials")
def download_from_azure_blob(azure_storage_account,azure_storage_account_key,input_azure_container,file_to_process,pdf_docs_path):
file_to_download=os.path.join(input_azure_container,file_to_process)
try:
subprocess.check_output(["az","storage","blob","download","--container-name",input_azure_container,"--file",os.path.join(pdf_docs_path,file_to_process),"--name",file_to_process,"--account-key",azure_storage_account_key,\
"--account-name",azure_storage_account,"--auth-mode","login"])
except subprocess.CalledProcessError:
sentry_sdk.capture_message("unable to download the pdf file")
sys.exit("unable to download the pdf file")
def pdf_to_png(input_folder_path,output_folder_path):
pdf_files=[x for x in os.listdir(input_folder_path) if x.endswith((".pdf",".PDF"))]
pdf_files.sort()
for pdf in tqdm(pdf_files,desc="pdf--->png"):
doc=fitz.open(os.path.join(input_folder_path,pdf))
page_count=doc.pageCount
for f in range(page_count):
page=doc.loadPage(f)
pix = page.getPixmap()
if pdf.endswith(".pdf"):
png_filename=pdf.split(".pdf")[0]+"___"+"page---"+str(f)+".png"
pix.writePNG(os.path.join(output_folder_path,png_filename))
elif pdf.endswith(".PDF"):
png_filename=pdf.split(".PDF")[0]+"___"+"page---"+str(f)+".png"
pix.writePNG(os.path.join(output_folder_path,png_filename))
def upload_to_azure_blob(azure_storage_account,azure_storage_account_key,output_azure_container,png_docs_path):
try:
subprocess.check_output(["az","storage","blob","upload-batch","--destination",output_azure_container,"--source",png_docs_path,"--account-key",azure_storage_account_key,\
"--account-name",azure_storage_account,"--auth-mode","login"])
except subprocess.CalledProcessError:
sentry_sdk.capture_message("Unable to upload file to the container")
if __name__=="__main__":
#Credentials
sentry_sdk.init(<SENTRY-CREDENTIALS>)
azure_username=<AZURE-USERNAME>
azure_password=<AZURE-PASSWORD>
azure_tenant=<AZURE-TENANT>
azure_storage_account=<AZURE-STORAGE-NAME>
azure_storage_account_key=<AZURE-STORAGE-KEY>
try:
parser = argparse.ArgumentParser()
parser.add_argument("input_azure_container",type=str,help="Location to download files from")
parser.add_argument("output_azure_container",type=str,help="Location to upload files to")
parser.add_argument("file_to_process",type=str,help="file link in azure blob storage")
args = parser.parse_args()
timestamp = time.time()
timestamp_humanreadable= datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d-%H-%M-%S')
task_working_dir=os.getcwd()
file_to_process=args.file_to_process
input_azure_container=args.input_azure_container
output_azure_container=args.output_azure_container
pdf_docs_path=os.path.join(task_working_dir,"pdf_files"+"-"+timestamp_humanreadable)
png_docs_path=os.path.join(task_working_dir,"png_files"+"-"+timestamp_humanreadable)
os.mkdir(pdf_docs_path)
os.mkdir(png_docs_path)
except Exception as e:
sentry_sdk.capture_exception(e)
azure_active_directory_login(azure_username,azure_password,azure_tenant)
download_from_azure_blob(azure_storage_account,azure_storage_account_key,input_azure_container,file_to_process,pdf_docs_path)
pdf_to_png(pdf_docs_path,png_docs_path)
upload_to_azure_blob(azure_storage_account,azure_storage_account_key,output_azure_container,png_docs_path)
更新 1: 我已经解决了服务器节点进入不可用状态错误。我解决这个问题的方法是:
1) 我没有使用上面提到的 cmds 在 Ubuntu 上设置 Python env 3.6,因为 Ubuntu 18.04 LTS 带有它自己的 python 3 环境。最初我用谷歌搜索“在 Ubuntu 上安装 Python 3”并得到了这个@ 987654323@.在服务器设置过程中完全避免了这一步。 这次我所做的只是安装这些软件包。
sudo apt-get install -y python3-pip
sudo -H pip3 install tqdm==4.19.9
sudo -H pip3 install sentry-sdk==0.4.1
sudo -H pip3 install blobxfer==1.5.0
sudo -H pip3 install pandas==0.22.0
使用此链接中的 cmd 在计算机上安装了 Azure cli Install Azure CLI with apt
2) 创建操作系统磁盘的快照,然后根据该快照创建映像,最后在客户端脚本中引用该映像。
我现在面临另一个问题,节点上的 stderr.txt 文件告诉我:
python3: can't open file '$AZ_BATCH_APP_PACKAGE_pdfprocessingapp/pdf_processing.py': [Errno 2] No such file or directory
使用随机用户登录服务器我看到目录_azbatch已创建但该目录中没有内容。
我很确定是 azure_batch_create_task() 函数的命令行出了问题,但我无法确定它。我已经完成了本文档中的所有操作推荐:Install app packages to Azure Batch Compute Nodes 请检查我的客户端 Python 脚本,让我知道我做错了什么!
编辑 3: 这个问题看起来与这篇文章中描述的问题非常相似: Unable to pass app path to Tasks
更新 2:
我能够克服 file/directory not found 错误使用我不是特别喜欢的脏 hack。我将 python 应用程序放在用户的主目录中创建虚拟机,并在任务的工作目录中创建了处理所需的所有目录。
我仍然想知道如何通过使用应用程序包的方式将其部署到节点来运行工作流。
更新 3
我已更新客户端代码和 python 应用程序以反映所做的最新更改。重要的事情都是一样的.....
我将对他/她提出的@fparks 点发表评论。
我打算在 Azure Batch 中使用的原始 python 应用程序包含许多模块和一些配置文件以及用于 Python 包的相当长的 requirements.txt 文件。Azure 还建议在这种情况下使用自定义图像。 在我的情况下,每个任务下载 python 模块也有点不合理,因为 1 个任务等于多页 pdf,而我的预期工作量是 25k 多页 pdf 我使用 CLI 是因为 Python SDK 的文档很少且难以理解。节点进入不可用状态的问题已得到解决。我同意你对 blobxfer 错误的看法。
【问题讨论】:
标签: python-3.x azure-cli azure-batch