【问题标题】:SQL Server Service Broker - Ways to improve SQL execution frameworkSQL Server Service Broker - 改进 SQL 执行框架的方法
【发布时间】:2020-09-16 03:14:13
【问题描述】:

下面是我一直在玩的使用Service Broker 的SQL 执行框架设计的概要。我已经概述了流程并通过(使用块引用突出显示)提出了一些问题,并且有兴趣听取有关设计的任何建议。

概述

我有一个 ETL 操作需要从 5 个数据库中取出数据,并使用 select/insert 语句或 stored procedures 将其移动到 150 个数据库中。结果是大约 2,000 个单独的查询,每个查询需要 1 秒到 1 小时。

每个 SQL 查询只插入数据。无需返回数据。

操作可以分为3个步骤:

  • ETL 前
  • ETL
  • ETL 后

每个步骤中的查询可以按任意顺序执行,但步骤必须保持顺序。

方法

我正在使用Service Broker 进行异步/并行执行。

关于如何调整服务代理的任何建议(例如,查看或指导设置队列工作人员数量的任何特定选项?

服务代理设计

发起者

initiator 将包含 SQL 查询的XML 消息发送到Unprocessed 队列,并带有一个名为ProcessUnprocessedQueue 的激活存储过程。这个过程被包裹在一个事务中的try/catch中,当出现异常时回滚事务。

ProcessUnpressedQueue

ProcessUnprocessedQueue 将 XML 传递给过程ExecSql

ExecSql - SQL 执行和日志记录

ExecSql 然后处理 SQL 执行和日志记录:

  • 解析 XML 以及有关将要记录的执行的任何其他数据
  • 在执行之前,会插入一个日志条目
    • 如果事务是在initiator中启动的,如果initiator中的外部事务被回滚,是否可以确保日志条目插入始终提交?

    • SAVE TRANSACTION 之类的内容在这里无效,对吗?

    • 我是否应该在这里操作事务,在try/catch 中执行查询,如果它进入catch,插入异常日志条目和throw 异常,因为它正在交易中?

  • 查询已执行
替代日志记录解决方案?

我需要登录:

  • 执行的 SQL 查询
  • 关于操作的元数据
  • 每个进程完成所需的时间
    • 这就是为什么我在进程的开头插入一行,在结束时插入一行
  • 任何异常(如果存在)

有一个包含查询信息的In-Memory OLTP 表会更好吗?所以,我会在操作开始前有INSERT 一行,然后执行UPDATEINSERT 来记录异常和执行时间。批处理完成后,我会将数据归档到存储到磁盘的表中,以防止表变得太大。

ProcessUnprocessedQueue - 手动处理结果

执行后,ProcessUnprocessedQueue 取回 XML 的更新版本(以确定执行是否成功,或有关事务的其他数据,用于后处理),然后将该消息发送到 @987654345 @,没有激活过程,所以可以手动处理(我需要知道一批查询什么时候执行完毕)。

处理查询

由于 ETL 可以分为 3 个步骤,因此我创建了 3 个XML 变量,我将在其中添加 ETL 操作中所需的所有查询,因此我将有如下内容:

  • @preEtlQueue xml
    • 200 个查询
  • @etlQueue xml
    • 1500 个查询
  • @postEtlQueue xml
    • 300 个查询

为什么选择 XML?

XML 队列变量作为OUTPUT 参数在不同的存储过程之间传递,该参数更新其值和/或向其添加SQL 查询。该变量需要读写,因此可以使用全局临时表或持久表之类的替代方法。

然后我处理XML 变量:

  • 使用cursor 循环查询并将它们发送到服务代理服务。
    • XML 变量中包含的每组查询都在同一个 conversation_group_id 下发送。
    • to/from 服务、消息类型等值都存储在XML 变量中。
  • 消息发送到Service Broker后,使用while循环不断检查ProcessedQueue,直到所有消息都处理完。
    • 这实现了超时以避免无限循环
    • 我正在考虑重新设计它。我是否应该在ProcessedQueue 上添加一个激活程序,然后让该程序将处理后的结果插入到物理表中?如果我这样做,我将无法使用RECEIVE 而不是WHILE 循环来检查已处理的项目。这样做有什么缺点吗?

【问题讨论】:

    标签: sql-server tsql etl service-broker


    【解决方案1】:

    我还没有建造像你现在所做的那样庞大的东西,但我会给你一些对我有用的东西,以及一些一般性的意见......

    • 我的偏好是避免 In-Memory OLTP 并将所有内容写入持久表并保持消息队列尽可能干净

    • 在服务器中使用尽可能快的硬盘驱动器,写入速度相当于 NVMe 或使用 RAID 10 等更快。

    • 我会在每条消息命中后立即将其从队列中取出并将其写入我命名为“mqMessagesReceived”的表中(参见下面的代码,我的通用 MQ 处理程序名为 mqAsyncQueueMessageOnCreate)

    • 我在“mqMessagesReceived”表中使用了一个触发器,该触发器进行查找以查找要执行哪个 StoredProcedure 来处理每条唯一消息(请参见下面的代码)

    • 每条消息都有一个标识符(在我的例子中,我使用将消息写入队列的原始表名)并且此标识符用作在 mqMessagesReceived 表的触发器内运行的查找查询的键,找出需要运行的后续存储过程,以正确处理收到的每条消息。

    • 在 MQ 上发送消息之前,

    可以从调用方创建一个通用变量(例如,如果触发器将消息放入 MQ)

    SELECT @tThisTableName = OBJECT_NAME(parent_object_id) FROM sys.objects 
    WHERE sys.objects.name = OBJECT_NAME(@@PROCID)
    AND SCHEMA_NAME(sys.objects.schema_id) = OBJECT_SCHEMA_NAME(@@PROCID);
    
    • 配置表是用于将表名与需要运行的 StoredProcedure 匹配的查找数据,以处理到达并写入 mqMessagesReceived 表的 MQ 数据。

    这是查找表的定义

    CREATE TABLE [dbo].[mqMessagesConfig](
        [ID] [int] IDENTITY(1,1) NOT NULL,
        [tSourceTableReceived] [nvarchar](128) NOT NULL,
        [tTriggeredStoredProcedure] [nvarchar](128) NOT NULL,
     CONSTRAINT [PK_mqMessagesConfig] PRIMARY KEY CLUSTERED 
    (
        [ID] ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
    ) ON [PRIMARY]
    GO
    

    这是在消息进入队列时运行的激活存储过程

    CREATE PROCEDURE [dbo].[mqAsyncQueueMessageOnCreate]
    AS
    BEGIN
        SET NOCOUNT ON
     
        DECLARE 
            @h UNIQUEIDENTIFIER,
            @t sysname,
            @b varbinary(200),
            @hand VARCHAR(36),
            @body VARCHAR(2000),
            @sqlcleanup nvarchar(MAX)
    
            
     
        -- Get all of the messages on the queue
        -- the WHILE loop is infinite, until BREAK is received when we get a null handle
        WHILE 1=1
        BEGIN
            SET @h = NULL
     
            --Note the semicolon..!
            ;RECEIVE TOP(1) 
                @h = conversation_handle,
                @t = message_type_name,
                @b = message_body
            FROM mqAsyncQueue
    
    
            --No message found (handle is now null)
            IF @h IS NULL
            BEGIN
                -- all messages are now processed, but we still have the @hand variable saved from processing the last message
                SET @sqlcleanup = 'EXEC [mqConversationsClearOne]  @handle = N' + char(39) + @hand + char(39) + ';';
                EXECUTE(@sqlcleanup);
                BREAK
            END
    
    
            --mqAsyncMessage message type received
            ELSE IF @t = 'mqAsyncMessage'
            BEGIN
                SET @hand = CONVERT(varchar(36),@h);
                SET @body = CONVERT(varchar(2000),@b);
                
                INSERT mqMessagesReceived  (tMessageType, tMessageBody, tMessageBinary, tConversationHandle)
                VALUES (@t, @body, @b, @hand);
            
            END
            
            
            --unknown message type was received that we dont understand
            ELSE
            BEGIN
                INSERT mqMessagesReceived (tMessageBody, tMessageBinary)  
                VALUES ('Unknown message type received', CONVERT(varbinary(MAX), 'Unknown message type received'))
            END
    
        END        
    
    END
    
    
    
    
    
    
    CREATE PROCEDURE [dbo].[mqConversationsClearOne]
    
        @handle varchar(36)
    
    AS
    
        -- Note: you can check the queue by running this query
        -- SELECT * FROM sys.conversation_endpoints 
        -- SELECT * FROM sys.conversation_endpoints WHERE NOT([State] = 'CO') 
        -- CO = conversing [State]
    
    
        DECLARE @getid CURSOR
                ,@sql NVARCHAR(MAX)
                ,@conv_id NVARCHAR(100)
                ,@conv_handle NVARCHAR(100)
    
    
        -- want to create a chain of statements like this, one per conversation
        -- END CONVERSATION 'FE851F37-218C-EA11-B698-4CCC6AD00AE9' WITH CLEANUP;
        -- END CONVERSATION 'A4B4F603-208C-EA11-B698-4CCC6AD00AE9' WITH CLEANUP;    
    
        SET @getid = CURSOR FOR
                        SELECT [conversation_id], [conversation_handle]  
                        FROM   sys.conversation_endpoints
                        WHERE conversation_handle = @handle;
    
    
        OPEN @getid
    
            FETCH NEXT
            FROM @getid INTO @conv_id, @conv_handle 
            WHILE @@FETCH_STATUS = 0
                BEGIN
                    SET @sql = 'END CONVERSATION ' + char(39) + @conv_handle + char(39) + ' WITH CLEANUP;'
                    EXEC sys.sp_executesql @stmt = @sql;
                    FETCH NEXT
                    FROM @getid INTO @conv_id, @conv_handle  --, @conv_service
                END
    
        CLOSE @getid
        DEALLOCATE @getid
    

    并且名为“mqMessagesReceived”的表有这个触发器

    CREATE TRIGGER [dbo].[mqMessagesReceived_TriggerUpdate]
    
    ON [dbo].[mqMessagesReceived]
    
    AFTER INSERT
    
    AS
      BEGIN
    
    
        DECLARE     
    
        @strMessageBody nvarchar(4000),
        @strSourceTable nvarchar(128),
        @strSourceKey nvarchar(128),
        @strConfigStoredProcedure nvarchar(4000),
        @sqlRunStoredProcedure nvarchar(4000),
        @strErr nvarchar(4000)
    
    
        SELECT @strMessageBody= ins.tMessageBody FROM INSERTED ins; 
    
        SELECT @strSourceTable = (select txt_Value from dbo.fn_ParseText2Table(@strMessageBody,'|') WHERE Position=2);
    
        SELECT @strSourceKey =  (select txt_Value from dbo.fn_ParseText2Table(@strMessageBody,'|') WHERE Position=3);
    
        
        -- look in mqMessagesConfig to find the name of the final stored procedure 
        -- to run against the SourceTable
        -- e.g.  @strConfigStoredProcedure = mqProcess-tblLabDaySchedEventsMQ
    
        SELECT @strConfigStoredProcedure =
        (select tTriggeredStoredProcedure from dbo.mqMessagesConfig WHERE tSourceTableReceived = @strSourceTable);
    
        
    
        SET @sqlRunStoredProcedure = 'EXEC [' + @strConfigStoredProcedure + ']  @iKey = ' + @strSourceKey + ';';
        EXECUTE(@sqlRunStoredProcedure);
    
    
        INSERT INTO [mqMessagesProcessed]
        ( 
            [tMessageBody],
            [tSourceTable],
            [tSourceKey],
            [tTriggerStoredProcedure]
        )
    
        VALUES  
        (
            @strMessageBody,
            @strSourceTable,
            @strSourceKey,
            @sqlRunStoredProcedure
        );
    
    
      END
    

    另外,我发现我还必须做一些一般性的 SQL Server 调优建议(用于处理繁忙的数据库)

    默认情况下,每个 SQL Server 只有一个 TempDB 文件,而 TempDB 的初始大小为 8MB

    但是,每次服务器重新启动时,TempDB 都会重置回初始的 8MB 大小,并且该公司每个周末都通过 cron/taskscheduler 重新启动服务器。

    我们看到的问题是缓慢的数据库和大量的记录锁,但这只是周一早上的第一件事,当时每个人都在开始他们的工作周时立即敲击数据库。

    当 TempDB 自动调整大小时,它被“锁定”,因此根本没有人可以使用该单个 TempDB(这就是 SQL Server 经常变得无响应的原因)

    到周五,TempDB 已增长到超过 300MB。

    所以...为了解决以下最佳实践建议,我为每个 vCPU 创建了一个 TempDB 文件,因此我创建了 8 个 TempDB 文件,并将它们分布在该服务器上的两个可用硬盘驱动器上,最重要的是,设置它们的初始大小超过我们需要的大小(我选择了每个 200MB)。

    这解决了每周一早上遇到的 SQL Server 减速和记录锁定问题。

    【讨论】:

      猜你喜欢
      • 2010-11-21
      • 2013-09-07
      • 2012-08-01
      • 1970-01-01
      • 2014-03-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-09-14
      相关资源
      最近更新 更多