Jorgen's blog Jorgen's blog
首页
  • 平台架构
  • 混合式开发记录
  • 推送服务
  • 数据分析
  • 实时调度
  • 架构思想

    • 分布式
  • 编程框架工具

    • 编程语言
    • 框架
    • 开发工具
  • 数据存储与处理

    • 数据库
    • 大数据
  • 消息、缓存与搜索

    • 消息队列
    • 搜索与日志分析
  • 前端与跨端开发

    • 前端技术
    • Android
  • 系统与运维

    • 操作系统
    • 容器化与 DevOps
  • 物联网与安全

    • 通信协议
    • 安全
    • 云平台
newland
  • 关于我
  • 终身学习
  • 关于时间的感悟
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

jorgen

Love it, make mistakes, learn, keep grinding.
首页
  • 平台架构
  • 混合式开发记录
  • 推送服务
  • 数据分析
  • 实时调度
  • 架构思想

    • 分布式
  • 编程框架工具

    • 编程语言
    • 框架
    • 开发工具
  • 数据存储与处理

    • 数据库
    • 大数据
  • 消息、缓存与搜索

    • 消息队列
    • 搜索与日志分析
  • 前端与跨端开发

    • 前端技术
    • Android
  • 系统与运维

    • 操作系统
    • 容器化与 DevOps
  • 物联网与安全

    • 通信协议
    • 安全
    • 云平台
newland
  • 关于我
  • 终身学习
  • 关于时间的感悟
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 消息队列
  • 主流消息队列产品对比与选型指南
  • 消息队列中的事务性消息:实现可靠业务流程的关键
  • 消息队列事务消息:分布式事务的可靠保障
  • 消息队列的事务处理:确保数据一致性的关键
  • 消息队列的事务性处理:从理论到实践
  • 消息队列的事务消息与可靠性保证
  • 消息队列的可靠性与持久化机制
  • 消息队列的可靠性保证:从理论到实践
  • 消息队列的可靠性保证:如何确保消息不丢失、不重复、不乱序
  • 消息队列的可靠性保证:如何确保消息不丢失、不重复
  • 消息队列的可靠性保证与事务消息
  • 消息队列的可靠性保障机制
  • 消息队列的可靠性机制:如何确保消息不丢失、不重复、不乱序
  • 消息队列的性能优化与扩展性-高并发场景下的关键考量
  • 消息队列的安全性防护-构建企业级可靠通信的关键
  • 消息队列的监控与运维-构建可观测性体系的关键
  • 消息队列的架构设计模式-构建高可用系统的关键选择
  • 消息队列的消息路由与过滤机制-构建灵活消息系统的关键
  • 消息队列的测试策略与方法论-构建可靠系统的质量保障
  • 消息队列的集群部署与高可用架构-构建企业级消息系统的基石
  • 消息队列的流处理能力-构建事件驱动架构的核心引擎
  • 消息队列的延迟与死信队列处理-构建健壮消息系统的关键机制
  • 消息队列的消息模式与通信模式-构建灵活系统的基石
    • 前言
    • 1. 点对点模式
      • 1.1 工作原理
      • 1.2 适用场景
      • 1.3 实现示例
    • 2. 发布/订阅模式
      • 2.1 工作原理
      • 2.2 适用场景
      • 2.3 实现示例
    • 3. 请求/响应模式
      • 3.1 工作原理
      • 3.2 适用场景
      • 3.3 实现示例
    • 4. 扇出模式
      • 4.1 工作原理
      • 4.2 适用场景
      • 4.3 实现示例
    • 5. 主题模式
      • 5.1 工作原理
      • 5.2 适用场景
      • 5.3 实现示例
    • 6. 工作队列模式
      • 6.1 工作原理
      • 6.2 适用场景
      • 6.3 实现示例
    • 7. 消息模式的选择与组合
      • 7.1 模式选择原则
      • 7.2 模式组合示例
      • 7.2.1 订单处理系统
      • 7.2.2 实时数据分析系统
      • 7.3 设计模式应用
    • 8. 最佳实践与注意事项
      • 8.1 消息设计
      • 8.2 消费者设计
      • 8.3 系统设计
    • 结语
  • 消息队列的消息去重与幂等性处理-构建健壮业务系统的关键保障
  • 消息队列的优先级调度机制-构建高效消息处理系统的核心策略
  • 消息队列的容错与故障恢复机制-构建高可用系统的最后一道防线
  • 消息队列的事件溯源与CQRS模式-构建可追溯系统的架构基石
  • 消息队列与微服务架构的集成-构建分布式系统的通信基石
  • 消息队列的消息序列化与数据格式选择-构建高效通信系统的关键决策
  • message_queue
