【问题标题】:Having issues getting back results for Celery tasks在获取 Celery 任务的结果时遇到问题
【发布时间】:2018-12-04 22:00:55
【问题描述】:

所以,我设置了一个 Celery 系统,我在其中为每个任务动态创建一个云 VM 实例,一旦任务完成,VM 实例将自行删除。为此,我正在创建一个新队列并将新创建的实例上的工作人员分配给该队列,以便可以将任务发送到特定实例。这适用于 1 或 2 个同时执行的任务,但如果我尝试更多,那么 celery 的 result.get 方法只会无限期地等待。我使用的是 Celery 4.2.1 版(windowlicker)。

这是我的 Celery config.py 文件:

"""A module that configures Celery"""
from os import environ

from utils.loggerFactory import make_logger

LOGGER = make_logger(__name__)

LOGGER.info('Celery initalizing...')
REDIS_BACKEND_HOST = None
if 'RedisDNS' in environ:
    REDIS_BACKEND_HOST = environ['RedisDNS']
    LOGGER.info('Set Redis instance hostname to {}'.format(REDIS_BACKEND_HOST))
else:
    LOGGER.warning('Couldn\'t fetch RedisDNS, defaulting to localhost...')
    REDIS_BACKEND_HOST = 'localhost'

BROKER_URL = 'redis://{}'.format(REDIS_BACKEND_HOST)
CELERY_RESULT_BACKEND = 'redis://{}'.format(REDIS_BACKEND_HOST)
CELERY_TRACK_STARTED = True
CELERY_TASK_CREATE_MISSING_QUEUES = True
CELERY_TASK_IGNORE_RESULT = False
LOGGER.info('Init complete')

这里是执行任务的主要代码:

if ENV != 'development':
        # Create a new compute instance
        try:
            created_instance_name = create_worker_compute_instance(
                task_info['computeInstanceType'])
        except Exception as exc:
            LOGGER.error(
                '[{}] Couldn\'t create compute instance: {}'.format(request_id, str(exc)))
            try:
                LOGGER.info(
                    '[{}] Saving exception into redis...'.format(request_id))
                result = json.loads(REDIS_CLIENT.get(request_id))
                result['response'] = generate_response(
                    'Error: Couldn\'t create compute instance: {}'.format(str(exc)), None, 500)
                result['code'] = 500
                result['canDel'] = True
                REDIS_CLIENT.set(request_id, json.dumps(result))
            except Exception as exc:
                LOGGER.error(
                    '[{}] Couldn\'t save exception into redis: {}'.format(request_id, str(exc)))
                report_exception(ENV, exc)
            report_exception(ENV, exc)
            return

        celery_queue_name = 'queue-{}'.format(created_instance_name)

        LOGGER.info('[{}] Adding new Celery queue {}'.format(
            request_id, celery_queue_name))
        try:
            APP.control.add_consumer(celery_queue_name, reply=False, destination=[
                'worker1@{}'.format(created_instance_name)])
        except Exception as exc:
            LOGGER.error('[{}] Couldn\'t add queue {}: {}'.format(
                request_id, celery_queue_name, str(exc)))
            try:
                LOGGER.info('[{}] Saving exception into redis...'.format(request_id))
                result = json.loads(REDIS_CLIENT.get(request_id))
                result['response'] = generate_response(
                    'Error: Couldn\'t add queue {}: {}'.format(celery_queue_name, str(exc)), None, 500)
                result['code'] = 500
                result['canDel'] = True
                REDIS_CLIENT.set(request_id, json.dumps(result))
            except Exception as exc:
                LOGGER.error(
                    '[{}] Couldn\'t save exception into redis: {}'.format(request_id, str(exc)))
                report_exception(ENV, exc)
            report_exception(ENV, exc)
            return
        LOGGER.info('[{}] Queue added'.format(request_id))
    else:
        celery_queue_name = 'celery'

    # Execute the task
    LOGGER.info('[{}] Executing task...'.format(request_id))
    async_result = run_task.apply_async(
        args=(data, task_info, SERVICE_ACCOUNT_FILE_DATA), queue=celery_queue_name)

    LOGGER.info('[{}] Waiting for task to complete...'.format(request_id))
    task_result = None
    try:
        task_result = async_result.get()
    except Exception as exc:
        LOGGER.error(
            '[{}] Couldn\'t execute task {}: {}'.format(request_id, task, str(exc)))
        try:
            LOGGER.info('[{}] Saving exception into redis...'.format(request_id))
            result = json.loads(REDIS_CLIENT.get(request_id))
            result['response'] = generate_response('Error: Couldn\'t execute task {}: {}'.format(
                task, str(exc)), None, 500)
            result['code'] = 500
            result['canDel'] = True
            REDIS_CLIENT.set(request_id, json.dumps(result))
        except Exception as exc:
            LOGGER.error(
                '[{}] Couldn\'t save exception into redis: {}'.format(request_id, str(exc)))
            report_exception(ENV, exc)
        report_exception(ENV, exc)
        return

    LOGGER.info('[{}] Task executed successfully'.format(request_id))
    task_result['message'] = 'Ok, task {} executed successfully'.format(
        task)
    try:
        LOGGER.info('[{}] Saving result into redis...'.format(request_id))
        result = json.loads(REDIS_CLIENT.get(request_id))
        result['response'] = generate_response(
            None, task_result, 0)
        result['code'] = 200
        result['canDel'] = True
        REDIS_CLIENT.set(request_id, json.dumps(result))
    except Exception as exc:
        LOGGER.error(
            '[{}] Couldn\'t save result into redis: {}'.format(request_id, str(exc)))
        report_exception(ENV, exc)
        return

编辑:

下面是系统概览的小图:

【问题讨论】:

    标签: python celery


    【解决方案1】:

    好的,看来问题出在 APP.control.add_consumer(celery_queue_name, reply=False, destination=['worker1@{}'.format(created_instance_name)]) 上。即使该命令成功返回,worker 仍然没有被添加到队列中。

    我设法通过在带有 -Q 参数的 worker 启动命令中包含队列名称来解决此问题。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-07
      • 1970-01-01
      • 1970-01-01
      • 2011-02-09
      • 2016-12-25
      • 1970-01-01
      • 2023-03-26
      • 2015-10-28
      相关资源
      最近更新 更多