模拟Spring自实现监听器

 

组件:

  事件(Event):即监听什么。如任务即将执行、任务执行完毕

  监听器(Listener):谁来监听,<E extends Event> :E,对什么时间感兴趣(监听特定类型的事件)

  广播器(Multicaster):发布事件、添加/移除监听器

  事件触发机制:事件什么时候发布

 

1、事件 抽象类Event,其他要监控的事件继承它

/**
 * 事件
 *
 * @author yangyongjie
 * @date 2020/9/30
 * @desc
 */
public abstract class Event<T> {
    /**
     * 事件自身资源,用来发布任务时传递一些参数信息
     */
    protected T source;
    /**
     * 事件发生时间
     */
    private final long timestamp;

    public Event(T source) {
        this.source = source;
        this.timestamp = System.currentTimeMillis();
    }

    public T getSource() {
        return source;
    }

    public void setSource(T source) {
        this.source = source;
    }

    public long getTimestamp() {
        return timestamp;
    }
}

 如流程结束事件:

/**
 * 审批流程结束事件
 *
 * @author yangyongjie
 * @date 2020/10/15
 * @desc
 */
public class AuditFlowFinishEvent extends Event<AuditFlow> {

    public AuditFlowFinishEvent(AuditFlow auditFlow) {
        super(auditFlow);
    }

}

 

 

2、监听器接口 Listener,自定义监听器实现它;泛型E表示要监听的事件 

/**
 * 监听器
 *
 * @author yangyongjie
 * @date 2020/9/30
 * @desc
 */
public interface Listener<E extends Event> {

    /**
     * 当事件发生做一些事情
     *
     * @param event
     */
    void onEvent(E event);
}

 如:审核流程结束监听器

/**
 * 审核流程结束监听器
 *
 * @author yangyongjie
 * @date 2020/10/15
 * @desc
 */
@Component
public class AuditFlowFinishListener implements Listener<AuditFlowFinishEvent> {

    private static final Logger LOGGER = LoggerFactory.getLogger(AuditFlowFinishListener.class);

    @Override
    public void onEvent(AuditFlowFinishEvent event) {
        AuditFlow auditFlow = event.getSource();
        LOGGER.info("流程结束监听器工作了!");
    }
}

 

 

3、广播器,管理监听器,发布事件 (需注入到Spring容器中)

 V1.0:

 1)广播器接口:

/**
 * 事件广播器
 *
 * @author yangyongjie
 * @date 2020/9/30
 * @desc
 */
public interface EventMulticaster {

    /**
     * 添加监听器
     *
     * @param listener
     */
    void addListener(Listener<?> listener);

    /**
     * 移除监听器
     *
     * @param listener
     */
    void removeListener(Listener<?> listener);

    /**
     * 发布事件
     *
     * @param event
     */
    void multicastEvent(Event event);

}

 2)广播器实现类

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 广播器(事件发布器)
 *
 * @author yangyongjie
 * @date 2020/10/15
 * @desc
 */
