我还没有建造像你现在所做的那样庞大的东西,但我会给你一些对我有用的东西,以及一些一般性的意见......
-
我的偏好是避免 In-Memory OLTP 并将所有内容写入持久表并保持消息队列尽可能干净
-
在服务器中使用尽可能快的硬盘驱动器,写入速度相当于 NVMe 或使用 RAID 10 等更快。
-
我会在每条消息命中后立即将其从队列中取出并将其写入我命名为“mqMessagesReceived”的表中(参见下面的代码,我的通用 MQ 处理程序名为 mqAsyncQueueMessageOnCreate)
-
我在“mqMessagesReceived”表中使用了一个触发器,该触发器进行查找以查找要执行哪个 StoredProcedure 来处理每条唯一消息(请参见下面的代码)
-
每条消息都有一个标识符(在我的例子中,我使用将消息写入队列的原始表名)并且此标识符用作在 mqMessagesReceived 表的触发器内运行的查找查询的键,找出需要运行的后续存储过程,以正确处理收到的每条消息。
-
在 MQ 上发送消息之前,
可以从调用方创建一个通用变量(例如,如果触发器将消息放入 MQ)
SELECT @tThisTableName = OBJECT_NAME(parent_object_id) FROM sys.objects
WHERE sys.objects.name = OBJECT_NAME(@@PROCID)
AND SCHEMA_NAME(sys.objects.schema_id) = OBJECT_SCHEMA_NAME(@@PROCID);
- 配置表是用于将表名与需要运行的 StoredProcedure 匹配的查找数据,以处理到达并写入 mqMessagesReceived 表的 MQ 数据。
这是查找表的定义
CREATE TABLE [dbo].[mqMessagesConfig](
[ID] [int] IDENTITY(1,1) NOT NULL,
[tSourceTableReceived] [nvarchar](128) NOT NULL,
[tTriggeredStoredProcedure] [nvarchar](128) NOT NULL,
CONSTRAINT [PK_mqMessagesConfig] PRIMARY KEY CLUSTERED
(
[ID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
这是在消息进入队列时运行的激活存储过程
CREATE PROCEDURE [dbo].[mqAsyncQueueMessageOnCreate]
AS
BEGIN
SET NOCOUNT ON
DECLARE
@h UNIQUEIDENTIFIER,
@t sysname,
@b varbinary(200),
@hand VARCHAR(36),
@body VARCHAR(2000),
@sqlcleanup nvarchar(MAX)
-- Get all of the messages on the queue
-- the WHILE loop is infinite, until BREAK is received when we get a null handle
WHILE 1=1
BEGIN
SET @h = NULL
--Note the semicolon..!
;RECEIVE TOP(1)
@h = conversation_handle,
@t = message_type_name,
@b = message_body
FROM mqAsyncQueue
--No message found (handle is now null)
IF @h IS NULL
BEGIN
-- all messages are now processed, but we still have the @hand variable saved from processing the last message
SET @sqlcleanup = 'EXEC [mqConversationsClearOne] @handle = N' + char(39) + @hand + char(39) + ';';
EXECUTE(@sqlcleanup);
BREAK
END
--mqAsyncMessage message type received
ELSE IF @t = 'mqAsyncMessage'
BEGIN
SET @hand = CONVERT(varchar(36),@h);
SET @body = CONVERT(varchar(2000),@b);
INSERT mqMessagesReceived (tMessageType, tMessageBody, tMessageBinary, tConversationHandle)
VALUES (@t, @body, @b, @hand);
END
--unknown message type was received that we dont understand
ELSE
BEGIN
INSERT mqMessagesReceived (tMessageBody, tMessageBinary)
VALUES ('Unknown message type received', CONVERT(varbinary(MAX), 'Unknown message type received'))
END
END
END
CREATE PROCEDURE [dbo].[mqConversationsClearOne]
@handle varchar(36)
AS
-- Note: you can check the queue by running this query
-- SELECT * FROM sys.conversation_endpoints
-- SELECT * FROM sys.conversation_endpoints WHERE NOT([State] = 'CO')
-- CO = conversing [State]
DECLARE @getid CURSOR
,@sql NVARCHAR(MAX)
,@conv_id NVARCHAR(100)
,@conv_handle NVARCHAR(100)
-- want to create a chain of statements like this, one per conversation
-- END CONVERSATION 'FE851F37-218C-EA11-B698-4CCC6AD00AE9' WITH CLEANUP;
-- END CONVERSATION 'A4B4F603-208C-EA11-B698-4CCC6AD00AE9' WITH CLEANUP;
SET @getid = CURSOR FOR
SELECT [conversation_id], [conversation_handle]
FROM sys.conversation_endpoints
WHERE conversation_handle = @handle;
OPEN @getid
FETCH NEXT
FROM @getid INTO @conv_id, @conv_handle
WHILE @@FETCH_STATUS = 0
BEGIN
SET @sql = 'END CONVERSATION ' + char(39) + @conv_handle + char(39) + ' WITH CLEANUP;'
EXEC sys.sp_executesql @stmt = @sql;
FETCH NEXT
FROM @getid INTO @conv_id, @conv_handle --, @conv_service
END
CLOSE @getid
DEALLOCATE @getid
并且名为“mqMessagesReceived”的表有这个触发器
CREATE TRIGGER [dbo].[mqMessagesReceived_TriggerUpdate]
ON [dbo].[mqMessagesReceived]
AFTER INSERT
AS
BEGIN
DECLARE
@strMessageBody nvarchar(4000),
@strSourceTable nvarchar(128),
@strSourceKey nvarchar(128),
@strConfigStoredProcedure nvarchar(4000),
@sqlRunStoredProcedure nvarchar(4000),
@strErr nvarchar(4000)
SELECT @strMessageBody= ins.tMessageBody FROM INSERTED ins;
SELECT @strSourceTable = (select txt_Value from dbo.fn_ParseText2Table(@strMessageBody,'|') WHERE Position=2);
SELECT @strSourceKey = (select txt_Value from dbo.fn_ParseText2Table(@strMessageBody,'|') WHERE Position=3);
-- look in mqMessagesConfig to find the name of the final stored procedure
-- to run against the SourceTable
-- e.g. @strConfigStoredProcedure = mqProcess-tblLabDaySchedEventsMQ
SELECT @strConfigStoredProcedure =
(select tTriggeredStoredProcedure from dbo.mqMessagesConfig WHERE tSourceTableReceived = @strSourceTable);
SET @sqlRunStoredProcedure = 'EXEC [' + @strConfigStoredProcedure + '] @iKey = ' + @strSourceKey + ';';
EXECUTE(@sqlRunStoredProcedure);
INSERT INTO [mqMessagesProcessed]
(
[tMessageBody],
[tSourceTable],
[tSourceKey],
[tTriggerStoredProcedure]
)
VALUES
(
@strMessageBody,
@strSourceTable,
@strSourceKey,
@sqlRunStoredProcedure
);
END
另外,我发现我还必须做一些一般性的 SQL Server 调优建议(用于处理繁忙的数据库)
默认情况下,每个 SQL Server 只有一个 TempDB 文件,而 TempDB 的初始大小为 8MB
但是,每次服务器重新启动时,TempDB 都会重置回初始的 8MB 大小,并且该公司每个周末都通过 cron/taskscheduler 重新启动服务器。
我们看到的问题是缓慢的数据库和大量的记录锁,但这只是周一早上的第一件事,当时每个人都在开始他们的工作周时立即敲击数据库。
当 TempDB 自动调整大小时,它被“锁定”,因此根本没有人可以使用该单个 TempDB(这就是 SQL Server 经常变得无响应的原因)
到周五,TempDB 已增长到超过 300MB。
所以...为了解决以下最佳实践建议,我为每个 vCPU 创建了一个 TempDB 文件,因此我创建了 8 个 TempDB 文件,并将它们分布在该服务器上的两个可用硬盘驱动器上,最重要的是,设置它们的初始大小超过我们需要的大小(我选择了每个 200MB)。
这解决了每周一早上遇到的 SQL Server 减速和记录锁定问题。