设计模式之发布订阅模式(3) 深入Spring Events事件驱动模型

5,854 阅读9分钟

之前文章中我们讲解了 发布订阅模式的核心概念 ,并通过 Redis的 Pub/Sub 命令 演示了其分布式场景下的实现。相比后面要讲到的 Guava EventBus,可以说 Spring Events 的使用更加普遍,其功能也更加强大。

事件(Events)是框架中经常被忽略的、重要的功能,也是发布/订阅模式的一种常见实现。Spring框架本身就是事件驱动的。

下面我们就一起看一下Spring容器中的事件驱动模型,然后一起快速实现一个自定义的事件发布和监听,接着再分别探讨一下同步和异步的事件监听如何实现,再接着介绍一下如何定义监听器的顺序,最后提供一个基于SpEL(Spring Expression Language )实现的条件化的事件监听机制。

学完本节课程,将你会发现:

  1. 通过事件机制将代码解耦,会让自己的代码非常干净,且扩展性极强。
  2. 异步的事件处理机制,可以大大提高程序的响应速度,且内部的线程池会大大提高程序的并发效率。
  3. 条件化和泛型化的监听器可以让你减少很多显式的逻辑判断,从而让每个事件监听的原子性更强。

🚜 本文源码Github地址

Spring本身的事件驱动模型

Spring 容器与事件模型

Spring的事件机制主要提供了如下几个接口和类:

  • ApplicationContextEvent

Spring提供的事件抽象类,你可以继承它来实现自定义的事件。

  • ApplicationEventMulticaster

ApplicationEventMulticaster是一个事件广播器, 它的作用是把Applicationcontext发布的Event广播给所有的监听器。

  • ApplicationListener

ApplicationListener继承自EventListener, 所有的监听器都要实现这个接口。

这个接口只有一个onApplicationEvent()方法, 该方法接受一个ApplicationEvent或其子类对象作为参数, 在方法体中,可以通过不同对Event类的判断来进行相应的处理。

当事件触发时所有的监听器都会收到消息, 如果你需要对监听器的接收顺序有要求,可是实现该接口的一个实现SmartApplicationListener, 通过这个接口可以指定监听器接收事件的顺序。

  • ApplicationContext

实现事件机制需要三个部分:事件源、事件和事件监听器。 上面介绍的ApplicationEvent相当于事件, ApplicationListener相当于事件监听器, 这里的事件源说的就是ApplicationContext

ApplicationContext是Spring中的全局容器, 也叫"应用上下文", 它负责读取bean的配置, 管理bean的加载, 维护bean之间的依赖关系, 也就是负责管理bean的整个生命周期。

ApplicationContext就是我们平时所说的IOC容器。

  • ApplicationContextAware

当一个类实现了ApplicationContextAware接口之后,Aware接口的Bean在被初始之后,可以取得一些相对应的资源,这个类可以直接获取 spring 配置文件中所有注入的bean对象。

Spring提供了很多以Aware结尾的接口,通过实现这些接口,你就获得了获取Spring容器内资源的能力。

Spring本身实现了如下4个Event:

  • ContextStartedEvent (容器启动)
  • ContextStoppedEvent (容器停止)
  • ContextClosedEvent (容器关闭)
  • ContextRefreshedEvent (容器刷新)

自定义 Spring Events 实现

自定义Spring的事件模型需要三个角色:事件(Event)、发布者(Publisher)、监听者(Listerner)。

自定义Event

下面自定义了一个注册事件,这个Event的构造函数提供了两个参数,一个是发布源(source),一个是发布的消息(message)。

package net.ijiangtao.tech.designpattern.pubsub.spring.common;

import lombok.Getter;
import org.springframework.context.ApplicationEvent;

/**
 * 注册事件
 * @author ijiangtao
 * @create 2019-05-02 12:59
 **/
@Getter
public class RegisterEvent extends ApplicationEvent {

    private String message;

