Jorgen's blog Jorgen's blog
首页
  • 平台架构
  • 混合式开发记录
  • 推送服务
  • 数据分析
  • 实时调度
  • 架构思想

    • 分布式
  • 编程框架工具

    • 编程语言
    • 框架
    • 开发工具
  • 数据存储与处理

    • 数据库
    • 大数据
  • 消息、缓存与搜索

    • 消息队列
    • 搜索与日志分析
  • 前端与跨端开发

    • 前端技术
    • Android
  • 系统与运维

    • 操作系统
    • 容器化与 DevOps
  • 物联网与安全

    • 通信协议
    • 安全
    • 云平台
newland
  • 关于我
  • 终身学习
  • 关于时间的感悟
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

jorgen

Love it, make mistakes, learn, keep grinding.
首页
  • 平台架构
  • 混合式开发记录
  • 推送服务
  • 数据分析
  • 实时调度
  • 架构思想

    • 分布式
  • 编程框架工具

    • 编程语言
    • 框架
    • 开发工具
  • 数据存储与处理

    • 数据库
    • 大数据
  • 消息、缓存与搜索

    • 消息队列
    • 搜索与日志分析
  • 前端与跨端开发

    • 前端技术
    • Android
  • 系统与运维

    • 操作系统
    • 容器化与 DevOps
  • 物联网与安全

    • 通信协议
    • 安全
    • 云平台
newland
  • 关于我
  • 终身学习
  • 关于时间的感悟
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 消息队列
  • 主流消息队列产品对比与选型指南
  • 消息队列中的事务性消息:实现可靠业务流程的关键
  • 消息队列事务消息:分布式事务的可靠保障
  • 消息队列的事务处理:确保数据一致性的关键
    • 前言
    • 消息队列与事务的挑战
    • 解决方案
      • 方案一:本地消息表(本地事务表)
      • 方案二:事务消息(RocketMQ方案)
      • 方案三:可靠消息最终一致性(基于TCC)
    • 最佳实践
    • 结语
  • 消息队列的事务性处理:从理论到实践
  • 消息队列的事务消息与可靠性保证
  • 消息队列的可靠性与持久化机制
  • 消息队列的可靠性保证:从理论到实践
  • 消息队列的可靠性保证:如何确保消息不丢失、不重复、不乱序
  • 消息队列的可靠性保证:如何确保消息不丢失、不重复
  • 消息队列的可靠性保证与事务消息
  • 消息队列的可靠性保障机制
  • 消息队列的可靠性机制:如何确保消息不丢失、不重复、不乱序
  • 消息队列的性能优化与扩展性-高并发场景下的关键考量
  • 消息队列的安全性防护-构建企业级可靠通信的关键
  • 消息队列的监控与运维-构建可观测性体系的关键
  • 消息队列的架构设计模式-构建高可用系统的关键选择
  • 消息队列的消息路由与过滤机制-构建灵活消息系统的关键
  • 消息队列的测试策略与方法论-构建可靠系统的质量保障
  • 消息队列的集群部署与高可用架构-构建企业级消息系统的基石
  • 消息队列的流处理能力-构建事件驱动架构的核心引擎
  • 消息队列的延迟与死信队列处理-构建健壮消息系统的关键机制
  • 消息队列的消息模式与通信模式-构建灵活系统的基石
  • 消息队列的消息去重与幂等性处理-构建健壮业务系统的关键保障
  • 消息队列的优先级调度机制-构建高效消息处理系统的核心策略
  • 消息队列的容错与故障恢复机制-构建高可用系统的最后一道防线
  • 消息队列的事件溯源与CQRS模式-构建可追溯系统的架构基石
  • 消息队列与微服务架构的集成-构建分布式系统的通信基石
  • 消息队列的消息序列化与数据格式选择-构建高效通信系统的关键决策
  • message_queue
Jorgen
2023-11-15
目录

消息队列的事务处理:确保数据一致性的关键

# 前言

在分布式系统中,消息队列扮演着至关重要的角色,它帮助我们实现系统解耦、异步处理和流量削峰。然而,当我们把消息队列引入事务处理场景时,事情就变得复杂起来了。🤔

我曾在一个电商项目中遇到过这样的难题:用户下单后需要同时扣减库存和发送订单通知,但这两个操作分布在不同的服务中。如何确保这两个操作的原子性?如果扣减库存成功但发送消息失败,或者相反,都会导致数据不一致。今天,我想和大家聊聊消息队列的事务处理,这个在分布式系统中既基础又关键的话题。

提示

消息队列的事务处理本质上是解决分布式环境下的数据一致性问题,它需要我们在保证最终一致性的同时,尽可能提高系统的可用性和性能。