@Component
public class SimpleEventMulticaster implements EventMulticaster, ApplicationContextAware {

    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleEventMulticaster.class);

    private final Set<Listener<?>> listeners = new LinkedHashSet<>();

    /**
     * 创建一个执行监听器线程池,核心线程数为1,最大线程数为5,线程空闲时间为60s,拒绝策略为打印日志并直接执行被拒绝的任务
     */
    private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(1, 5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(50), new CustomThreadFactory("simpleEventMulticaster"), (r, executor) -> {
        LOGGER.error("Task:{},rejected from:{}", r.toString(), executor.toString());
        // 直接执行被拒绝的任务,JVM另起线程执行
        r.run();
    });

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取容器中实现Listener接口的监听器,并添加到listeners缓存中
        Map<String, Listener> listenerMap = applicationContext.getBeansOfType(Listener.class);
        for (Map.Entry<String, Listener> entry : listenerMap.entrySet()) {
            addListener(entry.getValue());
        }
    }

    @Override
    public void addListener(Listener<?> listener) {
        this.listeners.add(listener);
    }

    @Override
    public void removeListener(Listener<?> listener) {
        this.listeners.remove(listener);
    }

    @Override
    public void multicastEvent(final Event event) {
        multicastEvent(event, true);
    }

    /**
     * 是否异步,默认true
     *
     * @param event
     * @param async
     */
    public void multicastEvent(final Event event, boolean async) {
        Class eventClass = event.getClass();
        Set<Listener> interestedListeners = getInterestedListeners(eventClass);
        // 事件发生,异步调用监听器的事件方法
        if (async) {
            for (final Listener listener : interestedListeners) {
                EXECUTOR.execute(() -> listener.onEvent(event));
            }
        } else {
            for (final Listener listener : interestedListeners) {
                listener.onEvent(event);
            }
        }

    }

    /**
     * 获取对当前事件感兴趣的监听器
     *
     * @param eventClass
     */
    private Set<Listener> getInterestedListeners(Class eventClass) {
        // 存放监听对发布事件感兴趣的监听器
        Set<Listener> interestedListeners = new LinkedHashSet<>();
        for (Listener listener : listeners) {
            // 获取监听器的泛型类型
            ParameterizedType parameterizedType = (ParameterizedType) listener.getClass().getGenericInterfaces()[0];
            Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
            Listener filterListener = null;
            for (Type actualTypeArgument : actualTypeArguments) {
                if (StringUtils.equals(eventClass.getName(), actualTypeArgument.getTypeName())) {
                    filterListener = listener;
                }
            }
            if (filterListener != null) {
                interestedListeners.add(filterListener);
            }
        }
        return interestedListeners;
    }

}

  注1:1.8之前版本  actualTypeArgument.getTypeName() 替换成  ((Class) actualTypeArgument).getName() ;并替换掉 Lambda表达式

  注2:若在监听器的onEvent方法上加了@Transactional注解,那么获取到的监听器的对象就是代理对象,在获取监听器泛型的时候会有问题,需要拿到被代理的目标类对象,然后再去获取泛型类型:

  修改后的getInterestedListeners()方法的for循环第一行:

  ParameterizedType parameterizedType = (ParameterizedType) AopTargetUtil.getTarget(listener).getClass().getGenericInterfaces()[0];

 

  V2.0:

  1)广播器接口:

/**
 * 事件广播器
 *
 * @author yangyongjie
 * @date 2020/9/30
 * @desc
 */
public interface EventMulticaster {

    /**
     * 添加监听器
     *
     * @param listener
     */
    void addListener(Listener<?> listener);

    /**
     * 移除监听器
     *
     * @param listener
     */
    void removeListener(Listener<?> listener);

    /**
     * 发布事件,默认异步执行监听
     *
     * @param event
     */
    void multicastEvent(Event<?> event);

    /**
     * 发布事件,按参数确定异步还是同步执行监听
     *
     * @param event
     * @param async
     */
    void multicastEvent(Event<?> event, boolean async);

}

 

  2)广播器实现类:

 

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.ResolvableType;
import org.springframework.stereotype.Component;

import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 广播器(事件发布器)
 *
 * @author yangyongjie
 * @date 2021/1/28
 */
