【问题标题】:How can I access the return value from twisted deferred callbacks using AsyncioSelectorReactor?如何使用 AsyncioSelectorReactor 从扭曲的延迟回调中访问返回值?
【发布时间】:2020-06-17 17:31:06
【问题描述】:

当我尝试使用 Python 3.7.7、Twisted 20.3.0(和 Scrapy 2.1.0)时...

doc_link = await self.upload_reseller_document(doc_request, self.create_id(contract))

我得到一个延迟而不是一个字符串。我的回调也没有等待。

预期:https://s3.amazonaws.com/some-bucket/some_file.csvNone

收到:<Deferred at 0x11ae61dd0 current result: None>

    async def conditional_upload(request):
        docs_bucket = 'some-bucket'
        key = f'some-prefix/some_file.csv'
        url = f'https://s3.amazonaws.com/{docs_bucket}/{key}'
        async def cb(obj):
            print('found key, returning url')
            return defer.success(url)

        async def upload_doc():
            print('called upload_doc')
            response = await self.crawler.engine.download(request, self)
            if response.status != 200:
                # Error happened, return item.
                print('could not download reseller csv')
                return defer.error(None)
            print('uploading to', docs_bucket, key)
            return threads.deferToThread(
                self.s3client.put_object,
                Bucket=docs_bucket,
                Key=key,
                Body=response.body)

        async def eb(failure):
            print('did not find key')
            if failure.type != ClientError:
                raise failure.value
            return upload_doc()

        return ensureDeferred(threads.deferToThread(
                self.s3client.head_object,
                Bucket=docs_bucket,
                Key=key).addCallbacks(cb, eb))

【问题讨论】:

  • 正在返回一个名为 ensureDeferred 的函数的结果,该函数的工作是将其参数转换为 Deferred。这就是为什么你要从函数中得到一个 Deferred 的原因。试试return await ensureDeferred(...)
  • 另外,如果您有await 供您使用,您可能不需要首先通过addCallbacks,您可以等待deferToThread 的结果。

标签: scrapy python-asyncio twisted


【解决方案1】:

Internally Twisted 只处理 Deferreds 和返回它的函数,你不能将 async 函数作为回调传递给 Deferreds(调用时,异步函数返回一个 coroutine 对象),如果你这样做,回调将有没有效果,在反应堆停止时,您将收到警告“从未等待协程 x”。
使用异步函数时,您应该只 await Deferred 完成并处理它们的结果,而不是附加回调并返回它们。异步函数的目标是避免callback hell

defer.ensureDeferred 用于将协程包装在延迟中并允许 Twisted 安排它们运行,当您需要在非异步函数中调用异步函数时使用它。

使用try/catch处理异常(相当于errback,但异常没有包裹在twisted的Failure中):

async def conditional_upload(request):
    docs_bucket = 'some-bucket'
    key = f'some-prefix/some_file.csv'
    url = f'https://s3.amazonaws.com/{docs_bucket}/{key}'

    async def upload_doc():
        print('called upload_doc')
        response = await self.crawler.engine.download(request, self)
        if response.status != 200:
            # Error happened, return item.
            print('could not download reseller csv')
            raise Exception('could not download reseller csv')
        print('uploading to', docs_bucket, key)
        return await threads.deferToThread(
            self.s3client.put_object, Bucket=docs_bucket, Key=key, Body=body
        )

    # propably here you want to check if something already exists
    try:
        await threads.deferToThread(self.s3client.head_object, Bucket=docs_bucket, Key=key)
        print('found key, returning url')
        return url
    except ClientError:
        print('did not find key, going to upload_doc ...')

    # if does not exists, then create it
    retry_attempts = 10 # avoid infinite loop
    for _ in range(retry_attempts):
        try:
            await upload_doc()
            print('Uploaded the key, returning url')
            return url
        except ClientError:
            print('Failed to upload the key, retrying...')

    print('Failed to upload the key, max attemps tried.')

【讨论】:

    猜你喜欢
    • 2011-11-08
    • 2023-03-13
    • 2011-05-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多