【问题标题】:Using SQL Server as a DB queue with multiple clients将 SQL Server 用作具有多个客户端的数据库队列
【发布时间】:2011-04-08 04:31:33
【问题描述】:

给定一个充当队列的表,我怎样才能最好地配置表/查询,以便多个客户端同时从队列中处理?

例如,下表指示工作人员必须处理的命令。当worker完成后,它会将处理后的值设置为true。

| ID | COMMAND | PROCESSED |
|  1 | ...     | true      |
|  2 | ...     | false     |
|  3 | ...     | false     |

客户端可能会获得一个命令来处理,如下所示:

select top 1 COMMAND 
from EXAMPLE_TABLE 
with (UPDLOCK, ROWLOCK) 
where PROCESSED=false;

但是,如果有多个工作人员,每个工作人员都会尝试获取 ID=2 的行。只有第一个会获得悲观锁,其余的将等待。然后其中一个将获得第 3 行等。

什么查询/配置可以让每个工作客户端分别获取不同的行并同时处理它们?

编辑:

有几个答案建议使用表本身记录进程中状态的变化。我认为这在单笔交易中是不可能的。 (即,如果在提交 txn 之前没有其他工作人员看到它,那么更新状态有什么意义?)也许建议是:

# start transaction
update to 'processing'
# end transaction
# start transaction
process the command
update to 'processed'
# end transaction

这是人们通常处理这个问题的方式吗?在我看来,如果可能的话,DB 会更好地处理这个问题。

【问题讨论】:

  • 请指点我原来的,因为 SO 不推荐欺骗。
  • 既然 Microsoft 消息队列 (MSMQ) 的形式已经可用于任何 Windows 服务器,为什么还要费力重新创建所有这些功能?使用可用的东西 - 不要不断重新发明轮子!
  • 我不同意 marc_s。 MSMQ 为系统管理员和开发人员带来了极大的复杂性。数百万美元的项目失败了,因为他们使用了 MSMQ,但无法处理它的复杂性。如果没有令人信服的理由,您不应引入 MSMQ。
  • @marc_s:在不同的事务管理器中分离您的队列意味着在每个操作上进行两阶段提交以协调 SQL-MSMQ(即每秒数十/数百次操作与数万次单阶段提交)。将消息保存在 MSMQ 中并将状态保存在数据库中意味着 您不可能进行一致的备份。使用 MSMQ,您可以放松消息的可查询性。最后,MSMQ per store 有 2Gb 的限制,这在今天非常小,您可以轻松用完 MSMQ 存储空间,此时它只会滚动并死掉。
  • @Remus Rusanu:感谢那些有趣的见解——这些确实是 MSMQ 的一些严重缺陷.....

标签: sql sql-server database concurrency


【解决方案1】:

我建议你去Using tables as Queues。 正确实施的队列可以处理数千个并发用户和高达每分钟 1/2 百万入队/出队操作的服务。在 SQL Server 2005 之前,该解决方案很麻烦,涉及在单个事务中混合 SELECTUPDATE,并提供正确的锁定提示组合,如 gbn 链接的文章中所示。幸运的是,自从 SQL Server 2005 随着 OUTPUT 子句的出现,一个更优雅的解决方案可用,现在 MSDN 建议使用OUTPUT clause

您可以在应用程序中使用 OUTPUT 使用表作为队列,或持有 中间结果集。那就是 应用程序不断添加或 从表中删除行

基本上,您需要解决难题的 3 个部分,才能以高度并发的方式工作:

  1. 您需要自动出列。您必须在单个原子操作中找到该行,跳过任何锁定的行,并将其标记为“出队”,这就是 OUTPUT 子句发挥作用的地方:
    with CTE as (
      SELECT TOP(1) COMMAND, PROCESSED
      FROM TABLE WITH (READPAST)
      WHERE PROCESSED = 0)
    UPDATE CTE
      SET PROCESSED = 1
      OUTPUT INSERTED.*;
  1. 必须使用PROCESSED 列上最左边的聚集索引键来构建表。如果 ID 用作主键,则将其移动为聚集键中的第二列。是否在 ID 列上保留非聚集键的争论尚未结束,但我强烈支持不要在队列中使用任何辅助非聚集索引:
    CREATE CLUSTERED INDEX cdxTable on TABLE(PROCESSED, ID);
  1. 您不能通过任何其他方式查询此表,只能通过 Dequeue。尝试执行 Peek 操作或尝试将表同时用作队列作为存储将很可能导致死锁并显着降低吞吐量。

