【发布时间】:2017-01-10 09:10:06
【问题描述】:
我已经实现了来自 Oracle AQ 的消息的出列,并将其作为 IObservable 公开给系统。工作流程如下:-
- 应用程序收到来自 Oracle 的关于新消息的回调事件。
- 应用程序将消息出列并将其添加到 IObservable(消息出列作为事务的一部分,该事务在消息出列后立即提交)。
我意识到一个潜在的问题,那就是当消息出队时,事务会立即提交,而不是等待应用程序成功使用它。下面是我正在使用的代码,但需要建议在应用程序成功使用事务后在何处/如何提交事务。目前它在私有 Dequeue 方法中启动并提交/回滚事务。
public sealed class Queue<T> : IQueue<T> where T : IQueueDataType
{
private readonly OracleConnection _connection;
private readonly string _consumerName;
private readonly IQueueSetting _queueSetting;
private readonly IDbConnectionFactory _dbConnectionFactory;
private OracleAQQueue _queue;
private IObservable<T> _messages;
private bool _isDisposed;
public Queue(IDbConnectionFactory dbConnectionFactory, IDalSettings dalSettings, IQueueSetting queueSetting)
{
_dbConnectionFactory = dbConnectionFactory;
_connection = dbConnectionFactory.Create() as OracleConnection;
_consumerName = dalSettings.Consumer;
_queueSetting = queueSetting;
}
public void Connect()
{
_connection.Open();
_queue = new OracleAQQueue(_queueSetting.QueueName, _connection)
{
DequeueOptions = { Wait = 10, Visibility = OracleAQVisibilityMode.Immediate , ConsumerName = _consumerName, NavigationMode = OracleAQNavigationMode.FirstMessage, DequeueMode = OracleAQDequeueMode.Remove},
UdtTypeName = _queueSetting.QueueDataTypeName,
MessageType = OracleAQMessageType.Udt
};
_queue.NotificationConsumers = new[] { _consumerName };
_messages = Observable
.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs>(
h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
try
{
Log.Info("Msg received", "Queue", _queueSetting.QueueName);
OracleAQMessage msg = Dequeue();
Log.Info("Msg received id " + msg.MessageId, "Queue", _queueSetting.QueueName);
return (T)msg.Payload;
}
catch (Exception e)
{
}
}).Publish().RefCount();
}
private OracleAQMessage Dequeue()
{
using (var connection = _dbConnectionFactory.Create() as OracleConnection)
{
try
{
connection.Open();
using (OracleTransaction transaction = connection.BeginTransaction())
{
try
{
OracleAQMessage msg = _queue.Dequeue();
**transaction.Commit();**
return msg;
}
catch (Exception e)
{
**transaction.Rollback();**
throw;
}
}
}
catch (Exception e)
{
Log.Error(string.Format("Error occurred while connecting to database to dequeue new message. Error : {0}", e),
"Dequeue", GetType().FullName);
throw;
}
finally
{
connection.Close();
}
}
}
public IObservable<T> GetMessages()
{
return _messages;
}
public void Dispose()
{
if (!_isDisposed)
{
if (_queue != null)
{
_queue.Dispose();
}
_connection.Dispose();
_isDisposed = true;
}
}
}
如果不使用IObservable,我只是公开一个事件提交和回滚事务将非常容易,但我喜欢我可以用IObservable 做的事情,即我可以运行Linq,但不知道如何提交事务。
【问题讨论】:
标签: c# oracle transactions queue system.reactive