# 消息队列与事务的挑战

在单机应用中,我们可以使用数据库事务来保证多个操作的原子性。例如:

@Transactional
public void placeOrder(Order order) {
    // 扣减库存
    inventoryService.reduceStock(order.getProductId(), order.getQuantity());
    // 保存订单
    orderRepository.save(order);
}
1
2
3
4
5
6
7

这段代码确保了"扣减库存"和"保存订单"要么都成功,要么都失败。🏗

然而,当我们引入消息队列后,情况就变得复杂了:

@Transactional
public void placeOrder(Order order) {
    // 扣减库存
    inventoryService.reduceStock(order.getProductId(), order.getQuantity());
    // 发送订单创建消息
    messageQueue.send("order.created", order);
}
1
2
3
4
5
6
7

这里存在几个潜在问题:

  1. 本地事务与消息发送的原子性问题:如果消息发送失败,但数据库事务已经提交,会导致数据不一致。
  2. 消息重复消费问题:如果消息已经发送成功,但消费者处理失败,重试时可能导致重复处理。
  3. 消息丢失问题:如果消息发送后,消费者处理前,系统崩溃可能导致消息丢失。

# 解决方案

# 方案一:本地消息表(本地事务表)

这是最经典的一种解决方案,核心思想是"将消息发送也纳入本地事务管理"。

实现步骤:

  1. 创建一个本地消息表,用于存储待发送的消息。
  2. 在同一个事务中,执行业务操作和插入消息记录。
  3. 事务提交后,通过定时任务或消息发送服务将消息发送到消息队列。
  4. 消息发送成功后,更新消息表的状态。
CREATE TABLE local_message (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    business_id VARCHAR(64) NOT NULL,
    message_content TEXT NOT NULL,
    status TINYINT NOT NULL DEFAULT 0, -- 0:待发送 1:已发送 2:发送失败
    create_time DATETIME NOT NULL,
    update_time DATETIME NOT NULL
);
1
2
3
4
5
6
7
8
@Transactional
public void placeOrder(Order order) {
    // 1. 执行业务操作
    inventoryService.reduceStock(order.getProductId(), order.getQuantity());
    orderRepository.save(order);
    
    // 2. 插入本地消息
    LocalMessage message = new LocalMessage();
    message.setBusinessId(order.getId());
    message.setMessageContent(JSON.toJSONString(order));
    message.setStatus(LocalMessageStatus.PENDING);
    localMessageRepository.save(message);
}

