smark

事件驱动模型相信对大家来说并不陌生,因为这是一套非常高效的逻辑处理模型,通过事件来驱动接下来需要完成的工作,而不像传统同步模型等待任务完成后再继续!虽然事件驱动有着这样的好处,但在传统设计上基于消息回调的处理方式在业务处理中相对比较麻烦整体设计成本也比较高,所以落地也不容易。EventNext是一个事件驱动的应用框架,它的事件驱动支持接口调用,在一系列的业务接口调用过程中通过事件驱动调用来完成;简单来说组件驱动的接口行为是由上一接口行为完成而触发执行,接下来介绍详细介绍一下EventNext和使用。

NextQueue

EventNext组件有一个核心的事件驱动队列NextQueue,NextQueue和传统的线程队列有着很大的区别;传统队列都是线程不停的执行消息,下一个消息都是线程等待上一个消息完成后再继续。但NextQueue的设计则不是,它的所有消息都基于上一个消息完成来驱动(不管上一个消息的逻辑是同步还是异步)。实际情况是NextQueue触发任务的消息是启用线程工作外,后面的消息都是基于上一个消息回调执行;NextQueue上的消息执行线程是不确定性也不需要等待,虽然队列里的消息执行线程不是唯一的,但执行顺序是一致的这也是NextQueue所带来的好处,在有序的情况下确保线程的利用率更高。

组件使用

在使用组前需要引用组件,Nuget安装如下

Install-Package EventNext

通过组件制定的业务必须以接口的方式来描述,而业务的调用也是通过接口的方式进行;虽然组件支持以消息回调的方式便不建议这样做,毕竟面向业务接口有着更好的易用性和可维护性。为了确保业务接口方式 的行为满足事件驱动队列的要求 ,所有业务行为方法必须以Task作为返回值;非Task返回值的行为方法都不能被组件注册和调用。

接口定义和实现

接口的定义有一定的规则,除了方法返回值是Task外,也不支持同一名称的函数进行重载,如果有需要可以使用特定的Attribute来标记对应的名称(out类型参数不被支持)。以下是一个简单的接口定义:

    public interface IUserService
    {

        Task<int> Income(int value);

        Task<int> Payout(int value);

        Task<int> Amount();

    }

业务实现:

    [Service(typeof(IUserService))]
    public class UserService :  IUserService
    {
        private int mAmount;

        public Task<int> Amount()
        {
            return Task.FromResult(mAmount);
        }

        public Task<int> Income(int value)
        {
            mAmount += value;
            return Task.FromResult(mAmount);
        }

        public Task<int> Payout(int value)
        {
            mAmount -= value;
            return Task.FromResult(mAmount);
        }

    }

需要通过ServiceAttribute来描这个类提供那些事件驱动的接口行为。

使用

组件通过一个EventCenter的对象来进行逻辑调用,创建该对象并注册相应业务功能的程序集即可:

EventCenter eventCenter = new EventCenter();
eventCenter.Register(typeof(Program).Assembly);

定义EventCenter加载逻辑后就可以创建代理接口调用

var service=EventCenter.Create<IUserService>();
await server.Payout(10);
await server.Income(10);

事件驱动队列分配

组件针对不同情况的需要,可以给接口实例或方法定义不同的事件队列配置,主要为以下几种情况

默认

由组件内部队列组进行负载情况进行配置,这种分配方式会导致同一接口的方法有可能分配在不同的队列上;在默认分配下接口实例的方法会存在多线程中同时的运行,因此这种模式的应用并不是线程安全。

Actor

Actor相信大家也很熟悉,一种高性能一致性的调度模型;组件支持这种模型的接口实例创建,只需要在创建接口代理的时候指定Actor名称即可

henry = EventCenter.Create<IUserService>("henry");

当指定Actor名称后,这个接口的所有方法调用都会一致性到对应实例的队列中,即所有功能方法线程调用的唯一性;在接口调用返回的时候也会再次切入到其他事件驱动队列,确保Actor内部的工作队列不受响后的应逻辑影响;当使用这种方式时整个Actor实例都是线程安全的。

ThreadPool

这种配置只适用于接口方法,描述方法无论什么情况都从线程池中执行相关代码,此行为的方法非线程安全

 [ThreadInvoke(ThreadType.ThreadPool)]
