Spring Boot 集成 RabbitMQ 消息事务(生产者)

Jason's Blog / 2024-10-10 / 原文

  • 1. Spring Boot 集成 RabbitMQ 消息事务(生产者)
    • 1.1. 版本说明
    • 1.2. 概览
      • 1.2.1. 最大努力单阶段提交模式
      • 1.2.2. 成功的业务流程
      • 1.2.3. 失败的业务流程
    • 1.3. 新建数据库表
    • 1.4. Spring 配置
    • 1.5. 定义常量
    • 1.6. 配置交换机和队列
    • 1.7. 定义 RabbitMQ 消息事务管理器
    • 1.8. 定义 RabbitTemplate
    • 1.9. 定义数据库事务管理器
    • 1.10. 测试
    • 1.11. 参考资料

1. Spring Boot 集成 RabbitMQ 消息事务(生产者)

1.1. 版本说明

构件 版本
spring-boot 2.7.18
spring-boot-starter-amqp 2.7.18
spring-boot-starter-jdbc 2.7.18
spring-boot-starter-web 2.7.18

1.2. 概览

这里模拟一个常见的业务流程,提供一个 Http 接口,这个接口会向 RabbitMQ 发送一条消息,同时更新数据库。这里涉及到分布式事务,本案例采用最大努力单阶段提交模式来实现事务管理。

1.2.1. 最大努力单阶段提交模式

最大努力单阶段提交模式是相当普遍的,但在开发人员必须注意的某些情况下可能会失败。这是一种非 XA 模式,涉及了许多资源的同步单阶段提交。因为没有使用二阶段提交,它绝不会像 XA 事务那样安全,但是如果参与者意识到妥协,通常就足够了。许多高容量,高吞吐量的事务处理系统通过设置这种方式以达到提高性能的目的。

1.2.2. 成功的业务流程

flowchart TB 1["1. 开始消息事务"] --> 2["2. 发送消息"] --> 3["3. 开始数据库事务"] --> 4["4. 更新数据库"] --> 5["5. 提交数据库事务"] --> 6["6. 提交消息事务"]

1.2.3. 失败的业务流程

flowchart TB 1["1. 开始消息事务"] --> 2["2. 发送消息"] --> 3["3. 开始数据库事务"] --> 4["4. 更新数据库失败"] --> 5["5. 回滚数据库事务"] --> 6["6. 回滚消息事务"]

1.3. 新建数据库表

create table t_user
(
    id   int auto_increment primary key,
    name varchar(20) not null
);

1.4. Spring 配置

spring:
  application:
    name: spring-rabbit-transaction-producer-demo
  rabbitmq:
    addresses: 127.0.0.1:5672
    username: admin
    password: admin
    virtual-host: /
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/demo
    username: root
    password: root

1.5. 定义常量

public class RabbitTransactionProducerDemoConstants {
    public static final String EXCHANGE = "spring-rabbit-transaction-producer-demo-exchange";
    public static final String QUEUE = "spring-rabbit-transaction-producer-demo-queue";
}

1.6. 配置交换机和队列

@Bean
public FanoutExchange exchange() {
    return ExchangeBuilder.fanoutExchange(EXCHANGE).durable(true).build();
}

@Bean
public Queue queue() {
    return QueueBuilder.durable(QUEUE).build();
}

@Bean
public Binding binding() {
    return BindingBuilder.bind(queue()).to(exchange());
}

1.7. 定义 RabbitMQ 消息事务管理器

@Bean(name = "rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}

1.8. 定义 RabbitTemplate

@Bean
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate();
    configurer.configure(template, connectionFactory);
    //Channel 启用事务
    template.setChannelTransacted(true);
    return template;
}

1.9. 定义数据库事务管理器

@Bean(name = "dataSourceTransactionManager")
@Primary
DataSourceTransactionManager dataSourceTransactionManager(DataSource dataSource, ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
    DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(dataSource);
    transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(transactionManager));
    return transactionManager;
}

1.10. 测试

这里使用 Spring 事务来管理消息事务,由于不是 XA 事务,消息事务回滚时,数据库事务并不会回滚,因此需要手动管理数据库事务。

@RestController
@Slf4j
public class SpringRabbitTransactionProducerDemo {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Resource
    private JdbcTemplate jdbcTemplate;

    @Resource
    private TransactionDefinition transactionDefinition;

    @Resource
    private DataSourceTransactionManager dataSourceTransactionManager;

    //Spring 消息事务,指定事务管理器为 RabbitTransactionManager
    @Transactional(rollbackFor = Throwable.class, transactionManager = "rabbitTransactionManager")
    @GetMapping("/transaction")
    public void transaction() {
        String name = "Jason";
        //发送一条消息,接下来因数据库插入数据异常,消息事务回滚,并不会真正把消息发送出去
        rabbitTemplate.convertAndSend(EXCHANGE, null, name);
        //手动开启数据库事务
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        try {
            //往数据库表插入两条主键 id 一样的数据,引起主键 id 重复异常
            jdbcTemplate.update("INSERT INTO t_user (id, name) VALUES (1, ?)", ps -> ps.setString(1, name));
            jdbcTemplate.update("INSERT INTO t_user (id, name) VALUES (1, ?)", ps -> ps.setString(1, name));
            //手动提交数据库事务,因上面插入数据异常,并不会执行到这里
            dataSourceTransactionManager.commit(transactionStatus);
        } catch (Throwable throwable) {
            //捕获异常,手动回滚数据库事务
            dataSourceTransactionManager.rollback(transactionStatus);
            //抛出异常,让 Spring 回滚 RabbitMQ 消息事务
            throw throwable;
        }
    }
}

启动程序,使用 Postman 等工具调用该 HTTP 接口,将抛出主键重复异常,数据库事务回滚,并不会真正在数据库表中插入数据;消息事务回滚,RabbitMQ 队列也不会有消息。

1.11. 参考资料

  • Spring 官方文档
  • Distributed transactions in Spring, with and without XA