Spring Boot 集成 RabbitMQ 并发消费消息
- 1. Spring Boot 集成 RabbitMQ 并发消费消息
- 1.1. 版本说明
- 1.2. 概述
- 1.3. Spring 配置
- 1.4. 定义常量
- 1.5. 测试
1. Spring Boot 集成 RabbitMQ 并发消费消息
1.1. 版本说明
构件 | 版本 |
---|---|
spring-boot | 2.7.18 |
spring-boot-starter-amqp | 2.7.18 |
1.2. 概述
通过 @RabbitListener
注解指定 concurrency
并发数量,Spring 会自动生成相应数量的消费者从队列中消费消息。
1.3. Spring 配置
spring:
application:
name: spring-rabbit-listener-concurrency-demo
rabbitmq:
addresses: 127.0.0.1:5672
username: admin
password: admin
virtual-host: /
1.4. 定义常量
public class SpringRabbitListenerConcurrencyConstants {
public static final String QUEUE = "spring-rabbit-listener-concurrency-demo-queue";
public static final String EXCHANGE = "spring-rabbit-listener-concurrency-demo-exchange";
}
1.5. 测试
这里通过 @RabbitListener
注解指定 5 个消费者从队列中消费消息。
@Component
@Slf4j
public class SpringRabbitListenerConcurrencyDemo implements ApplicationRunner {
@Resource
private RabbitTemplate rabbitTemplate;
@Override
public void run(ApplicationArguments args) throws Exception {
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend(EXCHANGE, null, "hello, queue");
}
}
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue(
name = QUEUE,
durable = "true",
declare = "true"
),
exchange = @Exchange(
name = EXCHANGE,
type = FANOUT,
durable = "true",
declare = "true"
)
)
},
concurrency = "5"
)
public void listen(Channel channel, Message<String> message) throws Throwable {
log.info(
"received a message, channel: {}, payload: {}",
channel.hashCode(),
message.getPayload()
);
Thread.sleep(1000L);
}
}
启动程序,控制台将输出以下日志,可以看出从 5 个不同的 Channel 消费到消息:
received a message, channel: 2140403968, payload: hello, queue
received a message, channel: 1933452906, payload: hello, queue
received a message, channel: 1024230108, payload: hello, queue
received a message, channel: 1251934227, payload: hello, queue
received a message, channel: 353700673, payload: hello, queue