Ahoo-Wang

目录

  1. 设计重点
  2. 流程图
  3. 伪代码
    2.1. PublishEvent
    2.2. SubscribeEvent
    2.3. Publisher
    2.4. Subscriber
  4. 微服务 强一致性
    3.1 Publisher
    3.2 Subscriber
  5. 事件总线 - 跨服务 最终一致性
    4.1 Publisher & Subscriber 都开启了本地事务,保证了强一致性
    4.2 问题场景一:当 ③ 发布失败怎么办?
    4.3 问题场景二:当 ③ 发布成功,但 ④ 更新事件状态失败怎么办?
    4.4 问题场景三:Publisher 端Ok,Subscriber 消费出错

0. 设计重点

  1. Publisher 本地化 PublishEvent 保证事件发布可靠性
  2. Subscriber 本地化 SubscribeEvent 保证事件订阅可靠性
  3. SubscribeEvent 通过 EventId & HandlerType 组合约束 保证不重复消费事件
  4. 事件*控制台 处理 Publisher & Subscriber 事件重试

1. 执行流程图

执行流程图


2. 伪代码

2.1 PublishEvent

    public abstract class Event
    {
        public Event()
        {
            Id = Guid.NewGuid();
            CreationTime = DateTime.UtcNow;
        }

        public Guid Id { get; set; }
        public DateTime CreationTime { get; set; }
    }

    public class PublishEvent : Event
    {
        public PublishEvent(Event @event)
        {
            Id = @event.Id;
            CreationTime = @event.CreationTime;
            Type = @event.GetType().FullName;
            Data = JsonConvert.SerializeObject(@event);
            Status = PublishEventStatus.NotPublished;
        }

        public String Type { get; set; }
        public String Data { get; set; }
        public PublishEventStatus Status { get; set; }
    }

    public enum PublishEventStatus
    {
        NotPublished = 0,
        Published = 1,
        PublishedFailed = 2
    }

2.2 SubscribeEvent

    public class SubscribeEvent
    {
        public SubscribeEvent(Event @event, IEventHandler handler)
        {
            EventId = @event.Id;
            EventCreationTime = @event.CreationTime;
            EventType = @event.GetType().FullName;
            EventData = JsonConvert.SerializeObject(@event);
            HandlerType = handler.GetType().FullName;
            HandlingStatus = HandlingStatus.HandleSucceeded;
            HandlingTime = DateTime.Now;
        }
        public Guid EventId { get; set; }
        public String EventType { get; set; }
        public String EventData { get; set; }
        public DateTime EventCreationTime { get; set; }
        public String HandlerType { get; set; }
        public DateTime HandlingTime { get; set; }
        public HandlingStatus HandlingStatus { get; set; }
    }
    public enum HandlingStatus
    {
        HandleSucceeded = 0,
        HandleFailed = 1
    }

2.3 Publisher

    try
    {
        BeginTransaction(); // ①
        //Biz Flow
        EventRepository.PubilshEvent(@event);// ②
        CommitTransaction();
    }
    catch(Exception ex){
        RollbackTransaction();
        throw ex;
    }
    EventBus.Publish(@event); // ③
    EventResitory.EventPublished(@event.ToString()); // ④

2.4 Subscriber

    try
    {
        BeginTransaction();
        //Biz Flow
        EventRepository.SubscribeEvent(@event , eventHandler); // ⑤
        CommitTransaction();
    }
    catch(Exception ex){
        RollbackTransaction();
        throw ex;
    }

3. 微服务 强一致性

3.1 Publisher

  1. 开启本地事务达到强一致性
  2. 执行本地业务代码
  3. 本地事务内部保存事件 预发布 状态
  4. 发布事件到事件总线
  5. 修改事件发布状态为已发布

3.2 Subscriber

  1. 开启本地事务达到强一致性
  2. 执行本地业务代码
  3. 保存订阅事件到本地仓库

4 事件总线 - 跨服务 最终一致性

4.1 Publisher & Subscriber 都开启了本地事务,保证了强一致性


4.2 问题场景一:当 ③ 发布失败怎么办?

  1. 发布失败,意味着抛出异常,则 不执行,那么事件状态依然保持 预发布状态
  2. 后续 事件重试 重新发布该事件,并更新事件状态为 已发布

4.3 问题场景二:当 ③ 发布成功,但 ④ 更新事件状态失败怎么办?

4.3.1 场景二·一 Subscriber 订阅成功

  1. 发布成功,但 更新事件状态失败,事件状态依然是 预发布状态
  2. Subscriber 订阅到该事件后成功执行完业务代码
  3. Subscriber 将订阅事件保存到本地订阅事件仓库
    该场景存在的问题: Publisher 会通过 事件重试 再次发布 预发布 状态的事件,那么此时Subscriber 将重复消费该事件
    方案:该问题我们可以通过将 SubscribeEvent EventId & HandlerType 组合唯一约束,来避免重复消费

4.3.2 场景二·二 Subscriber 订阅失败

  1. 发布成功,但 更新事件状态失败,事件状态依然是 预发布状态
  2. Subscriber 执行消费失败
  3. Subscriber 回滚本地事务
    该场景不存在任何问题,因为 Publisher 会通过 事件重试 再次发布 预发布 状态的事件 。

4.4 问题场景三:Publisher 端Ok,Subscriber 消费出错

  1. Publisher 端处理顺利
  2. Subscriber 消费失败,回滚本地事务,此时 SubscribeEvent 未存储到本地仓库
    该场景存在的问题:
    Publisher 发送成功,并且本地 PublishEvent 事件为已发布,那么意味着从Publisher端是无法知道Subscriber消费失败需要重新消费
    解决方案:
  3. 通过检测 PublishEvent & SubscribeEvent 获得需要 事件重试PublishEvent
  4. PublishEvent 重新发布Subscriber

5. 通过Nuget安装组件支持以上编程模型

Install-Package SmartEventBus.RabbitMQImpl
Install-Package SmartEventBus.Repository

6. ORM:SmartSql 广而告之

SmartSql = Dapper + MyBatis + Cache(Memory | Redis) + ZooKeeper + R/W Splitting + ......

相关文章: