【问题标题】:How do I change these producer-consumer microservices to allow parallel processing?如何更改这些生产者-消费者微服务以允许并行处理?
【发布时间】:2015-03-06 13:08:52
【问题描述】:

我有几个微服务(用 ruby​​ 实现,尽管我怀疑这对我的问题很重要)。其中一个提供项目,另一个处理它们,然后将它们标记为已处理(通过 DELETE 调用)

提供者有一个/items 端点,它以 JSON 格式列出了一组用 id 标识的项目。它还有一个DELETE /items/id 端点,可以从列表中删除一项(可能是因为它已被处理)

“处理器”中的代码(非常简化)如下所示:

items = <GET provider/items>
items.each do |item|
  process item
  <DELETE provider/items/#{item.id}>
end

这有几个问题,但我想解决的一个是它不是线程安全的,因此我不能并行运行它。如果两个工人同时开始处理项目,他们将“踩到对方的脚趾”:他们将获得相同的项目列表,然后(尝试)处理并删除每个项目两次。

我可以更改此设置以允许并行处理的最简单方法是什么?

你可以假设我有 ruby​​ 可用。我宁愿将更改保持在最低限度,并且如果可能的话,宁愿不安装其他 gem。 Sidekiq 可用作消费者的排队系统。

【问题讨论】:

  • 如果 Sidekiq 已经启动并运行,为什么还要自己实现这样的任务排队系统?
  • @PatrickOscity 生产者和消费者在不同的服务器上

标签: ruby-on-rails ruby multithreading parallel-processing microservices


【解决方案1】:

一些替代方案(只是头脑风暴):

  1. 只需删除 HTTP 并将 pub-sub 与队列一起使用。让生产者对项目进行排队,许多消费者对其进行处理(并触发状态更改,在这种情况下使用 HTTP,如果你喜欢的话)。
  2. 如果你真的想要 HTTP,我认为有几个缺失的部分。如果您的项目的状态是 pendingprocessed,那么您的状态机中有一个隐藏/隐式状态:in_progress(或其他)。一旦你想到它,图片就会变得更清晰:你的GET /items 不是幂等的(因为它会将项目的状态从挂起更改为进行中),因此首先不应该是 GET。

    一个。另一种方法可能是添加一个新实体(例如批次),该实体通过 POST 创建并将一些项目分组并发送它们。已经退回的项目不会成为未来批次的一部分,然后您可以将整个批次标记为已完成(例如PUT /batches/X/done)。这变得非常快,因为您将开始重新实现排队系统和普通/显式(参见 c)HTTP 中已经存在的特性(ack、超时、错误)。

    b.一个稍微简单的替代方案:只需将/items 转换为POST/PUT(在这两种情况下都很奇怪)端点,该端点将项目标记为正在处理(并且不再返回它们,因为它只返回待处理的项目)。但同样的错误和超时问题也适用。

    c。让生产者明确并通过 PUT 向其他服务请求处理项目。您可以在正文中包含所有需要的数据,也可以将其用作 ping 并让处理器通过 GET 请求信息。您可以在任一侧添加异步处理(但在处理器中可能更好)。

我会诚实地做 1(除非有令人信服的理由)。

【讨论】:

  • 我最终做了比这更复杂的事情,但我认为你的 2.c 答案是解决我所问问题的答案(“最简单的解决方案,变化最少”)。从本质上讲,它使用 HTTP 协议作为某种队列,这很好。
  • 在我的情况下,我没有执行 2.c,因为消费者和生产者之间的请求数量会倒置,而消费者在性能方面是“弱者”。完成后他可以说“我完成了”,但制作人的很多 PUT 可能会让他失望。
  • 我最终在现实中做了什么,以防有人想知道:生产者上的 id 是连续的。我稍微更改了“DELETE”端点,使其意味着“删除直到”给定 ID。我现在读取消费者中的项目信息,将它们放入内部队列,并在一个请求中立即从生产者中删除所有项目。然后我从内部消费者队列中处理它们。
  • “items fetcher”仍然不是多线程的,但是“events processor”,这是慢的部分,可以用多个worker来完成就好了。此外,我还减少了服务器之间的流量,这也很好。