原子出队、READPAST 提示搜索元素出队和基于处理位的聚集索引上的最左键的组合确保了在高并发负载下的非常高的吞吐量。

【讨论】:

  • "Using tables as Queues" 文章对于我们这些必须实现不适合 Service Broker 的队列(例如待处理队列)的人来说是纯金。谢谢大佬。
  • 当message_handler想要回滚(在同一个事务中)时,如何重新安排待处理的消息以便稍后处理?
  • @dario-g:你不能故意回滚消息处理。
  • 感谢您的回复。这意味着我应该在处理程序中捕获这种情况并明确告诉队列它应该重新安排我的消息?
  • “正确实施的队列可以处理成千上万的并发用户和服务,每分钟可处理高达 1/2 百万的入队/出队操作。” - 这个说法有来源吗?
【解决方案2】:

我在这里的回答向您展示了如何将表格用作队列...SQL Server Process Queue Race Condition

您基本上需要“ROWLOCK、READPAST、UPDLOCK”提示

【讨论】:

  • 锁定提示对于保护选择和更新之间的更改很有用。如果您使用单个更新语句弹出队列,则它们不是必需的或有用的
  • @Andomar:这就是我们所需要的:读者和作者的 100% 安全并发......
【解决方案3】:

如果你想为多个客户端序列化你的操作,你可以简单地使用应用锁。

BEGIN TRANSACTION

EXEC  sp_getapplock @resource = 'app_token', @lockMode = 'Exclusive'

-- perform operation

EXEC  sp_releaseapplock @resource = 'app_token'

COMMIT TRANSACTION

【讨论】:

    【解决方案4】:

    您可以使用 int 来定义命令的状态,而不是为 Processed 使用布尔值:

    1 = not processed
    2 = in progress
    3 = complete
    

    然后每个工作人员将获得下一行,Processed = 1,将 Processed 更新为 2,然后开始工作。当 work in complete Processed 更新为 3。这种方法还允许扩展其他 Processed 结果,例如,您可以添加新状态“Completed Succesfully”和“Completed with Errors”,而不仅仅是定义工作人员已完成。 /p>

    【讨论】:

    • 谢谢。请看我的编辑。我对你的建议有误吗?
    • 您是对的,您需要单独的事务以允许其他工作人员看到更新,这应该是默认行为 - 为什么在工作人员处理命令时保持事务打开?我可以看到一个工人本身本质上是一个事务的奇偶性,但这几乎肯定比使用 Sql Server 事务更好地自己编码
    • @Macros 有什么建议,用它来处理“peek lock”吗?
    【解决方案5】:

    可能更好的选择是使用 trisSate 处理列和版本/时间戳列。然后,已处理列中的三个值将指示该行是正在处理、已处理还是未处理。

    例如

        CREATE TABLE Queue ID INT NOT NULL PRIMARY KEY,
        Command NVARCHAR(100), 
        Processed INT NOT NULL CHECK (Processed in (0,1,2) ), 
        Version timestamp)
    

    您抓取前 1 个未处理的行,将状态设置为处理中,并在事情完成后将状态设置回已处理。基于版本和主键列的更新状态。如果更新失败,则说明有人已经在那里了。

    您可能还想添加一个客户端标识符,这样如果客户端在处理它时死了,它可以重新启动,查看最后一行,然后从它所在的位置开始。

    【讨论】:

    • 谢谢。请看我的编辑。此外,为了实现持续可用性,我希望任何可用的客户端都能恢复失败的工作 - 而不仅仅是失败的工作。
    【解决方案6】:

    我不会弄乱桌子上的锁。只需创建两个额外的列,例如 IsProcessing(位/布尔值)和 ProcessingStarted(日期时间)。当一个工作人员崩溃或在超时后没有更新他的行时,您可以让另一个工作人员尝试处理数据。

    【讨论】:

    • 谢谢。请看我的编辑。此解决方案是否需要在主事务之外进行初始更新?
    • 为什么要使用事务?
    【解决方案7】:

    一种方法是使用单个更新语句标记行。如果您读取where 子句中的状态并在set 子句中对其进行更改,则不会有其他进程介于两者之间,因为该行将被锁定。例如:

    declare @pickup_id int
    set @pickup_id = 1
    
    set rowcount 1
    
    update  YourTable
    set     status = 'picked up'
    ,       @pickup_id = id
    where   status = 'new'
    
    set rowcount 0
    
    return @pickup_id
    

    这使用rowcount 最多更新一行。如果没有找到行,@pickup_id 将是 -1

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-12-06
      • 2019-01-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-07-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多