Spring Boot系列十五 spring boot集成RabbitMQ 源码分析

3,650 阅读12分钟
原文链接: blog.csdn.net

1. 概述

在这篇文章Spring Boot系列十三 Spring Boot集成RabbitMQ中,我们介绍了在Spring Boot如何使用RabbitMQ,本篇文章中,从源码中分析Spring Boot如何集成RabbitMQ。

2. 入口

在spring-boot-autoconfigure.jar中的spring.factories中有如下定义,表示spring启动时,会执行RabbitAutoConfiguration的初始化

… 
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration,\
…

3. RabbitProperties

application_*.yml属性文件

spring:
  # 配置rabbitMQspring:
  rabbitmq:
    host: 10.240.80.134
    username: spring-boot
    password: spring-boot
    virtual-host: spring-boot-vhost

以上的属性文件会被注入到RabbitProperties属性

@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
    …
}

4. RabbitAutoConfiguration

4.1. 类上的注解分析:

这是一个配置类,在启动时会初始化上面提到RabbitProperties对象,然后它会引入另一个配置类RabbitAnnotationDrivenConfiguration,这个配置类和消息监听有关我们后面再介绍 这个类有3个内部类,且都是配置类,这此配置类会根据条件初始RabbitMQ所需要的类

@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class }) 
// 会初始化RabbitProperties.class
@EnableConfigurationProperties(RabbitProperties.class) 
// 引入@Configuration类RabbitAnnotationDrivenConfiguration
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
…

}

4.2. 内部类RabbitConnectionFactoryCreator

内部类RabbitConnectionFactoryCreator会根据RabbitProperties 配置的参数初始CachingConnectionFactory 实例(它是ConnectionFactory 子类),这个实例是连接RabbitMQ的连接池。
CachingConnectionFactory实例是对RabbitMQ官方提供对com.rabbitmq.client.ConnectionFactory和com.rabbitmq.client.Channel的封装,缓存这两种资源。CachingConnectionFactory有两种缓存模式
1. 如果选择CacheMode#CHANNEL的缓存模式,当我们调用 createConnection()方法时,每次返回相同的Connection。默认情况下,只创建一个Connection,只创建一个Channel(通过配置创建Channel数量参数,可以创建缓存多个Channel)。即可以创建多个Channel,但是所有的Channel共用同一个Connection
2. 如果选择CacheMode#CONNECTION的缓存模式,可以同时配置创建Connection的数量和Channel数据。当调用 createConnection()时,从缓存中获取可用Connection,如果没有且创建的数量没有达到上限,则创建新的Connection。同理Channel

@Configuration
@ConditionalOnMissingBean(ConnectionFactory.class)
protected static class RabbitConnectionFactoryCreator {

    @Bean
    public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)
        throws Exception {
        // 根据RabbitProperties 配置RabbitMQ的连接工厂类
        RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
        if (config.determineHost() != null) {
            factory.setHost(config.determineHost());
        }
        …
        factory.afterPropertiesSet();
        // 连接缓存类
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
            factory.getObject());
        connectionFactory.setAddresses(config.determineAddresses());
        connectionFactory.setPublisherConfirms(config.isPublisherConfirms());
        connectionFactory.setPublisherReturns(config.isPublisherReturns());
        …
        return connectionFactory;
    }

}

4.3. 内部类RabbitTemplateConfiguration

内部类RabbitTemplateConfiguration通过类的构造器将RabbitProperties 配置的参数、MessageConverter赋值到类的相应的成员变量上,然后在方法rabbitTemplate()根据RabbitConnectionFactoryCreator创建的CachingConnectionFactory实例 ,创建出RabbitTemplate和RabbitAdmin。

@Configuration
// 引入RabbitConnectionFactoryCreator
@Import(RabbitConnectionFactoryCreator.class)
protected static class RabbitTemplateConfiguration {
    private final ObjectProvider<MessageConverter> messageConverter;
    private final RabbitProperties properties;

    // 注入MessageConverter和RabbitProperties  
    public RabbitTemplateConfiguration(
        ObjectProvider<MessageConverter> messageConverter,
        RabbitProperties properties) {
        this.messageConverter = messageConverter;
        this.properties = properties;
    }

