Spring Boot 集成 RabbitMQ 自定义 MessageConverter

Jason's Blog / 2024-10-13 / 原文

  • 1. Spring Boot 集成 RabbitMQ 自定义消息转换器
    • 1.1. 版本说明
    • 1.2. 概述
    • 1.3. Spring 配置
    • 1.4. 定义常量
    • 1.5. 配置交换机和队列
    • 1.6. 配置 ObjectMapper
    • 1.7. 配置 MessageConverter
    • 1.8. 测试

1. Spring Boot 集成 RabbitMQ 自定义消息转换器

1.1. 版本说明

构件 版本
spring-boot 2.7.18
spring-boot-starter-amqp 2.7.18
spring-boot-starter-json 2.7.18
hutool-all 5.8.27

1.2. 概述

这里使用常见的 JSON 作为消息传输格式,自定义 Jackson2JsonMessageConverter 对消息进行转换

1.3. Spring 配置

spring:
  application:
    name: spring-rabbit-message-converter-demo
  rabbitmq:
    addresses: 127.0.0.1:5672
    username: admin
    password: admin
    virtual-host: /

1.4. 定义常量

public class SpringRabbitMessageConverterDemoConstants {
    public static final String QUEUE = "spring-rabbit-message-converter-demo-queue";
    public static final String EXCHANGE = "spring-rabbit-message-converter-demo-exchange";
}

1.5. 配置交换机和队列

@Bean
public Queue queue() {
    return QueueBuilder.durable(QUEUE).build();
}

@Bean
public FanoutExchange exchange() {
    return ExchangeBuilder.fanoutExchange(EXCHANGE).durable(true).build();
}

@Bean
public Binding binding() {
    return BindingBuilder.bind(queue()).to(exchange());
}

1.6. 配置 ObjectMapper

添加对 Java 8 时间类型 LocalDateTimeLocalDate 序列化和反序列化支持;添加长整形序列化字符串支持;

@Bean
public Jackson2ObjectMapperBuilderCustomizer jackson2ObjectMapperBuilderCustomizer() {
    return jacksonObjectMapperBuilder -> {
        JavaTimeModule javaTimeModule = new JavaTimeModule();
        javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(NORM_DATETIME_FORMATTER));
        javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(NORM_DATE_FORMATTER));
        javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(NORM_TIME_FORMATTER));
        javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(NORM_DATETIME_FORMATTER));
        javaTimeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(NORM_DATE_FORMATTER));
        javaTimeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(NORM_TIME_FORMATTER));

        SimpleModule longModule = new SimpleModule();
        longModule.addSerializer(Long.class, ToStringSerializer.instance);
        longModule.addSerializer(Long.TYPE, ToStringSerializer.instance);

        jacksonObjectMapperBuilder.modulesToInstall(javaTimeModule, longModule);
    };
}

1.7. 配置 MessageConverter

使用 Jackson2JsonMessageConverter 对消息进行转换,消息采用 JSON 格式传输

@Bean
public MessageConverter messageConverter(ObjectMapper objectMapper) {
    return new Jackson2JsonMessageConverter(objectMapper);
}

1.8. 测试

@Component
@Slf4j
public class SpringRabbitMessageConverterDemo implements ApplicationRunner {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        User user = new User(1L, "Jason", LocalDateTime.now());
        rabbitTemplate.convertAndSend(EXCHANGE, null, user);
        log.info("sent a message, payload: {}", user);
    }

    @RabbitListener(queues = {QUEUE})
    public void listen(Message<User> message) {
        log.info(
                "received a message, payload: {}",
                message.getPayload()
        );
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class User {
        private Long id;
        private String name;
        private LocalDateTime birthday;
    }
}

启动程序,控制台将输出以下日志:

sent a message, payload: SpringRabbitMessageConverterDemo.User(id=1, name=Jason, birthday=2024-10-12T16:40:53.088051)
received a message, payload: SpringRabbitMessageConverterDemo.User(id=1, name=Jason, birthday=2024-10-12T16:40:53)