Spring Boot 集成 RabbitMQ 多个 Broker 发送、消费消息

Jason's Blog / 2024-10-12 / 原文

  • 1. Spring Boot 集成 RabbitMQ 多个 Broker 发送、消费消息
    • 1.1. 版本说明
    • 1.2. 概述
    • 1.3. RabbitMQ 信息
    • 1.4. Spring 配置
    • 1.6. 定义配置属性
    • 1.5. 定义常量
    • 1.7. 定义两个 ConnectionFactory
    • 1.8. 定义两个 RabbitTemplate
    • 1.9. 定义两个 AmqpAdmin
    • 1.10. 定义两个 SimpleRabbitListenerContainerFactory
    • 1.11. 测试

1. Spring Boot 集成 RabbitMQ 多个 Broker 发送、消费消息

1.1. 版本说明

构件 版本
spring-boot 2.7.18
spring-boot-starter-amqp 2.7.18

1.2. 概述

flowchart TB subgraph app["Spring Boot 应用"] direction TB template1["第一个 RabbitTemplate"] listener1["第一个监听器"] template2["第二个 RabbitTemplate"] listener2["第二个监听器"] end subgraph mq1["第一个 RabbitMQ"] direction TB exchange1["第一个交换机"] --> queue1["第一个队列"] end subgraph mq2["第二个 RabbitMQ"] direction TB exchange2["第二个交换机"] --> queue2["第二个队列"] end template1 --> exchange1 queue1 --> listener1 template2 --> exchange2 queue2 --> listener2

1.3. RabbitMQ 信息

IP:端口 用户名 virtual host
127.0.0.1:5672 first-user /first-virtual-host
127.0.0.1:5672 second-user /second-virtual-host

1.4. Spring 配置

spring:
  application:
    name: spring-rabbit-multiple-broker-demo
rabbitmq:
  first-broker:
    addresses: 127.0.0.1:5672
    username: first-user
    password: first-user
    virtual-host: /first-virtual-host
  second-broker:
    addresses: 127.0.0.1:5672
    username: second-user
    password: second-user
    virtual-host: /second-virtual-host

1.6. 定义配置属性

@Data
@ConfigurationProperties(prefix = "rabbitmq")
public class SpringRabbitMultipleBrokerProperties {
    private RabbitProperties firstBroker;
    private RabbitProperties secondBroker;
}

1.5. 定义常量

public class SpringRabbitMultipleBrokerConstants {
    public static final String FIRST_QUEUE = "spring-rabbit-multiple-broker-demo-first-queue";
    public static final String FIRST_EXCHANGE = "spring-rabbit-multiple-broker-demo-first-exchange";
    public static final String SECOND_QUEUE = "spring-rabbit-multiple-broker-demo-second-queue";
    public static final String SECOND_EXCHANGE = "spring-rabbit-multiple-broker-demo-second-exchange";
}

1.7. 定义两个 ConnectionFactory

@Bean(name = "firstConnectionFactory")
@Primary
public ConnectionFactory firstConnectionFactory(SpringRabbitMultipleBrokerProperties multipleBrokerProperties) throws Throwable {
    RabbitProperties rabbitProperties = multipleBrokerProperties.getFirstBroker();
    return generateConnectionFactory(rabbitProperties);
}

@Bean(name = "secondConnectionFactory")
public ConnectionFactory secondConnectionFactory(SpringRabbitMultipleBrokerProperties multipleBrokerProperties) throws Throwable {
    RabbitProperties rabbitProperties = multipleBrokerProperties.getSecondBroker();
    return generateConnectionFactory(rabbitProperties);
}

private ConnectionFactory generateConnectionFactory(RabbitProperties rabbitProperties) throws Throwable {
    RabbitConnectionFactoryBean connectionFactoryBean = new RabbitConnectionFactoryBean();
    PropertyMapper map = PropertyMapper.get();
    map.from(rabbitProperties.determineHost()).whenNonNull().to(connectionFactoryBean::setHost);
    map.from(rabbitProperties::determinePort).to(connectionFactoryBean::setPort);
    map.from(rabbitProperties::determineUsername).whenNonNull().to(connectionFactoryBean::setUsername);
    map.from(rabbitProperties::determinePassword).whenNonNull().to(connectionFactoryBean::setPassword);
    map.from(rabbitProperties::determineVirtualHost).whenNonNull().to(connectionFactoryBean::setVirtualHost);
    connectionFactoryBean.afterPropertiesSet();
    com.rabbitmq.client.ConnectionFactory factory = connectionFactoryBean.getObject();
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);
    return connectionFactory;
}

