经过几天的努力拜读大牛高手文章,终于对事件总线有所了解,特此记录下来,以免忘记

1、定义相关的接口:

  A  事件接口

 1 public interface IDomainEvent
 2     {
 3         DateTime OccurredOn();
 4 
 5         /// <summary>
 6         /// 设置为已读
 7         /// </summary>
 8         /// <returns></returns>
 9         void Read();
10 
11         /// <summary>
12         /// 是否已读
13         /// </summary>
14         bool IsRead { get; }
15     }

  B  事件订阅接口

    public interface IDomainEventSubscriber
    {
        Type SubscribedToEventType();

        void Handle(object domainEvent);
    }

2、定义相关实现

  A  事件实现

 public abstract class DomainEvent :  IDomainEvent
    {
        public readonly DateTime OccurredOnTime;
        public string ID;

        protected DomainEvent()
        {
            this.ID = Guid.NewGuid().ToString();
            this.OccurredOnTime = DateTime.Now;
            this.IsRead = false;
        }

        public DateTime OccurredOn()
        {
            return this.OccurredOnTime;
        }

        public void Read()
        {
            this.IsRead = true;
        }

        public bool IsRead { get; private set; }
    }

  B  事件订阅实现

    public abstract class DomainEventSubscriber<T> : IDomainEventSubscriber where T : IDomainEvent
    {
        /// <summary>订阅的事件类型
        /// </summary>
        /// <returns></returns>
        public Type SubscribedToEventType()
        {
            return typeof(T);
        }

        public abstract void HandleEvent(T domainEvent);

        public void Handle(object domainEvent)
        {
            if (domainEvent is T)
            {
                this.HandleEvent((T)domainEvent);
            }
            else
            {
                throw new NotSupportedException(string.Format("当前订阅者支持的事件类型是:{0},当前事件是:{1}", typeof(T).FullName, domainEvent.GetType().FullName));
            }
        }
    }

3、定义事件总线实现

 public class DomainEventBus
    {
        public delegate void DistributeExceptionHandle(IDomainEventSubscriber subscriber, IDomainEvent domainEvent, Exception exception);
        /// <summary>
        /// Key:DomainEvent的类型,Value订阅该类型事件的订阅者列表
        /// </summary>
        private static readonly Dictionary<Type, List<IDomainEventSubscriber>> _subscribers = new Dictionary<Type, List<IDomainEventSubscriber>>();

        private static readonly object _lockObj = new object();

        public event DistributeExceptionHandle DistributeExceptionEvent;

        private static DomainEventBus _instance;
        public static DomainEventBus Instance()
        {
            if (_instance != null)
                return _instance;
            var temp = new DomainEventBus();
            Interlocked.CompareExchange(ref _instance, temp, null);
            return temp;
        }

        public void Publish<T>(T aDomainEvent) where T : IDomainEvent
        {
            if (aDomainEvent.IsRead)
                return;

            var registeredSubscribers = _subscribers;
            if (registeredSubscribers != null)
            {
                var domainEventType = aDomainEvent.GetType();
                List<IDomainEventSubscriber> subscribers;
                if (!registeredSubscribers.TryGetValue(domainEventType, out subscribers))
                {
                    aDomainEvent.Read();  //未找到订阅者,但是消息还是消费掉。
                    return;
                }

                foreach (var domainEventSubscriber in subscribers)
                {
                    var subscribedTo = domainEventSubscriber.SubscribedToEventType();
                    if (subscribedTo == domainEventType || subscribedTo is IDomainEvent)
                    {
                        Distribute(domainEventSubscriber, aDomainEvent);
                    }
                }

                aDomainEvent.Read();
            }
        }

        private void Distribute(IDomainEventSubscriber subscriber, IDomainEvent domainEvent)
        {
            try
            {
                subscriber.Handle(domainEvent);
            }
            catch (Exception ex)
            {
                OnDistributeExceptionEvent(subscriber, domainEvent, ex);
            }
        }

        public void Subscribe(IDomainEventSubscriber aSubscriber)
        {
            lock (_lockObj)
            {
                var registeredSubscribers = _subscribers;

                var domainEventType = aSubscriber.SubscribedToEventType();
                List<IDomainEventSubscriber> subscribers;

                if (!registeredSubscribers.TryGetValue(domainEventType, out subscribers))
                {
                    subscribers = new List<IDomainEventSubscriber>();
                    registeredSubscribers.Add(domainEventType, subscribers);
                }

                if (subscribers.Any(ent => ent.SubscribedToEventType().FullName == aSubscriber.SubscribedToEventType().FullName && ent.GetType().FullName == aSubscriber.GetType().FullName))  //相同的订阅只接收一次。
                    return;

                subscribers.Add(aSubscriber);
            }
        }

        protected virtual void OnDistributeExceptionEvent(IDomainEventSubscriber subscriber, IDomainEvent domainEvent, Exception exception)
        {
            var handler = DistributeExceptionEvent;
            if (handler != null)
                handler(subscriber, domainEvent, exception);
        }
    }

4、定义具体的事件

public class OrderCreated : DomainEventCore.DomainEvent
    {
        public string OrderId { get; private set; }

        public string UserId { get; private set; }

        public string Receiver { get; private set; }

        public OrderCreated(string orderId, string userId, string receiver)
        {
            this.OrderId = orderId;
            this.UserId = userId;
            this.Receiver = receiver;
        }
    }
View Code

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2021-07-01
  • 2021-06-22
  • 2021-10-30
  • 2021-09-06
  • 2021-11-11
  • 2021-11-17
猜你喜欢
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-03-05
  • 2021-09-30
  • 2021-07-05
  • 2021-06-15
相关资源
相似解决方案