// 定时任务发送消息
@Scheduled(fixedRate = 5000)
public void sendPendingMessages() {
    List<LocalMessage> messages = localMessageRepository.findByStatus(LocalMessageStatus.PENDING);
    for (LocalMessage message : messages) {
        try {
            messageQueue.send("order.created", JSON.parseObject(message.getMessageContent(), Order.class));
            message.setStatus(LocalMessageStatus.SENT);
            localMessageRepository.save(message);
        } catch (Exception e) {
            message.setStatus(LocalMessageStatus.FAILED);
            localMessageRepository.save(message);
            log.error("发送消息失败", e);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

优点:

  • 实现简单,不需要额外组件
  • 保证了本地事务与消息发送的最终一致性

缺点:

  • 需要额外维护本地消息表
  • 消息发送有延迟,不适合实时性要求高的场景
  • 定时任务可能成为系统瓶颈

# 方案二:事务消息(RocketMQ方案)

RocketMQ提供了事务消息机制,可以很好地解决这个问题。

工作流程:

  1. 发送方发送一条半消息(消息暂存到MQ服务器,但消费者不可见)
  2. 发送方执行本地事务
  3. 根据本地事务执行结果,向MQ服务器确认提交或回滚消息
  4. 如果MQ服务器长时间未收到确认,会向发送方发起回查
// 发送事务消息
TransactionMQProducer producer = new TransactionMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            Order order = (Order) arg;
            // 执行本地事务
            inventoryService.reduceStock(order.getProductId(), order.getQuantity());
            orderRepository.save(order);
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            log.error("本地事务执行失败", e);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 回查本地事务状态
        String orderId = msg.getTags();
        Order order = orderRepository.findById(orderId);
        if (order != null) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
});
producer.start();

// 发送事务消息
public void sendOrderTransactionMessage(Order order) {
    Message message = new Message("order_topic", "order_created", 
        JSON.toJSONString(order).getBytes(RemotingHelper.DEFAULT_CHARSET));
    TransactionSendResult result = producer.sendMessageInTransaction(message, order);
    log.info("事务消息发送结果: {}", result);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

优点:

  • 保证了消息发送与本地事务的原子性
  • 不需要额外维护本地消息表
  • 消息发送及时,无延迟

缺点:

  • 仅支持RocketMQ
  • 实现相对复杂,需要处理事务回查逻辑

# 方案三:可靠消息最终一致性(基于TCC)

对于不支持事务消息的MQ(如RabbitMQ),可以采用基于TCC(Try-Confirm-Cancel)的可靠消息最终一致性方案。

工作流程:

  1. Try阶段:执行业务操作,但不提交事务,同时发送预消息
  2. Confirm阶段:确认消息发送成功,提交业务事务
  3. Cancel阶段:如果消息发送失败,回滚业务事务
public class OrderService {
    
    @Autowired
    private InventoryService inventoryService;
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private MessageQueue messageQueue;
    
    @Autowired
    private TransactionTemplate transactionTemplate;
    
    public void placeOrderWithTCC(Order order) {
        // Try阶段
        boolean tryResult = transactionTemplate.execute(status -> {
            try {
                // 执行业务操作但不提交
                inventoryService.reduceStockForTry(order.getProductId(), order.getQuantity());
                orderRepository.saveForTry(order);
                
                // 发送预消息
                messageQueue.sendPreparedMessage("order.created", order);
                
                return true;
            } catch (Exception e) {
                status.setRollbackOnly();
                return false;
            }
        });
        
        if (tryResult) {
            // Confirm阶段:确认消息发送成功,提交业务事务
            messageQueue.confirmPreparedMessage("order.created", order);
            transactionTemplate.execute(status -> {
                inventoryService.confirmReduceStock(order.getProductId(), order.getQuantity());
                orderRepository.confirmSave(order);
                return null;
            });
        } else {
            // Cancel阶段:回滚业务事务
            transactionTemplate.execute(status -> {
                inventoryService.cancelReduceStock(order.getProductId(), order.getQuantity());
                orderRepository.cancelSave(order);
                return null;
            });
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

优点:

  • 适用于任何MQ
  • 实现了最终一致性
  • 业务逻辑清晰

缺点:

  • 实现复杂,需要维护多个状态
  • 性能开销较大

# 最佳实践

在实际项目中,我总结了几条关于消息队列事务处理的最佳实践:

  1. 根据业务场景选择合适的方案:

    • 对于实时性要求不高的场景,可以使用本地消息表方案
    • 如果使用RocketMQ,优先选择事务消息方案
    • 对于复杂业务场景,可以考虑TCC方案
  2. 处理消息重复消费:

    • 使用消息的唯一标识(如业务ID)实现幂等处理
    • 在消费者端实现幂等逻辑
public void handleOrderCreated(String orderId) {
    // 检查是否已处理过
    if (processedOrderRepository.existsById(orderId)) {
        log.info("订单{}已处理过,跳过", orderId);
        return;
    }
    
    // 处理订单
    Order order = orderRepository.findById(orderId);
    // ... 处理逻辑
    
    // 记录已处理
    processedOrderRepository.save(new ProcessedOrder(orderId));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
  1. 设置合理的重试策略:
    • 对于可重试的异常,设置指数退避重试
    • 对于不可重试的异常,及时记录并告警
@Retryable(value = {Exception.class}, 
           maxAttempts = 3, 
           backoff = @Backoff(delay = 1000, multiplier = 2))
public void processMessage(Message message) {
    // 消息处理逻辑
}
1
2
3
4
5
6
  1. 监控与告警:
    • 监控消息积压情况
    • 监控消息处理失败率
    • 设置合理的告警阈值

# 结语

消息队列的事务处理是分布式系统中的一个经典问题,没有银弹,每种方案都有其适用场景。我曾经试图找到一种完美的解决方案,后来才明白,工程上的选择往往是在各种约束条件下的权衡。

在实际项目中,我们需要根据业务需求、技术栈和团队能力来选择合适的方案。对于大多数场景,RocketMQ的事务消息已经能够很好地满足需求。而对于其他MQ,可以考虑本地消息表或TCC方案。

最终一致性是分布式系统的基石,而消息队列则是实现最终一致性的重要工具。理解并掌握消息队列的事务处理,将帮助我们构建更加健壮的分布式系统。

希望这篇文章能对大家有所帮助。如果有任何问题或建议,欢迎在评论区交流!😊

#消息队列#事务处理#数据一致性
上次更新: 2026/01/28, 10:42:53
消息队列事务消息:分布式事务的可靠保障
消息队列的事务性处理:从理论到实践

← 消息队列事务消息:分布式事务的可靠保障 消息队列的事务性处理:从理论到实践→

最近更新
01
LLM
01-30
02
intro
01-30
03
intro
01-30
更多文章>
Theme by Vdoing | Copyright © 2019-2026 Jorgen | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式