Jorgen
2026-01-28
目录

消息队列的消息模式与通信模式-构建灵活系统的基石

# 前言

在构建分布式系统和微服务架构时,消息队列扮演着至关重要的角色。它不仅实现了系统间的解耦,还提供了异步通信的能力。然而,要充分利用消息队列的潜力,理解不同的消息模式和通信模式是必不可少的。

提示

消息模式是消息队列设计的核心,它决定了消息如何在生产者和消费者之间流动,以及系统如何响应这些消息。

本文将深入探讨消息队列的几种核心消息模式及其应用场景,帮助您在设计系统时做出更合适的选择。

# 1. 点对点模式

点对点模式是最简单的消息模式,它确保每条消息只被一个消费者处理。

# 1.1 工作原理

在点对点模式中:

  • 每个消息只有一个消费者
  • 生产者发送消息到队列
  • 消费者从队列中获取消息
  • 消息一旦被消费,就从队列中删除
生产者 --> 队列 --> 消费者
1

# 1.2 适用场景

点对点模式适用于以下场景:

  • 任务分配:将任务分配给可用的工作节点
  • 请求处理:确保每个请求只被处理一次
  • 数据处理:批量数据处理任务

# 1.3 实现示例

以RabbitMQ为例,点对点模式通过使用队列实现:

// 生产者
Channel channel = connection.createChannel();
channel.queueDeclare("task_queue", true, false, false, null);
String message = "Hello, RabbitMQ!";
channel.basicPublish("", "task_queue", null, message.getBytes());

// 消费者
channel.basicConsume("task_queue", true, (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println("Received message: " + message);
}, consumerTag -> {});
1
2
3
4
5
6
7
8
9
10
11

# 2. 发布/订阅模式

发布/订阅模式允许一条消息被多个消费者接收,实现消息的广播。

# 2.1 工作原理

在发布/订阅模式中:

  • 一个消息可以被多个消费者接收
  • 生产者发布消息到主题
  • 消息被复制并传递给所有订阅该主题的消费者
生产者 --> 主题 --> 消费者1
                 --> 消费者2
                 --> 消费者3
1
2
3

# 2.2 适用场景

发布/订阅模式适用于以下场景:

  • 事件通知:将系统事件通知多个监听者
  • 数据分发:将相同数据分发到多个处理系统
  • 日志收集:将日志发送到多个存储和分析系统

# 2.3 实现示例

以Kafka为例,发布/订阅模式通过主题和消费者组实现:

// 生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("test-topic", "key", "value"));
producer.close();

// 消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 3. 请求/响应模式

请求/响应模式允许客户端等待服务器的响应,实现同步通信。

# 3.1 工作原理

在请求/响应模式中:

  • 客户端发送请求消息并等待响应
  • 服务器处理请求并发送响应
  • 客户端接收响应并继续处理
客户端 --> 请求队列 --> 服务器
响应队列 <-- 客户端ID <-- 服务器
1
2

# 3.2 适用场景

请求/响应模式适用于以下场景:

  • API调用:需要立即获得结果的API调用
  • 数据查询:需要返回查询结果的请求
  • 同步处理:需要确认操作结果的场景

# 3.3 实现示例

以RabbitMQ为例,请求/响应模式可以通过临时响应队列实现:

// 客户端
String correlationId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
    .Builder()
    .correlationId(correlationId)
    .replyTo(replyQueueName)
    .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// 等待响应
