Jorgen's blog Jorgen's blog
首页
  • 平台架构
  • 混合式开发记录
  • 推送服务
  • 数据分析
  • 实时调度
  • 架构思想

    • 分布式
  • 编程框架工具

    • 编程语言
    • 框架
    • 开发工具
  • 数据存储与处理

    • 数据库
    • 大数据
  • 消息、缓存与搜索

    • 消息队列
    • 搜索与日志分析
  • 前端与跨端开发

    • 前端技术
    • Android
  • 系统与运维

    • 操作系统
    • 容器化与 DevOps
  • 物联网与安全

    • 通信协议
    • 安全
    • 云平台
newland
  • 关于我
  • 终身学习
  • 关于时间的感悟
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

jorgen

Love it, make mistakes, learn, keep grinding.
首页
  • 平台架构
  • 混合式开发记录
  • 推送服务
  • 数据分析
  • 实时调度
  • 架构思想

    • 分布式
  • 编程框架工具

    • 编程语言
    • 框架
    • 开发工具
  • 数据存储与处理

    • 数据库
    • 大数据
  • 消息、缓存与搜索

    • 消息队列
    • 搜索与日志分析
  • 前端与跨端开发

    • 前端技术
    • Android
  • 系统与运维

    • 操作系统
    • 容器化与 DevOps
  • 物联网与安全

    • 通信协议
    • 安全
    • 云平台
newland
  • 关于我
  • 终身学习
  • 关于时间的感悟
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 消息队列
  • 主流消息队列产品对比与选型指南
  • 消息队列中的事务性消息:实现可靠业务流程的关键
  • 消息队列事务消息:分布式事务的可靠保障
  • 消息队列的事务处理:确保数据一致性的关键
  • 消息队列的事务性处理:从理论到实践
  • 消息队列的事务消息与可靠性保证
  • 消息队列的可靠性与持久化机制
  • 消息队列的可靠性保证:从理论到实践
  • 消息队列的可靠性保证:如何确保消息不丢失、不重复、不乱序
  • 消息队列的可靠性保证:如何确保消息不丢失、不重复
    • 前言
    • 消息传递的可靠性挑战
    • 消息不丢失的保证策略
      • 1. 生产端可靠性保证
      • 持久化发送
      • 发送确认机制
      • 重试机制
      • 2. 队列端可靠性保证
      • 队列持久化
      • 消息持久化
      • 副本机制
      • 定期备份
      • 3. 消费端可靠性保证
      • 手动确认机制
      • 消息处理幂等性
    • 消息不重复的保证策略
      • 1. 唯一消息ID
      • 2. 事务消息
      • 3. 去重表
    • 主流消息队列的可靠性特性对比
    • 实践建议
    • 结语
  • 消息队列的可靠性保证与事务消息
  • 消息队列的可靠性保障机制
  • 消息队列的可靠性机制:如何确保消息不丢失、不重复、不乱序
  • 消息队列的性能优化与扩展性-高并发场景下的关键考量
  • 消息队列的安全性防护-构建企业级可靠通信的关键
  • 消息队列的监控与运维-构建可观测性体系的关键
  • 消息队列的架构设计模式-构建高可用系统的关键选择
  • 消息队列的消息路由与过滤机制-构建灵活消息系统的关键
  • 消息队列的测试策略与方法论-构建可靠系统的质量保障
  • 消息队列的集群部署与高可用架构-构建企业级消息系统的基石
  • 消息队列的流处理能力-构建事件驱动架构的核心引擎
  • 消息队列的延迟与死信队列处理-构建健壮消息系统的关键机制
  • 消息队列的消息模式与通信模式-构建灵活系统的基石
  • 消息队列的消息去重与幂等性处理-构建健壮业务系统的关键保障
  • 消息队列的优先级调度机制-构建高效消息处理系统的核心策略
  • 消息队列的容错与故障恢复机制-构建高可用系统的最后一道防线
  • 消息队列的事件溯源与CQRS模式-构建可追溯系统的架构基石
  • 消息队列与微服务架构的集成-构建分布式系统的通信基石
  • 消息队列的消息序列化与数据格式选择-构建高效通信系统的关键决策
  • message_queue
