【问题标题】:how break a celery task inside task如何在任务中打破芹菜任务
【发布时间】:2021-10-12 21:31:15
【问题描述】:

我使用 celery 和 django.inside 一个 forloop ,每转一个值在 db 中设置一个关系

lawyers=Lawyer.objects.filter(consultation_status=True)
for idx,lawyer in enumerate(lawyers):
    if(consultation.lawyer):
        break
    change_offered_lawyer.apply_async((id,lawyer.id),countdown=idx*60)

任务内循环中的每一轮我都会检查条件,我的目标是如果条件终止则中断所有这些任务。

@app.task
def change_offered_lawyer(consulation_id,consulator_id):
    consulation=ConsultationOrder.objects.get(id=consulation_id)
    consulator=Lawyer.objects.get(id=consulator_id)
    if(consultation.lawyer):
        #break all tasks 
    consultation.offered_lawyer=consulator
    consultation.save()

【问题讨论】:

    标签: django celery


    【解决方案1】:

    为了在下面进行更简单的解释,我们将只使用 1-10 的数字列表,它应该在第 4 个数字之后中断,从而处理 1-3 并跳过 4-10。

    解决方案 1:使用链式任务

    设计总结:

    将每个任务彼此相邻。一个任务返回后,调用下一个任务,以此类推。任务必须返回一个值,该值将指示下一个任务是继续还是中断。

    制片人:

    from celery import chain
    
    from task import change_offered_lawyer
    
    tasks = []
    
    lawyers_count = 10
    for lawyer_id in range(1, lawyers_count + 1):
        if lawyer_id == 1:
            # If first item, manually set the initial value for should_continue
            tasks.append(change_offered_lawyer.s(True, lawyer_id))
        else:
            tasks.append(change_offered_lawyer.s(lawyer_id))
    
    chain(*tasks).apply_async()
    

    消费者:

    from celery import shared_task
    
    
    @shared_task
    def change_offered_lawyer(should_continue, lawyer_id):
        if not should_continue:
            print(f"{lawyer_id=} Break...")
            return False
    
        if lawyer_id == 4:
            print(f"{lawyer_id=} Break now!")
            return False
    
        print(f"{lawyer_id=} Continue...")
        return True
    

    日志:

    [2021-08-10 03:04:03,294: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b01490c6-59ee-473e-b15d-40d4f628444c] received
    [2021-08-10 03:04:03,296: WARNING/MainProcess] lawyer_id=1 Continue...
    [2021-08-10 03:04:03,296: WARNING/MainProcess] 
    
    [2021-08-10 03:04:03,299: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b01490c6-59ee-473e-b15d-40d4f628444c] succeeded in 0.003656674999547249s: True
    [2021-08-10 03:04:03,300: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[3ea4e578-4f56-491b-a774-b0a95b33c123] received
    [2021-08-10 03:04:03,301: WARNING/MainProcess] lawyer_id=2 Continue...
    [2021-08-10 03:04:03,301: WARNING/MainProcess] 
    
    [2021-08-10 03:04:03,302: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[3ea4e578-4f56-491b-a774-b0a95b33c123] succeeded in 0.001406519999363809s: True
    [2021-08-10 03:04:03,303: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[dca891ef-4bac-4fa0-85e3-9d7ddfe031d5] received
    [2021-08-10 03:04:03,305: WARNING/MainProcess] lawyer_id=3 Continue...
    [2021-08-10 03:04:03,305: WARNING/MainProcess] 
    
    [2021-08-10 03:04:03,307: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[dca891ef-4bac-4fa0-85e3-9d7ddfe031d5] succeeded in 0.0023091260000001057s: True
    [2021-08-10 03:04:03,308: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[ea57df15-88e7-4b65-8fc3-79df0a189652] received
    [2021-08-10 03:04:03,313: WARNING/MainProcess] lawyer_id=4 Break now!
    [2021-08-10 03:04:03,313: WARNING/MainProcess] 
    
    [2021-08-10 03:04:03,314: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[ea57df15-88e7-4b65-8fc3-79df0a189652] succeeded in 0.001986995999686769s: False
    [2021-08-10 03:04:03,315: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[1e8fa660-2125-4790-b44e-4f884b826b40] received
    [2021-08-10 03:04:03,318: WARNING/MainProcess] lawyer_id=5 Break...
    [2021-08-10 03:04:03,318: WARNING/MainProcess] 
    
    [2021-08-10 03:04:03,324: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[1e8fa660-2125-4790-b44e-4f884b826b40] succeeded in 0.006545270000060555s: False
    [2021-08-10 03:04:03,325: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b6825617-6162-4273-ad14-c128190e18be] received
    [2021-08-10 03:04:03,327: WARNING/MainProcess] lawyer_id=6 Break...
    [2021-08-10 03:04:03,327: WARNING/MainProcess] 
    
    [2021-08-10 03:04:03,328: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b6825617-6162-4273-ad14-c128190e18be] succeeded in 0.002028814999903261s: False
    [2021-08-10 03:04:03,329: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[5d8c1201-48c7-4238-8f92-b999adccfad9] received
    [2021-08-10 03:04:03,330: WARNING/MainProcess] lawyer_id=7 Break...
    [2021-08-10 03:04:03,330: WARNING/MainProcess] 
    
    [2021-08-10 03:04:03,331: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[5d8c1201-48c7-4238-8f92-b999adccfad9] succeeded in 0.0015183160003289231s: False
    [2021-08-10 03:04:03,332: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[8aedf4ad-b3d6-44f6-b5e8-8227bc02df8a] received
    [2021-08-10 03:04:03,333: WARNING/MainProcess] lawyer_id=8 Break...
    [2021-08-10 03:04:03,333: WARNING/MainProcess] 
    
    [2021-08-10 03:04:03,335: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[8aedf4ad-b3d6-44f6-b5e8-8227bc02df8a] succeeded in 0.0019754629993258277s: False
    [2021-08-10 03:04:03,336: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[28dfedc7-40d6-45ab-8d3b-972db47e99bf] received
    [2021-08-10 03:04:03,337: WARNING/MainProcess] lawyer_id=9 Break...
    [2021-08-10 03:04:03,337: WARNING/MainProcess] 
    
    [2021-08-10 03:04:03,338: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[28dfedc7-40d6-45ab-8d3b-972db47e99bf] succeeded in 0.0016348939998351852s: False
    [2021-08-10 03:04:03,339: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[288f6c3d-77d7-4161-9b77-45bcd815f14e] received
    [2021-08-10 03:04:03,341: WARNING/MainProcess] lawyer_id=10 Break...
    [2021-08-10 03:04:03,341: WARNING/MainProcess] 
    
    [2021-08-10 03:04:03,341: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[288f6c3d-77d7-4161-9b77-45bcd815f14e] succeeded in 0.00046298599954752717s: False
    

    参考资料:

    解决方案 2:使用公共存储来跟踪状态

    设计总结:

    使用数据库存储来跟踪执行是必须继续还是中断。在这里,我们将使用 django-caching 系统以方便使用。请注意,如果您打算在没有 countdown 的情况下并行运行任务,则此解决方案可能对 race conditions 有效。

    settings.py:

    ...
    CACHES = {
        "default": {
            "BACKEND": "django.core.cache.backends.db.DatabaseCache",
            "LOCATION": "my_cache_table",
        }
    }
    ...
    
    • 之后,运行python manage.py createcachetable

    制片人:

    from django.core.cache import cache
    
    from task import change_offered_lawyer, reset_cache
    
    cache.set(key='change_offered_lawyer', value='continue', timeout=None)
    
    lawyers_count = 10
    for lawyer_id in range(1, lawyers_count + 1):
        if lawyer_id == lawyers_count:
            # If last item, reset the cache
            change_offered_lawyer.apply_async((lawyer_id,), countdown=lawyer_id * 5, link=reset_cache.si())
        else:
            change_offered_lawyer.apply_async((lawyer_id,), countdown=lawyer_id * 5)
    

    消费者:

    from celery import shared_task
    from django.core.cache import cache
    
    
    @shared_task
    def change_offered_lawyer(lawyer_id):
        if cache.get('change_offered_lawyer') == 'break':
            print(f"{lawyer_id=} Break...")
            return
    
        if lawyer_id == 4:
            cache.set(key='change_offered_lawyer', value='break', timeout=None)
            print(f"{lawyer_id=} Break now!")
            return
    
        print(f"{lawyer_id=} Continue...")
    
    
    @shared_task
    def reset_cache():
        cache.delete('change_offered_lawyer')
    

    日志:

    [2021-08-10 03:10:32,333: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[604585ed-76aa-44b6-9e9c-2119f0e33da2] received
    [2021-08-10 03:10:32,335: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[6305c7d1-c5ff-4c40-915c-6e37ecf9c440] received
    [2021-08-10 03:10:32,337: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[3437dc9d-e3a5-4da3-8020-be52cf871aa1] received
    [2021-08-10 03:10:32,343: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[682a21e5-0531-443b-b829-17c40b9c6509] received
    [2021-08-10 03:10:32,344: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[0d7dbff6-0207-441e-a3e3-2840f028b393] received
    [2021-08-10 03:10:32,345: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[e4e7987f-1ebe-4fc6-b37d-a475783fa1b5] received
    [2021-08-10 03:10:32,346: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[a9dc6b82-c507-47f0-9bab-073d33e24a19] received
    [2021-08-10 03:10:32,347: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[ac8b57cd-2c15-4978-9a70-69d9f876d776] received
    [2021-08-10 03:10:32,349: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[97fc021e-5426-43e3-b6bf-0e83c4ff4eda] received
    [2021-08-10 03:10:32,349: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[c3bda008-6cda-436e-a454-0e9dc8900857] received
    [2021-08-10 03:10:37,258: WARNING/MainProcess] lawyer_id=1 Continue...
    [2021-08-10 03:10:37,258: WARNING/MainProcess] 
    
    [2021-08-10 03:10:37,258: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[604585ed-76aa-44b6-9e9c-2119f0e33da2] succeeded in 0.01066518100014946s: None
    [2021-08-10 03:10:42,347: WARNING/MainProcess] lawyer_id=2 Continue...
    [2021-08-10 03:10:42,347: WARNING/MainProcess] 
    
    [2021-08-10 03:10:42,347: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[6305c7d1-c5ff-4c40-915c-6e37ecf9c440] succeeded in 0.0018151690001104726s: None
    [2021-08-10 03:10:47,350: WARNING/MainProcess] lawyer_id=3 Continue...
    [2021-08-10 03:10:47,350: WARNING/MainProcess] 
    
    [2021-08-10 03:10:47,350: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[3437dc9d-e3a5-4da3-8020-be52cf871aa1] succeeded in 0.004217886999867915s: None
    [2021-08-10 03:10:52,340: WARNING/MainProcess] lawyer_id=4 Break now!
    [2021-08-10 03:10:52,340: WARNING/MainProcess] 
    
    [2021-08-10 03:10:52,341: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[682a21e5-0531-443b-b829-17c40b9c6509] succeeded in 0.013524283999686304s: None
    [2021-08-10 03:10:57,330: WARNING/MainProcess] lawyer_id=5 Break...
    [2021-08-10 03:10:57,330: WARNING/MainProcess] 
    
    [2021-08-10 03:10:57,330: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[0d7dbff6-0207-441e-a3e3-2840f028b393] succeeded in 0.004878013000052306s: None
    [2021-08-10 03:11:02,338: WARNING/MainProcess] lawyer_id=6 Break...
    [2021-08-10 03:11:02,338: WARNING/MainProcess] 
    
    [2021-08-10 03:11:02,338: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[e4e7987f-1ebe-4fc6-b37d-a475783fa1b5] succeeded in 0.00420587299959152s: None
    [2021-08-10 03:11:07,336: WARNING/MainProcess] lawyer_id=7 Break...
    [2021-08-10 03:11:07,336: WARNING/MainProcess] 
    
    [2021-08-10 03:11:07,337: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[a9dc6b82-c507-47f0-9bab-073d33e24a19] succeeded in 0.005373962000703614s: None
    [2021-08-10 03:11:12,327: WARNING/MainProcess] lawyer_id=8 Break...
    [2021-08-10 03:11:12,328: WARNING/MainProcess] 
    
    [2021-08-10 03:11:12,328: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[ac8b57cd-2c15-4978-9a70-69d9f876d776] succeeded in 0.004143920999922557s: None
    [2021-08-10 03:11:17,342: WARNING/MainProcess] lawyer_id=9 Break...
    [2021-08-10 03:11:17,342: WARNING/MainProcess] 
    
    [2021-08-10 03:11:17,343: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[97fc021e-5426-43e3-b6bf-0e83c4ff4eda] succeeded in 0.004410948999975517s: None
    [2021-08-10 03:11:22,327: WARNING/MainProcess] lawyer_id=10 Break...
    [2021-08-10 03:11:22,327: WARNING/MainProcess] 
    
    [2021-08-10 03:11:22,331: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[c3bda008-6cda-436e-a454-0e9dc8900857] succeeded in 0.007513158000620024s: None
    [2021-08-10 03:11:22,332: INFO/MainProcess] Task task.lawyer.reset_cache[d6222a10-dbda-4764-add1-88e6ed93927a] received
    [2021-08-10 03:11:22,338: INFO/MainProcess] Task task.lawyer.reset_cache[d6222a10-dbda-4764-add1-88e6ed93927a] succeeded in 0.004719080000540998s: None
    

    参考资料:

    解决方案 3:同步运行任务(不建议)

    设计总结:

    更新change_offered_lawyer 以返回一个指示符,指示是继续还是中断。然后在调用apply_async()之后,再调用get()同步接收结果,如果需要就中断。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-12-02
      • 2014-12-04
      • 1970-01-01
      • 1970-01-01
      • 2017-08-28
      • 2014-03-16
      • 1970-01-01
      相关资源
      最近更新 更多