Jorgen's blog Jorgen's blog
首页
  • 平台架构
  • 混合式开发记录
  • 推送服务
  • 数据分析
  • 实时调度
  • 架构思想

    • 分布式
  • 编程框架工具

    • 编程语言
    • 框架
    • 开发工具
  • 数据存储与处理

    • 数据库
    • 大数据
  • 消息、缓存与搜索

    • 消息队列
    • 搜索与日志分析
  • 前端与跨端开发

    • 前端技术
    • Android
  • 系统与运维

    • 操作系统
    • 容器化与 DevOps
  • 物联网与安全

    • 通信协议
    • 安全
    • 云平台
newland
  • 关于我
  • 终身学习
  • 关于时间的感悟
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

jorgen

Love it, make mistakes, learn, keep grinding.
首页
  • 平台架构
  • 混合式开发记录
  • 推送服务
  • 数据分析
  • 实时调度
  • 架构思想

    • 分布式
  • 编程框架工具

    • 编程语言
    • 框架
    • 开发工具
  • 数据存储与处理

    • 数据库
    • 大数据
  • 消息、缓存与搜索

    • 消息队列
    • 搜索与日志分析
  • 前端与跨端开发

    • 前端技术
    • Android
  • 系统与运维

    • 操作系统
    • 容器化与 DevOps
  • 物联网与安全

    • 通信协议
    • 安全
    • 云平台
newland
  • 关于我
  • 终身学习
  • 关于时间的感悟
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 消息队列
  • 主流消息队列产品对比与选型指南
  • 消息队列中的事务性消息:实现可靠业务流程的关键
  • 消息队列事务消息:分布式事务的可靠保障
    • 前言
    • 什么是事务消息?
      • 传统方案 vs 事务消息
    • RocketMQ事务消息实战
      • 核心流程
      • 实现步骤
      • 1. 发送半消息
      • 2. 实现本地事务监听器
      • 3. 消费端处理
    • 关键注意事项
      • 1. 回查机制
      • 2. 死信队列处理
      • 3. 性能优化点
    • 适用场景分析
      • ✅ 强烈推荐场景
      • ❌ 不适用场景
    • 结语
  • 消息队列的事务处理:确保数据一致性的关键
  • 消息队列的事务性处理:从理论到实践
  • 消息队列的事务消息与可靠性保证
  • 消息队列的可靠性与持久化机制
  • 消息队列的可靠性保证:从理论到实践
  • 消息队列的可靠性保证:如何确保消息不丢失、不重复、不乱序
  • 消息队列的可靠性保证:如何确保消息不丢失、不重复
  • 消息队列的可靠性保证与事务消息
  • 消息队列的可靠性保障机制
  • 消息队列的可靠性机制:如何确保消息不丢失、不重复、不乱序
  • 消息队列的性能优化与扩展性-高并发场景下的关键考量
  • 消息队列的安全性防护-构建企业级可靠通信的关键
  • 消息队列的监控与运维-构建可观测性体系的关键
  • 消息队列的架构设计模式-构建高可用系统的关键选择
  • 消息队列的消息路由与过滤机制-构建灵活消息系统的关键
  • 消息队列的测试策略与方法论-构建可靠系统的质量保障
  • 消息队列的集群部署与高可用架构-构建企业级消息系统的基石
  • 消息队列的流处理能力-构建事件驱动架构的核心引擎
  • 消息队列的延迟与死信队列处理-构建健壮消息系统的关键机制
  • 消息队列的消息模式与通信模式-构建灵活系统的基石
  • 消息队列的消息去重与幂等性处理-构建健壮业务系统的关键保障
  • 消息队列的优先级调度机制-构建高效消息处理系统的核心策略
  • 消息队列的容错与故障恢复机制-构建高可用系统的最后一道防线
  • 消息队列的事件溯源与CQRS模式-构建可追溯系统的架构基石
  • 消息队列与微服务架构的集成-构建分布式系统的通信基石
  • 消息队列的消息序列化与数据格式选择-构建高效通信系统的关键决策
  • message_queue
Jorgen
2023-11-15
目录

消息队列事务消息:分布式事务的可靠保障

# 前言

在分布式系统中,我们经常遇到需要跨服务操作的场景:比如创建订单时需要同时扣减库存、支付时需要更新订单状态。传统的两阶段提交协议在分布式环境下简直就是性能杀手 🐢。消息队列的事务消息机制提供了一种优雅的解决方案,既保证了数据一致性,又避免了分布式事务的复杂性。