Jorgen
2023-11-15
目录

消息队列的可靠性保证:如何确保消息不丢失、不重复

# 前言

在分布式系统中,消息队列扮演着至关重要的角色。它不仅能够解耦系统组件,还能提高系统的弹性和可扩展性。然而,消息队列最核心的价值在于它能够在系统组件之间可靠地传递消息。想象一下,如果关键业务消息因为系统故障而丢失,将会造成多么严重的后果。

本文将深入探讨如何保证消息队列的可靠性,确保消息在传递过程中不会丢失,也不会被重复处理。

# 消息传递的可靠性挑战

在使用消息队列时,我们主要面临以下几个可靠性挑战:

  1. 消息发送失败:生产者发送消息到队列时可能因为网络问题或系统故障导致发送失败。
  2. 消息丢失:消息已经发送到队列,但因为队列服务器宕机或其他原因导致消息丢失。
  3. 消息重复:由于网络问题或消费者处理失败,同一条消息可能被多次处理。
  4. 消息顺序混乱:在某些场景下,需要保证消息的严格顺序,但多消费者环境下可能导致顺序混乱。

# 消息不丢失的保证策略

# 1. 生产端可靠性保证

# 持久化发送

大多数消息队列都支持消息持久化机制。当消息被标记为持久化后,它会先被写入操作系统的缓存,然后定期刷入磁盘。

// RabbitMQ 示例:发送持久化消息
channel.basicPublish("", "queueName", 
    MessageProperties.PERSISTENT_TEXT_PLAIN, 
    message.getBytes());
1
2
3
4

# 发送确认机制

启用发送确认机制,当消息成功发送到队列后,队列会向生产者发送确认。

// RabbitMQ 示例:启用发送确认
channel.confirmSelect();
// 添加确认监听器
channel.addConfirmListener(...);
1
2
3
4

# 重试机制

对于发送失败的消息,可以实现重试机制,确保最终能够成功发送。