    // 初始化RabbitTemplate 
    @Bean
    @ConditionalOnSingleCandidate(ConnectionFactory.class)
    @ConditionalOnMissingBean(RabbitTemplate.class)
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        // 创建RabbitTemplate 
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        MessageConverter messageConverter = this.messageConverter.getIfUnique();
        if (messageConverter != null) {
            // 配置MessageConverter  
            rabbitTemplate.setMessageConverter(messageConverter);
        }
        // 其它参数配置略
        …
        return rabbitTemplate;
    }

    // 初始化AmqpAdmin 
    @Bean
    @ConditionalOnSingleCandidate(ConnectionFactory.class)
    @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
    @ConditionalOnMissingBean(AmqpAdmin.class)
    public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
        // 创建RabbitAdmin
        return new RabbitAdmin(connectionFactory);
    }

}

4.4. 内部配置类:MessagingTemplateConfiguration

内部配置类:MessagingTemplateConfiguration 通过rabbitMessagingTemplate()方法将上面创建的RabbitTemplate 实例注入并创建RabbitMessagingTempla

@Configuration
@ConditionalOnClass(RabbitMessagingTemplate.class)
@ConditionalOnMissingBean(RabbitMessagingTemplate.class)
// 引入RabbitTemplateConfiguration配置类 
@Import(RabbitTemplateConfiguration.class)
protected static class MessagingTemplateConfiguration {
    // 生成实例RabbitMessagingTemplate, 其中RabbitTemplate 由RabbitTemplateConfiguration实例化
    @Bean
    @ConditionalOnSingleCandidate(RabbitTemplate.class)
    public RabbitMessagingTemplate rabbitMessagingTemplate(
    RabbitTemplate rabbitTemplate) {
        return new RabbitMessagingTemplate(rabbitTemplate);
    }

}

通过以上配置就完成的RabbitMQ发送者相关的bean初始化,我们可以使用RabbitTemplate和RabbitAdmin发送消息。如果要监听RabbitMQ消息还需要如下配置,这个配置更加更复杂

5. RabbitAnnotationDrivenConfiguration

此类RabbitAutoConfiguration中引入此类,此类会创建监听消息相关的Bean。我们来详细分析这个类。

5.1. 类的构造方法 :

传入监控需要MessageConverter实例、MessageRecoverer实例、RabbitProperties 实例,做为的类的成员变量

@Configuration
@ConditionalOnClass(EnableRabbit.class)
class RabbitAnnotationDrivenConfiguration {

    private final ObjectProvider<MessageConverter> messageConverter;

    private final ObjectProvider<MessageRecoverer> messageRecoverer;

    private final RabbitProperties properties;

    RabbitAnnotationDrivenConfiguration(ObjectProvider<MessageConverter> messageConverter,
        ObjectProvider<MessageRecoverer> messageRecoverer,
        RabbitProperties properties) {
        this.messageConverter = messageConverter;
        this.messageRecoverer = messageRecoverer;
        this.properties = properties;
    }
    …
} 

5.2. 类的rabbitListenerContainerFactoryConfigurer()方法

创建SimpleRabbitListenerContainerFactoryConfigurer 对象,此类保存创建RabbitListenerContainer所需要的MessageConverter实例、MessageRecoverer实例、RabbitProperties 实例

// 实例SimpleRabbitListenerContainerFactoryConfigurer 对象,设置MessageConverter、MessageRecovere、RabbitMQ的属性
@Bean
@ConditionalOnMissingBean
public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() {
    SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer();
    configurer.setMessageConverter(this.messageConverter.getIfUnique());
    configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
    configurer.setRabbitProperties(this.properties);
    return configurer;
}

5.3. 类中rabbitListenerContainerFactory()方法

创建实例SimpleRabbitListenerContainerFactory (是RabbitListenerContainerFactory的子类),其中SimpleRabbitListenerContainerFactoryConfigurer 来自下面的方法,ConnectionFactory 来自RabbitAutoConfiguration,上面已经解释过了

@Bean
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
    SimpleRabbitListenerContainerFactoryConfigurer configurer,
    ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    return factory;
}

5.4. 启动@EnableRabbit

这个内部类主要看他的@EnableRabbit注解,这个注解会使用RabbitListenerContainer参数,并创建其他相关的Bean实例,并进行监听消息。下节详细介绍@EnableRabbit

@EnableRabbit
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableRabbitConfiguration {

}

6. @EnableRabbit

引入配置类RabbitBootstrapConfiguration

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 引入配置类RabbitBootstrapConfiguration
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}

7. RabbitBootstrapConfiguration

在这个配置类创建RabbitListenerAnnotationBeanPostProcessor和RabbitListenerEndpointRegistry。

@Configuration
public class RabbitBootstrapConfiguration {

    // 创建RabbitListenerAnnotationBeanPostProcessor ,@RabbitListener+@RabbitHandler注解的方法,当收到监听消息分发到这些方法进行处理
    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
        return new RabbitListenerAnnotationBeanPostProcessor();
    }

    // 创建RabbitListenerEndpointRegistry,供监听节点的注册 
    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
    public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }

}

8. RabbitListenerAnnotationBeanPostProcessor

继承BeanPostProcessor,在Spring创建对象后,会拦截所有的被@RabbitListener+@RabbitHandler注解的方法

8.1. 类的afterSingletonsInstantiated()方法

类在实例化时,会执行初始化,重要操作
1. 设置获取RabbitListenerEndpointRegistry实例,并设置实例到RabbitListenerEndpointRegistrar中
2. 在RabbitListenerEndpointRegistrar中设置containerFactoryBeanName名称为rabbitListenerContainerFactory
3. 调用RabbitListenerEndpointRegistrar.afterPropertiesSet()进行初始化,这个方法内容后面再介绍

// 创建实例
private final RabbitListenerEndpointRegistrar registrar = new RabbitListenerEndpointRegistrar();

@Override
public void afterSingletonsInstantiated() {
    …

    // 设置获取RabbitListenerEndpointRegistry实例,并设置实例到RabbitListenerEndpointRegistrar中
    if (this.registrar.getEndpointRegistry() == null) {
        if (this.endpointRegistry == null) {
            Assert.state(this.beanFactory != null,
                "BeanFactory must be set to find endpoint registry by bean name");
            this.endpointRegistry = this.beanFactory.getBean(
                RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
                RabbitListenerEndpointRegistry.class);
        }
        this.registrar.setEndpointRegistry(this.endpointRegistry);
    }

    // 在RabbitListenerEndpointRegistrar中设置containerFactoryBeanName名称为rabbitListenerContainerFactory
    if (this.containerFactoryBeanName != null) {
        this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
    }

    // Set the custom handler method factory once resolved by the configurer
    MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
    if (handlerMethodFactory != null) {
        this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
    }

    // Actually register all listeners,初始化RabbitListenerEndpointRegistrar
    this.registrar.afterPropertiesSet();

}

8.2. postProcessAfterInitialization()方法

postProcessAfterInitialization()方法会在对象初始化完毕后被执行,此方法会拦截所有的被@RabbitListener和@RabbitHandler注解的方法。
1. @RabbitListener如果注解到方法上,则调用方法processAmqpListener(),此时会使用MethodRabbitListenerEndpoint 封装调用方法
2. @RabbitListener如果注解到类上,且类有方法被@RabbitHandler注解,则调用processMultiMethodListeners(),此时会使用MultiMethodRabbitListenerEndpoint 封装调用方法

MethodRabbitListenerEndpoint 和MultiMethodRabbitListenerEndpoint都是MethodRabbitListenerEndpoint 的子类

这里写图片描述

@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
    …
    // 处理所有被@RabbitListener注解的方法 
    for (ListenerMethod lm : metadata.listenerMethods) {
        for (RabbitListener rabbitListener : lm.annotations) {
            processAmqpListener(rabbitListener, lm.method, bean, beanName);
        }
    }

    // 处理所有被@RabbitHandler注解的方法 
    if (metadata.handlerMethods.length > 0) {
        processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
    }
    return bean;
}

private void processMultiMethodListeners(RabbitListener[] classLevelListeners, Method[] multiMethods,
    Object bean, String beanName) {
    …
    for (RabbitListener classLevelListener : classLevelListeners) {
        // 创建处理有多个监听方法的类
        MultiMethodRabbitListenerEndpoint endpoint = new MultiMethodRabbitListenerEndpoint(checkedMethods, bean);
        endpoint.setBeanFactory(this.beanFactory);
        processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);
    }
}

protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
    Method methodToUse = checkProxy(method, bean);
    // 创建处理单个监听方法的类
    MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
    endpoint.setMethod(methodToUse);
    endpoint.setBeanFactory(this.beanFactory);
    processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}

8.3. 方法processListener()

无论是方法processMultiMethodListeners()和processMultiMethodListeners()都会进入processListener(),这里做如下内容:
1. 第一步这里先根据监听方法上的@RabbitListener的配置参数,设置MethodRabbitListenerEndpoint 要监听的队列、优先级、排他性等待,
2. 第二步获取rabbitAdmin实例,并设置到MethodRabbitListenerEndpoint 中
3. 第三步 根据@RabbitListener的containerFactory()配置的值获取RabbitListenerContainerFactory,默认值为空
4. 第四步将调用工具类RabbitListenerEndpointRegistrar将RabbitListenerEndpoint注册到RabbitListenerEndpointRegistry。后面会解释这个RabbitListenerEndpointRegistrar类

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
    Object adminTarget, String beanName) {

    // 这里有设置MethodRabbitListenerEndpoint endpoint的要监听的队列、优先级、排他性等待
    …

    // 获取rabbitAdmin实例,并设置到MethodRabbitListenerEndpoint 中 
    String rabbitAdmin = resolve(rabbitListener.admin());
    if (StringUtils.hasText(rabbitAdmin)) {
        Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
        try {
            endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
        }
        catch (NoSuchBeanDefinitionException ex) {
            throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
                adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +
                rabbitAdmin + "' was found in the application context", ex);
        }
    }

    //  根据@RabbitListener的containerFactory()配置的值获取RabbitListenerContainerFactory
    RabbitListenerContainerFactory<?> factory = null;
    String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
    if (StringUtils.hasText(containerFactoryBeanName)) {
        Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
        try {
            factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
        }
        catch (NoSuchBeanDefinitionException ex) {
            throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
                adminTarget + "] for bean " + beanName + ", no " +
                RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
                containerFactoryBeanName + "' was found in the application context", ex);
        }
    }

    // 调用工具类RabbitListenerEndpointRegistrar将RabbitListenerEndpoint注册到RabbitListenerEndpointRegistry。RabbitListenerEndpointRegistra下面会解释这个类
    this.registrar.registerEndpoint(endpoint, factory);
}

9. RabbitListenerEndpointRegistrar

将上文的RabbitListenerEndpoint注册到RabbitListenerEndpointRegistry上的工作类

9.1. 初始化操作afterPropertiesSet()

初始化操作,此方法被RabbitListenerAnnotationBeanPostProcessor的afterSingletonsInstantiated()方法调用,触发初始化 。主要内容如下:

  1. 第一步:循环AmqpListenerEndpointDescriptor,将MethodRabbitListenerEndpoint 和 RabbitListenerContainerFactory实例调用RabbitListenerEndpointRegistry的方法注册到此类上
  2. 第二步:设置startImmediately 为true,表示后面将
private RabbitListenerEndpointRegistry endpointRegistry;...

// 初始化操作
@Override
public void afterPropertiesSet() {
    registerAllEndpoints();
}

protected void registerAllEndpoints() {
    synchronized (this.endpointDescriptors) {、
        # AmqpListenerEndpointDescriptor是保存RabbitListenerEndpoint和RabbitListenerContainerFactory实例
        for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
            # 将MethodRabbitListenerEndpoint endpoint + RabbitListenerContainerFactory注册到endpointRegistry上
            this.endpointRegistry.registerListenerContainer(
                descriptor.endpoint, resolveContainerFactory(descriptor));
        }
        # 设置值为true
        this.startImmediately = true;  // trigger immediate startup
    }
}

