【问题标题】:Oracle AQ call back with Reactive Extension and transaction ODP.NET使用响应式扩展和事务 ODP.NET 进行 Oracle AQ 回调
【发布时间】:2017-01-10 09:10:06
【问题描述】:

我已经实现了来自 Oracle AQ 的消息的出列,并将其作为 IObservable 公开给系统。工作流程如下:-

  1. 应用程序收到来自 Oracle 的关于新消息的回调事件。
  2. 应用程序将消息出列并将其添加到 IObservable(消息出列作为事务的一部分,该事务在消息出列后立即提交)。

我意识到一个潜在的问题,那就是当消息出队时,事务会立即提交,而不是等待应用程序成功使用它。下面是我正在使用的代码,但需要建议在应用程序成功使用事务后在何处/如何提交事务。目前它在私有 Dequeue 方法中启动并提交/回滚事务。

public sealed class Queue<T> : IQueue<T> where T : IQueueDataType
{
    private readonly OracleConnection _connection;

    private readonly string _consumerName;

    private readonly IQueueSetting _queueSetting;

    private readonly IDbConnectionFactory _dbConnectionFactory;

    private OracleAQQueue _queue;

    private IObservable<T> _messages;

    private bool _isDisposed;

    public Queue(IDbConnectionFactory dbConnectionFactory, IDalSettings dalSettings, IQueueSetting queueSetting)
    {
        _dbConnectionFactory = dbConnectionFactory;
        _connection = dbConnectionFactory.Create() as OracleConnection;
        _consumerName = dalSettings.Consumer;
        _queueSetting = queueSetting;

    }

    public void Connect()
    {            
        _connection.Open();
        _queue = new OracleAQQueue(_queueSetting.QueueName, _connection)
        {
            DequeueOptions = { Wait = 10, Visibility = OracleAQVisibilityMode.Immediate , ConsumerName = _consumerName, NavigationMode = OracleAQNavigationMode.FirstMessage, DequeueMode = OracleAQDequeueMode.Remove},
            UdtTypeName = _queueSetting.QueueDataTypeName,
            MessageType = OracleAQMessageType.Udt
        };

        _queue.NotificationConsumers = new[] { _consumerName };

        _messages = Observable
            .FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs>(
                h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h)
            .Where(x => x.EventArgs.AvailableMessages > 0)
            .Select(x =>
            {
                try
                {
                Log.Info("Msg received", "Queue", _queueSetting.QueueName);

                    OracleAQMessage msg = Dequeue();

                    Log.Info("Msg received id " + msg.MessageId, "Queue", _queueSetting.QueueName);
                    return (T)msg.Payload;
                }
                catch (Exception e)
                {

                }
            }).Publish().RefCount();

    }

    private OracleAQMessage Dequeue()
    {
        using (var connection = _dbConnectionFactory.Create() as OracleConnection)
        {
            try
            {
                connection.Open();
                using (OracleTransaction transaction = connection.BeginTransaction())
                {
                    try
                    {
                        OracleAQMessage msg = _queue.Dequeue();
                        **transaction.Commit();**
                        return msg;
                    }
                    catch (Exception e)
                    {                           
                        **transaction.Rollback();**
                        throw;
                    }
                }
            }
            catch (Exception e)
            {
                Log.Error(string.Format("Error occurred while connecting to database to dequeue new message. Error : {0}", e),
                    "Dequeue", GetType().FullName);
                throw;
            }
            finally
            {
                connection.Close();
            }
        }
    }

    public IObservable<T> GetMessages()
    {
        return _messages;
    }

    public void Dispose()
    {           
        if (!_isDisposed)
        {
            if (_queue != null)
            {
                _queue.Dispose();    
            }

            _connection.Dispose();
            _isDisposed = true;
        }
    }
}

如果不使用IObservable,我只是公开一个事件提交和回滚事务将非常容易,但我喜欢我可以用IObservable 做的事情,即我可以运行Linq,但不知道如何提交事务。

【问题讨论】:

    标签: c# oracle transactions queue system.reactive


    【解决方案1】:

    我认为这里没有简单的解决方法。如果我理解正确:

    1. 从 Oracle 推送一个事件,
    2. 您想通过IObservable 流公开事件,
    3. 应用程序“处理”它,
    4. 如果处理成功则要提交,否则回滚。

    问题在于IObservable 是一种单向机制。一旦你发布了一条消息(在我们的例子中,你从这个 Oracle 队列中得到了一些东西),目的不是跟踪它,而是稍后决定是否提交/回滚。因此,您的选择几乎是将您的应用程序逻辑填充到某种形式的处理程序中:

    Func<OracleMessage, bool> isMessageCommitable; //...application handling logic here
    
    var appHandledMessages = oracleSourceMessages
        .Select(m => Tuple.Create(m, isMessageCommitable(m)))
        .Publish()
        .RefCount();
    
    appHandledMessages
        .Where(t => t.Item2)
        .Subscribe(t => Commit(t.Item1));
    
    appHandledMessages
        .Where(t => !t.Item2)
        .Subscribe(t => Rollback(t.Item1));
    

    ...或者设置IObservables 指向另一种方式,该方式将从应用程序推回队列,哪些消息应该被提交/回滚。您可能需要两个,一个用于提交,一个用于回滚,这些可能应该传递给Queue&lt;T&gt; 的构造函数。

    祝你好运。

    【讨论】:

    • 谢谢。您能否提供第二种情况的示例“IObservables 指向另一个方向......”
    猜你喜欢
    • 2013-09-26
    • 1970-01-01
    • 2023-03-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多