【问题标题】:Python- How to pass Feedparser object to a celery task?Python-如何将 Feedparser 对象传递给 celery 任务?
【发布时间】:2015-04-12 22:18:00
【问题描述】:

我使用feedparser 模块来解析RSS 提要。我需要将 feedparser 对象传递给 celery 任务。

在尝试传递对象时,我收到一条错误消息time.struct_time(tm_year=2015, tm_mon=2, tm_mday=12, tm_hour=8, tm_min=19, tm_sec=11, tm_wday=3, tm_yday=43, tm_isdst=0) is not JSON serializable

如何将 feedparser 对象传递给 celery 任务?

这是我的代码:-

rss_content = feedparser.parse(rss_link)
content_entries = rss_content['entries']
for content in content_entries:
    parse_link.apply_async(args=[content, link, json_id, news_category], queue= news_source) #celery task

我该怎么做?

【问题讨论】:

    标签: python celery feedparser jsonserializer


    【解决方案1】:

    您需要创建您的自定义编码器和解码器,它们基本上会将您的 time.time_struct 对象转换为可序列化的对象(一个 dict),然后按照 docs 中的描述在 kombu 序列化程序注册表中注册它们,以便让celery 在其任务中使用您的新序列化程序。

    import json
    import time
    import types
    import datetime
    
    class FeedContentEncoder(json.JSONEncoder):   
        def default(self, obj):
            if isinstance(obj, time_struct):
                epoch = int(time.mktime(time_struct))
                return {'__type__': '__time__', 'time': epoch}
            else:
                return json.FeedContentEncoder.default(self, obj)
    
    def decode_feed_content(obj):
        if isinstance(obj, types.DictionaryType) and '__type__' in obj:
            if obj['__type__'] == '__time__':
                return datetime.datetime.fromtimestamp(obj['time']).timetuple()
        return obj
    

    您需要通过将新序列化注册到序列化程序注册表中来通知 kombu。

    from kombu.serialization import register
    
    def feed_content_json_dumps(obj):
        return json.dumps(obj, cls=FeedContentEncoder)
    
    def feed_content_json_loads(obj):
        return json.loads(obj, object_hook=decode_feed_content)
    
    register('feedcontentjson', 
             feed_content_json_dumps, 
             feed_content_json_loads, 
             content_type='application/x-feedcontent-json', 
             content_encoding='utf-8') 
    

    最后,你应该告诉 celery 使用新的序列化器来序列化任务,就像celery docs;您应该使用 serializer 参数调用您的任务。

    parse_link.apply_async(args=[content, link, json_id, news_category], queue= news_source, serializer='feedcontentjson')
    

    希望这会有所帮助。

    【讨论】:

    • 我在放置celeryconfig.py 的同一目录中创建了一个serializer.py 文件。我在哪里注册我的新序列化?在哪个文件中?以及 celery 将如何了解我的新序列化?如何连接这些文件?
    • 我通过酸洗 Feedparser 对象解决了这个问题。不管怎么说,多谢拉。 :)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-02-28
    • 2012-03-22
    • 2018-10-19
    • 2019-11-13
    • 2021-11-22
    • 1970-01-01
    • 2014-03-07
    相关资源
    最近更新 更多