【问题标题】:socket.error: timed out (Celery & RabbitMQ running in docker containers)socket.error: 超时(在 docker 容器中运行的 Celery 和 RabbitMQ)
【发布时间】:2016-10-20 10:39:57
【问题描述】:

试图调出Celery(和RabbitMQ)官方docker容器。

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq
docker run --link some-rabbit:rabbit --name some-celery -d celery

我检查了日志以确保一切正常:

# docker logs some-celery

[2016-10-20 11:05:50,357: WARNING/MainProcess] /usr/local/lib/python3.5/site-packages/celery/apps/worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.


  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
[2016-10-20 11:05:50,419: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbit:5672//: [Errno 111] Connection refused.
Trying again in 2.00 seconds...

[2016-10-20 11:05:52,430: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbit:5672//: [Errno 111] Connection refused.
Trying again in 4.00 seconds...

[2016-10-20 11:05:57,611: WARNING/MainProcess] celery@93b7b6c0b40b ready.

现在,在主机中我使用的是tasks.py

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://guest:guest@172.17.0.2/')

@app.task(name='tasks.add')
def add(x, y):
    return x + y

我从主机命令行运行 python 并得到:

>>> from tasks import add
>>> res=add.delay(4,4)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Library/Python/2.7/site-packages/celery/app/task.py", line 461, in delay
    return self.apply_async(args, kwargs)
  File "/Library/Python/2.7/site-packages/celery/app/task.py", line 573, in apply_async
    **dict(self._get_exec_options(), **options)
  File "/Library/Python/2.7/site-packages/celery/app/base.py", line 354, in send_task
    reply_to=reply_to or self.oid, **options
  File "/Library/Python/2.7/site-packages/celery/app/amqp.py", line 310, in publish_task
    **kwargs
  File "/Library/Python/2.7/site-packages/kombu/messaging.py", line 172, in publish
    routing_key, mandatory, immediate, exchange, declare)
  File "/Library/Python/2.7/site-packages/kombu/connection.py", line 470, in _ensured
    interval_max)
  File "/Library/Python/2.7/site-packages/kombu/connection.py", line 382, in ensure_connection
    interval_start, interval_step, interval_max, callback)
  File "/Library/Python/2.7/site-packages/kombu/utils/__init__.py", line 246, in retry_over_time
    return fun(*args, **kwargs)
  File "/Library/Python/2.7/site-packages/kombu/connection.py", line 250, in connect
    return self.connection
  File "/Library/Python/2.7/site-packages/kombu/connection.py", line 756, in connection
    self._connection = self._establish_connection()
  File "/Library/Python/2.7/site-packages/kombu/connection.py", line 711, in _establish_connection
    conn = self.transport.establish_connection()
  File "/Library/Python/2.7/site-packages/kombu/transport/pyamqp.py", line 116, in establish_connection
    conn = self.Connection(**opts)
  File "/Library/Python/2.7/site-packages/amqp/connection.py", line 165, in __init__
    self.transport = self.Transport(host, connect_timeout, ssl)
  File "/Library/Python/2.7/site-packages/amqp/connection.py", line 186, in Transport
    return create_transport(host, connect_timeout, ssl)
  File "/Library/Python/2.7/site-packages/amqp/transport.py", line 299, in create_transport
    return TCPTransport(host, connect_timeout)
  File "/Library/Python/2.7/site-packages/amqp/transport.py", line 95, in __init__
    raise socket.error(last_err)
socket.error: timed out

我看到了这个Question,但那里的错误似乎不同..

编辑: 我已经添加了 RabbitMQ 容器日志(相关部分):

    docker logs some-rabbit


              RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: tty
  ######  ##        tty
  ##########
              Starting broker...

=INFO REPORT==== 20-Oct-2016::10:22:41 ===
Starting RabbitMQ 3.6.5 on Erlang 19.0.7
Copyright (C) 2007-2016 Pivotal Software, Inc.
Licensed under the MPL.  See http://www.rabbitmq.com/

