Jorgen's blog Jorgen's blog
首页
  • 平台架构
  • 混合式开发记录
  • 推送服务
  • 数据分析
  • 实时调度
  • 架构思想

    • 分布式
  • 编程框架工具

    • 编程语言
    • 框架
    • 开发工具
  • 数据存储与处理

    • 数据库
    • 大数据
  • 消息、缓存与搜索

    • 消息队列
    • 搜索与日志分析
  • 前端与跨端开发

    • 前端技术
    • Android
  • 系统与运维

    • 操作系统
    • 容器化与 DevOps
  • 物联网与安全

    • 通信协议
    • 安全
    • 云平台
newland
  • 关于我
  • 终身学习
  • 关于时间的感悟
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

jorgen

Love it, make mistakes, learn, keep grinding.
首页
  • 平台架构
  • 混合式开发记录
  • 推送服务
  • 数据分析
  • 实时调度
  • 架构思想

    • 分布式
  • 编程框架工具

    • 编程语言
    • 框架
    • 开发工具
  • 数据存储与处理

    • 数据库
    • 大数据
  • 消息、缓存与搜索

    • 消息队列
    • 搜索与日志分析
  • 前端与跨端开发

    • 前端技术
    • Android
  • 系统与运维

    • 操作系统
    • 容器化与 DevOps
  • 物联网与安全

    • 通信协议
    • 安全
    • 云平台
newland
  • 关于我
  • 终身学习
  • 关于时间的感悟
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 消息队列
  • 主流消息队列产品对比与选型指南
  • 消息队列中的事务性消息:实现可靠业务流程的关键
  • 消息队列事务消息:分布式事务的可靠保障
  • 消息队列的事务处理:确保数据一致性的关键
  • 消息队列的事务性处理:从理论到实践
  • 消息队列的事务消息与可靠性保证
  • 消息队列的可靠性与持久化机制
  • 消息队列的可靠性保证:从理论到实践
  • 消息队列的可靠性保证:如何确保消息不丢失、不重复、不乱序
    • 前言
    • 消息可靠性的三大挑战
      • 消息丢失 🚫
      • 消息重复 🔄
      • 消息乱序 🔄➡️🔄➡️🔄
    • 消息不丢失的解决方案
      • 生产端可靠性保证
      • 1. 使用同步发送+重试机制
      • 2. 本地消息表+定时任务
      • 消息队列端可靠性保证
      • 1. 开启消息持久化
      • 2. 配置合理的存储容量
      • 消费端可靠性保证
      • 1. 消息确认机制
      • 2. 消息处理幂等性
    • 消息不重复的解决方案
      • 唯一消息ID
      • 事务消息
    • 消息不乱序的解决方案
      • 单分区保证有序
      • 全局有序的实现
    • 实战案例:订单系统的消息可靠性保证
      • 场景描述
      • 可靠性设计
      • 1. 消息不丢失
      • 2. 消息不重复
      • 3. 消息不乱序
    • 结语
  • 消息队列的可靠性保证:如何确保消息不丢失、不重复
  • 消息队列的可靠性保证与事务消息
  • 消息队列的可靠性保障机制
  • 消息队列的可靠性机制:如何确保消息不丢失、不重复、不乱序
  • 消息队列的性能优化与扩展性-高并发场景下的关键考量
  • 消息队列的安全性防护-构建企业级可靠通信的关键
  • 消息队列的监控与运维-构建可观测性体系的关键
  • 消息队列的架构设计模式-构建高可用系统的关键选择
  • 消息队列的消息路由与过滤机制-构建灵活消息系统的关键
  • 消息队列的测试策略与方法论-构建可靠系统的质量保障
  • 消息队列的集群部署与高可用架构-构建企业级消息系统的基石
  • 消息队列的流处理能力-构建事件驱动架构的核心引擎
  • 消息队列的延迟与死信队列处理-构建健壮消息系统的关键机制
  • 消息队列的消息模式与通信模式-构建灵活系统的基石
  • 消息队列的消息去重与幂等性处理-构建健壮业务系统的关键保障
  • 消息队列的优先级调度机制-构建高效消息处理系统的核心策略
  • 消息队列的容错与故障恢复机制-构建高可用系统的最后一道防线
  • 消息队列的事件溯源与CQRS模式-构建可追溯系统的架构基石
  • 消息队列与微服务架构的集成-构建分布式系统的通信基石
  • 消息队列的消息序列化与数据格式选择-构建高效通信系统的关键决策
  • message_queue
Jorgen
2023-11-15
目录

消息队列的可靠性保证:如何确保消息不丢失、不重复、不乱序

# 前言

大家好!在上一篇文章中,我们了解了消息队列的基本概念和使用方法。📨 但是,在实际生产环境中,仅仅使用消息队列是远远不够的。今天我想和大家聊聊一个非常重要的话题——消息队列的可靠性保证。

想象一下,如果你的订单系统因为消息丢失导致用户支付成功但订单未创建,或者因为消息重复导致用户被扣款两次,那后果简直不堪设想!😱 这就是为什么我们要深入探讨消息队列的可靠性问题。

提示

消息队列的可靠性是衡量消息系统质量的关键指标,也是构建高可用分布式系统的基础。

在本文中,我将和大家一起探讨如何确保消息在传输过程中不丢失、不重复、不乱序,以及在实际项目中如何实现这些可靠性保证。

# 消息可靠性的三大挑战

# 消息丢失 🚫

消息丢失是消息队列中最常见也是最严重的问题。消息可能在以下环节丢失:

  1. 生产者发送消息时丢失

    • 网络问题导致消息未到达消息队列
    • 生产者端异常导致消息未成功发送
  2. 消息队列内部存储丢失

    • 消息队列节点宕机
    • 存储设备故障
    • 配置不当导致消息未持久化
  3. 消费者处理消息时丢失

    • 消费者处理异常导致消息未确认
    • 消费者崩溃导致正在处理的消息丢失

# 消息重复 🔄

消息重复通常发生在以下场景:

  1. 生产者重试机制导致重复

    • 发送失败后,生产者未收到确认,重试发送
  2. 消息队列重投机制导致重复

    • 消费者处理超时,消息队列重新投递
  3. 网络分区导致重复

    • 网络不稳定,ACK确认未到达生产者

# 消息乱序 🔄➡️🔄➡️🔄

消息乱序主要发生在以下情况:

  1. 分区/队列内的乱序

    • 单个分区内的消息可能因为各种原因乱序
  2. 多分区/队列间的乱序

    • 不同分区/队列间的消息无法保证全局有序

# 消息不丢失的解决方案

# 生产端可靠性保证

在生产端,我们可以通过以下方式确保消息不丢失:

# 1. 使用同步发送+重试机制

