Jorgen's blog Jorgen's blog
首页
  • 平台架构
  • 混合式开发记录
  • 推送服务
  • 数据分析
  • 实时调度
  • 架构思想

    • 分布式
  • 编程框架工具

    • 编程语言
    • 框架
    • 开发工具
  • 数据存储与处理

    • 数据库
    • 大数据
  • 消息、缓存与搜索

    • 消息队列
    • 搜索与日志分析
  • 前端与跨端开发

    • 前端技术
    • Android
  • 系统与运维

    • 操作系统
    • 容器化与 DevOps
  • 物联网与安全

    • 通信协议
    • 安全
    • 云平台
newland
  • 关于我
  • 终身学习
  • 关于时间的感悟
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

jorgen

Love it, make mistakes, learn, keep grinding.
首页
  • 平台架构
  • 混合式开发记录
  • 推送服务
  • 数据分析
  • 实时调度
  • 架构思想

    • 分布式
  • 编程框架工具

    • 编程语言
    • 框架
    • 开发工具
  • 数据存储与处理

    • 数据库
    • 大数据
  • 消息、缓存与搜索

    • 消息队列
    • 搜索与日志分析
  • 前端与跨端开发

    • 前端技术
    • Android
  • 系统与运维

    • 操作系统
    • 容器化与 DevOps
  • 物联网与安全

    • 通信协议
    • 安全
    • 云平台
newland
  • 关于我
  • 终身学习
  • 关于时间的感悟
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 消息队列
  • 主流消息队列产品对比与选型指南
  • 消息队列中的事务性消息:实现可靠业务流程的关键
  • 消息队列事务消息:分布式事务的可靠保障
  • 消息队列的事务处理:确保数据一致性的关键
  • 消息队列的事务性处理:从理论到实践
  • 消息队列的事务消息与可靠性保证
  • 消息队列的可靠性与持久化机制
  • 消息队列的可靠性保证:从理论到实践
  • 消息队列的可靠性保证:如何确保消息不丢失、不重复、不乱序
  • 消息队列的可靠性保证:如何确保消息不丢失、不重复
  • 消息队列的可靠性保证与事务消息
    • 前言
    • 消息可靠性概述
    • 消息可靠性保证机制
      • 1. 持久化存储
      • 2. 确认机制
      • 3. 重试机制
      • 4. 消息去重
    • 事务消息
      • 1. 事务消息的基本原理
      • 2. RocketMQ的事务消息实现
      • 3. 基于本地消息表的实现方案
    • 可靠性最佳实践
      • 1. 合理配置持久化
      • 2. 实现幂等性设计
      • 3. 监控与告警
      • 4. 合理的重试策略
    • 结语
  • 消息队列的可靠性保障机制
  • 消息队列的可靠性机制:如何确保消息不丢失、不重复、不乱序
  • 消息队列的性能优化与扩展性-高并发场景下的关键考量
  • 消息队列的安全性防护-构建企业级可靠通信的关键
  • 消息队列的监控与运维-构建可观测性体系的关键
  • 消息队列的架构设计模式-构建高可用系统的关键选择
  • 消息队列的消息路由与过滤机制-构建灵活消息系统的关键
  • 消息队列的测试策略与方法论-构建可靠系统的质量保障
  • 消息队列的集群部署与高可用架构-构建企业级消息系统的基石
  • 消息队列的流处理能力-构建事件驱动架构的核心引擎
  • 消息队列的延迟与死信队列处理-构建健壮消息系统的关键机制
  • 消息队列的消息模式与通信模式-构建灵活系统的基石
  • 消息队列的消息去重与幂等性处理-构建健壮业务系统的关键保障
  • 消息队列的优先级调度机制-构建高效消息处理系统的核心策略
  • 消息队列的容错与故障恢复机制-构建高可用系统的最后一道防线
  • 消息队列的事件溯源与CQRS模式-构建可追溯系统的架构基石
  • 消息队列与微服务架构的集成-构建分布式系统的通信基石
  • 消息队列的消息序列化与数据格式选择-构建高效通信系统的关键决策
  • message_queue
Jorgen
2023-11-15
目录