# 获取RabbitListenerContainerFactory实例
/**
    如果endpoint 结点注册是有RabbitListenerContainerFactory,则使用这个值(实际来自@RabbitListener的containerFactory()值)。
    如果没有则使用默认的RabbitListenerContainerFactory,如果没有,则从spring容器中获取名称为containerFactoryBeanName值的RabbitListenerContainerFactory对象并设置为默认值
    在之前我们已经知道这个值被RabbitListenerAnnotationBeanPostProcessor在afterSingletonsInstantiated()中设置为rabbitListenerContainerFactory
**/
private RabbitListenerContainerFactory<?> resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) {
    if (descriptor.containerFactory != null) {
        return descriptor.containerFactory;
    }
    else if (this.containerFactory != null) {
        return this.containerFactory;
    }
    else if (this.containerFactoryBeanName != null) {
        Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
        this.containerFactory = this.beanFactory.getBean(
            this.containerFactoryBeanName, RabbitListenerContainerFactory.class);
        return this.containerFactory;  // Consider changing this if live change of the factory is required
    }
    else {
        throw new IllegalStateException("Could not resolve the " +
            RabbitListenerContainerFactory.class.getSimpleName() + " to use for [" +
            descriptor.endpoint + "] no factory was given and no default is set.");
    }
}

9.2. registerEndpoint()方法

此方法被RabbitListenerAnnotationBeanPostProcessor的processListener()方法调用。主要内容如下: 在之前的分析中我们初始化设置startImmediately=true,现在只分析true情况。此时调用 RabbitListenerEndpointRegistry的registerListenerContainer()方法将且传递参数startImmediately=true

// 在 RabbitListenerAnnotationBeanPostProcessor调用此方法进行注册

private RabbitListenerEndpointRegistry endpointRegistry;

public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
    …
    // Factory may be null, we defer the resolution right before actually creating the container
    AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
    synchronized (this.endpointDescriptors) {
        if (this.startImmediately) { // Register and start immediately
            # 在之前的分析中我们发现startImmediately=true,现在只分析true情况
            # 调用endpointRegistry的方法注册到上面this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
                resolveContainerFactory(descriptor), true);
        }
        else {
            this.endpointDescriptors.add(descriptor);
        }
    }
}

10. RabbitListenerEndpointRegistry

功能: 为已经注册的RabbitListenerEndpoint创建MessageListenerContainer实例,并执行MessageListenerContainer的初始化操作,最后执行此对象的start()方法。此类还管理监听容器的生命周期

10.1. registerListenerContainer()方法:

  1. 第一步:为RabbitListenerEndpoint 根据RabbitListenerContainerFactory创建一个MessageListenerContainer监听器
  2. 第二步:从上文知道我们startImmediately值为true,会执行startIfNecessary()启动MessageListenerContainer
 // 设置RabbitListenerEndpoint 创建一个监听容器
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
                                          boolean startImmediately) {
        Assert.notNull(endpoint, "Endpoint must not be null");
        Assert.notNull(factory, "Factory must not be null");

    String id = endpoint.getId();
    Assert.hasText(id, "Endpoint id must not be empty");
    synchronized (this.listenerContainers) {
        Assert.state(!this.listenerContainers.containsKey(id),
            "Another endpoint is already registered with id '" + id + "'");
        # 为endpoint根据factory创建一个监听器,方法详细见下边
        MessageListenerContainer container = createListenerContainer(endpoint, factory);
        this.listenerContainers.put(id, container);
        # Group??
        if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
            List<MessageListenerContainer> containerGroup;
            if (this.applicationContext.containsBean(endpoint.getGroup())) {
                containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
            }
            else {
                containerGroup = new ArrayList<MessageListenerContainer>();
                this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
            }
            containerGroup.add(container);
        }
        if (startImmediately) {
            # 启动容器MessageListenerContainer,已知传入的值为true
            startIfNecessary(container);
        }
    }
}

10.2. createListenerContainer()方法

  1. 第一步: 使用RabbitListenerEndpoint 配置的RabbitListenerContainerFactory的方法createListenerContainer()创建MessageListenerContainer 。默认是使用SimpleRabbitListenerContainerFactory创建的实例为SimpleMessageListenerContainer
  2. 第二步:因为MessageListenerContainer 继承InitializingBean,所有会执行SimpleMessageListenerContainer的初始化操作。后面会介绍这个类的初始化
 # 为endpoint根据factory创建一个监听器
protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
    RabbitListenerContainerFactory<?> factory) {
    # 使用endpoint配置的RabbitListenerContainerFactory创建MessageListenerContainer 。默认是使用SimpleRabbitListenerContainerFactory创建的实例为SimpleMessageListenerContainer
    MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
    # 初始化容器
    if (listenerContainer instanceof InitializingBean) {
        try {
            ((InitializingBean) listenerContainer).afterPropertiesSet();
        }
        catch (Exception ex) {
            throw new BeanInitializationException("Failed to initialize message listener container", ex);
        }
    }
    …
    return listenerContainer;
}

