您需要创建您的自定义编码器和解码器,它们基本上会将您的 time.time_struct 对象转换为可序列化的对象(一个 dict),然后按照 docs 中的描述在 kombu 序列化程序注册表中注册它们,以便让celery 在其任务中使用您的新序列化程序。
import json
import time
import types
import datetime
class FeedContentEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, time_struct):
epoch = int(time.mktime(time_struct))
return {'__type__': '__time__', 'time': epoch}
else:
return json.FeedContentEncoder.default(self, obj)
def decode_feed_content(obj):
if isinstance(obj, types.DictionaryType) and '__type__' in obj:
if obj['__type__'] == '__time__':
return datetime.datetime.fromtimestamp(obj['time']).timetuple()
return obj
您需要通过将新序列化注册到序列化程序注册表中来通知 kombu。
from kombu.serialization import register
def feed_content_json_dumps(obj):
return json.dumps(obj, cls=FeedContentEncoder)
def feed_content_json_loads(obj):
return json.loads(obj, object_hook=decode_feed_content)
register('feedcontentjson',
feed_content_json_dumps,
feed_content_json_loads,
content_type='application/x-feedcontent-json',
content_encoding='utf-8')
最后,你应该告诉 celery 使用新的序列化器来序列化任务,就像celery docs;您应该使用 serializer 参数调用您的任务。
parse_link.apply_async(args=[content, link, json_id, news_category], queue= news_source, serializer='feedcontentjson')
希望这会有所帮助。