1.8. 定义两个 RabbitTemplate

    @Bean(name = "firstRabbitTemplate")
    public RabbitTemplate firstRabbitTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory firstConnectionFactory) {
        RabbitTemplate template = new RabbitTemplate();
        template.setConnectionFactory(firstConnectionFactory);
        return template;
    }

    @Bean(name = "secondRabbitTemplate")
    public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory secondConnectionFactory) {
        RabbitTemplate template = new RabbitTemplate();
        template.setConnectionFactory(secondConnectionFactory);
        return template;
    }

1.9. 定义两个 AmqpAdmin

@Bean(name = "firstAmqpAdmin")
public AmqpAdmin firstAmqpAdmin(@Qualifier("firstRabbitTemplate") RabbitTemplate firstRabbitTemplate) {
    return new RabbitAdmin(firstRabbitTemplate);
}

@Bean(name = "secondAmqpAdmin")
public AmqpAdmin secondAmqpAdmin(@Qualifier("secondRabbitTemplate") RabbitTemplate secondRabbitTemplate) {
    return new RabbitAdmin(secondRabbitTemplate);
}

1.10. 定义两个 SimpleRabbitListenerContainerFactory

@Bean(name = "firstRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory firstRabbitListenerContainerFactory(
        @Qualifier("firstConnectionFactory") ConnectionFactory firstConnectionFactory
) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(firstConnectionFactory);
    return factory;
}

@Bean(name = "secondRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory secondRabbitListenerContainerFactory(
        @Qualifier("secondConnectionFactory") ConnectionFactory secondConnectionFactory
) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(secondConnectionFactory);
    return factory;
}

1.11. 测试

@Component
@Slf4j
public class SpringRabbitMultipleBrokerDemo implements ApplicationRunner, InitializingBean {

    @Resource(name = "firstRabbitTemplate")
    private RabbitTemplate firstRabbitTemplate;
    @Resource(name = "secondRabbitTemplate")
    private RabbitTemplate secondRabbitTemplate;

    @Resource(name = "firstAmqpAdmin")
    private AmqpAdmin firstAmqpAdmin;

    @Resource(name = "secondAmqpAdmin")
    private AmqpAdmin secondAmqpAdmin;

    /**
     * 定义交换机,队列
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        FanoutExchange firstExchange = ExchangeBuilder.fanoutExchange(FIRST_EXCHANGE)
                .durable(true)
                .build();
        firstAmqpAdmin.declareExchange(firstExchange);
        Queue firstQueue = QueueBuilder.durable(FIRST_QUEUE).build();
        firstAmqpAdmin.declareQueue(firstQueue);
        Binding firstBinding = BindingBuilder.bind(firstQueue).to(firstExchange);
        firstAmqpAdmin.declareBinding(firstBinding);

        FanoutExchange secondExchange = ExchangeBuilder.fanoutExchange(SECOND_EXCHANGE)
                .durable(true)
                .build();
        secondAmqpAdmin.declareExchange(secondExchange);
        Queue secondQueue = QueueBuilder.durable(SECOND_QUEUE).build();
        secondAmqpAdmin.declareQueue(secondQueue);
        Binding secondBinding = BindingBuilder.bind(secondQueue).to(secondExchange);
        secondAmqpAdmin.declareBinding(secondBinding);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        String firstPayload = "first payload";
        firstRabbitTemplate.convertAndSend(FIRST_EXCHANGE, null, firstPayload);
        log.info("sent a message to first broker, payload: {}", firstPayload);

        String secondPayload = "second payload";
        secondRabbitTemplate.convertAndSend(SECOND_EXCHANGE, null, secondPayload);
        log.info("sent a message to second broker, payload: {}", secondPayload);
    }

    @RabbitListener(
            containerFactory = "firstRabbitListenerContainerFactory",
            queues = FIRST_QUEUE
    )
    public void listenToFirstBroker(Message<String> message) {
        log.info(
                "received a message from first broker, payload: {}",
                message.getPayload()
        );
    }

    @RabbitListener(
            containerFactory = "secondRabbitListenerContainerFactory",
            queues = SECOND_QUEUE
    )
    public void listenToSecondBroker(Message<String> message) {
        log.info(
                "received a message from second broker, payload: {}",
                message.getPayload()
        );
    }
}

启动程序,控制台将输出以下日志:

sent a message to first broker, payload: first payload
sent a message to second broker, payload: second payload
received a message from second broker, payload: second payload
received a message from first broker, payload: first payload