while (true) {
  GetResponse response = channel.basicGet(replyQueueName, true);
  if (response != null) {
    if (response.getProps().getCorrelationId().equals(correlationId)) {
      String replyMessage = new String(response.getBody(), "UTF-8");
      System.out.println("Received reply: " + replyMessage);
      break;
    }
  }
}

// 服务器
channel.basicConsume("rpc_queue", false, (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");
  String replyMessage = "Processed: " + message;
  
  AMQP.BasicProperties props = new AMQP.BasicProperties
      .Builder()
      .correlationId(delivery.getProps().getCorrelationId())
      .build();
  
  channel.basicPublish("", delivery.getProps().getReplyTo(), props, replyMessage.getBytes());
  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# 4. 扇出模式

扇出模式是一种特殊的发布/订阅模式,消息被广播到所有消费者,而不考虑消费者组。

# 4.1 工作原理

在扇出模式中:

  • 生产者发送消息到交换机
  • 交换机将消息复制并转发到所有绑定队列
  • 每个队列有一个独立消费者
生产者 --> 交换机 --> 队列1 --> 消费者1
                     --> 队列2 --> 消费者2
                     --> 队列3 --> 消费者3
1
2
3

# 4.2 适用场景

扇出模式适用于以下场景:

  • 日志处理:将日志发送到多个存储系统
  • 通知系统:将通知发送到多个渠道
  • 数据备份:将数据复制到多个存储位置

# 4.3 实现示例

以RabbitMQ为例,扇出模式通过使用扇出交换机实现:

// 声明扇出交换机
channel.exchangeDeclare("logs", "fanout");

// 生产者
String message = "Log message";
channel.basicPublish("logs", "", null, message.getBytes());

// 消费者1
String queueName1 = channel.queueDeclare().getQueue();
channel.queueBind(queueName1, "logs", "");

// 消费者2
String queueName2 = channel.queueDeclare().getQueue();
channel.queueBind(queueName2, "logs", "");

// 消费者3
String queueName3 = channel.queueDeclare().getQueue();
channel.queueBind(queueName3, "logs", "");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 5. 主题模式

主题模式是一种更灵活的发布/订阅模式,允许消费者基于主题筛选消息。

# 5.1 工作原理

在主题模式中:

  • 消息带有主题标签
  • 消费者订阅特定主题模式
  • 交换机根据主题模式将消息路由到匹配的队列
生产者 --> 主题交换机 --> 队列1(匹配主题1) --> 消费者1
                             --> 队列2(匹配主题2) --> 消费者2
                             --> 队列3(匹配主题3) --> 消费者3
1
2
3

# 5.2 适用场景

主题模式适用于以下场景:

  • 内容分发:基于内容类型分发给不同处理系统
  • 事件过滤:根据事件类型分发给不同处理器
  • 数据路由:根据数据属性分发给不同系统

# 5.3 实现示例

以RabbitMQ为例,主题模式通过使用主题交换机实现:

// 声明主题交换机
channel.exchangeDeclare("topic_logs", "topic");

// 生产者
String routingKey = "quick.orange.rabbit";
channel.basicPublish("topic_logs", routingKey, null, message.getBytes());

// 消费者1 - 匹配所有以"quick.orange."开头的消息
channel.queueDeclare("queue1", false, false, false, null);
channel.queueBind("queue1", "topic_logs", "quick.orange.*");

// 消费者2 - 匹配所有包含".rabbit."的消息
channel.queueDeclare("queue2", false, false, false, null);
channel.queueBind("queue2", "topic_logs", "*.rabbit.*");

// 消费者3 - 匹配所有以"lazy.#"结尾的消息
channel.queueDeclare("queue3", false, false, false, null);
channel.queueBind("queue3", "topic_logs", "lazy.#");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 6. 工作队列模式

工作队列模式允许多个消费者从同一个队列中获取消息,实现负载均衡。

# 6.1 工作原理

在工作队列模式中:

  • 多个消费者连接到同一个队列
  • 消息自动分配给空闲的消费者
  • 可以设置预取计数来控制消费者一次获取的消息数量
生产者 --> 队列 --> 消费者1
                 --> 消费者2
                 --> 消费者3
1
2
3

# 6.2 适用场景

工作队列模式适用于以下场景:

  • 任务处理:将耗时的任务分配给多个工作节点
  • 数据处理:并行处理大量数据
  • 批量操作:执行批量计算或转换

# 6.3 实现示例

以RabbitMQ为例,工作队列模式通过多个消费者连接到同一个队列实现:

// 生产者
for (int i = 0; i < 10; i++) {
    String message = "Task " + i;
    channel.basicPublish("", "work_queue", null, message.getBytes();
}

// 消费者1
channel.basicQos(1); // 一次只处理一条消息
channel.basicConsume("work_queue", false, (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println("Worker 1 received: " + message);
    try {
        // 模拟处理耗时
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {});

// 消费者2
channel.basicQos(1);
channel.basicConsume("work_queue", false, (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println("Worker 2 received: " + message);
    try {
        // 模拟处理耗时
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 7. 消息模式的选择与组合

在实际应用中,往往需要组合多种消息模式来解决复杂问题。

# 7.1 模式选择原则

选择消息模式时,应考虑以下因素:

  • 消息消费者数量:一个还是多个
  • 消息处理方式:同步还是异步
  • 消息路由需求:简单路由还是复杂路由
  • 性能要求:高吞吐量还是低延迟
  • 可靠性要求:是否需要保证消息不丢失

# 7.2 模式组合示例

# 7.2.1 订单处理系统

在一个电商系统中,可以组合多种消息模式:

订单服务 --> 点对点队列 --> 订单处理服务
            |
            发布/订阅 --> 库存服务
            |
            发布/订阅 --> 通知服务
            |
            发布/订阅 --> 分析服务
1
2
3
4
5
6
7

# 7.2.2 实时数据分析系统

数据采集 --> 主题交换机 --> 实时处理队列 --> 实时分析服务
                            |
                            --> 批量处理队列 --> 批量分析服务
                            |
                            --> 告警队列 --> 告警服务
1
2
3
4
5

# 7.3 设计模式应用

消息队列可以与其他设计模式结合使用:

  • 中介者模式:使用消息队列作为中介者,减少组件间的直接依赖
  • 观察者模式:使用发布/订阅模式实现观察者模式
  • 策略模式:通过消息路由实现不同的处理策略
  • 管道模式:通过消息队列连接多个处理阶段

# 8. 最佳实践与注意事项

# 8.1 消息设计

  • 保持消息简单和原子性
  • 包含足够的上下文信息
  • 使用版本控制来处理消息格式变化
  • 考虑消息大小和传输效率

# 8.2 消费者设计

  • 实现幂等性处理
  • 合理处理异常和重试
  • 实现背压机制防止系统过载
  • 考虑消息处理顺序性需求

# 8.3 系统设计

  • 合理设置队列大小和持久化策略
  • 实现监控和告警机制
  • 考虑分区和分片以提高吞吐量
  • 实现消息追踪和调试工具

# 结语

消息模式是消息队列设计的核心,不同的消息模式适用于不同的场景。理解这些模式并正确选择和组合它们,对于构建高效、可扩展的分布式系统至关重要。

在实际应用中,往往需要根据业务需求灵活组合多种消息模式,同时遵循最佳实践来确保系统的稳定性和可靠性。

选择正确的消息模式不仅影响系统的架构设计,还决定了系统的可扩展性、可靠性和性能。在设计阶段就考虑这些因素,可以避免后期的重大重构。

希望本文能够帮助您更好地理解消息队列的消息模式,并在实际项目中做出更合适的设计决策。

#消息队列#通信模式#事件驱动
上次更新: 2026/01/28, 17:04:46
消息队列的延迟与死信队列处理-构建健壮消息系统的关键机制
消息队列的消息去重与幂等性处理-构建健壮业务系统的关键保障

← 消息队列的延迟与死信队列处理-构建健壮消息系统的关键机制 消息队列的消息去重与幂等性处理-构建健壮业务系统的关键保障→

最近更新
01
LLM
01-30
02
intro
01-30
03
intro
01-30
更多文章>
Theme by Vdoing | Copyright © 2019-2026 Jorgen | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式