    public RegisterEvent(Object source, String message) {
        super(source);
        this.message = message;
    }

}

自定义Publisher

下面提供了一个发布自定义事件的发布器,我们通过ApplicationEventPublisher来把事件发布出去。

package net.ijiangtao.tech.designpattern.pubsub.spring.common;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/**
 * 注册事件发布器
 * @author ijiangtao
 * @create 2019-05-02 13:01
 **/
@Component
@Slf4j
public class RegisterEventPublisher {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    public void publish(final String message) {
        log.info("publis a RegisterEvent,message:{}", message + " time: " + LocalTime.now());
        RegisterEvent registerEvent = new RegisterEvent(this, message);
        applicationEventPublisher.publishEvent(registerEvent);
    }
}

自定义Listener

下面提供几个自定义事件的监听器,它们都实现了ApplicationListener<RegisterEvent>接口,同时为了模拟处理事件的过程,这里让当前线程休眠了3秒。因为实现过程类似,这里仅提供一个实现。

package net.ijiangtao.tech.designpattern.pubsub.spring.common;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/**
 * 发送注册成功邮件提醒
 *
 * @author ijiangtao
 * @create 2019-05-02 13:07
 **/
@Component
@Slf4j
public class SendRegisterEmailListener implements ApplicationListener<RegisterEvent> {
    @Override
    public void onApplicationEvent(RegisterEvent event) {
        try {
            Thread.sleep(3 * 1000);
        } catch (Exception e) {
            log.error("{}", e);
        }
        log.info("SendRegisterEmailListener message: " + event.getMessage()+" time: "+ LocalTime.now());
    }

}

测试自定义事件机制

下面通过一个单元测试发布了自定义事件。通过观察log输出,发现事件发布以后,每个监听器都依次输出了监听日志。

package net.ijiangtao.tech.designpattern.pubsub.spring;

import lombok.extern.slf4j.Slf4j;
import net.ijiangtao.tech.designpattern.pubsub.spring.common.RegisterEventPublisher;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * Spring Events
 *
 * @author ijiangtao
 * @create 2019-05-02 12:53
 **/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringEventsCommonTests {

    @Autowired
    private RegisterEventPublisher registerEventPublisher;

    @Test
    public void test1(){
        registerEventPublisher.publish(" Danny is here.");
        try {
            Thread.sleep(10 * 1000);
        } catch (Exception e) {
            log.error("{}", e);
        }
    }

}

这样,一个基于Spring Events的事件监听器就实现了。

实现异步的 Spring Events

通过观察日志中打印的时间你会发现,上面注册的所有监听器,都是依次执行的,也就是Spring Events的事件处理默认是同步的。同步的事件监听耗时比较长,需要等待上一个监听处理结束,下一个监听器才能执行。

那么能不能改成异步监听呢?答案是肯定的。下面介绍两种实现方式。

配置

通过JDK提供的SimpleApplicationEventMulticaster将事件广播出去,就可以实现异步并发地让多个监听器同时执行事件监听动作。

package net.ijiangtao.tech.designpattern.pubsub.spring.async.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ApplicationEventMulticaster;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

/**
 * 异步事件监听配置
 *
 * @author ijiangtao
 * @create 2019-05-02 13:23
 **/
@Configuration
public class AsynchronousSpringEventsConfig {

    @Bean(name = "applicationEventMulticaster")
    public ApplicationEventMulticaster simpleApplicationEventMulticaster() {
        SimpleApplicationEventMulticaster eventMulticaster  = new SimpleApplicationEventMulticaster();
        eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return eventMulticaster;
    }
}

通过SimpleApplicationEventMulticaster的源码可以看到它的multicastEvent方法会通过线程池并发执行事件发布动作。

