【发布时间】:2014-02-07 15:41:12
【问题描述】:
我有一些对象要发送到我的应用程序中的 celery 任务。这些对象显然不能使用默认的 json 库进行 json 序列化。有没有办法让 celery 使用自定义 JSON Encoder/Decoder 对这些对象进行序列化/反序列化?
【问题讨论】:
我有一些对象要发送到我的应用程序中的 celery 任务。这些对象显然不能使用默认的 json 库进行 json 序列化。有没有办法让 celery 使用自定义 JSON Encoder/Decoder 对这些对象进行序列化/反序列化?
【问题讨论】:
这里有点晚了,但是您应该能够通过在 kombu 序列化程序注册表中注册它们来定义自定义编码器和解码器,如文档中所示:http://docs.celeryproject.org/en/latest/userguide/calling.html#serializers。
例如,以下是 Django 的自定义日期时间序列化器/反序列化器(子类化python's builtin json module):
myjson.py(将其放在您的 settings.py 文件的同一文件夹中)
import json
from datetime import datetime
from time import mktime
class MyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return {
'__type__': '__datetime__',
'epoch': int(mktime(obj.timetuple()))
}
else:
return json.JSONEncoder.default(self, obj)
def my_decoder(obj):
if '__type__' in obj:
if obj['__type__'] == '__datetime__':
return datetime.fromtimestamp(obj['epoch'])
return obj
# Encoder function
def my_dumps(obj):
return json.dumps(obj, cls=MyEncoder)
# Decoder function
def my_loads(obj):
return json.loads(obj, object_hook=my_decoder)
settings.py
# Register your new serializer methods into kombu
from kombu.serialization import register
from .myjson import my_dumps, my_loads
register('myjson', my_dumps, my_loads,
content_type='application/x-myjson',
content_encoding='utf-8')
# Tell celery to use your new serializer:
CELERY_ACCEPT_CONTENT = ['myjson']
CELERY_TASK_SERIALIZER = 'myjson'
CELERY_RESULT_SERIALIZER = 'myjson'
【讨论】:
settings.py 中拔出register 调用(由于某种原因,它抛出了一个完全不相关的错误),但这有效。谢谢!