【问题标题】:Sidekiq handling re-queue when processing large dataSidekiq 在处理大数据时处理重新排队
【发布时间】:2014-08-26 06:52:03
【问题描述】:

请参阅下面的更新问题。

原问题:

在我当前的 Rails 项目中,我需要解析大型 xml/csv 数据文件并将其保存到 mongodb。 现在我使用以下步骤:

  1. 接收用户上传的文件,将数据存入mongodb
  2. 使用sidekiq对mongodb中的数据进行异步处理。
  3. 处理完成后,删除原始数据。

对于localhost中的中小数据,上述步骤运行良好。但是在heroku中,我使用hirefire来动态地上下缩放worker dyno。当工人仍在处理大数据时,hirefire 看到空队列并缩小工人测功机。这会向进程发送终止信号,并使进程处于未完成状态。

我正在寻找一种更好的解析方式,允许解析进程随时被终止(接收终止信号时保存当前状态),并允许进程重新排队。

现在我正在使用 Model.delay.parse_file 并且它不会重新排队。

更新

读完sidekiq wiki,我发现article about job control。任何人都可以解释代码,它是如何工作的,以及它在接收到 SIGTERM 信号并且工作人员重新排队时如何保持其状态?

是否有任何替代方法来处理作业终止、保存当前状态并从最后一个位置继续?

谢谢,

【问题讨论】:

    标签: ruby-on-rails mongodb heroku sidekiq idempotent


    【解决方案1】:

    可能更容易解释过程和高级步骤,给出一个示例实现(我使用的一个精简版),然后讨论 throw 和 catch:

    1. 插入具有递增索引的原始 csv 行(以便以后能够从特定行/索引恢复)
    2. 通过检查 Sidekiq::Fetcher.done? 是否返回 true 来处理停止每个“块”的 CSV 以检查作业是否完成
    3. 当fetcher为done?时,将当前处理的item的索引存储在用户身上并返回,这样completes的job和控制权就交还给sidekiq。
    4. 请注意,如果某个作业在短暂超时(默认 20 秒)后仍在运行,则该作业将被终止。
    5. 然后当作业再次简单运行时,从上次中断的地方(或 0)开始

    例子:

        class UserCSVImportWorker
          include Sidekiq::Worker
    
          def perform(user_id)
            user = User.find(user_id)
    
            items = user.raw_csv_items.where(:index => {'$gte' => user.last_csv_index.to_i})
            items.each_with_index do |item, i|
              if (i+1 % 100) == 0 && Sidekiq::Fetcher.done?
                user.update(last_csv_index: item.index)
    
                return
              end
    
              # Process the item as normal
            end
          end
        end
    

    上面的类确保每 100 个项目我们检查 fetcher 没有完成(如果关闭已启动的代理),并结束作业的执行。但是,在执行结束之前,我们会使用已处理的最后一个 index 更新用户,以便我们可以从下次中断的地方开始。

    throw catch 是一种将上述功能实现得更简洁(也许)的方法,但有点像使用 Fibers,这是一个不错的概念,但很难理解。从技术上讲, throw catch 更像 goto,而不是大多数人通常习惯的。

    编辑

    此外,您不能调用Sidekiq::Fetcher.done? 并在每一行或处理的每一行块上记录last_csv_index,这样如果您的工人在没有机会记录last_csv_index 的情况下被杀死,您仍然可以恢复“接近”到你离开的地方。

    【讨论】:

    • 嗨@nort,我还有一个问题要问你。如何处理 Sidekiq::Fetcher.done?另一个类的内部循环?在表演中,我只有一个电话给ModelContainer.find(model_id).parse_data
    • 实际上应该仍然可以正常工作,尽管有点未封装......我在那里的逻辑我倾向于放入工人而不是模型以保持模型纤薄。
    【解决方案2】:

    您正在尝试解决幂等性的概念,即多次处理具有潜在不完整循环的事物不会导致问题的想法。 (https://github.com/mperham/sidekiq/wiki/Best-Practices#2-make-your-jobs-idempotent-and-transactional)

    前进的可能步骤

    1. 将文件拆分为多个部分,并使用每个部分的作业处理这些部分。
    2. 提高hirefire 的门槛,以便在工作可能完全完成(10 分钟)时进行扩展
    3. 不允许在工作运行时缩小hirefire(在开始时设置redis键,在完成时清除)
    4. 在作业处理过程中跟踪作业进度,如果作业被终止,则从中断处继续。

    【讨论】:

    • 谢谢@nort,我会试试你的方法
    猜你喜欢
    • 2015-07-06
    • 2013-05-26
    • 1970-01-01
    • 2012-09-28
    • 2015-07-03
    • 1970-01-01
    • 2018-09-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多