// 伪代码示例
for (int i = 0; i < maxRetryTimes; i++) {
    try {
        // 同步发送消息,等待结果
        SendResult result = producer.send(message, syncSendCallback);
        if (result.getStatus() == SendStatus.SEND_OK) {
            break; // 发送成功,退出重试
        }
    } catch (Exception e) {
        // 记录日志
        log.error("发送消息失败,第{}次重试", i + 1, e);
        if (i == maxRetryTimes - 1) {
            // 达到最大重试次数,将消息存入本地数据库或文件系统
            persistToLocalStorage(message);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 2. 本地消息表+定时任务

对于特别重要的消息,可以采用本地消息表+定时任务的方式:

  1. 在业务数据库中创建消息表
  2. 业务操作和消息写入在同一个事务中
  3. 使用定时任务扫描未发送的消息并重试
-- 本地消息表示例
CREATE TABLE local_message (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    message_id VARCHAR(64) NOT NULL,
    topic VARCHAR(64) NOT NULL,
    tags VARCHAR(64),
    keys VARCHAR(64),
    message_body TEXT NOT NULL,
    status TINYINT NOT NULL DEFAULT 0, -- 0:待发送 1:发送成功 2:发送失败
    create_time DATETIME NOT NULL,
    update_time DATETIME NOT NULL,
    retry_count INT NOT NULL DEFAULT 0
);
1
2
3
4
5
6
7
8
9
10
11
12
13

# 消息队列端可靠性保证

消息队列端是保证消息不丢失的关键环节:

# 1. 开启消息持久化

  • 确保消息队列将消息持久化到磁盘
  • 配置合适的刷盘策略(同步刷盘/异步刷盘)
  • 设置合理的副本数,确保数据冗余

# 2. 配置合理的存储容量

  • 根据业务量预估,配置足够的存储空间
  • 设置合适的消息过期时间,避免无限堆积

# 消费端可靠性保证

消费端同样需要采取可靠性措施:

# 1. 消息确认机制

  • 消费者处理完消息后,手动确认
  • 只有确认后,消息队列才会删除消息
// 伪代码示例
consumer.subscribe(topic, (MessageExt message) -> {
    try {
        // 处理消息
        processMessage(message);
        
        // 手动确认
        consumer.ack(message);
        return ConsumeStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        // 处理失败,不确认,消息将重新投递
        log.error("处理消息失败", e);
        return ConsumeStatus.CONSUME_RETRY;
    }
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 2. 消息处理幂等性

确保消息处理具有幂等性,即使重复处理也不会产生副作用:

// 伪代码示例
public void processMessage(MessageExt message) {
    String messageId = message.getMsgId();
    
    // 检查是否已处理过
    if (processedMessageRepository.existsById(messageId)) {
        log.info("消息已处理过,跳过处理: {}", messageId);
        return;
    }
    
    // 处理消息
    doProcess(message);
    
    // 记录已处理
    processedMessageRepository.save(new ProcessedMessage(messageId));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 消息不重复的解决方案

# 唯一消息ID

为每条消息生成唯一ID,消费者在处理前检查是否已处理过该ID:

// 伪代码示例
public void processMessage(MessageExt message) {
    String messageId = message.getMsgId();
    
    // 使用Redis检查是否已处理
    if (redisTemplate.hasKey("processed_message:" + messageId)) {
        log.info("消息已处理过,跳过处理: {}", messageId);
        return;
    }
    
    // 处理消息
    doProcess(message);
    
    // 记录已处理,设置过期时间
    redisTemplate.opsForValue().set(
        "processed_message:" + messageId, 
        "1", 
        24, TimeUnit.HOURS
    );
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 事务消息

一些消息队列(如RocketMQ)支持事务消息:

  1. 生产者发送半消息
  2. 执行本地事务
  3. 根据本地事务结果提交或回滚消息
// 伪代码示例
TransactionSendResult result = rocketMQProducer.sendMessageInTransaction(
    message, 
    new LocalTransactionExecuter() {
        @Override
        public LocalTransactionState execute(Message msg, Object arg) {
            try {
                // 执行本地事务
                executeLocalTransaction();
                return LocalTransactionState.COMMIT_MESSAGE;
            } catch (Exception e) {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
    }
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 消息不乱序的解决方案

# 单分区保证有序

  • 将需要保证顺序的消息发送到同一个分区
  • 根据业务键(如订单ID)确定分区
// 伪代码示例
MessageSelector selector = MessageSelector.byKey(orderId);
Message message = new Message(topic, tags, keys, body);
SendResult result = producer.send(message, selector, hashKey);
1
2
3
4

# 全局有序的实现

如果需要全局有序,可以:

  1. 只使用一个分区(但会降低吞吐量)
  2. 在消费端进行二次排序
// 伪码示例:消费端排序
// 1. 将消息暂存到内存队列
// 2. 按业务顺序排序
// 3. 按顺序处理消息
ConcurrentLinkedQueue<MessageExt> messageQueue = new ConcurrentLinkedQueue<>();

// 消费消息
consumer.subscribe(topic, (MessageExt message) -> {
    messageQueue.add(message);
    return ConsumeStatus.CONSUME_SUCCESS;
});

// 排序处理线程
new Thread(() -> {
    while (true) {
        if (!messageQueue.isEmpty()) {
            // 按业务顺序排序
            List<MessageExt> sortedMessages = sortMessages(messageQueue);
            
            // 按顺序处理
            for (MessageExt message : sortedMessages) {
                processMessage(message);
            }
        }
        Thread.sleep(100);
    }
}).start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 实战案例:订单系统的消息可靠性保证

让我们来看一个电商订单系统的实际案例:

# 场景描述

用户下单后,系统需要:

  1. 创建订单
  2. 扣减库存
  3. 增加积分
  4. 发送通知

# 可靠性设计

# 1. 消息不丢失

@Service
public class OrderService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Transactional
    public Order createOrder(OrderDTO orderDTO) {
        // 1. 创建订单
        Order order = new Order();
        // ... 设置订单属性
        
        // 2. 保存订单到数据库
        orderMapper.insert(order);
        
        // 3. 发送订单创建成功消息
        Message<OrderEvent> message = MessageBuilder.withPayload(new OrderEvent(order))
            .setHeader(RocketMQHeaders.KEYS, order.getOrderNo())
            .build();
            
        // 使用同步发送+重试机制
        SendResult result = null;
        int retryCount = 0;
        while (retryCount < 3) {
            try {
                result = rocketMQTemplate.syncSend("order:create", message, 3000);
                if (result.getSendStatus() == SendStatus.SEND_OK) {
                    break;
                }
            } catch (Exception e) {
                retryCount++;
                if (retryCount >= 3) {
                    // 达到最大重试次数,记录到本地消息表
                    saveToLocalMessageTable(message);
                    throw new RuntimeException("发送订单创建消息失败", e);
                }
            }
        }
        
        return order;
    }
    
    // ... 其他方法
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 2. 消息不重复

@Service
public class StockService {
    
    @Autowired
    private RedisTemplate redisTemplate;
    
    @RocketMQMessageListener(topic = "order:create", consumerGroup = "stock-service")
    public void handleOrderCreated(Message<OrderEvent> message) {
        OrderEvent event = message.getPayload();
        String orderNo = event.getOrder().getOrderNo();
        
        // 检查是否已处理过该订单
        String processedKey = "processed_order:" + orderNo;
        if (redisTemplate.hasKey(processedKey)) {
            log.info("订单{}已处理过,跳过处理", orderNo);
            return;
        }
        
        try {
            // 扣减库存
            deductStock(event.getOrder());
            
            // 标记为已处理
            redisTemplate.opsForValue().set(processedKey, "1", 24, TimeUnit.HOURS);
            
        } catch (Exception e) {
            log.error("处理订单{}失败", orderNo, e);
            throw new RuntimeException("处理订单失败");
        }
    }
    
    // ... 其他方法
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 3. 消息不乱序

@Service
public class PointService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @RocketMQMessageListener(topic = "order:create", consumerGroup = "point-service")
    public void handleOrderCreated(Message<OrderEvent> message) {
        OrderEvent event = message.getPayload();
        Order order = event.getOrder();
        
        // 使用订单号作为消息key,确保同一订单的消息发送到同一队列
        SendResult result = rocketMQTemplate.syncSend(
            "point:add", 
            new PointEvent(order), 
            3000,
            order.getOrderNo() // 设置消息key
        );
        
        if (result.getSendStatus() != SendStatus.SEND_OK) {
            throw new RuntimeException("发送积分消息失败");
        }
    }
    
    // ... 其他方法
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 结语

通过本文的介绍,我们了解了消息队列可靠性的三大挑战(消息丢失、重复、乱序)以及相应的解决方案。在实际项目中,我们需要根据业务场景和系统特点,选择合适的可靠性保证策略。

记住,没有银弹,每种可靠性方案都有其优缺点和适用场景。我们需要在可靠性、性能和复杂度之间找到平衡点。

消息队列的可靠性不是一蹴而就的,它需要从设计、实现到运维的全链路保障。只有深入理解每个环节的潜在风险,才能构建真正可靠的消息系统。

希望本文能对大家在构建高可靠消息系统时有所帮助。如果还有其他问题或建议,欢迎在评论区留言交流!😊

— Jorgen,致力于构建高可用分布式系统

#消息队列#可靠性#系统设计
上次更新: 2026/01/28, 10:42:53
消息队列的可靠性保证:从理论到实践
消息队列的可靠性保证:如何确保消息不丢失、不重复

← 消息队列的可靠性保证:从理论到实践 消息队列的可靠性保证:如何确保消息不丢失、不重复→

最近更新
01
LLM
01-30
02
intro
01-30
03
intro
01-30
更多文章>
Theme by Vdoing | Copyright © 2019-2026 Jorgen | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式