@Component
public class SimpleEventMulticaster implements EventMulticaster, ApplicationContextAware {

    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleEventMulticaster.class);

    /**
     * 所有监听器的集合
     */
    private final Set<Listener<?>> listeners = new LinkedHashSet<>();

    /**
     * 创建一个执行监听器线程池,核心线程数为2,最大线程数为5,线程空闲时间为60s,拒绝策略为打印日志并直接执行被拒绝的任务
     * 线程数和队列依具体业务而定
     */
    private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
            2,
            5,
            60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(50),
            new CustomThreadFactory("simpleEventMulticaster"),
            (r, executor) -> {
                LOGGER.error("Task:{},rejected from:{}", r.toString(), executor.toString());
                // 直接执行被拒绝的任务,JVM另起线程执行
                r.run();
            });

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取容器中实现Listener接口的监听器,并添加到listeners缓存中
        Map<String, Listener> listenerMap = applicationContext.getBeansOfType(Listener.class);
        for (Map.Entry<String, Listener> entry : listenerMap.entrySet()) {
            addListener(entry.getValue());
        }
    }

    @Override
    public void addListener(Listener<?> listener) {
        this.listeners.add(listener);
    }

    @Override
    public void removeListener(Listener<?> listener) {
        this.listeners.remove(listener);
    }

    /**
     * 发布事件
     *
     * @param event
     */
    @Override
    public void multicastEvent(Event<?> event) {
        multicastEvent(event, true);
    }

    @Override
    public void multicastEvent(Event<?> event, boolean async) {
        Class eventClass = event.getClass();
        Set<Listener<?>> interestedListeners = getInterestedListeners(eventClass);
        // 事件发生,异步调用监听器的事件方法
        for (Listener<?> listener : interestedListeners) {
            if (async) {
                EXECUTOR.execute(() -> invokeListener(listener, event));
            } else {
                invokeListener(listener, event);
            }
        }
    }

    /**
     * 触发监听器事件处理
     *
     * @param listener
     * @param event
     */
    private void invokeListener(Listener listener, Event event) {
        try {
            listener.onEvent(event);
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

    /**
     * 获取对当前事件感兴趣的监听器(监听了当前事件或当前事件的父事件)
     *
     * @param eventClass
     */
    private Set<Listener<?>> getInterestedListeners(Class eventClass) {
        // 存放监听对发布事件感兴趣的监听器
        Set<Listener<?>> interestedListeners = new LinkedHashSet<>();
        for (Listener<?> listener : listeners) {
            // 避免listener被aop代理,因此获取其被代理的listener
            Class<?> targetClass = AopUtils.getTargetClass(listener);
            // 获取监听器的泛型类型(即监听器监听的事件类型)
            ResolvableType genericEventType = ResolvableType.forClass(targetClass).as(Listener.class).getGeneric();
            // 监听器监听的事件类型是否是发布事件类型的父类 或与发布的事件类型相同
            if (genericEventType.isAssignableFrom(eventClass)) {
                interestedListeners.add(listener);
            }
        }
        return interestedListeners;
    }

}

 

 

 

4、使用广播器发布事件(在事件源中发布事件)

    @Autowired
    private EventMulticaster eventMulticaster;

    // 发布事件
    eventMulticaster.multicastEvent(new AuditFlowFinishEvent(auditFlow));

 

 

 附录

 1)AopTargetUtil:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AdvisedSupport;
import org.springframework.aop.framework.AopProxy;
import org.springframework.aop.support.AopUtils;

import java.lang.reflect.Field;

/**
 * 通过代理对象获取被代理的目标对象
 *
 * @author yangyongjie
 * @date 2020/12/10
 * @desc
 */
public class AopTargetUtil {

    private static final Logger LOGGER = LoggerFactory.getLogger(AopTargetUtil.class);

    private AopTargetUtil() {
    }

    /**
     * 获取 目标对象
     *
     * @param proxy 代理对象
     * @return
     * @throws Exception
     */
    public static Object getTarget(Object proxy) {

        // 不是代理对象,即没有被aop代理,直接返回
        if (!AopUtils.isAopProxy(proxy)) {
            return proxy;
        }
        try {
            // cglibProxy
            if (AopUtils.isCglibProxy(proxy)) {
                return getCglibProxyTargetObject(proxy);
                // jdkDynamicProxy
            } else {
                return getJdkDynamicProxyTargetObject(proxy);
            }
        } catch (Exception e) {
            LOGGER.error("获取被代理类对象异常:" + e.getMessage(), e);
            return proxy;
        }
    }


    /**
     * CGLIB方式被代理类的获取
     *
     * @param proxy
     * @return
     * @throws Exception
     */
    private static Object getCglibProxyTargetObject(Object proxy) throws Exception {
        Field h = proxy.getClass().getDeclaredField("CGLIB$CALLBACK_0");
        h.setAccessible(true);
        Object dynamicAdvisedInterceptor = h.get(proxy);

        Field advised = dynamicAdvisedInterceptor.getClass().getDeclaredField("advised");
        advised.setAccessible(true);

        Object target = ((AdvisedSupport) advised.get(dynamicAdvisedInterceptor)).getTargetSource().getTarget();

        return target;
    }

    /**
     * JDK动态代理方式被代理类的获取
     *
     * @param proxy
     * @return
     * @throws Exception
     */
    private static Object getJdkDynamicProxyTargetObject(Object proxy) throws Exception {
        Field h = proxy.getClass().getSuperclass().getDeclaredField("h");
        h.setAccessible(true);
        AopProxy aopProxy = (AopProxy) h.get(proxy);

        Field advised = aopProxy.getClass().getDeclaredField("advised");
        advised.setAccessible(true);

        Object target = ((AdvisedSupport) advised.get(aopProxy)).getTargetSource().getTarget();

        return target;
    }

}
View Code

相关文章: