【问题标题】:Mongo Change Streams running multiple times (kind of): Node app running multiple instancesMongo Change Streams running multiple times (kind of): Node app running multiple instances
【发布时间】:2018-08-24 15:36:29
【问题描述】:

我的 Node 应用程序使用 Mongo 更改流,并且该应用程序在生产中运行 3 个以上的实例(最终会更多,因此随着它的增长,这将成为一个更大的问题)。因此,当更改流中发生更改时,功能会运行与进程一样多的次数。

如何设置以使更改流只运行一次?

这是我得到的:

const options = { fullDocument: "updateLookup" };

const filter = [
  {
    $match: {
      $and: [
        { "updateDescription.updatedFields.sites": { $exists: true } },
        { operationType: "update" }
      ]
    }
  }
];

const sitesStream = Client.watch(sitesFilter, options);

// Start listening to site stream
sitesStream.on("change", async change => {
  console.log("in site change stream", change);
  console.log(
    "in site change stream, update desc",
    change.updateDescription
  );

  // Do work...
  console.log("site change stream done.");
  return;
});

【问题讨论】:

    标签: node.js mongodb changestream


    【解决方案1】:

    只需使用 Mongodb 查询运算符即可轻松完成。您可以在 ID 字段上添加模查询,其中除数是您的应用程序实例的数量 (N)。余数则为 {0, 1, 2, ..., N-1} 的元素。如果您的应用实例按从零到 N-1 的升序编号,您可以这样编写过滤器:

    const filter = [
      {
        "$match": {
          "$and": [
            // Other filters
            { "_id": { "$mod": [<number of instances>, <this instance's id>]}}
          ]
        }
      }
    ];
    

    【讨论】:

    • 谢谢,这听起来比部署 MQTT 或 Kafka 更好
    • 如果实例自动扩展以满足需求,这将不起作用。
    【解决方案2】:

    在强有力的保证下做到这一点很困难,但并非不可能。我在这里写了一个解决方案的细节:https://www.alechenninger.com/2020/05/building-kafka-like-message-queue-with.html

    示例是用 Java 编写的,但重要的部分是算法。

    归结为一些技巧:

    • 每个进程都尝试获取锁
    • 每个锁(或每个更改)都有一个关联的fencing token
    • 处理每个更改必须是幂等的
    • 在处理更改时,令牌用于确保有序、有效的一次更新。

    更多详情请关注blog post

    【讨论】:

    • @Shubham 不,不幸的是它是一个 Java 库。但我怀疑算法和模型可以相对容易地移植到 JS 等其他语言。
    【解决方案3】:

    听起来您需要一种在实例之间分区更新的方法。你看过 Apache Kafka 吗?基本上,您要做的是拥有一个将更改数据写入分区 Kafka 主题的应用程序,并让您的节点应用程序成为 Kafka 消费者。这将确保只有一个应用程序实例收到更新。

    根据您的分区策略,您甚至可以确保同一记录的更新始终发送到同一个节点应用程序(如果您的应用程序需要维护自己的状态)。否则,您可以以循环方式分发更新。

    使用 Kafka 的最大好处是您可以添加和删除实例,而无需调整配置。例如,您可以启动一个实例,它会处理所有更新。然后,一旦您启动另一个实例,它们就会开始处理一半的负载。只要有分区,您就可以对尽可能多的实例继续这种模式(如果需要,您可以将主题配置为具有 1000 个分区),这就是 Kafka 消费者组的强大功能。缩小则相反。

    【讨论】:

      【解决方案4】:

      虽然 Kafka 选项听起来很有趣,但它是在我不熟悉的平台上进行的大量基础架构工作,所以我决定为我选择离家更近一些的东西,将 MQTT 消息发送到一个小站单独的应用程序,并让 MQTT 服务器监控消息的唯一性。

      siteStream.on("change", async change => {
        console.log("in site change stream);
        const mqttClient = mqtt.connect("mqtt://localhost:1883");
        const id = JSON.stringify(change._id._data);
        // You'll want to push more than just the change stream id obviously...
        mqttClient.on("connect", function() {
          mqttClient.publish("myTopic", id);
          mqttClient.end();
        });
      });
      

      我仍在制定 MQTT 服务器的最终版本,但评估消息唯一性的方法可能会在应用程序内存中存储一​​组更改流 ID,因为不需要持久化它们,并评估是否根据之前是否看到过该更改流 ID 继续进行。

      var mqtt = require("mqtt");
      var client = mqtt.connect("mqtt://localhost:1883");
      var seen = [];
      client.on("connect", function() {
        client.subscribe("myTopic");
      });
      client.on("message", function(topic, message) {
        context = message.toString().replace(/"/g, "");
        if (seen.indexOf(context) < 0) {
          seen.push(context);
          // Do stuff
        }
      });
      

      这不包括安全性等,但你明白了。

      【讨论】:

      • 这实际上是最终处理这个问题的最佳方法吗?我目前面临同样的问题,即多个实例正在搞乱我试图在手表功能中实现的数据库写入。这似乎给我最初认为的变更流的简单实现增加了很多复杂性。 :(
      • 此时,我的想法是运行另一个节点服务器,该节点服务器连接到处理所有观看的数据库,这显然仅限于单个实例?
      • @RyannGalea:大家好,我也遇到了同样的情况。 .我正在编写一个服务,负责监视更改并将更改发布到 rabbitmq 主题。但我仍然有一些担忧,例如此服务的单个实例可能会因 mongodb 写入、恢复更改等而过载。 Anw,您的最终解决方案如何?
      • 有一些方法可以将负载分配给多个生产者,例如基于实体的创建时间。更喜欢复数的course
      • 到目前为止一切都很好,但我认为它更多的是暂时的权宜之计。我相信在遇到扩展问题之前,您需要将大量不间断的消息发送到 RabbitMQ。将在复数视力的课程中寻找更长期的解决方案,感谢分享@pcuong
      【解决方案5】:

      是否会在 DB 中有一个名为 status 的字段,该字段将根据从更改流接收到的事件使用 findAnUpdate 进行更新。因此,假设您同时从更改流中获得 2 个事件。第一个事件将状态更新为start,如果状态为start,另一个将抛出错误。所以第二个事件不会处理任何业务逻辑。

      【讨论】:

      • 如果他们在“开始”状态的同一时间进行读取,然后他们都更改它......这是一个不好的解决方案
      • 所以只有一个能够更改它,因为状态字段将被更新为 ```end`,而另一个尝试更改它将失败。这是您可以在 MongoDB 中保证更新是原子的。
      • 这似乎是最好的方法。更改流的所有接收者都尝试翻转标志,并且唯一成功的接收者处理该事件。唯一需要考虑的是更改事件的_id 字段是否对于每个事件都是稳定的。
      【解决方案6】:

      我并不是说这些是坚如磐石的生产级解决方案,但我相信这样的事情可以奏效

      解决方案 1

      申请Read-Modify-Write:

      1. 在文档中添加version字段,所有创建的文档都有version=0
      2. 接收 ChangeStream 事件
      3. 阅读需要更新的文档
      4. 对模型执行更新
      5. 增量版本
      6. 更新 idversion 匹配的文档,否则丢弃更改

      是的,它会创建2 * n_application_replicas 无用的查询,所以还有另一种选择

      解决方案 2

      1. 在 mongo 中创建 ResumeToken 集合,用于存储集合 -> 令牌映射
      2. 在changeStream handler代码中,写入成功后,更新集合中的ResumeToken
      3. 创建一个功能切换,以禁用在您的应用程序中读取 ChangeStream
      4. 仅将应用程序的一个实例配置为“读取器”

      如果“阅读器”发生故障,您可以在另一个节点上启用阅读,或者重新部署“阅读器”节点。

      结果:可能会有无限数量的非阅读器副本,不会有任何无用的查询

      【讨论】:

        猜你喜欢
        • 2022-12-19
        • 1970-01-01
        • 2022-12-02
        • 2022-12-27
        • 2022-12-02
        • 2020-12-05
        • 2022-12-02
        • 2016-12-04
        • 2022-12-01
        相关资源
        最近更新 更多