public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
     ResolvableType type = eventType != null ? eventType : this.resolveDefaultEventType(event);
     Iterator var4 = this.getApplicationListeners(event, type).iterator();

     while(var4.hasNext()) {
         ApplicationListener<?> listener = (ApplicationListener)var4.next();
         Executor executor = this.getTaskExecutor();
         if (executor != null) {
             executor.execute(() -> {
                 this.invokeListener(listener, event);
             });
         } else {
             this.invokeListener(listener, event);
         }
     }

 }

注解

通过注解的方式发布事件,只需要在Listener上加上@Async,并且在发布事件的地方加上@EnableAsync注解即可。

package net.ijiangtao.tech.designpattern.pubsub.spring.async.annotation;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/**
 * 发送优惠券
 *
 * @author ijiangtao
 * @create 2019-05-02 13:07
 **/
@Component
@Slf4j
public class UserActionListenerAsyncAnnotation implements ApplicationListener<RegisterEvent> {

    @Async
    @Override
    public void onApplicationEvent(RegisterEvent event) {
        try {
            Thread.sleep(3 * 1000);
        } catch (Exception e) {
            log.error("{}", e);
        }
        log.info("UserActionListener message: " + event.getMessage()+" time: "+ LocalTime.now());
    }

}
package net.ijiangtao.tech.designpattern.pubsub.spring;

import lombok.extern.slf4j.Slf4j;
import net.ijiangtao.tech.designpattern.pubsub.spring.async.annotation.RegisterEventPublisherAsyncAnnotation;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * Spring Events
 *
 * @author ijiangtao
 * @create 2019-05-02 12:53
 **/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
@EnableAsync
public class SpringEventsAsyncAnnotationTests {

    @Autowired
    private RegisterEventPublisherAsyncAnnotation registerEventPublisherAsyncAnnotation;

    @Test
    public void test2() {
        registerEventPublisherAsyncAnnotation.publish(" Danny is here (Async).");
        try {
            Thread.sleep(10 * 1000);
        } catch (Exception e) {
            log.error("{}", e);
        }
    }

}

实现 Smart Listener

通过实现SmartApplicationListener接口,可以自定义监听器的执行顺序、支持的事件类型等。

package net.ijiangtao.tech.designpattern.pubsub.spring.smart;

import lombok.Getter;
import org.springframework.context.ApplicationEvent;

/**
 * event
 *
 * @author ijiangtao
 * @create 2019-05-02 15:33
 **/
@Getter
public class SmartEvent extends ApplicationEvent {

    private String message;

    public SmartEvent(Object source, String message) {
        super(source);
    }

}
package net.ijiangtao.tech.designpattern.pubsub.spring.smart;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

/**
 * SmartApplicationListener
 *
 * @author ijiangtao
 * @create 2019-05-02 15:32
 **/
@Component
@Slf4j
public class CustomSmartApplicationListener1 implements SmartApplicationListener {


    /**
     * 自定义支持的事件类型
     * @param eventType
     * @return
     */
    @Override
    public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
        return eventType == SmartEvent.class;
    }

    /**
     * 定义支持的事件源类型
     * @param sourceType
     * @return
     */
    @Override
    public boolean supportsSourceType(Class<?> sourceType) {
        return sourceType == String.class;
    }

    /**
     * 自定义优先级别
     * @return
     */
    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }

    @Override
    public void onApplicationEvent(ApplicationEvent applicationEvent) {
        log.info("CustomSmartApplicationListener {}",applicationEvent.getSource());
    }

}

条件化的事件监听

有时候我们希望一个监听器希望监听多个事件,例如一个系统安全监听器(SecurityEventListener),可以监听各种系统安全问题(NetWorkSecurityEvent、SQLSecurityEvent、AuthorizationSecurityEvent,等等),这个是时候你可以让监听器监听这些Event的父类SecurityEvent,这样你的监听器就可以监听到所有该Event的子类型。