【解决方案2】:

我可以更改此设置以允许并行处理的最简单方法是什么?

如果你可以升级服务器上的代码,或者添加中间人代码,那么最简单的方法就是队列。

如果您只喜欢客户端,没有中间人,也没有客户对客户的谈话,并且偶尔有一些冗余是可以的,那么这里有一些想法。

  1. 使用 shuffle 减少冲突

    • 如果您的服务器可以接收不存在对象的 DELETE
    • 而且“工艺项”成本+时间比较小
    • 并且该过程与顺序无关
    • 然后你可以打乱项目以减少碰撞:

      items.shuffle.each do |item|
        process item
      
  2. 使用 HEAD 检查项目是否存在

    • 如果你的服务器有 HEAD 方法
    • 并且有办法查找一个项目
    • 与“进程项”相比,HTTP 连接便宜且快速
    • 如果该项目不存在,您可以跳过它:

      items.each do |item|
        next if !<HEAD provider/items/id>
      
  3. 使用轮询循环刷新项目

    • 如果项目类似于您对正在进行的工作池进行轮询
    • 并且与订单无关
    • 而且 GET 请求是幂等的,即可以多次请求所有项目
    • DELETE 请求返回一个结果,通知您该项目不存在
    • 然后您可以处理项目直到遇到冗余,然后刷新项目列表:

      loop do
        items = <GET provider/items>
        if items.blank?
          sleep 1
          next
        end
        items.each do |item|
          process item
          <DELETE provider/items/#{item.id}>
          break if DELETE returns a code that indicates "already deleted"
        end
      end
      
  4. 以上所有内容都使用轮询循环、随机播放和 HEAD 检查进行组合。

    • 这非常有效,因为没有队列、没有中间人,也没有客户对客户的谈话。
    • 当多个客户端检查一个项目是否存在然后开始处理它时,仍然会发生罕见的冗余“处理项目”;在实践中,这几乎是零概率,尤其是当有很多项目时。

      loop do
        items = <GET provider/items>
        if items.blank?
          sleep 1 
          next
        end
        items.shuffle do |item|
          break if !<HEAD provider/items/id>
          process item
          <DELETE provider/items/#{item.id}>
          break if DELETE returns a code that indicates "already deleted"
        end
      end
      

【讨论】:

  • 嗨,我最终做了一些与您的建议完全不同的事情(请参阅我的 cmets 到 @porras 解决方案),但对于其他应用程序,我会牢记 shuffle。 +1
【解决方案3】:

在我看来,并行化这个实现的问题是你认为每个线程都会调用:

&lt;GET provider/items&gt;

一种解决方案是先获取所有项目,然后进行异步处理。

我的 Ruby 不存在,但它可能看起来像这样:

class HardWorker
    include Sidekiq::Worker
    def perform(item)
        process item
        <DELETE provider/items/#{item.id}>
    end
end

items = <GET provider/items>

items.each do |item|
    HardWorker.perform_async(item)
end

这样你的“生产者”是循环,消费者是异步HardWorker

【讨论】:

  • 谢谢。恐怕只有解决方案是不够的。假设有 200 个项目。第一个 GET 获得 200 并开始通过 Hardworker 处理它们。当它处理了大约 100 个时,就会启动一个新的 GET 索引。它获得了 100 个待处理的项目,因此为这些项目重复了 Hardworkers。
  • 也就是说,我最终做的事情本质上与您的建议相似:我“读取项目”然后“在不同的工作人员中处理它们”。缺少的是“一次性删除所有已读项目”。有关详细信息,请参阅@porras 答案中的我的 cmets。无论如何,+1 表示在消费者内部使用工作者的想法。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-04-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多