10.3. 启动容器

调用MessageListenerContainer 的start()方法启动容器

// 启动容器 
private void startIfNecessary(MessageListenerContainer listenerContainer) {
    if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
        // 容器默认值为自动启动,所有会执行一下操作
        listenerContainer.start();
    }
}

11. 类SimpleRabbitListenerContainerFactory

此类是RabbitListenerContainerFactory的子类

11.1. createContainerInstance() 方法

创建SimpleMessageListenerContainer实例

// 创建SimpleMessageListenerContainer实例 
@Override
protected SimpleMessageListenerContainer createContainerInstance() {
    return new SimpleMessageListenerContainer();
}

11.2. 方法createContainerInstance()

此方法:调用createContainerInstance()创建SimpleMessageListenerContainer实例,并使用RabbitListenerEndpoint包含的参数设置到创建的实例中,最后调用initializeContainer()初始化SimpleMessageListenerContainer实例

@Override
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
    // 创建实例
    C instance = createContainerInstance();

    // 以下设置容器的初始值
    if (this.connectionFactory != null) {
        instance.setConnectionFactory(this.connectionFactory);
    }
    // 其他根据本对象的成员变量配置RabbitListenerEndpoint 代码略
    …
    instance.setListenerId(endpoint.getId());

    endpoint.setupListenerContainer(instance);

    // 初始化容器
    initializeContainer(instance);

    return instance;
}

11.3. initializeContainer()方法

初始化刚刚创建的SimpleMessageListenerContainer实例,将本对象中的成员变量配置到SimpleMessageListenerContainer实例中

// 根据容器工厂初始化容器值 
@Override
protected void initializeContainer(SimpleMessageListenerContainer instance) {
    super.initializeContainer(instance);

    if (this.applicationContext != null) {
        instance.setApplicationContext(this.applicationContext);
    }
    if (this.taskExecutor != null) {
        instance.setTaskExecutor(this.taskExecutor);
    }
    if (this.transactionManager != null) {
        instance.setTransactionManager(this.transactionManager);
    }
    if (this.txSize != null) {
        instance.setTxSize(this.txSize);
    }
    // 其他根据本对象的成员变量配置SimpleMessageListenerContainer 代码略
    …
}

12. SimpleMessageListenerContainer

在RabbitListenerEndpointRegistry中会调用SimpleMessageListenerContainer的start()方法

12.1. start()方法

如果没有初始化,则执行此类的初始化操作

// 初始操作,主要操作见doStart() 
@Override
public void start() {
    if (!this.initialized) {
        synchronized (this.lifecycleMonitor) {
            if (!this.initialized) {
                afterPropertiesSet();
                this.initialized = true;
            }
        }
    }
    try {
        if (logger.isDebugEnabled()) {
            logger.debug("Starting Rabbit listener container.");
        }
        # 调用子类方法
        doStart();
    }
    catch (Exception ex) {
        throw convertRabbitAccessException(ex);
    }
}

12.2. doStart()方法:

主要操作如下:
第一步:获取RabbitAdmin实例
第二步:rabbitAdmin初始化:对新增的队列、交换机、绑定在RabbitMQ上进行声明
第三步:调用父类doStart()方法
第四步:在线程池中启动这个消息者,进行消息消费
第五步:监控消息者是否启动成功,如果失败则抛出异常

