【问题标题】:Get rows inserted since last check?自上次检查以来插入的行?
【发布时间】:2018-07-26 11:44:35
【问题描述】:

我正在实现CQRS 模式,其中一个或多个进程将记录插入数据库,而一个或多个进程以不同的速度提取它们。

我希望消费者进程轮询数据库以查找自上次检查以来插入的新记录,但我不确定如何(安全地)实现这一点。

您可以假设行一旦插入就不会改变。似乎每行只有一个唯一的 id 和一个指示插入时间的时间戳是不够的。

如果我查询时间戳大于我看到的最后一行的记录,那么如果同时插入多条记录(具有相同的时间戳),我就会遇到问题。

如果我查询 id 大于我看到的最后一行的记录,那么我会遇到并发事务可能以非递增顺序提交 ID 的问题(例如,postgreSQL 会话提前分配和缓存序列 ID 以提高性能) .

理想情况下,我正在寻找一种与 DBMS 无关的解决方案,并且能够尽可能接近实时地使用数据。有什么想法吗?

澄清:每一行应该被消费多次,每个消费者一次。意思是,仅仅因为一个消费者处理一行不应该阻止其他消费者这样做。每个消费者都会对相同的数据做不同的事情。

【问题讨论】:

  • 因为你有多个消费者,你要么需要一种方法让他们拥有一个共享存储,以便能够弄清楚他们已经阅读和处理了什么。一种方法是通过在处理时更新某些列来维护源表上的内容。如果无法更改表结构,您可以创建一个新表或采用其他形式来维护每个消费者处理的记录,例如在内存中、新表或某个共享文件中。
  • @clinomaniac 我澄清了这个问题:每一行应该被多次消费,每个消费者一次。
  • 这使事情变得更简单,因为您不需要消费者确定每一行是否已被处理。现在唯一的问题是每个消费者的独特性。如果时间戳不够唯一,一种非常简单的方法是始终获取不包括当前的数据(可以是相对的)。根据我的假设,这应该会有所帮助,即您不会在数据中获得较旧的时间戳。当消费者再次投票时,您将获得最后一个。基本上这意味着忽略过去 10 秒左右进入的数据以避免重复。
  • @clinomaniac 如问题中所述:如果同时提交两行,但您在上一个查询中只读取其中一个,那么下次运行查询时,您将错过第二个行。
  • 您会得到 10 秒(或一定时间)前编写的所有内容。您可以一次读取多行。

标签: sql database cqrs


【解决方案1】:

由于您有大量数据传入,并且可能有多个记录最后一个时间戳,因此您需要一种方法来跟踪读取的数据。以下是几种不同的方法,各有优缺点:

  1. 您可以等待数据进入以获得时间戳。你可以通过不读取MAX(timestamp) 来做到这一点,这样你就可以从表中获取所有数据,除了最后一个数据可能仍在进来的数据。

专业:简单的设计

缺点:不是实时处理

  1. 您可以将每次读取的 id 存储为最后一个时间戳。获取数据时,可以使用(timestamp = lasttimestamp and id not in (set of ids)) or timestamp > lasttimestamp)之类的查询

专业版:几乎是实时的

缺点:需要额外的存储空间

【讨论】:

  • 如果服务器的时间更新了怎么办?例如,它可能会被设置回 1 秒。
  • @ConstantinGalbenu 问题指出“您可以假设行一旦插入就不会改变。”如果不是这样,您可以添加一个数据库设置的时间戳列,指示该行上次更新的时间。然后,您将选择此列而不是用户设置的列。
  • @Gili 你不明白我的评论。当服务器将更新其系统时间(即通过使用 NTP)时,行将在过去中有时间戳。
  • @ConstantinGalbenu 对,那会有问题。那你有什么推荐的?理想情况下,我们希望在每一行旁边标记一个不断增加的时间或版本号。
  • 没关系,因为时间戳仍然相同。我们没有使用服务器时间进行任何计算。只要保持时间戳,就不会成为问题。
【解决方案2】:

如果您不使用分片或类似功能:

您可以使用乐观锁定

为此,您可以创建一个order 列,在记录表(日志)上具有唯一索引。在每次插入之前,生产者查询日志中最大的order,并将其递增并插入带有此order 的下一条记录。

如果发生并发异常(即Duplicate entry '12345' for key order),则重试整个过程(查询、递增、插入)。

如果您使用分片或类似功能:

然后您将需要一个额外的服务/表,它会在每次被要求时生成一个新的、唯一的、始终递增的order 整数。

这样做的缺点是必须管理另一部分,即必须具有高可用性的单点故障。

附言

  • “分片或类似”意味着您不能在整个表上拥有唯一索引,因为您使用分片或写入多个表。
  • 您不能依赖时间戳或与物理时间相关的任何内容,因为系统时间可能会通过自动服务 (NTP) 或人工操作员进行调整。

【讨论】:

  • 您的sequence 方法要求性能极差。阅读postgresql.org/docs/9.4/static/sql-createsequence.html中的Unexpected results might be obtained if a cache setting greater than one is used for a sequence object that will be used concurrently by multiple sessions.部分
  • @Gili 它被命名为sequence 只是一个巧合,它不是 Postgresql SEQUENCE。
  • @Gili 非分片表上整数值的唯一索引不是“严重的”。我在我的 MongoDB 事件存储实现中使用了一个唯一索引,其中包含数百万个文档,而且速度非常快。
  • 我不是在谈论唯一索引。我说的是不能保证序列以与提交事务相同的顺序插入数字的事实。示例:T1 插入第 1 行(提取序列 #1)。 T2 插入第 2 行(拉序列 #2)。 T2 在 T1 之前提交。消费者看到序列#2 是最新的。 T1 插入第 3 行(拉序列 #3)。 T1 提交。现在数据库包含序列为2, 1, 3 的行,消费者将跳过序列为#1 的行。
  • @Gili T2 没有拉 #2,因为 #1 尚未提交。
猜你喜欢
  • 2017-12-24
  • 2012-04-22
  • 1970-01-01
  • 1970-01-01
  • 2012-08-14
  • 2011-10-03
  • 2015-02-03
  • 2019-05-16
  • 1970-01-01
相关资源
最近更新 更多