【问题标题】:Docker, Celery, method failing on composeDocker,Celery,组合方法失败
【发布时间】:2020-12-26 12:18:00
【问题描述】:

我正在尝试使用 celery、redis 和 rabitMQ 作为后台任务的基于 FastAPI 的 API。 在执行docker-compose up 时,redis、rabbit 和 Flower 部分工作时,我可以访问 Flower 仪表板。

但它会卡在芹菜部分。

错误:

 rabbitmq_1       | 2020-09-08 06:32:38.552 [info] <0.716.0> connection <0.716.0> (172.22.0.6:49290 -> 172.22.0.2:5672): user 'user' authenticated and granted access to vhost '/'
celery-flower_1  | [W 200908 06:32:41 control:44] 'stats' inspect method failed
celery-flower_1  | [W 200908 06:32:41 control:44] 'active_queues' inspect method failed
celery-flower_1  | [W 200908 06:32:41 control:44] 'registered' inspect method failed
celery-flower_1  | [W 200908 06:32:41 control:44] 'scheduled' inspect method failed
celery-flower_1  | [W 200908 06:32:41 control:44] 'active' inspect method failed
celery-flower_1  | [W 200908 06:32:41 control:44] 'reserved' inspect method failed
celery-flower_1  | [W 200908 06:32:41 control:44] 'revoked' inspect method failed
celery-flower_1  | [W 200908 06:32:41 control:44] 'conf' inspect method failed

我的 docker-compose 文件:

version: "3.7"

services:
  rabbitmq:
    image: "bitnami/rabbitmq:3.7"
    ports:
      - "4000:4000"
      - "5672:5672"
    volumes:
      - "rabbitmq_data:/bitnami"

  redis:
    image: "bitnami/redis:5.0.4"
    environment:
      - REDIS_PASSWORD=password123
    ports:
      - "5000:5000"
    volumes:
      - "redis_data:/bitnami/redis/data"

  celery-flower:
    image: gregsi/latest-celery-flower-docker:latest
    environment:
      - AMQP_USERNAME=user
      - AMQP_PASSWORD=bitnami
      - AMQP_ADMIN_USERNAME=user
      - AMQP_ADMIN_PASSWORD=bitnami
      - AMQP_HOST=rabbitmq
      - AMQP_PORT=5672
      - AMQP_ADMIN_HOST=rabbitmq
      - AMQP_ADMIN_PORT=15672
      - FLOWER_BASIC_AUTH=user:test
    ports:
      - "5555:5555"
    depends_on:
      - rabbitmq
      - redis

  fastapi:
    build: .
    ports:
      - "8000:8000"
    depends_on:
      - rabbitmq
      - redis
    volumes:
      - "./:/app"
    command: "poetry run uvicorn app/app/main:app --bind 0.0.0.0:8000"

  worker:
    build: .
    depends_on:
      - rabbitmq
      - redis
    volumes:
      - "./:/app"
    command: "poetry run celery worker -A app.app.worker.celery_worker -l info -Q test-queue -c 1"

volumes:
  rabbitmq_data:
    driver: local
  redis_data:
    driver: local

我的芹菜应用:

celery_app = Celery(
    "worker",
    backend="redis://:password123@redis:6379/0",
    broker="amqp://user:bitnami@rabbitmq:5672//"
)

celery_app.conf.task_routes = {
    "app.app.worker.celery_worker.compute_stock_indicators": "stocks-queue"
}

celery_app.conf.update(task_track_started=True)

芹菜工人:

@celery_app.task(acks_late=True)
def compute_stock_indicators(stocks: list, background_task):
    stocks_with_indicators = {}
    for stock in stocks:
        current_task.update_state(state=Actions.STARTED,
                                  meta={f"starting to fetch {stock}'s indicators"})

        stock_indicators = fetch_stock_indicators(stock)  # Fetch the stock most recent indicators
        current_task.update_state(state=Actions.FINISHED,
                                  meta={f"{stock}'s indicators fetched"})

        stocks_with_indicators.update({stock: stock_indicators})

        current_task.update_state(state=Actions.PROGRESS,
                                  meta={f"predicting {stocks}s..."})

快速 API 功能:

log = logging.getLogger(__name__)
rabbit = RabbitMQHandler(host='localhost', port=5672, level="DEBUG")
log.addHandler(rabbit)


def celery_on_message(body):
    """
    Logs the initiation of the function
    """
    log.warning(body)


def background_on_message(task):
    """
    logs the function when it is added to queue
    """
    log.warning(task.get(on_message=celery_on_message, propagate=False))


app = FastAPI(debug=True)


@app.post("/")
async def initiator(stocks: FrozenSet, background_task: BackgroundTasks, ):
    """
    :param stocks: stocks to be analyzed
    :type stocks: set
    :param background_task: initiate the tasks queue
    :type background_task: starlette.background.BackgroundTasks
    """
    log.warning(msg=f'beginning analysis on: {stocks}')
    task_name = "app.app.worker.celery_worker.compute_stock_indicators"

    task = celery_app.send_task(task_name, args=[stocks, background_task])
    background_task.add_task(background_on_message, task)
    return {"message": "Stocks indicators successfully calculated,stocks sent to prediction"}

【问题讨论】:

    标签: python docker rabbitmq fastapi flower


    【解决方案1】:

    docker-composeworker 部分,command 内容如下:

    command: "poetry run celery worker -A app.app.worker.celery_worker -l info -Q test-queue -c 1"
    

    所以本质上你是在要求工作人员“监视”一个名为 test-queue 的队列。
    但是在celery_app,在以下部分:

    celery_app.conf.task_routes = {
        "app.app.worker.celery_worker.compute_stock_indicators": "stocks-queue"
    }
    

    您正在定义一个名为 stocks-queue 的队列。

    更改docker-composecelery_app 的队列名称以匹配另一个。

    【讨论】:

    • 将 docker-compose 更改为 :command: "poetry run celery worker -A app.app.worker.celery_worker -l info -Q stocks-queue -c 1" 做同样的事情
    • @yovelcohen 您可以尝试在celery-flowerdepends_on 部分添加worker 并告诉我接下来会发生什么吗?
    【解决方案2】:

    如果您在 windows 上使用 Docker Toolbox,则应将端口 5555 添加到 VM virtualBOX 网络:

    首先在 cmd 上运行以下命令:

    docker-machine stop default
    

    然后打开VM virtualBOX,进入设置>网络>高级>端口转发 >添加一行5555端口并留下名称字段

    单击确定,然后在 cmd 上运行以下命令:

    docker-machine start default
    

    【讨论】:

      猜你喜欢
      • 2021-06-24
      • 2017-01-09
      • 1970-01-01
      • 2019-05-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-01-16
      • 1970-01-01
      相关资源
      最近更新 更多