我正在使用当前版本的 nServiceBus (5)、VS2013 和 SQL Server 2008。我使用 this tutorial 创建了一个数据库更改侦听器,它使用 SQL Server 对象代理和 SQLDependency 来监视对特定表的更改。 (注意这可能在更高版本的 SQL Server 中被弃用)。
SQL 依赖项允许您使用所有基本 SQL 功能的广泛选择,尽管您需要注意 some restrictions。我稍微修改了教程中的代码以提供更好的错误信息:
void NotifyOnChange(object sender, SqlNotificationEventArgs e)
{
// Check for any errors
if (@"Subscribe|Unknown".Contains(e.Type.ToString())) { throw _DisplayErrorDetails(e); }
var dependency = sender as SqlDependency;
if (dependency != null) dependency.OnChange -= NotifyOnChange;
if (OnChange != null) { OnChange(); }
}
private Exception _DisplayErrorDetails(SqlNotificationEventArgs e)
{
var message = "useful error info";
var messageInner = string.Format("Type:{0}, Source:{1}, Info:{2}", e.Type.ToString(), e.Source.ToString(), e.Info.ToString());
if (@"Subscribe".Contains(e.Type.ToString()) && @"Invalid".Contains(e.Info.ToString()))
messageInner += "\r\n\nThe subscriber says that the statement is invalid - check your SQL statement conforms to specified requirements (http://stackoverflow.com/questions/7588572/what-are-the-limitations-of-sqldependency/7588660#7588660).\n\n";
return new Exception(messageMain, new Exception(messageInner));
}
我还创建了一个带有“数据库优先”实体框架数据模型的项目,以允许我对更改的数据进行处理。
[相关部分] 我的 nServiceBus 项目包含两个“作为主机运行”端点,其中一个发布事件消息。第二个端点处理消息。发布者已设置为 IWantToRunAtStartup,它实例化 DBListener 并将我想要作为更改监视器运行的 SQL 语句传递给它。 onChange() 函数被传递一个匿名函数来读取更改的数据并发布消息:
using statements
namespace Sample4.TestItemRequest
{
public partial class MyExampleSender : IWantToRunWhenBusStartsAndStops
{
private string NOTIFY_SQL = @"SELECT [id] FROM [dbo].[Test] WITH(NOLOCK) WHERE ISNULL([Status], 'N') = 'N'";
public void Start() { _StartListening(); }
public void Stop() { throw new NotImplementedException(); }
private void _StartListening()
{
var db = new Models.TestEntities();
// Instantiate a new DBListener with the specified connection string
var changeListener = new DatabaseChangeListener(ConfigurationManager.ConnectionStrings["TestConnection"].ConnectionString);
// Assign the code within the braces to the DBListener's onChange event
changeListener.OnChange += () =>
{
/* START OF EVENT HANDLING CODE */
//This uses LINQ against the EF data model to get the changed records
IEnumerable<Models.TestItems> _NewTestItems = DataAccessLibrary.GetInitialDataSet(db);
while (_NewTestItems.Count() > 0)
{
foreach (var qq in _NewTestItems)
{
// Do some processing, if required
var newTestItem = new NewTestStarted() { ... set properties from qq object ... };
Bus.Publish(newTestItem);
}
// Because there might be a number of new rows added, I grab them in small batches until finished.
// Probably better to use RX to do this, but this will do for proof of concept
_NewTestItems = DataAccessLibrary.GetNextDataChunk(db);
}
changeListener.Start(string.Format(NOTIFY_SQL));
/* END OF EVENT HANDLING CODE */
};
// Now everything has been set up.... start it running.
changeListener.Start(string.Format(NOTIFY_SQL));
}
}
}
重要 OnChange 事件触发会导致侦听器停止监视。它基本上是一个单一的事件通知器。处理完事件后,最后要做的是重新启动 DBListener。 (您可以在 END OF EVENT HANDLING 注释之前的行中看到这一点)。
您需要添加对 System.Data 的引用,可能还有 System.Data.DataSetExtensions。
目前该项目仍处于概念验证阶段,因此我很清楚以上内容可以有所改进。另请记住,我必须删除公司特定的代码,因此可能存在错误。将其视为模板,而不是工作示例。
我也不知道这是否是放置代码的正确位置——这也是我今天在 StackOverflow 上的部分原因;寻找更好的 ServiceBus 主机代码示例。无论我的代码有什么缺陷,该解决方案都非常有效 - 到目前为止 - 并且也能满足您的目标。
不要太担心 ServiceBroker 方面的事情。设置好之后,按照教程,SQLDependency 会为您处理细节。