SpringBoot 如何事务消息与最终一致性(荣耀典藏版)
大家好我是月夜枫。在微服务架构中分布式事务是一个常见挑战尤其是在使用 Spring Boot 构建的应用中。事务消息和最终一致性是实现分布式事务处理的一种策略。下面我们将详细探讨这两种概念以及如何在 Spring Boot 应用程序中实现它们。在分布式系统中数据一致性是一个永恒的话题。想象一下这样的场景用户下单后订单数据成功写入数据库但通知库存服务扣减库存的消息却发送失败...支付成功后账户余额已扣除但订单状态却还是待支付...这些问题会导致数据不一致给业务带来巨大风险今天就带大家深入理解SpringBoot 中的消息事务机制掌握事务消息与最终一致性的实现方案准备好了吗Lets Go一、什么是事务消息1.1 事务消息的概念事务消息是指在发送消息的同时确保消息发送与本地事务的原子性要么本地事务和消息发送都成功要么两者都失败不会出现半成功的状态生活中的例子就像你去超市购物结账时需要同时完成两个操作商品从货架上取下本地事务支付成功消息发送如果支付失败商品必须放回货架这就是事务的原子性。1.2 为什么需要事务消息问题场景后果解决方案先更新数据库消息发送失败数据不一致使用事务消息先发送消息数据库更新失败消息重复消费使用事务消息网络分区导致消息丢失业务中断使用事务消息 重试二、分布式事务的 CAP 理论在分布式系统中我们无法同时满足以下三个特性一致性Consistency所有节点的数据保持一致可用性Availability系统始终可用分区容错性Partition tolerance网络分区时系统仍能运行CAP 权衡CP 系统牺牲可用性保证一致性如银行转账AP 系统牺牲一致性保证可用性如电商秒杀2.2 最终一致性最终一致性是分布式系统中最常用的一致性模型允许数据在一段时间内不一致但最终会达到一致状态通过异步消息传递实现三、SpringBoot 事务消息实现方案3.1 方案一本地消息表模式核心思想在本地数据库中创建消息表记录待发送的消息通过定时任务重试发送。实现步骤第一步创建消息表CREATE TABLE message_record ( id BIGINT AUTO_INCREMENT PRIMARY KEY, business_key VARCHAR(64) NOT NULL COMMENT 业务ID, topic VARCHAR(128) NOT NULL COMMENT 消息主题, message TEXT NOT NULL COMMENT 消息内容, status TINYINT NOT NULL DEFAULT 0 COMMENT 状态0-待发送 1-已发送 2-失败, retry_count INT DEFAULT 0 COMMENT 重试次数, create_time DATETIME DEFAULT CURRENT_TIMESTAMP, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_business_key (business_key) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4;第二步创建消息实体Data Entity Table(name message_record) public class MessageRecord { Id GeneratedValue(strategy GenerationType.IDENTITY) private Long id; Column(name business_key, nullable false, unique true) private String businessKey; Column(nullable false) private String topic; Column(columnDefinition TEXT, nullable false) private String message; Column(nullable false) private Integer status 0; Column(name retry_count) private Integer retryCount 0; Column(name create_time) private LocalDateTime createTime; Column(name update_time) private LocalDateTime updateTime; }第三步发送消息本地事务Service public class OrderService { private final OrderRepository orderRepository; private final MessageRecordRepository messageRecordRepository; private final KafkaTemplateString, String kafkaTemplate; public OrderService(OrderRepository orderRepository, MessageRecordRepository messageRecordRepository, KafkaTemplateString, String kafkaTemplate) { this.orderRepository orderRepository; this.messageRecordRepository messageRecordRepository; this.kafkaTemplate kafkaTemplate; } /** * 创建订单本地事务方式 */ Transactional public void createOrder(Order order) { try { // 1. 保存订单到数据库 orderRepository.save(order); // 2. 保存消息记录到消息表 MessageRecord record new MessageRecord(); record.setBusinessKey(order.getId()); record.setTopic(order-topic); record.setMessage(JSON.toJSONString(order)); record.setStatus(0); // 待发送 messageRecordRepository.save(record); // 3. 尝试发送消息 sendMessage(record); } catch (Exception e) { log.error(创建订单失败: {}, e.getMessage()); throw new RuntimeException(订单创建失败, e); } } private void sendMessage(MessageRecord record) { try { kafkaTemplate.send(record.getTopic(), record.getMessage()).get(); // 发送成功更新状态 record.setStatus(1); // 已发送 messageRecordRepository.save(record); } catch (Exception e) { log.warn(消息发送失败将由定时任务重试: {}, e.getMessage()); // 状态保持为 0等待定时任务重试 } } }第四步定时任务重试Component public class MessageRetryScheduler { private final MessageRecordRepository messageRecordRepository; private final KafkaTemplateString, String kafkaTemplate; private static final int MAX_RETRY_COUNT 3; public MessageRetryScheduler(MessageRecordRepository messageRecordRepository, KafkaTemplateString, String kafkaTemplate) { this.messageRecordRepository messageRecordRepository; this.kafkaTemplate kafkaTemplate; } /** * 每分钟执行一次重试失败的消息 */ Scheduled(fixedRate 60000) public void retryFailedMessages() { // 查询待发送且重试次数未超限的消息 ListMessageRecord records messageRecordRepository.findByStatusAndRetryCountLessThan(0, MAX_RETRY_COUNT); for (MessageRecord record : records) { try { kafkaTemplate.send(record.getTopic(), record.getMessage()).get(); // 发送成功 record.setStatus(1); log.info(消息重试成功: {}, record.getBusinessKey()); } catch (Exception e) { // 发送失败增加重试次数 record.setRetryCount(record.getRetryCount() 1); if (record.getRetryCount() MAX_RETRY_COUNT) { record.setStatus(2); // 标记为失败 log.error(消息重试{}次后失败: {}, MAX_RETRY_COUNT, record.getBusinessKey()); } } messageRecordRepository.save(record); } } }3.2 方案二使用 Kafka 事务消息核心思想利用 Kafka 自身的事务特性实现消息发送与本地事务的原子性。第一步配置 Kafka 事务spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer acks: all retries: 3 transaction-id-prefix: tx- # 开启事务支持 consumer: group-id: order-consumer-group enable-auto-commit: false第二步配置事务管理器Configuration public class KafkaTransactionConfig { Bean public KafkaTransactionManagerString, OrderMessage kafkaTransactionManager( ProducerFactoryString, OrderMessage producerFactory) { return new KafkaTransactionManager(producerFactory); } }第三步使用事务消息Service public class OrderTransactionalService { private final OrderRepository orderRepository; private final KafkaTemplateString, OrderMessage kafkaTemplate; private static final String ORDER_TOPIC order-topic; public OrderTransactionalService(OrderRepository orderRepository, KafkaTemplateString, OrderMessage kafkaTemplate) { this.orderRepository orderRepository; this.kafkaTemplate kafkaTemplate; } /** * 事务性创建订单 * 使用 Kafka 事务保证数据库操作和消息发送的原子性 */ Transactional(transactionManager kafkaTransactionManager) public void createOrderTransactional(Order order) { try { // 1. 保存订单到数据库 orderRepository.save(order); // 2. 创建订单消息 OrderMessage message OrderMessage.builder() .orderId(order.getId()) .userId(order.getUserId()) .amount(order.getAmount()) .createTime(LocalDateTime.now()) .build(); // 3. 发送消息到 Kafka在事务中 kafkaTemplate.send(ORDER_TOPIC, message); log.info(订单创建成功消息已发送: {}, order.getId()); } catch (Exception e) { log.error(订单创建失败事务回滚: {}, e.getMessage()); throw new RuntimeException(订单创建失败, e); } } }3.3 方案三使用 RocketMQ 事务消息核心思想RocketMQ 提供了完善的分布式事务消息支持采用半消息机制。执行流程1. 发送半消息Half Message 2. 执行本地事务 3. 根据事务结果提交或回滚消息 4. 如果超时未收到确认RocketMQ 会主动回查事务状态代码实现第一步添加依赖dependency groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-spring-boot-starter/artifactId version2.2.3/version /dependency第二步配置 RocketMQrocketmq: name-server: localhost:9876 producer: group: order-producer-group第三步实现事务监听器Component public class OrderTransactionListener implements RocketMQLocalTransactionListener { private final OrderRepository orderRepository; public OrderTransactionListener(OrderRepository orderRepository) { this.orderRepository orderRepository; } /** * 执行本地事务 */ Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { try { // 解析消息内容 String body new String((byte[]) message.getBody()); Order order JSON.parseObject(body, Order.class); // 执行本地事务保存订单 orderRepository.save(order); // 返回提交状态 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error(本地事务执行失败: {}, e.getMessage()); return RocketMQLocalTransactionState.ROLLBACK; } } /** * 回查事务状态消息发送后超时未确认时触发 */ Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { try { String body new String((byte[]) message.getBody()); Order order JSON.parseObject(body, Order.class); // 查询订单是否存在 if (orderRepository.existsById(order.getId())) { return RocketMQLocalTransactionState.COMMIT; } else { return RocketMQLocalTransactionState.ROLLBACK; } } catch (Exception e) { log.error(事务回查失败: {}, e.getMessage()); return RocketMQLocalTransactionState.UNKNOWN; } } }第四步发送事务消息Service public class RocketMQTransactionService { Autowired private RocketMQTemplate rocketMQTemplate; private static final String ORDER_TOPIC order-topic; /** * 发送事务消息 */ public void sendTransactionMessage(Order order) { String messageBody JSON.toJSONString(order); rocketMQTemplate.sendMessageInTransaction( ORDER_TOPIC, MessageBuilder.withPayload(messageBody).build(), order // 传递给本地事务的参数 ); log.info(事务消息已发送: {}, order.getId()); } }四、最终一致性保障策略4.1 消息幂等性什么是幂等性多次执行同一个操作结果相同。实现方式Service public class InventoryService { private final InventoryRepository inventoryRepository; public InventoryService(InventoryRepository inventoryRepository) { this.inventoryRepository inventoryRepository; } /** * 扣减库存保证幂等性 */ Transactional public void deductStock(String orderId, String productId, int quantity) { // 1. 检查是否已经处理过 if (inventoryRepository.existsByOrderId(orderId)) { log.info(订单{}已处理过跳过, orderId); return; } // 2. 查询库存 Inventory inventory inventoryRepository.findByProductId(productId) .orElseThrow(() - new RuntimeException(库存不存在)); // 3. 检查库存是否充足 if (inventory.getQuantity() quantity) { throw new RuntimeException(库存不足); } // 4. 扣减库存 inventory.setQuantity(inventory.getQuantity() - quantity); inventoryRepository.save(inventory); // 5. 记录处理记录保证幂等 InventoryRecord record new InventoryRecord(); record.setOrderId(orderId); record.setProductId(productId); record.setQuantity(quantity); inventoryRepository.save(record); log.info(库存扣减成功: {} - {}, productId, quantity); } }4.2 消息补偿机制补偿流程1. 定期扫描业务表 2. 检查是否有未完成的业务操作 3. 重新发送消息或执行补偿逻辑代码示例Component public class CompensationScheduler { private final OrderRepository orderRepository; private final KafkaTemplateString, OrderMessage kafkaTemplate; public CompensationScheduler(OrderRepository orderRepository, KafkaTemplateString, OrderMessage kafkaTemplate) { this.orderRepository orderRepository; this.kafkaTemplate kafkaTemplate; } /** * 每5分钟检查一次未完成的订单 */ Scheduled(fixedRate 300000) public void compensateUnfinishedOrders() { // 查询已创建但状态未更新的订单10分钟前创建 LocalDateTime threshold LocalDateTime.now().minusMinutes(10); ListOrder orders orderRepository.findByStatusAndCreateTimeBefore( OrderStatus.CREATED, threshold); for (Order order : orders) { try { // 重新发送消息 OrderMessage message OrderMessage.builder() .orderId(order.getId()) .userId(order.getUserId()) .amount(order.getAmount()) .createTime(order.getCreateTime()) .build(); kafkaTemplate.send(order-topic, message).get(); log.info(补偿消息发送成功: {}, order.getId()); } catch (Exception e) { log.error(补偿消息发送失败: {}, order.getId()); } } } }五、实战案例订单创建完整流程5.1 流程图5.2 完整代码示例订单服务RestController RequestMapping(/api/orders) public class OrderController { private final OrderTransactionalService transactionalService; public OrderController(OrderTransactionalService transactionalService) { this.transactionalService transactionalService; } PostMapping public ResponseEntityString createOrder(RequestBody OrderDTO dto) { Order order Order.builder() .id(UUID.randomUUID().toString()) .userId(dto.getUserId()) .productId(dto.getProductId()) .amount(dto.getAmount()) .quantity(dto.getQuantity()) .status(OrderStatus.CREATED) .createTime(LocalDateTime.now()) .build(); transactionalService.createOrderTransactional(order); return ResponseEntity.ok(订单创建成功); } }库存服务消费者Component public class OrderConsumer { private final InventoryService inventoryService; private final OrderService orderService; public OrderConsumer(InventoryService inventoryService, OrderService orderService) { this.inventoryService inventoryService; this.orderService orderService; } KafkaListener(topics order-topic, groupId inventory-group) public void consumeOrderMessage(Payload OrderMessage message, Acknowledgment ack) { try { // 扣减库存 inventoryService.deductStock( message.getOrderId(), product-id, // 从消息中获取产品ID 1 // 购买数量 ); // 更新订单状态 orderService.updateStatus(message.getOrderId(), OrderStatus.PAID); // 确认消息 ack.acknowledge(); log.info(订单处理完成: {}, message.getOrderId()); } catch (Exception e) { log.error(订单处理失败: {}, message.getOrderId()); // 不确认让 Kafka 重新投递 } } } 结尾通过这篇文章我们一起学习了1. ✅ 事务消息的概念保证消息发送与本地事务的原子性2. ✅ CAP 理论与最终一致性分布式系统的一致性策略3. ✅ 三种事务消息方案 • 本地消息表模式 • Kafka 事务消息 • RocketMQ 事务消息4. ✅ 最终一致性保障消息幂等性与补偿机制5. ✅ 完整实战案例订单创建端到端流程互动时刻你在项目中是如何保证分布式事务一致性的遇到过什么有趣的问题欢迎在留言区分享你的经验如果这篇文章对你有帮助别忘了点赞、在看、转发三连支持每天分享更多硬核技术干货