Spring Boot 集成 RabbitMQ 多个 Broker 发送、消费消息
- 1. Spring Boot 集成 RabbitMQ 多个 Broker 发送、消费消息
- 1.1. 版本说明
- 1.2. 概述
- 1.3. RabbitMQ 信息
- 1.4. Spring 配置
- 1.6. 定义配置属性
- 1.5. 定义常量
- 1.7. 定义两个 ConnectionFactory
- 1.8. 定义两个 RabbitTemplate
- 1.9. 定义两个 AmqpAdmin
- 1.10. 定义两个 SimpleRabbitListenerContainerFactory
- 1.11. 测试
1. Spring Boot 集成 RabbitMQ 多个 Broker 发送、消费消息
1.1. 版本说明
构件 | 版本 |
---|---|
spring-boot | 2.7.18 |
spring-boot-starter-amqp | 2.7.18 |
1.2. 概述
flowchart TB
subgraph app["Spring Boot 应用"]
direction TB
template1["第一个 RabbitTemplate"]
listener1["第一个监听器"]
template2["第二个 RabbitTemplate"]
listener2["第二个监听器"]
end
subgraph mq1["第一个 RabbitMQ"]
direction TB
exchange1["第一个交换机"]
--> queue1["第一个队列"]
end
subgraph mq2["第二个 RabbitMQ"]
direction TB
exchange2["第二个交换机"]
--> queue2["第二个队列"]
end
template1 --> exchange1
queue1 --> listener1
template2 --> exchange2
queue2 --> listener2
1.3. RabbitMQ 信息
IP:端口 | 用户名 | virtual host |
---|---|---|
127.0.0.1:5672 | first-user | /first-virtual-host |
127.0.0.1:5672 | second-user | /second-virtual-host |
1.4. Spring 配置
spring:
application:
name: spring-rabbit-multiple-broker-demo
rabbitmq:
first-broker:
addresses: 127.0.0.1:5672
username: first-user
password: first-user
virtual-host: /first-virtual-host
second-broker:
addresses: 127.0.0.1:5672
username: second-user
password: second-user
virtual-host: /second-virtual-host
1.6. 定义配置属性
@Data
@ConfigurationProperties(prefix = "rabbitmq")
public class SpringRabbitMultipleBrokerProperties {
private RabbitProperties firstBroker;
private RabbitProperties secondBroker;
}
1.5. 定义常量
public class SpringRabbitMultipleBrokerConstants {
public static final String FIRST_QUEUE = "spring-rabbit-multiple-broker-demo-first-queue";
public static final String FIRST_EXCHANGE = "spring-rabbit-multiple-broker-demo-first-exchange";
public static final String SECOND_QUEUE = "spring-rabbit-multiple-broker-demo-second-queue";
public static final String SECOND_EXCHANGE = "spring-rabbit-multiple-broker-demo-second-exchange";
}
1.7. 定义两个 ConnectionFactory
@Bean(name = "firstConnectionFactory")
@Primary
public ConnectionFactory firstConnectionFactory(SpringRabbitMultipleBrokerProperties multipleBrokerProperties) throws Throwable {
RabbitProperties rabbitProperties = multipleBrokerProperties.getFirstBroker();
return generateConnectionFactory(rabbitProperties);
}
@Bean(name = "secondConnectionFactory")
public ConnectionFactory secondConnectionFactory(SpringRabbitMultipleBrokerProperties multipleBrokerProperties) throws Throwable {
RabbitProperties rabbitProperties = multipleBrokerProperties.getSecondBroker();
return generateConnectionFactory(rabbitProperties);
}
private ConnectionFactory generateConnectionFactory(RabbitProperties rabbitProperties) throws Throwable {
RabbitConnectionFactoryBean connectionFactoryBean = new RabbitConnectionFactoryBean();
PropertyMapper map = PropertyMapper.get();
map.from(rabbitProperties.determineHost()).whenNonNull().to(connectionFactoryBean::setHost);
map.from(rabbitProperties::determinePort).to(connectionFactoryBean::setPort);
map.from(rabbitProperties::determineUsername).whenNonNull().to(connectionFactoryBean::setUsername);
map.from(rabbitProperties::determinePassword).whenNonNull().to(connectionFactoryBean::setPassword);
map.from(rabbitProperties::determineVirtualHost).whenNonNull().to(connectionFactoryBean::setVirtualHost);
connectionFactoryBean.afterPropertiesSet();
com.rabbitmq.client.ConnectionFactory factory = connectionFactoryBean.getObject();
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);
return connectionFactory;
}
1.8. 定义两个 RabbitTemplate
@Bean(name = "firstRabbitTemplate")
public RabbitTemplate firstRabbitTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory firstConnectionFactory) {
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(firstConnectionFactory);
return template;
}
@Bean(name = "secondRabbitTemplate")
public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory secondConnectionFactory) {
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(secondConnectionFactory);
return template;
}
1.9. 定义两个 AmqpAdmin
@Bean(name = "firstAmqpAdmin")
public AmqpAdmin firstAmqpAdmin(@Qualifier("firstRabbitTemplate") RabbitTemplate firstRabbitTemplate) {
return new RabbitAdmin(firstRabbitTemplate);
}
@Bean(name = "secondAmqpAdmin")
public AmqpAdmin secondAmqpAdmin(@Qualifier("secondRabbitTemplate") RabbitTemplate secondRabbitTemplate) {
return new RabbitAdmin(secondRabbitTemplate);
}
1.10. 定义两个 SimpleRabbitListenerContainerFactory
@Bean(name = "firstRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory firstRabbitListenerContainerFactory(
@Qualifier("firstConnectionFactory") ConnectionFactory firstConnectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(firstConnectionFactory);
return factory;
}
@Bean(name = "secondRabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory secondRabbitListenerContainerFactory(
@Qualifier("secondConnectionFactory") ConnectionFactory secondConnectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(secondConnectionFactory);
return factory;
}
1.11. 测试
@Component
@Slf4j
public class SpringRabbitMultipleBrokerDemo implements ApplicationRunner, InitializingBean {
@Resource(name = "firstRabbitTemplate")
private RabbitTemplate firstRabbitTemplate;
@Resource(name = "secondRabbitTemplate")
private RabbitTemplate secondRabbitTemplate;
@Resource(name = "firstAmqpAdmin")
private AmqpAdmin firstAmqpAdmin;
@Resource(name = "secondAmqpAdmin")
private AmqpAdmin secondAmqpAdmin;
/**
* 定义交换机,队列
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {
FanoutExchange firstExchange = ExchangeBuilder.fanoutExchange(FIRST_EXCHANGE)
.durable(true)
.build();
firstAmqpAdmin.declareExchange(firstExchange);
Queue firstQueue = QueueBuilder.durable(FIRST_QUEUE).build();
firstAmqpAdmin.declareQueue(firstQueue);
Binding firstBinding = BindingBuilder.bind(firstQueue).to(firstExchange);
firstAmqpAdmin.declareBinding(firstBinding);
FanoutExchange secondExchange = ExchangeBuilder.fanoutExchange(SECOND_EXCHANGE)
.durable(true)
.build();
secondAmqpAdmin.declareExchange(secondExchange);
Queue secondQueue = QueueBuilder.durable(SECOND_QUEUE).build();
secondAmqpAdmin.declareQueue(secondQueue);
Binding secondBinding = BindingBuilder.bind(secondQueue).to(secondExchange);
secondAmqpAdmin.declareBinding(secondBinding);
}
@Override
public void run(ApplicationArguments args) throws Exception {
String firstPayload = "first payload";
firstRabbitTemplate.convertAndSend(FIRST_EXCHANGE, null, firstPayload);
log.info("sent a message to first broker, payload: {}", firstPayload);
String secondPayload = "second payload";
secondRabbitTemplate.convertAndSend(SECOND_EXCHANGE, null, secondPayload);
log.info("sent a message to second broker, payload: {}", secondPayload);
}
@RabbitListener(
containerFactory = "firstRabbitListenerContainerFactory",
queues = FIRST_QUEUE
)
public void listenToFirstBroker(Message<String> message) {
log.info(
"received a message from first broker, payload: {}",
message.getPayload()
);
}
@RabbitListener(
containerFactory = "secondRabbitListenerContainerFactory",
queues = SECOND_QUEUE
)
public void listenToSecondBroker(Message<String> message) {
log.info(
"received a message from second broker, payload: {}",
message.getPayload()
);
}
}
启动程序,控制台将输出以下日志:
sent a message to first broker, payload: first payload
sent a message to second broker, payload: second payload
received a message from second broker, payload: second payload
received a message from first broker, payload: first payload