=INFO REPORT==== 20-Oct-2016::10:22:41 ===
node           : rabbit@my-rabbit
home dir       : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.config
cookie hash    : AlUJAQFic5TGBPlUjyyIOw==
log            : tty
sasl log       : tty
database dir   : /var/lib/rabbitmq/mnesia/rabbit@my-rabbit

=INFO REPORT==== 20-Oct-2016::10:22:42 ===
Memory limit set to 799MB of 1999MB total.

=INFO REPORT==== 20-Oct-2016::10:22:42 ===
Disk free limit set to 50MB

=INFO REPORT==== 20-Oct-2016::10:22:42 ===
Limiting to approx 1048476 file handles (943626 sockets)

=INFO REPORT==== 20-Oct-2016::10:22:42 ===
FHC read buffering:  OFF
FHC write buffering: ON

=INFO REPORT==== 20-Oct-2016::10:22:42 ===
Database directory at /var/lib/rabbitmq/mnesia/rabbit@my-rabbit is empty. Initialising from scratch...

=INFO REPORT==== 20-Oct-2016::10:22:42 ===
    application: mnesia
    exited: stopped
    type: temporary

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
Priority queues enabled, real BQ is rabbit_variable_queue

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
Adding vhost '/'

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
Creating user 'guest'

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
Setting user tags for user 'guest' to [administrator]

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
Setting permissions for 'guest' in '/' to '.*', '.*', '.*'

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
msg_store_transient: using rabbit_msg_store_ets_index to provide index

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
msg_store_persistent: using rabbit_msg_store_ets_index to provide index

=WARNING REPORT==== 20-Oct-2016::10:22:43 ===
msg_store_persistent: rebuilding indices from scratch

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
started TCP Listener on [::]:5672
 completed with 0 plugins.

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
Server startup complete; 0 plugins started.

=INFO REPORT==== 20-Oct-2016::10:22:47 ===
accepting AMQP connection <0.353.0> (172.17.0.3:41166 -> 172.17.0.2:5672)

=INFO REPORT==== 20-Oct-2016::10:22:47 ===
accepting AMQP connection <0.360.0> (172.17.0.3:41168 -> 172.17.0.2:5672)

【问题讨论】:

  • 可以直接访问rabbitmq服务器吗?
  • @Jason 我已将 rabbitmq 日志添加到问题中......
  • 我注意到您正在使用 rabbit-mq 作为后端。如果您从 celery 配置中删除 backend='amqp,是否还会出现这种情况?我想知道任务是否执行,但它会将响应保存到引发异常的后端。当您将登录放入任务时会发生什么?
  • @Jason 我删除了后端,在add 函数中添加print "BLA" - 并获得相同的超时。 (芹菜和兔子容器的)日志中没有任何内容
  • 关于我能想到的唯一另一件事是包含 DEBUG 标志,以便与您的 celery worker 一起记录。当我遇到与您类似的问题时,诚然不是 docker,这是因为我的 URL 和/或访问搞砸了。调试输出通常包含线索。

标签: python docker rabbitmq celery amqp


【解决方案1】:

已修复。 我关注了这个question。 我将 tasks.py 代理 ip 地址更改为 localhost (因为 RabbitMQ 端口暴露给主机并且行为类似于 localhost 进程):

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost')

@app.task(name='tasks.add')
def add(x, y):
    return x + y

然后,我运行容器:

docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management

docker run -v /Users/user/hostpath:/home/user --link some-rabbit:rabbit --name some-celery -d celery

并将celeryconfig.py文件添加到/Users/user/hostpath

CELERY_IMPORTS = ('tasks')
CELERY_IGNORE_RESULT = False
CELERY_RESULT_BACKEND = 'amqp'
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

这对我有用:)

【讨论】:

  • 很高兴您找到了解决方案!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-09-13
  • 1970-01-01
  • 1970-01-01
  • 2017-05-20
  • 1970-01-01
相关资源
最近更新 更多