// 伪代码:发送重试机制
public void sendMessageWithRetry(Message message, int maxRetries) {
    int retryCount = 0;
    while (retryCount < maxRetries) {
        try {
            producer.send(message);
            return;
        } catch (Exception e) {
            retryCount++;
            if (retryCount >= maxRetries) {
                // 记录日志或采取其他措施
                log.error("Failed to send message after {} retries", maxRetries);
                return;
            }
            // 指数退避策略
            Thread.sleep((long) (Math.pow(2, retryCount) * 100));
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 2. 队列端可靠性保证

# 队列持久化

确保队列本身是持久化的,这样即使队列服务器重启,队列和其中的消息也不会丢失。

// RabbitMQ 示例:声明持久化队列
channel.queueDeclare("queueName", true, false, false, null);
1
2

# 消息持久化

确保所有重要消息都标记为持久化,这样它们会被写入磁盘而不是仅保存在内存中。

# 副本机制

使用消息队列的副本机制,将消息复制到多个节点,防止单点故障。

# Kafka 示例:配置副本因子
broker.num.replica.fetchers=4
num.replica.fetchers=4
default.replication.factor=3
1
2
3
4

# 定期备份

对于关键消息队列,可以设置定期备份机制,将消息备份到其他存储系统。

# 3. 消费端可靠性保证

# 手动确认机制

使用手动确认机制,确保只有在消息被成功处理后才向队列发送确认。

// RabbitMQ 示例:手动确认消息
channel.basicConsume("queueName", false, new DeliverCallback() {
    @Override
    public void handle(String consumerTag, Delivery delivery) {
        try {
            // 处理消息
            processMessage(delivery.getBody());
            // 手动确认
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        } catch (Exception e) {
            // 处理失败,拒绝消息
            channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
        }
    }
}, consumerTag -> {});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 消息处理幂等性

设计消息处理逻辑时,确保处理过程是幂等的,即同一条消息被处理多次也不会产生不一致的结果。

// 伪代码:消息处理幂等性示例
public void processMessage(Message message) {
    // 检查消息是否已处理
    if (messageService.isProcessed(message.getId())) {
        return;
    }
    
    try {
        // 处理消息
        businessService.handle(message);
        // 标记为已处理
        messageService.markAsProcessed(message.getId());
    } catch (Exception e) {
        // 处理失败
        throw e;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 消息不重复的保证策略

# 1. 唯一消息ID

为每条消息生成唯一ID,消费者在处理前检查是否已处理过该ID的消息。

// 伪代码:基于唯一ID的重复检测
public void processMessageWithUniqueId(Message message) {
    String messageId = message.getId();
    
    // 检查是否已处理
    if (processedMessageCache.contains(messageId)) {
        return;
    }
    
    try {
        // 处理消息
        businessService.handle(message);
        // 记录已处理
        processedMessageCache.add(messageId);
    } catch (Exception e) {
        // 处理失败,不记录已处理
        throw e;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 2. 事务消息

使用支持事务的消息队列,确保消息发送和业务操作在同一个事务中完成。

// RocketMQ 示例:发送事务消息
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("name-server1:9876;name-server2:9876");
producer.start();

// 创建事务监听器
TransactionListener transactionListener = new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        boolean success = executeLocalBusiness(msg);
        // 返回事务状态
        return success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 检查本地事务状态
        boolean success = checkLocalBusinessStatus(msg);
        return success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
    }
};

producer.setTransactionListener(transactionListener);
// 发送事务消息
TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 3. 去重表

在数据库中创建去重表,记录已处理的消息ID。

-- 创建去重表示例
CREATE TABLE message_dedup (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    message_id VARCHAR(64) NOT NULL,
    topic VARCHAR(64) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE KEY uk_message_id_topic (message_id, topic)
);
1
2
3
4
5
6
7
8

# 主流消息队列的可靠性特性对比

特性 RabbitMQ Kafka RocketMQ ActiveMQ
消息持久化 支持 支持 支持 支持
消息确认机制 支持 支持 支持 支持
事务消息 支持 不支持 支持 支持
顺序消息 单队列有序 分区内有序 支持全局顺序 单队列有序
去重机制 不支持 不支持 支持消息去重 不支持
高可用 镜像队列 副本机制 主从架构 主从架构

# 实践建议

  1. 根据业务场景选择合适的可靠性策略:不是所有场景都需要最高级别的可靠性保证,应根据业务需求和成本选择合适的策略。

  2. 实现监控和告警:建立完善的监控体系,及时发现问题并告警。

  3. 定期进行故障演练:定期模拟各种故障场景,验证系统的可靠性。

  4. 做好文档和应急预案:详细记录各种故障的处理流程,制定应急预案。

# 结语

消息队列的可靠性是分布式系统设计中不可忽视的重要环节。通过本文介绍的各种策略,我们可以构建出高可靠性的消息传递系统。然而,需要注意的是,高可靠性往往伴随着更高的复杂性和成本,在实际应用中需要根据业务需求和系统特点进行权衡。

希望本文能够帮助你在设计和实现消息队列系统时,更好地考虑可靠性问题,构建出更加健壮的系统。

"可靠性不是设计出来的,而是通过不断测试和优化实现的。"

#消息队列#可靠性#系统设计
上次更新: 2026/01/28, 13:30:02
消息队列的可靠性保证:如何确保消息不丢失、不重复、不乱序
消息队列的可靠性保证与事务消息

← 消息队列的可靠性保证:如何确保消息不丢失、不重复、不乱序 消息队列的可靠性保证与事务消息→

最近更新
01
LLM
01-30
02
intro
01-30
03
intro
01-30
更多文章>
Theme by Vdoing | Copyright © 2019-2026 Jorgen | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式