Spring Boot 集成 RabbitMQ 批量发送消息.md

Jason's Blog / 2024-09-26 / 原文

  • 1. Spring Boot 集成 RabbitMQ 批量发送消息
    • 1.1. 版本说明
    • 1.2. Spring 配置
    • 1.3. 定义常量
    • 1.4. 配置交换机和队列
    • 1.5. 定义 BatchingRabbitTemplate 工厂
    • 1.6. 测试

1. Spring Boot 集成 RabbitMQ 批量发送消息

1.1. 版本说明

构件 版本
spring-boot 2.7.18
spring-boot-starter-amqp 2.7.18

1.2. Spring 配置

spring:
  application:
    name: spring-rabbit-batch-demo
  rabbitmq:
    addresses: 127.0.0.1:5672
    username: admin
    password: admin
    virtual-host: /

1.3. 定义常量

public class RabbitBatchConstants {
    public static final String QUEUE_1 = "spring-rabbit-batch-demo-queue-1";
    public static final String EXCHANGE_1 = "spring-rabbit-batch-demo-exchange-1";
    public static final String QUEUE_2 = "spring-rabbit-batch-demo-queue-2";
    public static final String EXCHANGE_2 = "spring-rabbit-batch-demo-exchange-2";
}

1.4. 配置交换机和队列

@Configuration
@Slf4j
public class RabbitBatchConfiguration {

    @Bean
    public Queue queue1() {
        return QueueBuilder.durable(RabbitBatchConstants.QUEUE_1).build();
    }

    @Bean
    public FanoutExchange exchange1() {
        return ExchangeBuilder.fanoutExchange(RabbitBatchConstants.EXCHANGE_1).durable(true).build();
    }

    @Bean
    public Queue queue2() {
        return QueueBuilder.durable(RabbitBatchConstants.QUEUE_2).build();
    }

    @Bean
    public FanoutExchange exchange2() {
        return ExchangeBuilder.fanoutExchange(RabbitBatchConstants.EXCHANGE_2).durable(true).build();
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(queue1()).to(exchange1());
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queue2()).to(exchange2());
    }
}

1.5. 定义 BatchingRabbitTemplate 工厂

org.springframework.amqp.rabbit.core.BatchingRabbitTemplate 提供了批量发送消息的 API,但是一个 BatchingRabbitTemplate 实例只能发送同一交换机或 Routing Key 的消息,因此需要一个工厂类,针对同一交换机生成一个 BatchingRabbitTemplate 实例。

@Component
public class BatchingRabbitTemplateFactory implements Lifecycle {

    @Resource
    private ConnectionFactory connectionFactory;

    private Map<String, BatchingRabbitTemplate> batchingRabbitTemplateMap = new ConcurrentHashMap<>();

    public BatchingRabbitTemplate getBatchingRabbitTemplate(String exchange) {
        return batchingRabbitTemplateMap.computeIfAbsent(exchange, exchangeName -> {
            ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
            threadPoolTaskScheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
            threadPoolTaskScheduler.setThreadNamePrefix("batch-rabbit-");
            threadPoolTaskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            threadPoolTaskScheduler.initialize();
            return new BatchingRabbitTemplate(connectionFactory, new SimpleBatchingStrategy(100, 1000, 3000L), threadPoolTaskScheduler);
        });
    }

    @Override
    public void start() {

    }

    @Override
    public void stop() {
        batchingRabbitTemplateMap.forEach((exchange, batchingRabbitTemplate) -> batchingRabbitTemplate.flush());
    }

    @Override
    public boolean isRunning() {
        return true;
    }
}

1.6. 测试

@Component
@Slf4j
public class SpringRabbitBatchDemo implements ApplicationRunner {

    @Resource
    private BatchingRabbitTemplateFactory batchingRabbitTemplateFactory;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        String payload1 = "hello, queue 1";
        batchingRabbitTemplateFactory.getBatchingRabbitTemplate(EXCHANGE_1).convertAndSend(EXCHANGE_1, null, payload1);
        log.info("send a messages, exchange: {}, payload: {}", EXCHANGE_1, payload1);

        String payload2 = "hello, queue 2";
        batchingRabbitTemplateFactory.getBatchingRabbitTemplate(EXCHANGE_2).convertAndSend(EXCHANGE_2, null, payload2);
        log.info("send a messages, exchange: {}, payload: {}", EXCHANGE_2, payload2);
    }

    @RabbitListener(queues = {RabbitBatchConstants.QUEUE_1})
    public void listen1(Message<String> message) {
        log.info(
                "received a message, queue: {}, exchange: {}, payload: {}",
                message.getHeaders().get(CONSUMER_QUEUE),
                message.getHeaders().get(RECEIVED_EXCHANGE),
                message.getPayload()
        );
    }

    @RabbitListener(queues = {RabbitBatchConstants.QUEUE_2})
    public void listen2(Message<String> message) {
        log.info(
                "received a message, queue: {}, exchange: {}, payload: {}",
                message.getHeaders().get(CONSUMER_QUEUE),
                message.getHeaders().get(RECEIVED_EXCHANGE),
                message.getPayload()
        );
    }
}

启动程序,控制台将输出:

send a messages, exchange: spring-rabbit-batch-demo-exchange-1, payload: hello, queue 1
send a messages, exchange: spring-rabbit-batch-demo-exchange-2, payload: hello, queue 2
received a message, queue: spring-rabbit-batch-demo-queue-2, exchange: spring-rabbit-batch-demo-exchange-2, payload: hello, queue 2
received a message, queue: spring-rabbit-batch-demo-queue-1, exchange: spring-rabbit-batch-demo-exchange-1, payload: hello, queue 1