提示

事务消息的本质是"可靠消息+本地事务"的混合模式,通过消息中间件的状态控制来协调分布式操作。

# 什么是事务消息?

事务消息是消息队列提供的一种特殊消息类型,它将本地事务和消息发送绑定为一个原子操作。当本地事务执行成功时,消息才会被真正投递;如果事务失败,消息会被回滚,确保下游服务不会收到不一致的数据。

# 传统方案 vs 事务消息

方案 一致性 性能 实现复杂度
两阶段提交(2PC) 强一致 低 高
TCC 最终一致 中 极高
事务消息 最终一致 高 中

# RocketMQ事务消息实战

# 核心流程

sequenceDiagram
    participant Producer
    participant Broker
    participant Consumer
    
    Producer->>Broker: 发送半消息(Prepare)
    Broker-->>Producer: 确认接收
    Producer->>Producer: 执行本地事务
    alt 本地事务成功
        Producer->>Broker: 提交事务(Commit)
        Broker->>Consumer: 投递消息
    else 本地事务失败
        Producer->>Broker: 回滚事务(Rollback)
        Broker-->>Producer: 消息丢弃
    end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 实现步骤

# 1. 发送半消息

// 发送半消息
TransactionSendResult result = producer.sendMessageInTransaction(
    new Message("ORDER_TOPIC", "order-123".getBytes()), 
    new OrderTransactionListener(), 
    "order-data"
);
1
2
3
4
5
6

# 2. 实现本地事务监听器

public class OrderTransactionListener implements TransactionListener {
    
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 1. 执行本地业务逻辑
            Order order = parseOrderData(arg);
            orderService.createOrder(order);
            
            // 2. 返回事务状态
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // Broker回查消息状态
        String orderId = new String(msg.getBody());
        if (orderService.existsOrder(orderId)) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 3. 消费端处理

consumer.subscribe("ORDER_TOPIC", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        try {
            // 处理业务逻辑
            Order order = parseOrder(msg.getBody());
            paymentService.processPayment(order);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
});
1
2
3
4
5
6
7
8
9
10
11
12
13

# 关键注意事项

# 1. 回查机制

  • Broker会定期回查未确定状态的消息
  • 需要保证业务逻辑的幂等性
  • 回查超时时间默认15分钟(可通过transactionTimeout配置)

# 2. 死信队列处理

// 配置死信队列参数
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
    // 处理失败后发送到死信队列
    if (failCount > 3) {
        producer.send(new Message("DLQ_TOPIC", msg.getBody()));
        return ConsumeOrderlyStatus.COMMIT;
    }
    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
});
1
2
3
4
5
6
7
8
9
10

# 3. 性能优化点

  • 合理设置transactionTimeout避免长时间阻塞
  • 使用批量消息减少网络IO
  • 对业务关键消息开启retryTimesWhenSendFailed重试机制

# 适用场景分析

# ✅ 强烈推荐场景

  • 订单创建与库存扣减
  • 支付与订单状态变更
  • 跨系统数据同步

# ❌ 不适用场景

  • 对实时性要求极高的场景(如股票交易)
  • 需要强一致性的金融交易
  • 单体应用内的操作

# 结语

事务消息作为分布式事务的"轻量级解决方案",在保证业务最终一致性的同时,提供了优秀的性能表现。通过RocketMQ的事务消息机制,我们能够以较低的成本实现跨服务的可靠操作。

经验总结:在实施事务消息时,一定要做好幂等性设计,并合理配置回查参数。对于关键业务,建议结合本地事务表进行二次兜底。

"分布式系统的本质是管理不确定性,而事务消息是我们手中的一张确定性底牌"

#事务消息#RocketMQ#分布式事务#可靠性
上次更新: 2026/01/28, 14:21:05
消息队列中的事务性消息:实现可靠业务流程的关键
消息队列的事务处理:确保数据一致性的关键

← 消息队列中的事务性消息:实现可靠业务流程的关键 消息队列的事务处理:确保数据一致性的关键→

最近更新
01
LLM
01-30
02
intro
01-30
03
intro
01-30
更多文章>
Theme by Vdoing | Copyright © 2019-2026 Jorgen | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式