【问题标题】:Run a class function in background and retrieve its status在后台运行类函数并检索其状态
【发布时间】:2018-05-01 20:10:35
【问题描述】:
import threading
from azure.storage.blob import BlockBlobService

def do_other_stuff():
    print("so much stuff to do")

class ABlob:
    def __init__(self, account_name, account_key, container_name, blob_name, file_path):
        self.account_name = account_name
        self.account_key = account_key
        self.container_name = container_name
        self.blob_name = blob_name
        self.file_path = file_path
        self.blob_service = BlockBlobService(account_name=self.account_name, account_key=self.account_key)

    def get_blob(self):
        download_thread = threading.Thread(
            target=self.blob_service.get_blob_to_path,
            args=(self.container_name, self.blob_name, self.file_path))
        download_thread.start()

    def get_blob_name(self):
        print(self.blob_name)


first_blob = ABlob(account_name='account_name',
                   account_key='key',
                   container_name='container', blob_name='something.csv',
                   file_path='path')


first_blob.get_blob()
first_blob.get_blob_name()
do_other_stuff()

我有需要下载和上传的 Azure Blob(未显示)。我不想等待他们完成他们的过程,因为我还有其他事情要做。但在某些时候,我需要确认它们是否已成功下载或上传。

在我当前的代码中,我使用了线程库。如果上传或下载过程中发生错误,处理事务的线程将退出并返回错误。我没有办法通知主线程完成和完成的状态。

我需要做什么才能获得get_blob 的状态?是否有另一个图书馆以一种不那么危险的方式来处理这种情况?我引用了以下线程,但无法弄清楚如何组合它们的不同方法。

Catch a thread's exception in the caller thread in Python

python multiprocessing pool retries

How to call a async function contained in a class

background function in Python

【问题讨论】:

    标签: python multithreading asynchronous python-multithreading python-asyncio


    【解决方案1】:

    我需要做什么才能获得get_blob的状态?

    您可以将get_blob 包装在一个函数中,该函数将存储有关它是否成功的信息,并存储返回值(如果有)。你可以写target=self._get_blob_background,而不是target=self.blob_service.get_blob_to_path。新的_get_blob_background方法可以调用self.result = self.blob_service.get_blob_to_path并使用tryexcept Exception as e捕获所有异常,遇到异常则执行self.result_exception = e,以便主线程区分结果和异常。

    更好的是,您可以使用concurrent.futures 库为您完成所有这些工作:

    pool = concurrent.futures.ThreadPoolExecutor()
    
    def get_blob(self):
        return pool.submit(self.blob_service.get_blob_to_path,
                           self.container_name, self.blob_name, self.file_path)
    

    现在get_blob() 将在后台运行,就像在您的代码中一样,但在这里它将返回一个Future 对象,您可以查询该对象是否已完成,以及它是如何完成的。

    【讨论】:

    • 我曾考虑过您的第一种方法,但认为它可能过于草率。 concurrent_futures 库绝对是我想要的。我的班级的每个实例都有一个池对象有什么后果吗?或者,池应该比每个实例更高。
    • @supertommy 执行器维护一个线程或进程池,按需生成和销毁它们。如果您为每个实例创建一个执行器,您将负责在执行器上调用close() 以确保其线程被删除。此外,每次调用都会产生一个新线程,线程池通常旨在防止这种情况发生以提高效率。
    猜你喜欢
    • 2023-03-27
    • 1970-01-01
    • 2023-03-18
    • 1970-01-01
    • 2020-08-29
    • 2015-08-16
    • 1970-01-01
    • 2015-10-29
    • 2019-08-30
    相关资源
    最近更新 更多