消息队列的可靠性保证与事务消息

# 前言

在分布式系统中,消息队列扮演着至关重要的角色。它不仅能够实现系统间的解耦,还能提高系统的可伸缩性和容错能力。然而,随着业务复杂度的增加,我们对消息队列的要求也越来越高,尤其是在消息可靠性方面。想象一下,用户支付成功的消息丢失了,那该是多么糟糕的体验

本文将深入探讨消息队列的可靠性保证机制,以及事务消息这一高级特性,帮助大家构建更加健壮的分布式系统。

# 消息可靠性概述

消息可靠性是指消息在传递过程中能够保证"不丢失、不重复、不乱序"的特性。在实际应用中,消息的可靠性往往面临多种挑战:

  1. 生产者发送失败:网络问题导致消息无法发送到消息队列
  2. 消息队列内部故障:消息队列自身宕机或数据丢失
  3. 消费者处理失败:消费者处理消息时发生异常,导致消息未被正确处理
  4. 消费者处理成功但未确认:消费者已处理消息但未发送确认,消息队列认为消息未被处理

为了解决这些问题,现代消息队列系统通常提供多种可靠性保证机制。

# 消息可靠性保证机制

# 1. 持久化存储

持久化是保证消息不丢失的基础。大多数消息队列都支持将消息持久化到磁盘,即使系统崩溃,重启后也能恢复未处理的消息。

RabbitMQ:

// 在发送消息时设置持久化
channel.basicPublish("", "queueName", 
    MessageProperties.PERSISTENT_TEXT_PLAIN, 
    message.getBytes());
1
2
3
4

Kafka:

// Kafka默认就是持久化的,可以通过调整配置优化
properties.put("acks", "all"); // 等待所有副本确认
properties.put("retries", Integer.MAX_VALUE); // 无限重试
1
2
3

# 2. 确认机制

确认机制确保消息被成功传递到消费者。

生产者确认:

  • RabbitMQ: 通过 confirm 机制,生产者可以确认消息是否被成功接收
  • Kafka: 通过 acks 参数控制确认级别

消费者确认:

  • RabbitMQ: 通过手动确认 channel.basicAck()
  • Kafka: 通过自动提交或手动提交 consumer.commitSync()
// RabbitMQ消费者手动确认
channel.basicAck(deliveryTag, false);

// Kafka消费者手动提交
consumer.commitSync();
1
2
3
4
5

# 3. 重试机制

当消费者处理消息失败时,消息队列应该提供重试机制,而不是直接丢弃消息。

RabbitMQ死信队列:

// 设置队列的死信交换器
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
channel.queueDeclare("queueName", false, false, false, args);
1
2
3
4

Kafka重试主题:

// 配置消费者重试
properties.put("max.poll.records", 100);
properties.put("max.poll.interval.ms", 300000);
1
2
3

# 4. 消息去重

在分布式系统中,由于网络问题或消费者重试,同一条消息可能会被多次处理。为了避免重复处理,需要实现消息去重机制。

基于唯一ID去重:

// 消息中包含唯一ID
String uniqueId = UUID.randomUUID().toString();
String message = "{\"id\":\"" + uniqueId + "\",\"content\":\"test\"}";

// 消费者处理前检查ID是否已处理
if (processedIds.contains(uniqueId)) {
    return; // 已处理,直接返回
}
// 处理消息...
processedIds.add(uniqueId);
1
2
3
4
5
6
7
8
9
10

基于Redis去重:

// 使用Redis的SETNX操作
Boolean result = redisTemplate.opsForValue().setIfAbsent("msg:id:" + uniqueId, "1", 24, TimeUnit.HOURS);
if (!result) {
    return; // 已处理,直接返回
}
// 处理消息...
1
2
3
4
5
6

# 事务消息

事务消息是保证分布式系统数据一致性的重要手段。它允许将消息发送和业务操作放在同一个事务中,要么全部成功,要么全部失败。

# 1. 事务消息的基本原理

事务消息通常包含三个阶段:

  1. 发送阶段:发送半消息(暂不可见消息)
  2. 确认阶段:执行本地事务,根据结果确认或回滚半消息
  3. 消费阶段:消费者确认消息处理结果