public Task<int> ThreadInvoke()
{
            mCount++;
            return mCount.ToTask();
}

SingleQueue

这种配置只适用于接口方法,用于描述方法不管那个实例都一致性到一个队列中,此行为的方法内线程安全,不保证对应实例是线程安全.

        [ThreadInvoke(ThreadType.SingleQueue)]
        public Task<int> GetID([ThreadUniqueID]string name)
        {
            if (!mValues.TryGetValue(name, out int value))
            {
                value = 1;
            }
            else
            {
                value++;
            }
            mValues[name] = value;
            return value.ToTask();
        }

在这配置下还可以再细分,如上面的[ThreadUniqueID]对不同参数做一致性对列,这个时候name的不同值会一致性到不同的事件队列中。

Actor性能对比

组件默认集成了Actor模型,可以通过它实现高并发无锁业务集成,EventNext最大的特点是以接口的方式集成应用,相对于akka.net基于消息接收的模式来说有着明显的应用优势。在性能上EventNext基于接口的ask机制也比akka.net基于消息receive的ask机制要高,以下是一个简单的对比测试

akak.net

 public class UserActor : ReceiveActor
    {
        public UserActor()
        {
            Receive<Income>(Income =>
            {
                mAmount += Income.Memory;
                this.Sender.Tell(mAmount);
            });
            Receive<Payout>(Outlay =>
            {
                mAmount -= Outlay.Memory;
                this.Sender.Tell(mAmount);
            });
            Receive<Get>(Outlay =>
            {
                this.Sender.Tell(mAmount);
            });
        }
        private decimal mAmount;
    }
    //invoke
    Income income = new Income { Memory = i };
    var result = await nbActor.Ask<decimal>(income);
    Payout payout = new Payout { Memory = i };
    var result = await nbActor.Ask<decimal>(payout);

Event Next

    [Service(typeof(IUserService))]
    public class UserService : IUserService
    {
        private int mAmount;   

        public Task<int> Amount()
        {
            return Task.FromResult(mAmount);
        }

        public Task<int> Income(int value)
        {
            mAmount += value;
            return Task.FromResult(mAmount);
        }

        public Task<int> Payout(int value)
        {
            mAmount -= value;
            return Task.FromResult(mAmount);
        }
    }
    //invoke
    var result = await nb.Income(i);
    var result = await nb.Payout(i);

详细测试代码https://github.com/IKende/EventNext/tree/master/samples/EventNext_AkkaNet 在默认配置下不同并发下的测试结果

 

Event Sourcing

由于事件驱动提倡的业务处理都是异步,这样就带来一个业务事务性的问题,如何确保不同接口方法业务处理一致性就比较关键了。由于不同的逻辑在不同线程中异步进行,所以相对比较好解决的就是在业务处理时引入Event Sourcing.以下就简单介绍一下组件这方面的应用,就不详细介绍了。毕竟 Event Sourcing设计和业务还有着一些关系

        public async Task<long> Income(int amount)
        {
            await EventCenter.WriteEvent(this, null, null, new { History = user.Amount, Change = amount, Value = user.Amount + amount });
            user.Amount += amount;
            return user.Amount;
        }

        public async Task<long> Pay(int amount)
        {
            await EventCenter.WriteEvent(this, null, null, new { History = user.Amount, Change = -amount, Value = user.Amount - amount });
            user.Amount -= amount;
            return user.Amount;
        }

组件提供事件信息的读写接口IEventLogHandler可以通过实现这个接口扩展自己的事件源处理。

使用注意事项

适应async/await

其实整个事件队列都是使用async/await,通过它大大简化了消息和回调函数间不同数据状态整合的难度。.Net也现有所异步API都支持async/wait

异步化设计你的逻辑

在实现接口逻辑的情况尽可能使和异步逻辑方法,在逻辑实施过程中禁用Task.Wait或一些线程相关Wait的方法,特别不带超时的Wait因为这种操作极容易导致事件驱动队列逻辑被挂起,导致队列无法正常工作;更糟糕的情况可能引起事件队列假死的情况。

传统异步API

由于各种原因,可能还存在旧的异步API不支持async/wait,出现这情况可以通过TaskCompletionSource来扩展已经有的异步方法支持async/wait

项目地址

https://github.com/IKende/EventNext

分类:

技术点:

相关文章: