Spring Boot 集成 RabbitMQ 并发消费消息

Jason's Blog / 2024-10-11 / 原文

  • 1. Spring Boot 集成 RabbitMQ 并发消费消息
    • 1.1. 版本说明
    • 1.2. 概述
    • 1.3. Spring 配置
    • 1.4. 定义常量
    • 1.5. 测试

1. Spring Boot 集成 RabbitMQ 并发消费消息

1.1. 版本说明

构件 版本
spring-boot 2.7.18
spring-boot-starter-amqp 2.7.18

1.2. 概述

通过 @RabbitListener 注解指定 concurrency 并发数量,Spring 会自动生成相应数量的消费者从队列中消费消息。

1.3. Spring 配置

spring:
  application:
    name: spring-rabbit-listener-concurrency-demo
  rabbitmq:
    addresses: 127.0.0.1:5672
    username: admin
    password: admin
    virtual-host: /

1.4. 定义常量

public class SpringRabbitListenerConcurrencyConstants {
    public static final String QUEUE = "spring-rabbit-listener-concurrency-demo-queue";
    public static final String EXCHANGE = "spring-rabbit-listener-concurrency-demo-exchange";
}

1.5. 测试

这里通过 @RabbitListener 注解指定 5 个消费者从队列中消费消息。

@Component
@Slf4j
public class SpringRabbitListenerConcurrencyDemo implements ApplicationRunner {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        for (int i = 0; i < 5; i++) {
            rabbitTemplate.convertAndSend(EXCHANGE, null, "hello, queue");
        }
    }

    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(
                                    name = QUEUE,
                                    durable = "true",
                                    declare = "true"
                            ),
                            exchange = @Exchange(
                                    name = EXCHANGE,
                                    type = FANOUT,
                                    durable = "true",
                                    declare = "true"
                            )
                    )
            },
            concurrency = "5"
    )
    public void listen(Channel channel, Message<String> message) throws Throwable {
        log.info(
                "received a message, channel: {}, payload: {}",
                channel.hashCode(),
                message.getPayload()
        );
        Thread.sleep(1000L);
    }
}

启动程序,控制台将输出以下日志,可以看出从 5 个不同的 Channel 消费到消息:

received a message, channel: 2140403968, payload: hello, queue
received a message, channel: 1933452906, payload: hello, queue
received a message, channel: 1024230108, payload: hello, queue
received a message, channel: 1251934227, payload: hello, queue
received a message, channel: 353700673, payload: hello, queue