一、核心概念理解事务消息解决什么问题java复制下载// 分布式事务典型问题本地事务与消息发送的一致性 // 传统方式存在的问题 1. 先发消息后执行本地事务 → 消息发送成功但本地事务失败 2. 先执行本地事务后发消息 → 本地事务成功但消息发送失败RocketMQ事务消息的核心机制text复制下载Producer发送Half消息 → Broker存储Half消息 → 执行本地事务 ↓ Broker等待事务状态回查 ← Producer返回本地事务结果 ↓ 根据结果提交或回滚消息二、两阶段提交详细流程第一阶段发送Half消息java复制下载public class TransactionProducer { public static void main(String[] args) throws Exception { // 1. 创建事务消息生产者 TransactionMQProducer producer new TransactionMQProducer(TransactionProducerGroup); producer.setNamesrvAddr(127.0.0.1:9876); // 2. 设置事务监听器核心 producer.setTransactionListener(new TransactionListener() { /** * 执行本地事务 * param msg Half消息 * param arg 业务参数 * return 本地事务状态 */ Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { // 执行本地数据库事务 boolean success doLocalBusinessTransaction(msg, arg); if (success) { System.out.println(本地事务执行成功提交消息); return LocalTransactionState.COMMIT_MESSAGE; } else { System.out.println(本地事务执行失败回滚消息); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { System.out.println(本地事务执行异常回查); return LocalTransactionState.UNKNOW; } } /** * 事务回查Broker主动查询事务状态 * param msg Half消息 * return 事务状态 */ Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 根据业务ID查询本地事务状态 String transactionId msg.getTransactionId(); boolean status queryLocalTransactionStatus(transactionId); if (status) { System.out.println(事务回查本地事务已提交); return LocalTransactionState.COMMIT_MESSAGE; } else { System.out.println(事务回查本地事务已回滚); return LocalTransactionState.ROLLBACK_MESSAGE; } } }); // 3. 启动生产者 producer.start(); // 4. 发送事务消息 Message msg new Message(TransactionTopic, TagA, Order-001.getBytes(StandardCharsets.UTF_8)); // 设置事务ID关键 msg.setKeys(TXN- System.currentTimeMillis()); // 发送Half消息第一阶段 SendResult sendResult producer.sendMessageInTransaction(msg, // 业务参数会在executeLocalTransaction中传递 new BusinessParam(orderId, 123456, 100.00) ); System.out.println(Half消息发送结果: sendResult.getSendStatus()); } }篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc需要全套面试笔记及答案【点击此处即可/免费获取】三、完整执行时序图text复制下载┌─────────┐ ┌────────┐ ┌────────┐ │ Producer│ │ Broker │ │ 本地DB │ └────┬────┘ └───┬────┘ └────┬───┘ │ 1.发送Half消息 │ │ │───────────────│ │ │ │ │ │ │ 2.存储Half消息 │ │ │───────────────│ │ │ │ │ 3.返回Half成功 │ │ │───────────────│ │ │ │ │ │ 4.执行本地事务 │ │ │────────────────────────────────│ │ │ │ │ 5.返回事务状态 │ │ │───────────────│ │ │ │ │ │ │6.提交/回滚消息 │ │ │───────────────│ │ │ │ │(可能)7.事务回查 │ │ │───────────────│ │ │ │ │ │ 8.返回回查结果 │ │ │───────────────│ │ │ │ │ │ │9.最终提交/回滚 │ │ │───────────────│四、关键配置参数yaml复制下载# Broker端配置 broker.conf: transactionCheckMax: 15 # 最大回查次数 transactionCheckInterval: 60000 # 回查间隔(ms) transactionTimeOut: 6000 # 超时时间(ms) # Producer端配置 producer: checkThreadPoolMinSize: 1 # 回查线程池最小 checkThreadPoolMaxSize: 1 # 回查线程池最大 checkRequestHoldMax: 2000 # 回查请求队列大小五、代码实现最佳实践1. 完整的订单事务示例java复制下载Service public class OrderTransactionService { Resource private OrderMapper orderMapper; Resource private TransactionMQProducer transactionMQProducer; /** * 创建订单事务消息 */ public void createOrderWithTransaction(OrderDTO orderDTO) { // 构建消息 Message msg new Message(ORDER_TOPIC, CREATE, JSON.toJSONBytes(orderDTO)); // 设置业务标识 msg.setKeys(ORDER_ orderDTO.getOrderNo()); msg.putUserProperty(businessType, ORDER_CREATE); // 发送事务消息 SendResult sendResult transactionMQProducer.sendMessageInTransaction( msg, new OrderTransactionArg(orderDTO) ); if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) { throw new RuntimeException(Half消息发送失败); } } /** * 事务监听器实现 */ Component public class OrderTransactionListener implements TransactionListener { Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { OrderTransactionArg transactionArg (OrderTransactionArg) arg; OrderDTO orderDTO transactionArg.getOrderDTO(); try { // 1. 保存订单到数据库 Order order convertToOrder(orderDTO); orderMapper.insert(order); // 2. 扣减库存调用库存服务 boolean deductResult inventoryService.deductStock( orderDTO.getProductId(), orderDTO.getQuantity() ); if (!deductResult) { // 库存不足回滚本地事务 orderMapper.deleteById(order.getId()); return LocalTransactionState.ROLLBACK_MESSAGE; } // 3. 记录事务日志用于回查 transactionLogService.saveTransactionLog( msg.getTransactionId(), ORDER_CREATE, order.getId(), LocalTransactionState.COMMIT_MESSAGE.name() ); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error(订单本地事务执行异常, e); // 记录异常状态 transactionLogService.saveTransactionLog( msg.getTransactionId(), ORDER_CREATE, null, LocalTransactionState.UNKNOW.name() ); return LocalTransactionState.UNKNOW; } } Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 根据事务ID查询事务日志 String transactionId msg.getTransactionId(); TransactionLog log transactionLogService.getByTransactionId(transactionId); if (log null) { // 没有事务记录需要回滚 return LocalTransactionState.ROLLBACK_MESSAGE; } if (COMMIT_MESSAGE.equals(log.getStatus())) { // 事务已提交 return LocalTransactionState.COMMIT_MESSAGE; } else { // 事务需要回滚 return LocalTransactionState.ROLLBACK_MESSAGE; } } } /** * 事务参数封装 */ Data AllArgsConstructor public static class OrderTransactionArg { private OrderDTO orderDTO; } }2. 消费端幂等处理java复制下载Component RocketMQMessageListener( topic ORDER_TOPIC, consumerGroup ORDER_CONSUMER_GROUP ) public class OrderConsumer implements RocketMQListenerMessageExt { Override public void onMessage(MessageExt message) { // 1. 检查消息幂等性 String messageId message.getMsgId(); if (redisTemplate.hasKey(MSG_ messageId)) { log.info(消息已处理跳过: {}, messageId); return; } // 2. 解析消息 OrderDTO orderDTO JSON.parseObject(message.getBody(), OrderDTO.class); // 3. 业务处理 try { // 更新订单状态为已确认 orderService.confirmOrder(orderDTO.getOrderNo()); // 4. 记录已处理消息 redisTemplate.opsForValue().set( MSG_ messageId, PROCESSED, 1, TimeUnit.HOURS ); } catch (Exception e) { log.error(订单处理失败将重试, e); throw new RuntimeException(e); } } }六、面试问题回答要点问题RocketMQ事务消息如何实现二阶段提交回答结构概念解释RocketMQ事务消息通过二阶段提交保证分布式事务的最终一致性核心思想将本地事务和消息发送绑定通过Half消息和状态回查机制实现第一阶段Half消息阶段Producer发送Half消息到BrokerBroker存储但不对Consumer可见Half消息发送成功后执行本地事务本地事务执行结果返回给BrokerCOMMIT、ROLLBACK或UNKNOWN篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc需要全套面试笔记及答案【点击此处即可/免费获取】第二阶段状态确认阶段如果本地事务返回COMMIT/ROLLBACKBroker立即提交/回滚消息如果返回UNKNOWNBroker会发起事务状态回查Producer实现TransactionListener.checkLocalTransaction()进行状态查询关键机制事务状态回查解决网络超时或生产者宕机问题消息幂等性消费端需要处理重复消息超时机制超过配置时间未确认的消息会自动回滚代码示例java复制 下载// 简要展示核心代码结构 producer.setTransactionListener(new TransactionListener() { public LocalTransactionState executeLocalTransaction(...) { // 执行本地业务 } public LocalTransactionState checkLocalTransaction(...) { // 状态回查 } });适用场景订单创建通知库存支付成功发送通知任何需要保证本地事务和消息发送一致性的场景注意事项事务消息不支持定时和批量消息确保checkLocalTransaction方法的幂等性合理配置回查次数和间隔面试加分项提到最大努力通知型事务对比TCC、Saga等分布式事务方案强调消息幂等处理的重要性提及RocketMQ 4.3的事务消息优化这样的回答既展示了理论知识又体现了实际编码能力适合中高级Java岗位面试。