Spring Boot 集成 RabbitMQ 批量发送消息.md
- 1. Spring Boot 集成 RabbitMQ 批量发送消息
- 1.1. 版本说明
- 1.2. Spring 配置
- 1.3. 定义常量
- 1.4. 配置交换机和队列
- 1.5. 定义 BatchingRabbitTemplate 工厂
- 1.6. 测试
1. Spring Boot 集成 RabbitMQ 批量发送消息
1.1. 版本说明
构件 | 版本 |
---|---|
spring-boot | 2.7.18 |
spring-boot-starter-amqp | 2.7.18 |
1.2. Spring 配置
spring:
application:
name: spring-rabbit-batch-demo
rabbitmq:
addresses: 127.0.0.1:5672
username: admin
password: admin
virtual-host: /
1.3. 定义常量
public class RabbitBatchConstants {
public static final String QUEUE_1 = "spring-rabbit-batch-demo-queue-1";
public static final String EXCHANGE_1 = "spring-rabbit-batch-demo-exchange-1";
public static final String QUEUE_2 = "spring-rabbit-batch-demo-queue-2";
public static final String EXCHANGE_2 = "spring-rabbit-batch-demo-exchange-2";
}
1.4. 配置交换机和队列
@Configuration
@Slf4j
public class RabbitBatchConfiguration {
@Bean
public Queue queue1() {
return QueueBuilder.durable(RabbitBatchConstants.QUEUE_1).build();
}
@Bean
public FanoutExchange exchange1() {
return ExchangeBuilder.fanoutExchange(RabbitBatchConstants.EXCHANGE_1).durable(true).build();
}
@Bean
public Queue queue2() {
return QueueBuilder.durable(RabbitBatchConstants.QUEUE_2).build();
}
@Bean
public FanoutExchange exchange2() {
return ExchangeBuilder.fanoutExchange(RabbitBatchConstants.EXCHANGE_2).durable(true).build();
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(exchange1());
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(exchange2());
}
}
1.5. 定义 BatchingRabbitTemplate 工厂
org.springframework.amqp.rabbit.core.BatchingRabbitTemplate
提供了批量发送消息的 API,但是一个 BatchingRabbitTemplate
实例只能发送同一交换机或 Routing Key 的消息,因此需要一个工厂类,针对同一交换机生成一个 BatchingRabbitTemplate
实例。
@Component
public class BatchingRabbitTemplateFactory implements Lifecycle {
@Resource
private ConnectionFactory connectionFactory;
private Map<String, BatchingRabbitTemplate> batchingRabbitTemplateMap = new ConcurrentHashMap<>();
public BatchingRabbitTemplate getBatchingRabbitTemplate(String exchange) {
return batchingRabbitTemplateMap.computeIfAbsent(exchange, exchangeName -> {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
threadPoolTaskScheduler.setThreadNamePrefix("batch-rabbit-");
threadPoolTaskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskScheduler.initialize();
return new BatchingRabbitTemplate(connectionFactory, new SimpleBatchingStrategy(100, 1000, 3000L), threadPoolTaskScheduler);
});
}
@Override
public void start() {
}
@Override
public void stop() {
batchingRabbitTemplateMap.forEach((exchange, batchingRabbitTemplate) -> batchingRabbitTemplate.flush());
}
@Override
public boolean isRunning() {
return true;
}
}
1.6. 测试
@Component
@Slf4j
public class SpringRabbitBatchDemo implements ApplicationRunner {
@Resource
private BatchingRabbitTemplateFactory batchingRabbitTemplateFactory;
@Override
public void run(ApplicationArguments args) throws Exception {
String payload1 = "hello, queue 1";
batchingRabbitTemplateFactory.getBatchingRabbitTemplate(EXCHANGE_1).convertAndSend(EXCHANGE_1, null, payload1);
log.info("send a messages, exchange: {}, payload: {}", EXCHANGE_1, payload1);
String payload2 = "hello, queue 2";
batchingRabbitTemplateFactory.getBatchingRabbitTemplate(EXCHANGE_2).convertAndSend(EXCHANGE_2, null, payload2);
log.info("send a messages, exchange: {}, payload: {}", EXCHANGE_2, payload2);
}
@RabbitListener(queues = {RabbitBatchConstants.QUEUE_1})
public void listen1(Message<String> message) {
log.info(
"received a message, queue: {}, exchange: {}, payload: {}",
message.getHeaders().get(CONSUMER_QUEUE),
message.getHeaders().get(RECEIVED_EXCHANGE),
message.getPayload()
);
}
@RabbitListener(queues = {RabbitBatchConstants.QUEUE_2})
public void listen2(Message<String> message) {
log.info(
"received a message, queue: {}, exchange: {}, payload: {}",
message.getHeaders().get(CONSUMER_QUEUE),
message.getHeaders().get(RECEIVED_EXCHANGE),
message.getPayload()
);
}
}
启动程序,控制台将输出:
send a messages, exchange: spring-rabbit-batch-demo-exchange-1, payload: hello, queue 1
send a messages, exchange: spring-rabbit-batch-demo-exchange-2, payload: hello, queue 2
received a message, queue: spring-rabbit-batch-demo-queue-2, exchange: spring-rabbit-batch-demo-exchange-2, payload: hello, queue 2
received a message, queue: spring-rabbit-batch-demo-queue-1, exchange: spring-rabbit-batch-demo-exchange-1, payload: hello, queue 1