Spring 多线程事务:为什么回滚失效,怎么解决
Transactional加上去单线程没问题一到多线程就废——部分数据入库了部分没回滚还不报错。我第一次碰到这个问题排查了半天最后发现 Spring 事务是基于 ThreadLocal 的子线程根本拿不到主线程的 Connection。这篇文章把这个问题从头到尾讲清楚事务怎么工作的多线程下为什么失效以及三种生产环境能用的方案。一、事务是怎么跑起来的先看一段最普通的代码Service public class OrderService { Transactional(rollbackFor Exception.class) public void createOrder(Order order) { orderMapper.insert(order); accountMapper.deduct(order.getUserId(), order.getAmount()); } }加了Transactionalinsert 和 deduct 就在同一个事务里了。但谁在管这个事务不是你的 OrderService是 Spring 在启动时给它包了一层代理。代理对象长这样Client 调用 ↓ 代理对象Proxy ├── TransactionInterceptor │ ├── 解析 Transactional 的属性 │ ├── 开启事务setAutoCommitfalse │ ├── 反射调用 createOrder() │ └── commit() 或 rollback() └── 真正的 OrderService代理创建过程EnableTransactionManagement这个注解会触发TransactionManagementConfigurationSelector往容器里注册两个东西AutoProxyRegistrar注册InfrastructureAdvisorAutoProxyCreator负责给 Bean 创建代理ProxyTransactionManagementConfiguration注册事务切面相关 Bean然后TransactionAttributeSourcePointcut会扫描所有标注了Transactional的方法// Spring 源码TransactionAttributeSourcePointcut public boolean matches(Method method, Class? targetClass) { if (this.publicMethodsOnly !Modifier.isPublic(method.getModifiers())) { return false; // 非 public 方法直接跳过 } TransactionAttributeSource tas this.transactionAttributeSource; return (tas null || tas.getTransactionAttribute(method, targetClass) ! null); }最后为包含事务方法的 Bean 创建 CGLIB 代理织入TransactionInterceptor。运行时调用链路核心源码简化下来就这些protected Object invokeWithinTransaction(MethodInvocation invocation) { TransactionAttribute txAttr tas.getTransactionAttribute(method, targetClass); PlatformTransactionManager tm getTransactionManager(txAttr); TransactionInfo txInfo createTransactionIfNecessary(tm, txAttr, method); Object retVal; try { retVal invocation.proceed(); // 反射调用目标方法 } catch (Throwable ex) { completeTransactionAfterThrowing(txInfo, ex); // 异常 → 回滚或提交 throw ex; } commitTransactionAfterReturning(txInfo); // 正常 → 提交 return retVal; }最关键的东西ThreadLocal整个事务机制靠TransactionSynchronizationManager撑着它用 ThreadLocal 把 Connection 绑到当前线程public abstract class TransactionSynchronizationManager { private static final ThreadLocalMapObject, Object resources new NamedThreadLocal(Transactional resources); public static void bindResource(Object key, Object value) { MapObject, Object map resources.get(); if (map null) { map new HashMap(); resources.set(map); } map.put(key, value); } public static Object getResource(Object key) { MapObject, Object map resources.get(); if (map null) return null; return map.get(key); } }每个线程有自己独立的 Connection事务自然也是独立的。记住这点后面全跟它有关。二、Transactional 的几个关键属性传播行为用的最多的是前三个传播行为干嘛的什么时候用REQUIRED默认有事务就加入没有就新建90% 的场景REQUIRES_NEW不管有没有都开一个独立事务操作日志、消息记录不想被外层事务影响NESTED嵌套事务用的是 savepoint子操作回滚不影响外层SUPPORTS有就加入没有就非事务跑查询方法NOT_SUPPORTED非事务跑有事务就挂起不需要事务的操作MANDATORY必须在事务里不然抛异常强制要求事务NEVER不能在事务里有就抛异常不允许事务REQUIRED、REQUIRES_NEW、NESTED 三者的区别REQUIRED: ┌── 外层事务 ──────────────────────────┐ │ methodA() → methodB() │ 同一个事务任一回滚全部回滚 │ (共享同一个 Connection) │ └─────────────────────────────────────┘ REQUIRES_NEW: ┌── 外层事务 ──────────┐ │ methodA() │ │ ┌── 新事务 ────────┐│ │ │ methodB() ││ 独立事务互不影响 │ │ (独立Connection) ││ │ └─────────────────┘│ └─────────────────────┘ NESTED: ┌── 外层事务 ──────────────────────────┐ │ methodA() │ │ ┌── Savepoint ───────────────────┐ │ │ │ methodB() │ │ B回滚不影响A │ │ (共享Connection有savepoint) │ │ │ └────────────────────────────────┘ │ └─────────────────────────────────────┘回滚异常Spring 默认只对 RuntimeException 和 Error 回滚checked exception 不管// DefaultTransactionAttribute public boolean rollbackOn(Throwable ex) { return (ex instanceof RuntimeException || ex instanceof Error); }所以Transactional不加rollbackFor抛 IOException 是不会回滚的。rollbackFor Exception.class应该成为默认写法。执行全链路三、多线程事务为什么废了直接看代码Service public class OrderBatchService { Autowired private OrderMapper orderMapper; Autowired private ThreadPoolTaskExecutor executor; Transactional(rollbackFor Exception.class) public void batchCreateOrders(ListOrder orders) { for (Order order : orders) { executor.submit(() - { orderMapper.insert(order); // 子线程执行 }); } if (orders.size() 10) { throw new RuntimeException(批量处理失败); } } }主线程抛异常回滚了子线程的数据已经在库里了。跟踪一下就明白主线程 Thread-main 子线程 Thread-pool-1 ────────────────── ───────────────────── ThreadLocal: {ds: Conn-A} ThreadLocal: {} ← 空的 Conn-A.setAutoCommit(false) 从连接池拿 Connection-B executor.submit(...) Connection-B.autoCommittrue INSERT → 直接提交了 throw RuntimeException rollback Conn-A Conn-B 早就提交了回天无力子线程从自己的 ThreadLocal 里取 Connection取到的是空的Spring 就给它新建一个。新 Connection 的 autoCommit 是 trueSQL 执行完直接入库主线程的事务根本管不着。四、怎么解决方案一编程式事务 CountDownLatch最简单也最常用。每个子线程自己管自己的事务CountDownLatch 等所有线程跑完AtomicBoolean 标记有没有失败的。Service Slf4j public class BatchOrderService { Autowired private PlatformTransactionManager transactionManager; Autowired private OrderMapper orderMapper; Autowired Qualifier(batchExecutor) private ThreadPoolTaskExecutor executor; public BatchResult batchCreateOrders(ListOrder orders) { if (CollectionUtils.isEmpty(orders)) { return BatchResult.success(); } CountDownLatch latch new CountDownLatch(orders.size()); AtomicBoolean hasError new AtomicBoolean(false); ListBatchResultItem results Collections.synchronizedList(new ArrayList()); for (Order order : orders) { executor.submit(() - { DefaultTransactionDefinition def new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); TransactionStatus status transactionManager.getTransaction(def); try { orderMapper.insert(order); orderMapper.insertLog(new OrderLog(order.getId(), 创建成功)); transactionManager.commit(status); results.add(BatchResultItem.success(order.getId())); } catch (Exception e) { hasError.set(true); transactionManager.rollback(status); results.add(BatchResultItem.fail(order.getId(), e.getMessage())); log.error(订单 [{}] 创建失败, order.getId(), e); } finally { latch.countDown(); } }); } try { boolean completed latch.await(30, TimeUnit.SECONDS); if (!completed) { throw new BusinessException(批量处理超时部分订单未完成); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new BusinessException(批量处理被中断); } if (hasError.get()) { ListString failedIds results.stream() .filter(r - !r.isSuccess()) .map(BatchResultItem::getBizId) .collect(Collectors.toList()); throw new BusinessException(部分订单处理失败: failedIds); } return BatchResult.success(results); } }返回结果封装Data public class BatchResult { private boolean success; private ListBatchResultItem items; private String message; public static BatchResult success() { BatchResult r new BatchResult(); r.setSuccess(true); r.setItems(Collections.emptyList()); return r; } public static BatchResult success(ListBatchResultItem items) { BatchResult r new BatchResult(); r.setSuccess(true); r.setItems(items); return r; } } Data AllArgsConstructor public class BatchResultItem { private String bizId; private boolean success; private String message; public static BatchResultItem success(String bizId) { return new BatchResultItem(bizId, true, 成功); } public static BatchResultItem fail(String bizId, String message) { return new BatchResultItem(bizId, false, message); } }这个方案没法做到全回滚——已经 commit 的线程撤不回来。适合日志、通知、数据同步这种丢几条问题不大的场景。方案二手动管理连接最后统一提交或回滚思路是所有子线程先跑跑完先不提交等主线程统一决定。Service Slf4j public class ManualConnectionService { Autowired private DataSource dataSource; Autowired private OrderMapper orderMapper; Autowired Qualifier(batchExecutor) private ThreadPoolTaskExecutor executor; public void batchWithUnifiedCommit(ListOrder orders) { CountDownLatch latch new CountDownLatch(orders.size()); AtomicBoolean hasError new AtomicBoolean(false); ListConnectionHolder holders Collections.synchronizedList(new ArrayList()); for (Order order : orders) { executor.submit(() - { Connection conn null; try { conn DataSourceUtils.getConnection(dataSource); conn.setAutoCommit(false); holders.add(new ConnectionHolder(conn, order.getId())); TransactionSynchronizationManager.bindResource( dataSource, new ConnectionHolder(conn)); orderMapper.insert(order); orderMapper.insertLog(new OrderLog(order.getId(), 处理中)); } catch (Exception e) { hasError.set(true); log.error(订单 [{}] 处理失败, order.getId(), e); } finally { TransactionSynchronizationManager.unbindResource(dataSource); DataSourceUtils.releaseConnection(conn, dataSource); latch.countDown(); } }); } try { latch.await(60, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(批量处理被中断); } for (ConnectionHolder holder : holders) { try (Connection conn holder.getConnection()) { if (hasError.get()) { conn.rollback(); } else { conn.commit(); } } catch (SQLException e) { log.error(连接操作失败: {}, holder.getBizId(), e); } } if (hasError.get()) { throw new RuntimeException(批量处理存在失败项已全部回滚); } } Data AllArgsConstructor private static class ConnectionHolder { private Connection connection; private String bizId; } }说实话这个方案我在线上不敢用。latch.await()期间所有 Connection 都被占着数据量一大连接池就耗尽了。只有数据量小百条以内且对一致性有要求的时候才考虑。方案三本地消息表生产环境最推荐的方案。把批量任务拆成单条消息主事务写入业务数据和消息记录子任务异步消费失败自动重试。先建表CREATE TABLE batch_task_message ( id BIGINT PRIMARY KEY AUTO_INCREMENT, batch_id VARCHAR(64) NOT NULL COMMENT 批次ID, biz_id VARCHAR(64) NOT NULL COMMENT 业务ID, status VARCHAR(16) NOT NULL DEFAULT PENDING COMMENT PENDING/PROCESSING/SUCCESS/FAILED, content TEXT NOT NULL COMMENT 任务内容(JSON), retry_count INT NOT NULL DEFAULT 0 COMMENT 已重试次数, max_retry INT NOT NULL DEFAULT 3 COMMENT 最大重试次数, error_msg VARCHAR(512) COMMENT 错误信息, create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_status (status), INDEX idx_batch_id (batch_id), INDEX idx_biz_id (biz_id) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COMMENT批量任务消息表;主事务写入——业务数据和消息表在同一个事务里要么一起成功要么一起回滚Service public class BatchTaskSubmitService { Autowired private OrderMapper orderMapper; Autowired private BatchTaskMessageMapper messageMapper; Transactional(rollbackFor Exception.class) public String submitBatchTask(ListOrder orders) { String batchId IdUtil.simpleUUID(); for (Order order : orders) { order.setStatus(PENDING); orderMapper.insert(order); BatchTaskMessage message new BatchTaskMessage(); message.setBatchId(batchId); message.setBizId(order.getId().toString()); message.setContent(JSON.toJSONString(order)); message.setStatus(PENDING); messageMapper.insert(message); } return batchId; } }异步处理定时轮询Service Slf4j public class BatchTaskProcessService { Autowired private BatchTaskMessageMapper messageMapper; Autowired private OrderMapper orderMapper; Autowired private PlatformTransactionManager transactionManager; Autowired Qualifier(batchExecutor) private ThreadPoolTaskExecutor executor; Scheduled(fixedDelay 3000) public void processPendingTasks() { ListBatchTaskMessage tasks messageMapper.selectPendingTasks(100); if (tasks.isEmpty()) return; CountDownLatch latch new CountDownLatch(tasks.size()); for (BatchTaskMessage task : tasks) { executor.submit(() - { DefaultTransactionDefinition def new DefaultTransactionDefinition(); TransactionStatus status transactionManager.getTransaction(def); try { messageMapper.updateStatus(task.getId(), PROCESSING, null); Order order JSON.parseObject(task.getContent(), Order.class); order.setStatus(COMPLETED); orderMapper.updateById(order); messageMapper.updateStatus(task.getId(), SUCCESS, null); transactionManager.commit(status); } catch (Exception e) { transactionManager.rollback(status); int retryCount task.getRetryCount() 1; if (retryCount task.getMaxRetry()) { messageMapper.updateStatus(task.getId(), FAILED, e.getMessage().substring(0, Math.min(e.getMessage().length(), 512))); } else { messageMapper.updateRetryCount(task.getId(), retryCount); } } finally { latch.countDown(); } }); } try { latch.await(60, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public BatchTaskProgress getProgress(String batchId) { int total messageMapper.countByBatchId(batchId); int success messageMapper.countByBatchIdAndStatus(batchId, SUCCESS); int failed messageMapper.countByBatchIdAndStatus(batchId, FAILED); int pending total - success - failed; return new BatchTaskProgress(batchId, total, success, failed, pending); } }进度查询Data AllArgsConstructor public class BatchTaskProgress { private String batchId; private int total; private int success; private int failed; private int pending; public boolean isCompleted() { return pending 0; } public boolean isAllSuccess() { return total success; } }这个方案的好处是天然支持重试、进度可查、不阻塞连接。代价是多了一张消息表和定时任务但换来的是可靠性。五、方案怎么选维度编程式事务手动管理连接本地消息表一致性最终一致统一提交/回滚最终一致回滚粒度仅失败线程全部连接单条任务复杂度低中中性能高连接被占着中等高失败重试不支持不支持支持进度追踪自己搞自己搞天然支持我的建议日志、通知、数据同步 → 方案一够用对一致性有要求、数据量不大 → 方案二但谨慎用业务数据操作不能丢数据 → 方案三生产首选跨服务跨库 → 上 Seata但说实话大多数项目用不上六、线程池配置批处理线程池有个容易忽略的点线程数不能超过数据库连接池大小。HikariCP 默认连接池才 10 个你开 20 个线程跑批处理一半线程在那等连接。Configuration public class BatchThreadPoolConfig { Bean(batchExecutor) public ThreadPoolTaskExecutor batchExecutor() { int corePoolSize Runtime.getRuntime().availableProcessors(); int maxPoolSize corePoolSize * 2; ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(200); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix(batch-); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); executor.initialize(); return executor; } }CallerRunsPolicy让主线程自己跑被拒绝的任务起到限流作用。别用默认的AbortPolicy任务被拒了直接抛异常前面已经提交的任务不受影响但你不知道有任务没跑。大数据量记得分批public void processLargeBatch(ListOrder allOrders) { int batchSize 50; IterableListOrder batches Iterables.partition(allOrders, batchSize); int batchIndex 0; for (ListOrder batch : batches) { batchIndex; try { batchOrderService.batchCreateOrders(batch); } catch (Exception e) { log.error(第 {} 批处理失败, batchIndex, e); throw e; } } }七、几个容易踩的坑子线程异常被吞掉// 错主线程不知道子线程挂了 executor.submit(() - { try { orderMapper.insert(order); } catch (Exception e) { log.error(失败, e); } latch.countDown(); }); // 对AtomicBoolean 标记 executor.submit(() - { try { orderMapper.insert(order); } catch (Exception e) { hasError.set(true); log.error(失败, e); } finally { latch.countDown(); } });CountDownLatch 无限等待// 错可能永久阻塞 latch.await(); // 对设超时 boolean completed latch.await(30, TimeUnit.SECONDS); if (!completed) { throw new BusinessException(处理超时); }事务注解加错位置// 错Transactional 在主方法上对子线程无效 Transactional(rollbackFor Exception.class) public void batchCreateOrders(ListOrder orders) { for (Order order : orders) { executor.submit(() - orderMapper.insert(order)); } } // 对去掉主方法的注解子线程内用编程式事务 public void batchCreateOrders(ListOrder orders) { for (Order order : orders) { executor.submit(() - { TransactionStatus status transactionManager.getTransaction( new DefaultTransactionDefinition()); try { orderMapper.insert(order); transactionManager.commit(status); } catch (Exception e) { transactionManager.rollback(status); } }); } }这几个坑都是我自己踩过的。尤其是第一个子线程异常被吞掉线上日志看着一切正常数据就是对不上排查了半天才发现。多线程事务的本质是个分布式一致性问题。单机场景下本地消息表已经够用真要跨服务跨库再考虑 Seata。别上来就追求强一致性大多数业务根本不需要。