protected void doStart() throws Exception {
    if (getMessageListener() instanceof ListenerContainerAware) {
        // 验证当前监听的队列是否和容器相同 
        ….
        Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
        if (expectedQueueNames != null) {
            String[] queueNames = getQueueNames();
            Assert.state(expectedQueueNames.size() == queueNames.length,
                "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                    + Arrays.asList(queueNames));
            boolean found = false;
            for (String queueName : queueNames) {
                if (expectedQueueNames.contains(queueName)) {
                    found = true;
                }
                else {
                    found = false;
                    break;
                }
            }
            Assert.state(found, "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                + Arrays.asList(queueNames));
        }
    }
    // 获取rabbitAdmin值
    if (this.rabbitAdmin == null && this.getApplicationContext() != null) {
        Map<String, RabbitAdmin> admins = this.getApplicationContext().getBeansOfType(RabbitAdmin.class);
        if (admins.size() == 1) {
            this.rabbitAdmin = admins.values().iterator().next();
        }
        else {
            ….
        }
    }
    // rabbitAdmin初始化:对新增的队列、交换机、绑定在RabbitMQ上进行声明 
    checkMismatchedQueues();
    // 调用父类方法
    super.doStart();
    synchronized (this.consumersMonitor) {
        // 初始化消息者,此方法会创建消息者BlockingQueueConsumer
        int newConsumers = initializeConsumers();
        ….
        Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
        // 在线程池中启动这个消息者,进行消息消费:AsyncMessageProcessingConsumer 是个线程类,它调用BlockingQueueConsumer的start()方法接收消息并进行处理。
        for (BlockingQueueConsumer consumer : this.consumers) {
            AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
            processors.add(processor);
            // 在线程池中执行 AsyncMessageProcessingConsumer 
            this.taskExecutor.execute(processor);
            if (this.applicationEventPublisher != null) {
                this.applicationEventPublisher.publishEvent(new AsyncConsumerStartedEvent(this, consumer));
            }
        }
        // 监控消息者是否启动成功,如果失败则抛出异常
        for (AsyncMessageProcessingConsumer processor : processors) {
            FatalListenerStartupException startupException = processor.getStartupException();
            if (startupException != null) {
                throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
            }
        }
    }
}

13. RabbitAdmin:

rabbitAdmin初始化:对新增的队列、交换机、绑定在RabbitMQ上进行声明

public void initialize() {

    …
    // 获取所有的队列、交换机、绑定
    Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
        this.applicationContext.getBeansOfType(Exchange.class).values());
    Collection<Queue> contextQueues = new LinkedList<Queue>(
        this.applicationContext.getBeansOfType(Queue.class).values());
    Collection<Binding> contextBindings = new LinkedList<Binding>(
        this.applicationContext.getBeansOfType(Binding.class).values());

    … 
    // 在RabbitMQ上进行声明,创建对应的队列、交换机、绑定
    this.rabbitTemplate.execute(new ChannelCallback<Object>() {
        @Override
        public Object doInRabbit(Channel channel) throws Exception {
            declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
            declareQueues(channel, queues.toArray(new Queue[queues.size()]));
            declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
            return null;
        }
    });


    }

14. AsyncMessageProcessingConsumer

继承Runnable,的run()方法中调用BlockingQueueConsumer 的start()方法

private final class AsyncMessageProcessingConsumer implements Runnable {

    private final BlockingQueueConsumer consumer;

    private volatile FatalListenerStartupException startupException;

    private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
        this.consumer = consumer;
        this.start = new CountDownLatch(1);
    }

    @Override
    public void run() {

    ….
    try {
        if (SimpleMessageListenerContainer.this.autoDeclare) {
        SimpleMessageListenerContainer.this.redeclareElementsIfNecessary();
        }
        // 启动BlockingQueueConsumer 的start()方法
        this.consumer.start();
        ..
    }
    …
...

15. BlockingQueueConsumer

BlockingQueueConsumer:在start中调用basicConsume方法定阅消息。在这里我们可以看到spring开始调用RabbitMQ提供的Jar包里的类(.channel.basicConsume),说明我们已经跟到底层,所有源码分析到此为止

15.1. 线程start()方法

        void start() throws AmqpException {

        try {
                for (String queueName : this.queues) {
                        // 循环订阅所有的消息
                        if (!this.missingQueues.contains(queupublic eName)) {
                                consumeFromQueue(queueName);
                        }
                }
                }
                catch (IOException e) {
                        throw RabbitExceptionTranslator.convertRabbitAccessException(e);
                }
                }

                private void consumeFromQueue(String queue) throws IOException {
                        // 调用basicConsume方法定阅消息。在这里我们可以看到spring开始调用RabbitMQ提供的Jar包里的类(.channel.basicConsume),
                        String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
                                (this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), false, this.exclusive,
                                this.consumerArgs, this.consumer);
                        if (consumerTag != null) {
                                this.consumerTags.put(consumerTag, queue);
                        }
                        ...
                }

        }