【问题标题】:Celery Worker don't execute cassandra queriesCelery Worker 不执行 cassandra 查询
【发布时间】:2018-09-30 23:08:27
【问题描述】:

我正在使用

celery == 4.1.0 (latentcall)
[cqlsh 5.0.1 | Cassandra 3.11.2 | CQL spec 3.4.4 | Native protocol v4]
Python 2.7.14

我正在尝试在 Celery 工作函数中执行 Cassandra Query。但是 Celery worker 收到了任务但没有执行 Query。

tasks.py

from cassandra.cluster import Cluster
from celery import Celery

app = Celery('<workername>', backend="rpc://", broker='redis://localhost:6379/0')
dbSession = Cluster().connect()


@app.tasks()
def get_data():
    query = "SELECT * FROM customers"
    CustomerObj = dbSession.execute(dbSession.prepare(query))

    return CustomerObj


get_data.delay()

我开始使用工人:

$ celery worker -A <worker_name> -l INFO -c 1

 -------------- celery@ubuntu v4.1.0 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-4.13.0-21-generic-x86_64-with-Ubuntu-17.10-artful 2018-04-20 14:31:41
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         Woker:0x7fa4a0e6f310
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . Worker.get_data

[2018-04-20 14:31:41,271: INFO/MainProcess] Connected to redis://localhost:6379/0
[2018-04-20 14:31:41,285: INFO/MainProcess] mingle: searching for neighbors
[2018-04-20 14:31:42,315: INFO/MainProcess] mingle: all alone
.............
[2018-04-20 14:31:42,332: INFO/MainProcess] celery@ubuntu ready.
[2018-04-20 14:31:43,823: INFO/MainProcess] Received task: <worker_name>.get_data[8de91fdf-1388-4d5c-bb22-8cb00c1c065e]  

工作进程刚刚停止在那里。它不会执行该 SELECT 查询并提供任何数据。

任何人都建议我如何运行此代码来执行 Cassandra 查询。

【问题讨论】:

    标签: python python-2.7 cassandra redis celery


    【解决方案1】:

    我认为你不能全局定义dbSession。 Celery 任务可以在不同的 worker 中运行,所以连接不能是全局的。

    我可以建议两个选项:

    1. 在任务中创建会话。它应该工作。优点是您将为每个任务创建新会话。也许lazy (@LazyProperty) 应该在这里提供帮助。

    2. 您可以在工作人员级别创建连接:尝试在工作人员启动时创建会话,可能使用worker_init 信号(ref)。这里的问题是您可以拥有concurrency level > 1(取决于您如何启动工作人员)-并且您需要会话池来一次服务多个芹菜任务(一次处理多个 Cassandra 会话)。

    顺便说一句,你应该在 python 中使用global 关键字。如果您正在运行一个实例,它也可能会修复。

    这是一个可能对您有所帮助的相关问题:Celery Worker Database Connection Pooling

    祝你好运!

    【讨论】:

      【解决方案2】:

      因为 celery 不使用应用程序的连接实例。在 celery 启动时启动一个新的连接。下面的 sn-p 是根据 Cassandra 的芹菜文档

      from celery import Celery
      from celery.signals import worker_process_init, beat_init
      from cassandra.cqlengine import connection
      from cassandra.cqlengine.connection import (
          cluster as cql_cluster, session as cql_session)
      
      def cassandra_init(**kwargs):
          """ Initialize a clean Cassandra connection. """
          if cql_cluster is not None:
              cql_cluster.shutdown()
          if cql_session is not None:
              cql_session.shutdown()
          connection.setup()
      
      # Initialize worker context for both standard and periodic tasks.
      worker_process_init.connect(cassandra_init)
      beat_init.connect(cassandra_init)
      
      app = Celery()
      

      这对我有用

      【讨论】:

        猜你喜欢
        • 2016-06-03
        • 2018-10-19
        • 2014-09-15
        • 2012-11-27
        • 2019-04-18
        • 1970-01-01
        • 2016-09-20
        • 2017-03-23
        • 2017-11-30
        相关资源
        最近更新 更多