# 2. RocketMQ的事务消息实现

RocketMQ是目前支持事务消息较为成熟的系统之一。

发送事务消息:

// 创建事务消息监听器
TransactionListener transactionListener = new TransactionListenerImpl();
// 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setTransactionListener(transactionListener);

// 发送事务消息
Message msg = new Message("TopicTest", "TagA", "KEY", "hello world".getBytes());
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
1
2
3
4
5
6
7
8
9

事务监听器实现:

public class TransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        try {
            // 业务逻辑处理...
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 回查本地事务状态
        // 根据业务状态返回COMMIT、ROLLBACK或UNKNOWN
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 3. 基于本地消息表的实现方案

对于不支持事务消息的消息队列(如RabbitMQ),可以通过本地消息表模式实现类似的事务消息功能。

流程:

  1. 开启本地事务
  2. 执行业务操作
  3. 写入消息表
  4. 提交本地事务
  5. 定时任务从消息表发送消息到MQ
  6. 消费者处理消息后更新消息状态

代码示例:

@Transactional
public void placeOrder(Order order) {
    // 1. 执行业务操作
    orderRepository.save(order);
    
    // 2. 写入消息表
    Message message = new Message();
    message.setContent("订单创建成功");
    message.setOrderId(order.getId());
    message.setStatus(MessageStatus.PENDING);
    messageRepository.save(message);
    
    // 3. 提交本地事务
    // 事务由@Transactional注解自动提交或回滚
}

// 定时任务发送消息
@Scheduled(fixedRate = 5000)
public void sendPendingMessages() {
    List<Message> pendingMessages = messageRepository.findByStatus(MessageStatus.PENDING);
    for (Message message : pendingMessages) {
        try {
            rabbitTemplate.convertAndSend("order.exchange", "order.create", message.getContent());
            message.setStatus(MessageStatus.SENT);
            messageRepository.save(message);
        } catch (Exception e) {
            // 处理发送失败
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 可靠性最佳实践

# 1. 合理配置持久化

根据业务需求,合理配置消息持久化级别:

  • 对于关键业务消息,必须启用持久化
  • 对于非关键或高吞吐量场景,可以考虑内存模式,但要做好备份

# 2. 实现幂等性设计

消费者端实现幂等性,确保重复消费不会导致问题:

public void processOrder(String orderId) {
    // 检查订单是否已处理
    if (orderRepository.existsById(orderId)) {
        return;
    }
    
    // 处理订单逻辑
    // ...
    
    // 标记订单已处理
    orderRepository.markAsProcessed(orderId);
}
1
2
3
4
5
6
7
8
9
10
11
12

# 3. 监控与告警

建立完善的监控体系,及时发现消息积压、处理失败等问题:

  • 消息积压监控
  • 消息处理延迟监控
  • 消息失败率监控
  • 消费者健康状态监控

# 4. 合理的重试策略

根据业务特点,制定合理的重试策略:

  • 对于可重试业务:设置最大重试次数和重试间隔
  • 对于不可重试业务:直接进入死信队列,人工介入

# 结语

消息的可靠性是构建健壮分布式系统的基石。通过合理使用持久化、确认机制、重试策略和事务消息等技术,我们可以有效提升消息系统的可靠性。在实际应用中,需要根据业务特点和系统架构选择合适的方案,并在实践中不断优化。

记住,没有银弹,每种方案都有其适用场景和局限性。选择最适合你业务场景的方案,并在实践中不断迭代优化,才是构建可靠系统的王道。

希望本文能帮助大家更好地理解和应用消息队列的可靠性保证机制,构建更加稳定可靠的分布式系统!

#消息队列#可靠性#事务消息#分布式系统
上次更新: 2026/01/28, 13:30:02
消息队列的可靠性保证:如何确保消息不丢失、不重复
消息队列的可靠性保障机制

← 消息队列的可靠性保证:如何确保消息不丢失、不重复 消息队列的可靠性保障机制→

最近更新
01
LLM
01-30
02
intro
01-30
03
intro
01-30
更多文章>
Theme by Vdoing | Copyright © 2019-2026 Jorgen | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式