【问题标题】:Read SQL Server Broker messages and publish them using NServiceBus读取 SQL Server Broker 消息并使用 NServiceBus 发布它们
【发布时间】:2014-12-01 09:44:13
【问题描述】:

我对 NServiceBus 非常陌生,在我们的一个项目中,我们希望完成以下任务 -

  1. 每当在Sql server中修改表数据时,构造一条消息并插入到sql server broker queue中
  2. 使用 NServiceBus 读取代理队列消息
  3. 将消息作为另一个事件再次发布,以便其他订阅者 可以应付。

现在是第 2 点,我没有太多线索,如何完成它。

我参考了以下帖子,之后我可以在代理队列中输入消息,但无法在我们的项目中与 NServiceBus 集成,因为 NServiceBus 库版本较旧,并且使用的许多方法也已弃用。因此,将它们与当前版本一起使用会变得非常麻烦,或者如果我以不正确的方式使用它们。

http://www.nullreference.se/2010/12/06/using-nservicebus-and-servicebroker-net-part-2 https://github.com/jdaigle/servicebroker.net

任何有关如何正确执行此操作的帮助都是非常宝贵的。

谢谢。

【问题讨论】:

    标签: nservicebus


    【解决方案1】:

    我正在使用当前版本的 nServiceBus (5)、VS2013 和 SQL Server 2008。我使用 this tutorial 创建了一个数据库更改侦听器,它使用 SQL Server 对象代理和 SQLDependency 来监视对特定表的更改。 (注意这可能在更高版本的 SQL Server 中被弃用)。

    SQL 依赖项允许您使用所有基本 SQL 功能的广泛选择,尽管您需要注意 some restrictions。我稍微修改了教程中的代码以提供更好的错误信息:

        void NotifyOnChange(object sender, SqlNotificationEventArgs e)
        {
            // Check for any errors
            if (@"Subscribe|Unknown".Contains(e.Type.ToString())) { throw _DisplayErrorDetails(e); }
    
            var dependency = sender as SqlDependency;
            if (dependency != null) dependency.OnChange -= NotifyOnChange;
            if (OnChange != null) { OnChange(); }
        }
    
        private Exception _DisplayErrorDetails(SqlNotificationEventArgs e)
        {
            var message = "useful error info";
    
            var messageInner = string.Format("Type:{0}, Source:{1}, Info:{2}", e.Type.ToString(), e.Source.ToString(), e.Info.ToString());
    
            if (@"Subscribe".Contains(e.Type.ToString()) && @"Invalid".Contains(e.Info.ToString()))
                messageInner += "\r\n\nThe subscriber says that the statement is invalid - check your SQL statement conforms to specified requirements (http://stackoverflow.com/questions/7588572/what-are-the-limitations-of-sqldependency/7588660#7588660).\n\n";
    
            return new Exception(messageMain, new Exception(messageInner));
    
        }
    

    我还创建了一个带有“数据库优先”实体框架数据模型的项目,以允许我对更改的数据进行处理。

    [相关部分] 我的 nServiceBus 项目包含两个“作为主机运行”端点,其中一个发布事件消息。第二个端点处理消息。发布者已设置为 IWantToRunAtStartup,它实例化 DBListener 并将我想要作为更改监视器运行的 SQL 语句传递给它。 onChange() 函数被传递一个匿名函数来读取更改的数据并发布消息:

    using statements
    
    namespace Sample4.TestItemRequest
    {
        public partial class MyExampleSender : IWantToRunWhenBusStartsAndStops
        {
            private string NOTIFY_SQL = @"SELECT [id] FROM [dbo].[Test] WITH(NOLOCK) WHERE ISNULL([Status], 'N') = 'N'";
        public void Start() { _StartListening(); }
        public void Stop() { throw new NotImplementedException(); }
    
        private void _StartListening()
        {
            var db = new Models.TestEntities();
    
            // Instantiate a new DBListener with the specified connection string            
            var changeListener = new DatabaseChangeListener(ConfigurationManager.ConnectionStrings["TestConnection"].ConnectionString);
    
            // Assign the code within the braces to the DBListener's onChange event
            changeListener.OnChange += () =>
            {
                /* START OF EVENT HANDLING CODE  */
    
                //This uses LINQ against the EF data model to get the changed records
                IEnumerable<Models.TestItems> _NewTestItems = DataAccessLibrary.GetInitialDataSet(db);
    
                while (_NewTestItems.Count() > 0)
                {
                    foreach (var qq in _NewTestItems)
                    {
                        // Do some processing, if required
    
                        var newTestItem = new NewTestStarted() { ... set properties from qq object ... };
                        Bus.Publish(newTestItem);
                    }
    
                    // Because there might be a number of new rows added, I grab them in small batches until finished.
                    // Probably better to use RX to do this, but this will do for proof of concept
                    _NewTestItems = DataAccessLibrary.GetNextDataChunk(db);
    
                }
    
                changeListener.Start(string.Format(NOTIFY_SQL));
    
                /* END OF EVENT HANDLING CODE  */
    
            };
    
            // Now everything has been set up.... start it running.
            changeListener.Start(string.Format(NOTIFY_SQL));
    
            }
        }
    }
    

    重要 OnChange 事件触发会导致侦听器停止监视。它基本上是一个单一的事件通知器。处理完事件后,最后要做的是重新启动 DBListener。 (您可以在 END OF EVENT HANDLING 注释之前的行中看到这一点)。

    您需要添加对 System.Data 的引用,可能还有 System.Data.DataSetExtensions。

    目前该项目仍处于概念验证阶段,因此我很清楚以上内容可以有所改进。另请记住,我必须删除公司特定的代码,因此可能存在错误。将其视为模板,而不是工作示例。

    我也不知道这是否是放置代码的正确位置——这也是我今天在 StackOverflow 上的部分原因;寻找更好的 ServiceBus 主机代码示例。无论我的代码有什么缺陷,该解决方案都非常有效 - 到目前为止 - 并且也能满足您的目标。

    不要太担心 ServiceBroker 方面的事情。设置好之后,按照教程,SQLDependency 会为您处理细节。

    【讨论】:

    • 只是一件小事:删除 Stop 方法实现中的 NotImplementedException 否则基本上每次关闭端点都会崩溃
    • @Mauro - 当你“实现接口”时,它会由 VS 自动添加。当我清理代码以供发布时,我没有发现它可能仍在我的代码中,因为它仍在开发/测试中。谢谢你接我;我会忘记它,以后它会咬我!
    • @AnyoneStillReadingThis - 我的 NOTIFY_SQL 中的 WITH(NOLOCK) 与 SQLDependency 不兼容。我认为那来自一个初始测试版本,我已经更新了它。这条评论有望让您免去一些头疼的问题。
    • 嗨贾斯汀,+1 为您的回复。它设定了一个初始方向。
    【解决方案2】:

    据我所知,ServiceBroker Transport 非常陈旧,不再受支持。 一种可能的解决方案是使用 SqlDependency (http://msdn.microsoft.com/en-us/library/62xk7953(v=vs.110).aspx) 之类的东西从端点代码中“监控”有趣的表,然后将消息推送到相关队列中。

    .m

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-05-14
      • 2021-12-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多