有时候我们需要根据同一个事件抛出的消息的某个值来决定用哪个监听器来处理。例如SecurityEvent有个安全级别level属性,你定义了5个level,每个level都有不同的处理机制。按照传统的实现方式需要通过条件判断(if/else或者switch/case等)来实现,代码的封装性不好。这种情况下,你可以在你的Listener的监听方法上增加@EventListener注解,并通过condition参数来指定过滤条件。例如 condition = "#event.success eq false")就是通过SpEL表示:当方法的参数event变量的success属性等于false的时候才执行监听方法。

下面我们就演示一下实现过程。

提供泛型的Event基类:

package net.ijiangtao.tech.designpattern.pubsub.spring.generic;

import lombok.Getter;

/**
 * GenericSpringEvent
 *
 * @author ijiangtao
 * @create 2019-05-02 13:47
 **/
@Getter
public class GenericSpringEvent<T> {

    private T what;

    protected boolean success;

    public GenericSpringEvent(T what, boolean success) {
        this.what = what;
        this.success = success;
    }

}

基于Event基类自定义Event实现

package net.ijiangtao.tech.designpattern.pubsub.spring.generic.checkout;

import lombok.Getter;
import net.ijiangtao.tech.designpattern.pubsub.spring.generic.GenericSpringEvent;

/**
 * GenericSpringEventCheckout
 *
 * @author ijiangtao
 * @create 2019-05-02 13:58
 **/
@Getter
public class GenericSpringEventCheckout extends GenericSpringEvent<Long> {

    private Long userId;

    public GenericSpringEventCheckout(Long userId, boolean success) {
        super(userId, success);
    }

}

提供条件化监听器

监听器监听基类Event的所有字类,并且通过@EventListener注解和SpEL定义监听的Event的过滤条件。

package net.ijiangtao.tech.designpattern.pubsub.spring.generic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

/**
 * @author ijiangtao
 * @create 2019-05-02 13:52
 **/
@Component
@Slf4j
public class GenericSpringEventSuccessListenerLong {

    @EventListener(condition = "#event.success")
    public void handle(GenericSpringEvent<Long> event) {
        log.info("Handling generic event Success (conditional). {}",event.getWhat());
    }

}
package net.ijiangtao.tech.designpattern.pubsub.spring.generic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

/**
 * @author ijiangtao
 * @create 2019-05-02 13:52
 **/
@Component
@Slf4j
public class GenericSpringEventFailListenerLong {

    @EventListener(condition = "#event.success eq false")
    public void handle(GenericSpringEvent<Long> event) {
        log.info("Handling generic event  Fail (conditional). {}",event.getWhat());
    }

}

自定义事件发布器

package net.ijiangtao.tech.designpattern.pubsub.spring.generic.checkout;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/**
 * GenericSpringEventPublisher
 *
 * @author ijiangtao
 * @create 2019-05-02 13:55
 **/
@Component
@Slf4j
public class GenericSpringEventPublisherCheckout {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    public void publish(final Long userId, boolean success) {

        log.info("publis a GenericSpringEventPublisher, userId:{}", userId + " time: " + LocalTime.now());

        GenericSpringEventCheckout eventCheckout = new GenericSpringEventCheckout(userId, success);

        applicationEventPublisher.publishEvent(eventCheckout);
    }

}

单元测试

下面提供了一个测试方法,通过观察日志发现,不同的条件,触发了不同的监听器。

package net.ijiangtao.tech.designpattern.pubsub.spring;

import lombok.extern.slf4j.Slf4j;
import net.ijiangtao.tech.designpattern.pubsub.spring.generic.checkout.GenericSpringEventPublisherCheckout;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationEvent;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * Spring Events
 *
 * @author ijiangtao
 * @create 2019-05-02 12:53
 **/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringEventsGenericTests {

    @Autowired
    private GenericSpringEventPublisherCheckout checkoutPubliser;


    @Test
    public void test1() {

        ApplicationEvent applicationEvent;
        checkoutPubliser.publish(101L, true);

        checkoutPubliser.publish(202L, false);
    }

}

Wechat-westcall

相关链接