2026-02-06 03:34:50 +08:00
|
|
|
|
# 消息队列设计:从系统解耦到高可用架构
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
> 💡 **学习指南**:消息队列是现代分布式系统的"缓冲器"和"解耦器"。本章节从一个核心问题出发:**当系统耦合严重、流量突增时,如何保证核心链路稳定?** 我们将通过真实案例(淘宝订单、秒杀系统)深入理解消息队列的设计哲学和工程实践。
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
在开始之前,建议你先了解以下基础概念:
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
- **同步 vs 异步**:可以先阅读 [Web 基础概念](./web-basics.md) 的「HTTP 请求/响应」部分。
|
|
|
|
|
|
- **分布式系统基础**:如果你还不了解 CAP 定理、一致性等概念,可以先看 [分布式系统入门](./distributed-systems.md)。
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
---
|
2026-02-01 23:42:12 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
## 0. 引言:为什么系统耦合严重时会"一崩全崩"?
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
很多团队在业务快速增长时都会遇到类似的困境:
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
- 一个下游服务挂掉,导致整个订单流程卡住;
|
|
|
|
|
|
- 大促期间流量突增,数据库被瞬间打崩;
|
|
|
|
|
|
- 新增一个业务模块,需要修改十几个上游服务;
|
|
|
|
|
|
- 系统响应越来越慢,用户投诉"转圈圈"。
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
直觉上,我们会以为是:**"服务器不够强"**。
|
|
|
|
|
|
但大多数时候,问题并不在于硬件性能,而在于系统架构的设计——**服务之间耦合太紧,缺乏有效的缓冲和解耦机制**。
|
2026-02-01 23:42:12 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
面对这些挑战,单纯依靠"加机器"已经捉襟见肘。我们需要一种更优雅的架构设计,来在服务之间建立"缓冲带",让系统能够从容应对流量波动和服务故障。这正是**消息队列**试图解决的问题。
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
---
|
2026-02-01 23:42:12 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
## 1. 什么是消息队列?(定义 + 核心三要素)
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
### 1.1 从一个真实案例说起:淘宝订单系统的演进
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
2012 年,淘宝订单系统遭遇了一次严重故障。双 11 零点,流量瞬间涌入,订单服务直接调用库存服务、支付服务、物流服务... 整个链路像多米诺骨牌一样接连倒下。
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**当时的架构(紧耦合):**
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
|
|
|
|
|
```
|
2026-02-06 03:34:50 +08:00
|
|
|
|
用户下单 → 订单服务 → 同步调用库存服务 → 同步调用支付服务 → 同步调用物流服务
|
|
|
|
|
|
↓ ↓ ↓
|
|
|
|
|
|
响应 200ms 响应 500ms 响应 300ms
|
2026-01-19 11:25:10 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**问题:**
|
|
|
|
|
|
- 总响应时间 = 200 + 500 + 300 = 1000ms(用户等 1 秒)
|
|
|
|
|
|
- 库存服务挂了,订单服务也挂
|
|
|
|
|
|
- 支付服务慢了,整个链路被拖慢
|
|
|
|
|
|
- 无法水平扩展,只能垂直加机器
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**改进后的架构(引入消息队列):**
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
```
|
|
|
|
|
|
用户下单 → 订单服务 → 发送"订单创建"消息 → 立即返回(50ms)
|
|
|
|
|
|
↓
|
|
|
|
|
|
消息队列(Kafka)
|
|
|
|
|
|
↓
|
|
|
|
|
|
┌─────────────┬─────────────┬─────────────┐
|
|
|
|
|
|
▼ ▼ ▼ ▼
|
|
|
|
|
|
库存服务 支付服务 物流服务 通知服务
|
|
|
|
|
|
(异步扣减) (异步处理) (异步创建) (异步发送)
|
|
|
|
|
|
```
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**改进后:**
|
|
|
|
|
|
- 用户响应时间 = 50ms(体验提升 20 倍)
|
|
|
|
|
|
- 库存服务挂了,消息暂存队列,恢复后继续处理
|
|
|
|
|
|
- 支付服务慢了,不影响订单创建
|
|
|
|
|
|
- 可以水平扩展消费者,提升处理能力
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
<MQArchitectureDemo />
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
### 1.2 消息队列的核心三要素
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
从上面的案例可以看出,消息队列的本质是**在生产者和消费者之间建立一个"邮局"**。
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
#### 要素一:生产者(Producer)
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**职责**:创建并发送消息到队列。
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**关键设计要点**:
|
|
|
|
|
|
- **发送方式**:同步发送(可靠但阻塞)vs 异步发送(高性能但需处理回调)
|
|
|
|
|
|
- **消息确认**:等待 Broker 确认(At Least Once)vs 发送即忘(At Most Once)
|
|
|
|
|
|
- **失败处理**:重试策略、本地日志备份、死信队列
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
|
|
|
|
|
```python
|
2026-02-06 03:34:50 +08:00
|
|
|
|
# 生产者代码示例(带可靠发送)
|
|
|
|
|
|
class OrderProducer:
|
|
|
|
|
|
def __init__(self, mq_connection):
|
|
|
|
|
|
self.mq = mq_connection
|
|
|
|
|
|
self.pending_messages = [] # 本地待确认消息
|
|
|
|
|
|
|
|
|
|
|
|
def send_order_created(self, order_data):
|
|
|
|
|
|
"""发送订单创建消息(带可靠投递)"""
|
|
|
|
|
|
message = {
|
|
|
|
|
|
"event_type": "order.created",
|
|
|
|
|
|
"order_id": order_data["id"],
|
|
|
|
|
|
"user_id": order_data["user_id"],
|
|
|
|
|
|
"product_id": order_data["product_id"],
|
|
|
|
|
|
"amount": order_data["amount"],
|
|
|
|
|
|
"timestamp": time.time()
|
|
|
|
|
|
}
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
# 1. 先写入本地日志(防止发送失败丢失)
|
|
|
|
|
|
msg_record = {
|
|
|
|
|
|
"id": generate_uuid(),
|
|
|
|
|
|
"status": "PENDING",
|
|
|
|
|
|
"message": message,
|
|
|
|
|
|
"retry_count": 0,
|
|
|
|
|
|
"created_at": time.time()
|
|
|
|
|
|
}
|
|
|
|
|
|
self.save_to_local_log(msg_record)
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 2. 发送消息到 MQ(异步发送 + 回调确认)
|
|
|
|
|
|
self.mq.send_async(
|
|
|
|
|
|
exchange="order.events",
|
|
|
|
|
|
routing_key="order.created",
|
|
|
|
|
|
body=message,
|
|
|
|
|
|
on_success=lambda: self.on_send_success(msg_record["id"]),
|
|
|
|
|
|
on_failure=lambda e: self.on_send_failure(msg_record["id"], e)
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
return {"success": True, "message_id": msg_record["id"]}
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
# 发送异常,等待定时任务重试
|
|
|
|
|
|
logger.error(f"消息发送异常:{e}")
|
|
|
|
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
|
|
|
|
|
|
def on_send_success(self, msg_id):
|
|
|
|
|
|
"""发送成功回调"""
|
|
|
|
|
|
self.update_local_log_status(msg_id, "SENT")
|
|
|
|
|
|
logger.info(f"消息 {msg_id} 发送成功")
|
|
|
|
|
|
|
|
|
|
|
|
def on_send_failure(self, msg_id, error):
|
|
|
|
|
|
"""发送失败回调"""
|
|
|
|
|
|
logger.error(f"消息 {msg_id} 发送失败:{error}")
|
|
|
|
|
|
# 状态保持 PENDING,等待定时任务重试
|
2026-01-19 11:25:10 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
<ProducerConsumerDemo />
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
#### 要素二:消费者(Consumer)
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**职责**:从队列获取消息并处理。
|
2026-02-01 23:42:12 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**关键设计要点**:
|
|
|
|
|
|
- **消费模式**:推模式(Push,Broker 主动推送)vs 拉模式(Pull,消费者主动拉取)
|
|
|
|
|
|
- **消费确认**:自动 ACK(高效但可能丢消息)vs 手动 ACK(可靠但需处理超时)
|
|
|
|
|
|
- **并发控制**:单线程顺序消费 vs 多线程并行消费
|
|
|
|
|
|
- **失败处理**:重试策略、死信队列、补偿机制
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
```python
|
|
|
|
|
|
# 消费者代码示例(带可靠处理)
|
|
|
|
|
|
class OrderConsumer:
|
|
|
|
|
|
def __init__(self, mq_connection, db_connection, redis_client):
|
|
|
|
|
|
self.mq = mq_connection
|
|
|
|
|
|
self.db = db_connection
|
|
|
|
|
|
self.redis = redis_client
|
|
|
|
|
|
self.processing_messages = {} # 正在处理的消息(防重复)
|
|
|
|
|
|
|
|
|
|
|
|
def start(self):
|
|
|
|
|
|
"""启动消费者"""
|
|
|
|
|
|
self.mq.consume(
|
|
|
|
|
|
queue="order.created.queue",
|
|
|
|
|
|
callback=self.handle_message,
|
|
|
|
|
|
prefetch_count=10, # 预取 10 条,平衡吞吐量和内存
|
|
|
|
|
|
auto_ack=False # 手动 ACK,保证可靠性
|
|
|
|
|
|
)
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
def handle_message(self, message):
|
|
|
|
|
|
"""处理消息(带幂等性和异常处理)"""
|
|
|
|
|
|
msg_id = message.headers.get("message_id")
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 1. 幂等性检查:是否正在处理或已处理
|
|
|
|
|
|
if self.is_duplicate(msg_id):
|
|
|
|
|
|
logger.warning(f"消息 {msg_id} 重复,跳过处理")
|
|
|
|
|
|
message.ack() # 确认消息,避免重复投递
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
# 标记为正在处理
|
|
|
|
|
|
self.mark_processing(msg_id)
|
|
|
|
|
|
|
|
|
|
|
|
# 2. 解析消息
|
|
|
|
|
|
event_data = json.loads(message.body)
|
|
|
|
|
|
order_id = event_data["order_id"]
|
|
|
|
|
|
user_id = event_data["user_id"]
|
|
|
|
|
|
product_id = event_data["product_id"]
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"开始处理订单 {order_id}")
|
|
|
|
|
|
|
|
|
|
|
|
# 3. 数据库事务处理
|
|
|
|
|
|
with self.db.transaction() as tx:
|
|
|
|
|
|
# 3.1 扣减库存(使用乐观锁防并发)
|
|
|
|
|
|
stock_result = tx.execute("""
|
|
|
|
|
|
UPDATE products
|
|
|
|
|
|
SET stock = stock - 1
|
|
|
|
|
|
WHERE id = ? AND stock > 0
|
|
|
|
|
|
""", product_id)
|
|
|
|
|
|
|
|
|
|
|
|
if stock_result.affected_rows == 0:
|
|
|
|
|
|
raise InsufficientStockException(f"商品 {product_id} 库存不足")
|
|
|
|
|
|
|
|
|
|
|
|
# 3.2 创建订单
|
|
|
|
|
|
tx.execute("""
|
|
|
|
|
|
INSERT INTO orders (id, user_id, product_id, status, created_at)
|
|
|
|
|
|
VALUES (?, ?, ?, 'CONFIRMED', NOW())
|
|
|
|
|
|
""", order_id, user_id, product_id)
|
|
|
|
|
|
|
|
|
|
|
|
# 3.3 记录库存变动日志
|
|
|
|
|
|
tx.execute("""
|
|
|
|
|
|
INSERT INTO inventory_logs (order_id, product_id, change, created_at)
|
|
|
|
|
|
VALUES (?, ?, -1, NOW())
|
|
|
|
|
|
""", order_id, product_id)
|
|
|
|
|
|
|
|
|
|
|
|
# 4. 更新 Redis 缓存
|
|
|
|
|
|
self.redis.hset(f"order:{order_id}", "status", "CONFIRMED")
|
|
|
|
|
|
|
|
|
|
|
|
# 5. 发送后续事件(如通知服务)
|
|
|
|
|
|
self.mq.publish(
|
|
|
|
|
|
exchange="order.events",
|
|
|
|
|
|
routing_key="order.confirmed",
|
|
|
|
|
|
body={"order_id": order_id, "user_id": user_id}
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# 6. 标记为已处理并确认消息
|
|
|
|
|
|
self.mark_processed(msg_id)
|
|
|
|
|
|
message.ack()
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"订单 {order_id} 处理成功")
|
|
|
|
|
|
|
|
|
|
|
|
except InsufficientStockException as e:
|
|
|
|
|
|
# 库存不足,业务逻辑失败,不重试
|
|
|
|
|
|
logger.warning(f"订单处理失败(库存不足):{e}")
|
|
|
|
|
|
self.mark_processed(msg_id)
|
|
|
|
|
|
message.ack() # 确认消息,避免重复
|
|
|
|
|
|
|
|
|
|
|
|
except RetryableException as e:
|
|
|
|
|
|
# 可重试异常(如网络超时)
|
|
|
|
|
|
logger.error(f"订单处理失败(可重试):{e}")
|
|
|
|
|
|
retry_count = message.headers.get("retry_count", 0)
|
|
|
|
|
|
|
|
|
|
|
|
if retry_count < 3:
|
|
|
|
|
|
# 重新入队,增加重试计数
|
|
|
|
|
|
message.headers["retry_count"] = retry_count + 1
|
|
|
|
|
|
message.nack(requeue=True)
|
|
|
|
|
|
else:
|
|
|
|
|
|
# 超过重试次数,进入死信队列
|
|
|
|
|
|
logger.error(f"消息 {msg_id} 超过最大重试次数,进入死信队列")
|
|
|
|
|
|
self.mark_processed(msg_id)
|
|
|
|
|
|
message.nack(requeue=False)
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
# 未知异常,记录日志并进入死信队列
|
|
|
|
|
|
logger.critical(f"订单处理发生未知异常:{e}", exc_info=True)
|
|
|
|
|
|
self.mark_processed(msg_id)
|
|
|
|
|
|
message.nack(requeue=False)
|
|
|
|
|
|
|
|
|
|
|
|
def is_duplicate(self, msg_id):
|
|
|
|
|
|
"""检查消息是否重复"""
|
|
|
|
|
|
# 1. 检查是否正在处理
|
|
|
|
|
|
if msg_id in self.processing_messages:
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
# 2. 检查 Redis 中是否已处理
|
|
|
|
|
|
return self.redis.sismember("processed_messages", msg_id)
|
|
|
|
|
|
|
|
|
|
|
|
def mark_processing(self, msg_id):
|
|
|
|
|
|
"""标记消息正在处理"""
|
|
|
|
|
|
self.processing_messages[msg_id] = time.time()
|
|
|
|
|
|
|
|
|
|
|
|
def mark_processed(self, msg_id):
|
|
|
|
|
|
"""标记消息已处理"""
|
|
|
|
|
|
self.processing_messages.pop(msg_id, None)
|
|
|
|
|
|
# 存入 Redis Set,设置过期时间(如 24 小时)
|
|
|
|
|
|
self.redis.sadd("processed_messages", msg_id)
|
|
|
|
|
|
self.redis.expire("processed_messages", 24 * 3600)
|
2026-01-19 11:25:10 +08:00
|
|
|
|
```
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
#### 要素三:Broker(消息代理)
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**职责**:接收、存储、转发消息。
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**关键设计要点**:
|
|
|
|
|
|
- **存储模型**:内存存储(低延迟)vs 磁盘存储(高可靠)
|
|
|
|
|
|
- **复制策略**:主从复制、多副本同步
|
|
|
|
|
|
- **高可用机制**:集群部署、自动故障转移
|
|
|
|
|
|
- **扩展性**:分区(Partition)、分片(Sharding)
|
2026-02-01 23:42:12 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
```python
|
|
|
|
|
|
# Broker 配置示例(Kafka 集群配置)
|
|
|
|
|
|
kafka_config = {
|
|
|
|
|
|
# 集群节点配置
|
|
|
|
|
|
"bootstrap.servers": "kafka-1:9092,kafka-2:9092,kafka-3:9092",
|
|
|
|
|
|
|
|
|
|
|
|
# 生产者配置(可靠性优先)
|
|
|
|
|
|
"producer": {
|
|
|
|
|
|
"acks": "all", # 等待所有 ISR 副本确认
|
|
|
|
|
|
"retries": 3, # 发送失败重试 3 次
|
|
|
|
|
|
"retry.backoff.ms": 1000, # 重试间隔 1 秒
|
|
|
|
|
|
"enable.idempotence": True, # 启用幂等性,防止重复发送
|
|
|
|
|
|
"max.in.flight.requests.per.connection": 5, # 允许 5 个未确认请求
|
|
|
|
|
|
"compression.type": "lz4", # 消息压缩,提升吞吐
|
|
|
|
|
|
"batch.size": 16384, # 批量发送大小 16KB
|
|
|
|
|
|
"linger.ms": 10 # 最多等待 10ms 批量发送
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
|
|
# 消费者配置(平衡性能与可靠性)
|
|
|
|
|
|
"consumer": {
|
|
|
|
|
|
"group.id": "order-service-group", # 消费者组 ID
|
|
|
|
|
|
"auto.offset.reset": "earliest", # 无偏移量时从最早开始
|
|
|
|
|
|
"enable.auto.commit": False, # 关闭自动提交,手动 ACK
|
|
|
|
|
|
"max.poll.records": 500, # 单次拉取最大 500 条
|
|
|
|
|
|
"max.poll.interval.ms": 300000, # 消费超时 5 分钟
|
|
|
|
|
|
"session.timeout.ms": 10000, # 会话超时 10 秒
|
|
|
|
|
|
"heartbeat.interval.ms": 3000 # 心跳间隔 3 秒
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
|
|
# Topic 配置(分区与副本)
|
|
|
|
|
|
"topic": {
|
|
|
|
|
|
"order.events": {
|
|
|
|
|
|
"partitions": 12, # 12 个分区,提升并行度
|
|
|
|
|
|
"replication.factor": 3, # 3 个副本,保证高可用
|
|
|
|
|
|
"min.insync.replicas": 2, # 至少 2 个副本同步才算写入成功
|
|
|
|
|
|
"retention.ms": 604800000 # 消息保留 7 天
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
```
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
## 2. 核心问题一:如何解耦系统,避免"牵一发而动全身"?
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
### 2.1 紧耦合的悲剧:一个服务挂了,全盘皆输
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**场景还原**:某电商平台的早期架构
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
|
|
|
|
|
```
|
2026-02-06 03:34:50 +08:00
|
|
|
|
订单服务直接调用下游服务:
|
|
|
|
|
|
┌─────────────┐
|
|
|
|
|
|
│ 订单服务 │
|
|
|
|
|
|
└──────┬──────┘
|
|
|
|
|
|
│
|
|
|
|
|
|
├───────────┬───────────┬───────────┐
|
|
|
|
|
|
▼ ▼ ▼ ▼
|
|
|
|
|
|
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
|
|
|
|
|
|
│库存服务 │ │支付服务 │ │物流服务 │ │短信服务 │
|
|
|
|
|
|
│ 200ms │ │ 500ms │ │ 300ms │ │ 100ms │
|
|
|
|
|
|
└──────────┘ └──────────┘ └──────────┘ └──────────┘
|
2026-01-19 11:25:10 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**痛点分析:**
|
2026-02-01 23:42:12 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
| 痛点 | 具体表现 | 后果 |
|
|
|
|
|
|
|------|----------|------|
|
|
|
|
|
|
| **级联故障** | 库存服务挂掉,订单服务同步调用超时 | 订单服务线程池耗尽,无法处理新请求 |
|
|
|
|
|
|
| **响应延迟** | 必须等待所有下游服务响应 | 用户等待 1 秒以上,体验极差 |
|
|
|
|
|
|
| **扩展困难** | 新增积分服务,需要修改订单服务代码 | 发布周期变长,风险增加 |
|
|
|
|
|
|
| **资源浪费** | 订单服务必须等待短信服务 | 数据库连接被长时间占用 |
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
### 2.2 解耦方案:引入消息队列作为"中间层"
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**解耦后的架构:**
|
2026-01-19 11:25:10 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
```
|
|
|
|
|
|
订单服务只负责发消息,不关心谁消费:
|
|
|
|
|
|
|
|
|
|
|
|
┌─────────────┐
|
|
|
|
|
|
│ 订单服务 │ ──发送"订单创建"消息──┐
|
|
|
|
|
|
└─────────────┘ │
|
|
|
|
|
|
▼
|
|
|
|
|
|
┌───────────────────┐
|
|
|
|
|
|
│ 消息队列 │
|
|
|
|
|
|
│ (Kafka/RabbitMQ) │
|
|
|
|
|
|
│ - 可靠存储 │
|
|
|
|
|
|
│ - 多副本 │
|
|
|
|
|
|
│ - 顺序保证 │
|
|
|
|
|
|
└─────────┬─────────┘
|
|
|
|
|
|
│
|
|
|
|
|
|
┌───────────────────────┼───────────────────────┐
|
|
|
|
|
|
│ │ │
|
|
|
|
|
|
▼ ▼ ▼
|
|
|
|
|
|
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
|
|
|
|
|
|
│ 库存服务 │ │ 支付服务 │ │ 物流服务 │
|
|
|
|
|
|
│ 订阅订单事件 │ │ 订阅订单事件 │ │ 订阅订单事件 │
|
|
|
|
|
|
└──────────────┘ └──────────────┘ └──────────────┘
|
|
|
|
|
|
```
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**解耦带来的好处:**
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
| 维度 | 解耦前 | 解耦后 |
|
|
|
|
|
|
|------|--------|--------|
|
|
|
|
|
|
| **故障隔离** | 库存挂 = 订单挂 | 库存挂,消息暂存队列,恢复后消费 |
|
|
|
|
|
|
| **响应时间** | 1000ms(同步等待) | 50ms(发完消息即返回) |
|
|
|
|
|
|
| **扩展性** | 新增服务需改订单代码 | 新增服务只需订阅主题 |
|
|
|
|
|
|
| **系统复杂度** | 订单服务强依赖下游 | 订单服务只依赖消息队列 |
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
<DecouplingDemo />
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
### 2.3 解耦的本质:从"直接调用"到"事件驱动"
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**思维模式的转变:**
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
```
|
|
|
|
|
|
传统思维(命令式):
|
|
|
|
|
|
"订单服务命令库存服务:给我扣库存!"
|
|
|
|
|
|
↓ 直接调用
|
|
|
|
|
|
↓ 耦合度高,被调用方必须在线
|
|
|
|
|
|
↓ 调用方需要知道被调用方的接口
|
|
|
|
|
|
|
|
|
|
|
|
事件驱动思维(声明式):
|
|
|
|
|
|
"订单服务声明:订单已创建,谁关心谁来处理。"
|
|
|
|
|
|
↓ 发送事件到消息队列
|
|
|
|
|
|
↓ 解耦,消费者可以离线
|
|
|
|
|
|
↓ 生产者不需要知道消费者的存在
|
|
|
|
|
|
```
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
## 3. 核心问题二:如何削峰填谷,应对流量突增?
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
### 3.1 秒杀场景:10 万 QPS 如何平稳处理?
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**场景还原**:某电商平台双 11 秒杀活动,预计峰值 10 万 QPS,但数据库只能承受 1000 QPS。
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**直接冲击的后果:**
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
```
|
|
|
|
|
|
用户请求 ──→ 应用服务器 ──→ 数据库
|
|
|
|
|
|
10万/s 10万/s 1000/s(极限)
|
|
|
|
|
|
↓
|
|
|
|
|
|
连接池耗尽
|
|
|
|
|
|
响应超时
|
|
|
|
|
|
数据库崩溃
|
|
|
|
|
|
↓
|
|
|
|
|
|
雪崩效应(所有依赖数据库的服务都挂)
|
2026-01-18 12:21:49 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
### 3.2 削峰填谷方案:消息队列作为"蓄水池"
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**架构设计:**
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
|
|
|
|
|
```
|
2026-02-06 03:34:50 +08:00
|
|
|
|
┌───────────────────────────────────────────────────────────────┐
|
|
|
|
|
|
│ 秒杀系统架构 │
|
|
|
|
|
|
├───────────────────────────────────────────────────────────────┤
|
|
|
|
|
|
│ │
|
|
|
|
|
|
│ 第一层:网关层(硬限流) │
|
|
|
|
|
|
│ ┌───────────────────────────────────────────────────────┐ │
|
|
|
|
|
|
│ │ - 令牌桶限流:10万/s → 1万/s(丢弃 90% 请求) │ │
|
|
|
|
|
|
│ │ - CDN 缓存静态资源(商品详情页) │ │
|
|
|
|
|
|
│ │ - 验证码/排队页面(削峰第一层) │ │
|
|
|
|
|
|
│ └───────────────────────────────────────────────────────┘ │
|
|
|
|
|
|
│ │ │
|
|
|
|
|
|
│ ▼ │
|
|
|
|
|
|
│ 第二层:服务层(软限流) │
|
|
|
|
|
|
│ ┌───────────────────────────────────────────────────────┐ │
|
|
|
|
|
|
│ │ - Nginx 限流:1万/s → 5000/s │ │
|
|
|
|
|
|
│ │ - Redis 预扣库存(原子操作): │ │
|
|
|
|
|
|
│ │ * 使用 Lua 脚本保证原子性 │ │
|
|
|
|
|
|
│ │ * 库存不足直接返回"已售罄" │ │
|
|
|
|
|
|
│ │ - 生成订单令牌(排队凭证) │ │
|
|
|
|
|
|
│ └───────────────────────────────────────────────────────┘ │
|
|
|
|
|
|
│ │ │
|
|
|
|
|
|
│ ▼ │
|
|
|
|
|
|
│ 第三层:消息队列层(核心削峰) │
|
|
|
|
|
|
│ ┌───────────────────────────────────────────────────────┐ │
|
|
|
|
|
|
│ │ Kafka/RocketMQ: │ │
|
|
|
|
|
|
│ │ - 批量写入:5000/s → 1000/s(数据库承受能力) │ │
|
|
|
|
|
|
│ │ - 消息持久化:落盘保证不丢消息 │ │
|
|
|
|
|
|
│ │ - 多分区并行消费:提升吞吐量 │ │
|
|
|
|
|
|
│ │ - 消费位点管理:支持故障恢复 │ │
|
|
|
|
|
|
│ │ │ │
|
|
|
|
|
|
│ │ 关键指标监控: │ │
|
|
|
|
|
|
│ │ - 生产速率(Produce Rate) │ │
|
|
|
|
|
|
│ │ - 消费速率(Consume Rate) │ │
|
|
|
|
|
|
│ │ - 消息堆积(Lag) │ │
|
|
|
|
|
|
│ └───────────────────────────────────────────────────────┘ │
|
|
|
|
|
|
│ │ │
|
|
|
|
|
|
│ ▼ │
|
|
|
|
|
|
│ 第四层:消费层(异步处理) │
|
|
|
|
|
|
│ ┌───────────────────────────────────────────────────────┐ │
|
|
|
|
|
|
│ │ 订单处理消费者(多实例): │ │
|
|
|
|
|
|
│ │ - 从 Kafka 拉取消息(1000/s,匹配数据库能力) │ │
|
|
|
|
|
|
│ │ - 数据库事务:创建订单 + 扣减库存 │ │
|
|
|
|
|
|
│ │ - 更新订单状态为"已创建" │ │
|
|
|
|
|
|
│ │ - 发送订单创建成功通知(邮件/短信/推送) │ │
|
|
|
|
|
|
│ │ - 确认消息消费(ACK) │ │
|
|
|
|
|
|
│ │ │ │
|
|
|
|
|
|
│ │ 消费者扩容策略: │ │
|
|
|
|
|
|
│ │ - 当 Lag > 10000 时,自动增加消费者实例 │ │
|
|
|
|
|
|
│ │ - 当 Lag < 1000 时,减少消费者实例(节省成本) │ │
|
|
|
|
|
|
│ └───────────────────────────────────────────────────────┘ │
|
|
|
|
|
|
│ │
|
|
|
|
|
|
└───────────────────────────────────────────────────────────────┘
|
2026-01-18 12:21:49 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
<PeakShavingDemo />
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
### 3.3 削峰填谷的数学原理
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**流量平滑效果:**
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
|
|
|
|
|
```
|
2026-02-06 03:34:50 +08:00
|
|
|
|
原始流量(尖峰): 平滑后流量:
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
10万/s │ ╱╲ 1000/s │████████████████
|
|
|
|
|
|
│ ╱ ╲ │
|
|
|
|
|
|
│ ╱ ╲ │
|
|
|
|
|
|
1000/s│╱ ╲ 0/s │
|
|
|
|
|
|
└─────────────── └────────────────
|
|
|
|
|
|
0s 1s 2s 0s 20s
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
原始:10万/s 峰值,持续 1 秒
|
|
|
|
|
|
平滑:1000/s 恒定速率,持续 100 秒
|
|
|
|
|
|
```
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**关键公式:**
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
|
|
|
|
|
```
|
2026-02-06 03:34:50 +08:00
|
|
|
|
队列长度 = 生产者速率 × 持续时间 - 消费者速率 × 持续时间
|
|
|
|
|
|
= 100,000 × 1 - 1,000 × 1
|
|
|
|
|
|
= 99,000 条消息(峰值时队列堆积)
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
消费完所有消息所需时间 = 队列长度 / 消费者速率
|
|
|
|
|
|
= 99,000 / 1,000
|
|
|
|
|
|
= 99 秒
|
|
|
|
|
|
```
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
## 4. 核心问题三:如何保证消息不丢失、不重复、有序?
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
### 4.1 消息可靠性:三道防线
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
消息可能在三个环节丢失:生产者发送时、Broker 存储时、消费者处理时。
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
#### 防线 1:生产者确认(Producer ACK)
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
|
|
|
|
|
```python
|
2026-02-06 03:34:50 +08:00
|
|
|
|
# 可靠的生产者实现
|
|
|
|
|
|
class ReliableProducer:
|
|
|
|
|
|
def __init__(self, mq_connection, db_connection):
|
|
|
|
|
|
self.mq = mq_connection
|
|
|
|
|
|
self.db = db_connection
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
def send_with_reliability(self, message):
|
|
|
|
|
|
"""
|
|
|
|
|
|
可靠消息发送:本地日志 + 异步发送 + 定时重试
|
|
|
|
|
|
"""
|
|
|
|
|
|
# 1. 生成消息 ID
|
|
|
|
|
|
msg_id = generate_uuid()
|
|
|
|
|
|
|
|
|
|
|
|
# 2. 先写本地数据库(本地事务)
|
|
|
|
|
|
with self.db.transaction() as tx:
|
|
|
|
|
|
tx.execute("""
|
|
|
|
|
|
INSERT INTO outgoing_messages
|
|
|
|
|
|
(id, topic, body, status, retry_count, created_at)
|
|
|
|
|
|
VALUES (?, ?, ?, 'PENDING', 0, NOW())
|
|
|
|
|
|
""", msg_id, message['topic'], json.dumps(message['body']))
|
|
|
|
|
|
|
|
|
|
|
|
# 3. 异步发送消息
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.mq.send_async(
|
|
|
|
|
|
topic=message['topic'],
|
|
|
|
|
|
body=message['body'],
|
|
|
|
|
|
headers={'msg_id': msg_id},
|
|
|
|
|
|
on_success=lambda: self.on_send_success(msg_id),
|
|
|
|
|
|
on_failure=lambda e: self.on_send_failure(msg_id, e)
|
|
|
|
|
|
)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
# 发送异常,等待定时任务重试
|
|
|
|
|
|
logger.error(f"消息 {msg_id} 发送异常:{e}")
|
2026-01-18 12:21:49 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
#### 防线 2:Broker 持久化
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
|
|
|
|
|
```python
|
2026-02-06 03:34:50 +08:00
|
|
|
|
# Kafka 生产者配置(可靠性优先)
|
|
|
|
|
|
kafka_producer_config = {
|
|
|
|
|
|
# 1. ACK 配置(最关键)
|
|
|
|
|
|
"acks": "all", # 等待所有 ISR 副本确认
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
# 2. 重试配置
|
|
|
|
|
|
"retries": 3, # 发送失败重试 3 次
|
|
|
|
|
|
"retry.backoff.ms": 1000, # 重试间隔 1 秒
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
# 3. 幂等性配置(防止重复发送)
|
|
|
|
|
|
"enable.idempotence": True, # 启用幂等性生产者
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
# 4. 批量配置(吞吐 vs 延迟权衡)
|
|
|
|
|
|
"batch.size": 16384, # 批量发送大小 16KB
|
|
|
|
|
|
"linger.ms": 10, # 最多等待 10ms 批量发送
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
# 5. 压缩配置
|
|
|
|
|
|
"compression.type": "lz4", # 消息压缩,提升吞吐
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
# 6. 缓冲区配置
|
|
|
|
|
|
"buffer.memory": 33554432, # 生产者缓冲区 32MB
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
# 7. 事务配置(原子性发送多条消息)
|
|
|
|
|
|
"transactional.id": "seckill-producer-1",
|
|
|
|
|
|
}
|
2026-01-18 12:21:49 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
#### 防线 3:消费者确认(Consumer ACK)
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
|
|
|
|
|
```python
|
2026-02-06 03:34:50 +08:00
|
|
|
|
# 消费者可靠处理代码
|
|
|
|
|
|
|
|
|
|
|
|
# ACK 策略选择:
|
|
|
|
|
|
ack_strategies = {
|
|
|
|
|
|
"auto_ack": {
|
|
|
|
|
|
"description": "自动确认,消息送达即确认",
|
|
|
|
|
|
"pros": ["性能最高", "代码最简单"],
|
|
|
|
|
|
"cons": ["消息可能在处理前丢失", "无法保证可靠性"],
|
|
|
|
|
|
"use_case": "日志收集等对可靠性要求不高的场景"
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
|
|
"manual_ack_sync": {
|
|
|
|
|
|
"description": "手动同步确认,处理成功后确认",
|
|
|
|
|
|
"pros": ["可靠性高", "可以控制确认时机"],
|
|
|
|
|
|
"cons": ["性能略低", "需要处理超时"],
|
|
|
|
|
|
"use_case": "订单处理、支付等对可靠性要求高的场景"
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
|
|
"manual_ack_async": {
|
|
|
|
|
|
"description": "手动异步确认,批量确认",
|
|
|
|
|
|
"pros": ["性能高", "可靠性平衡"],
|
|
|
|
|
|
"cons": ["实现复杂", "可能重复处理"],
|
|
|
|
|
|
"use_case": "日志处理、大数据等高吞吐场景"
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
# 推荐策略:手动同步确认 + 幂等性设计
|
|
|
|
|
|
consumer_config = {
|
|
|
|
|
|
"auto_ack": False, # 关闭自动确认
|
|
|
|
|
|
"prefetch_count": 10, # 预取 10 条,平衡吞吐和内存
|
|
|
|
|
|
"max_retries": 3, # 最大重试 3 次
|
|
|
|
|
|
"retry_backoff": [1, 5, 30], # 退避策略:1秒、5秒、30秒
|
|
|
|
|
|
"dead_letter_enabled": True, # 启用死信队列
|
|
|
|
|
|
"idempotency_enabled": True # 启用幂等性检查
|
|
|
|
|
|
}
|
2026-01-18 12:21:49 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
<ReliabilityDemo />
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
### 4.2 如何处理消息重复消费?
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
消息重复可能在以下场景发生:
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
1. **生产者重试**:生产者发送消息后未收到 ACK,重试发送同一条消息
|
|
|
|
|
|
2. **消费者 ACK 超时**:消费者处理完成但 ACK 超时,Broker 重新投递
|
|
|
|
|
|
3. **网络抖动**:消费者 ACK 未到达 Broker,Broker 认为未消费
|
|
|
|
|
|
4. **消费者重启**:消费者重启后重新消费同一批消息
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**解决方案:幂等性设计**
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**核心思想**:同一操作执行多次和执行一次的效果相同。
|
|
|
|
|
|
|
|
|
|
|
|
<IdempotenceDemo />
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
**幂等性实现方案对比:**
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
|
|
|
|
|
```python
|
2026-02-06 03:34:50 +08:00
|
|
|
|
# 方案对比
|
|
|
|
|
|
idempotency_solutions = {
|
|
|
|
|
|
"database_unique_constraint": {
|
|
|
|
|
|
"name": "数据库唯一约束",
|
|
|
|
|
|
"implementation": "UNIQUE 索引 + INSERT IGNORE",
|
|
|
|
|
|
"pros": ["简单可靠", "数据库保证原子性"],
|
|
|
|
|
|
"cons": ["需要数据库支持", "写入性能有损耗"],
|
|
|
|
|
|
"use_case": "订单创建、用户注册等核心业务流程"
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
|
|
"redis_setnx": {
|
|
|
|
|
|
"name": "Redis SETNX",
|
|
|
|
|
|
"implementation": "SET key value NX EX",
|
|
|
|
|
|
"pros": ["高性能", "支持过期时间"],
|
|
|
|
|
|
"cons": ["需要额外 Redis 资源", "极端情况下可能不一致"],
|
|
|
|
|
|
"use_case": "高并发场景下的幂等性检查"
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
|
|
"token_bucket": {
|
|
|
|
|
|
"name": "业务令牌",
|
|
|
|
|
|
"implementation": "前端生成唯一 Token,服务端校验",
|
|
|
|
|
|
"pros": ["前端友好", "可防重放攻击"],
|
|
|
|
|
|
"cons": ["需要 Token 生成策略", "Token 可能被盗用"],
|
|
|
|
|
|
"use_case": "表单提交、支付确认等用户交互场景"
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
|
|
"state_machine": {
|
|
|
|
|
|
"name": "状态机",
|
|
|
|
|
|
"implementation": "定义状态流转规则,拒绝非法状态变更",
|
|
|
|
|
|
"pros": ["业务语义清晰", "天然支持幂等"],
|
|
|
|
|
|
"cons": ["设计复杂", "需要维护状态图"],
|
|
|
|
|
|
"use_case": "订单状态流转、工作流引擎等复杂业务"
|
|
|
|
|
|
}
|
2026-01-18 12:21:49 +08:00
|
|
|
|
}
|
|
|
|
|
|
```
|
|
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
---
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
## 5. 实战:完整的消息队列架构设计
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
### 5.1 四大主流消息队列选型对比
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
| 特性 | RabbitMQ | Kafka | RocketMQ | Redis Stream |
|
|
|
|
|
|
|------|----------|-------|----------|--------------|
|
|
|
|
|
|
| **定位** | 传统消息队列 | 分布式日志流 | 电商级消息队列 | 轻量级队列 |
|
|
|
|
|
|
| **吞吐量** | ~1 万/秒 | ~100 万/秒 | ~10 万/秒 | ~5 万/秒 |
|
|
|
|
|
|
| **延迟** | 微秒级 | 毫秒级 | 毫秒级 | 毫秒级 |
|
|
|
|
|
|
| **可靠性** | 高(持久化) | 高(多副本) | 高(同步刷盘) | 中(AOF) |
|
|
|
|
|
|
| **消息回溯** | 不支持 | 支持 | 支持 | 支持 |
|
|
|
|
|
|
| **事务消息** | 支持(弱) | 不支持 | 支持(强) | 不支持 |
|
|
|
|
|
|
| **延迟消息** | 支持 | 不支持 | 支持 | 不支持 |
|
|
|
|
|
|
| **适用场景** | 传统企业应用 | 日志、大数据 | 电商、金融 | 小规模应用 |
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
<MQComparisonDemo />
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
### 5.2 选型决策树
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
|
|
|
|
|
```
|
2026-02-06 03:34:50 +08:00
|
|
|
|
选择消息队列:
|
|
|
|
|
|
│
|
|
|
|
|
|
├─ 需要事务消息(分布式事务)?
|
|
|
|
|
|
│ ├─ 是 → RocketMQ(首选)或 RabbitMQ
|
|
|
|
|
|
│ └─ 否 → 继续
|
|
|
|
|
|
│
|
|
|
|
|
|
├─ 需要处理海量日志/实时流?
|
|
|
|
|
|
│ ├─ 是 → Kafka(首选)
|
|
|
|
|
|
│ └─ 否 → 继续
|
|
|
|
|
|
│
|
|
|
|
|
|
├─ QPS > 1万/秒?
|
|
|
|
|
|
│ ├─ 是 → RocketMQ 或 Kafka
|
|
|
|
|
|
│ └─ 否 → 继续
|
|
|
|
|
|
│
|
|
|
|
|
|
├─ 需要复杂路由(如 headers 匹配)?
|
|
|
|
|
|
│ ├─ 是 → RabbitMQ
|
|
|
|
|
|
│ └─ 否 → 继续
|
|
|
|
|
|
│
|
|
|
|
|
|
├─ 已有 Redis 基础设施?
|
|
|
|
|
|
│ ├─ 是 → Redis Stream(快速开始)
|
|
|
|
|
|
│ └─ 否 → RabbitMQ(功能全面,学习曲线适中)
|
|
|
|
|
|
│
|
2026-01-18 12:21:49 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
## 6. 总结:消息队列设计心法
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
### 6.1 核心原则回顾
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
| 原则 | 含义 | 实践要点 |
|
|
|
|
|
|
|------|------|----------|
|
|
|
|
|
|
| **解耦** | 服务间不直接依赖 | 通过消息队列通信,消费者故障不影响生产者 |
|
|
|
|
|
|
| **削峰** | 平滑流量波动 | 消息队列作为蓄水池,消费者按恒定速率处理 |
|
|
|
|
|
|
| **可靠** | 消息不丢失 | 生产者确认 + Broker 持久化 + 消费者确认 |
|
|
|
|
|
|
| **幂等** | 重复消费无影响 | 业务层面保证幂等性(唯一键、状态机) |
|
|
|
|
|
|
| **有序** | 消息顺序保证 | 单分区有序或消费者端排序 |
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
### 6.2 设计检查清单
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
在引入消息队列前,问自己以下问题:
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
- [ ] 是否真的需要消息队列?(简单异步可以用线程池)
|
|
|
|
|
|
- [ ] 消息丢失是否可以接受?(决定可靠性级别)
|
|
|
|
|
|
- [ ] 消息重复是否会影响业务?(决定幂等性投入)
|
|
|
|
|
|
- [ ] 消息顺序是否重要?(决定分区策略)
|
|
|
|
|
|
- [ ] 消费者处理能力如何?(决定队列大小和告警阈值)
|
|
|
|
|
|
- [ ] 如何处理消费失败?(决定重试和死信策略)
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
### 6.3 学习路径推荐
|
2026-01-18 12:21:49 +08:00
|
|
|
|
|
|
|
|
|
|
```
|
2026-02-06 03:34:50 +08:00
|
|
|
|
入门阶段(1-2 周):
|
|
|
|
|
|
├── 理解消息队列核心概念(生产者、消费者、Broker)
|
|
|
|
|
|
├── 选择一种消息队列上手(推荐 RabbitMQ 或 Redis Stream)
|
|
|
|
|
|
└── 实现第一个"生产者-消费者" Demo
|
|
|
|
|
|
|
|
|
|
|
|
进阶阶段(2-4 周):
|
|
|
|
|
|
├── 深入学习一种消息队列的高级特性
|
|
|
|
|
|
│ ├── Kafka:分区、副本、消费者组
|
|
|
|
|
|
│ └── RocketMQ:事务消息、延迟消息
|
|
|
|
|
|
├── 实现可靠消息投递(ACK 机制、重试策略)
|
|
|
|
|
|
└── 实现幂等性消费
|
|
|
|
|
|
|
|
|
|
|
|
实战阶段(1-2 月):
|
|
|
|
|
|
├── 设计一个完整的秒杀系统(削峰填谷)
|
|
|
|
|
|
├── 设计一个订单系统(解耦、最终一致性)
|
|
|
|
|
|
└── 搭建消息队列监控告警体系
|
2026-01-18 12:21:49 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
---
|
|
|
|
|
|
|
2026-02-06 03:34:50 +08:00
|
|
|
|
## 7. 名词速查表 (Glossary)
|
|
|
|
|
|
|
|
|
|
|
|
| 名词 | 全称 | 解释 |
|
|
|
|
|
|
|------|------|------|
|
|
|
|
|
|
| **MQ** | Message Queue | **消息队列**。用于异步通信的中间件,实现生产者和消费者的解耦。 |
|
|
|
|
|
|
| **Producer** | - | **生产者**。发送消息的一方。 |
|
|
|
|
|
|
| **Consumer** | - | **消费者**。接收并处理消息的一方。 |
|
|
|
|
|
|
| **Broker** | - | **消息代理**。存储和转发消息的服务端程序。 |
|
|
|
|
|
|
| **Topic** | - | **主题**。消息的逻辑分类(如 "orders")。 |
|
|
|
|
|
|
| **Queue** | - | **队列**。存储消息的物理容器。 |
|
|
|
|
|
|
| **Partition** | - | **分区**。Kafka 的概念,一个 Topic 可以分成多个 Partition,提升并发。 |
|
|
|
|
|
|
| **ACK** | Acknowledgment | **确认**。消费者处理完消息后,向 Broker 确认。 |
|
|
|
|
|
|
| **Pub/Sub** | Publish/Subscribe | **发布订阅**。一种消息模式,一条消息可被多个消费者接收。 |
|
|
|
|
|
|
| **P2P** | Point-to-Point | **点对点**。一种消息模式,一条消息只能被一个消费者接收。 |
|
|
|
|
|
|
| **DLQ** | Dead Letter Queue | **死信队列**。存放无法消费的消息。 |
|
|
|
|
|
|
| **Idempotence** | - | **幂等性**。多次执行结果相同。 |
|
|
|
|
|
|
| **Throughput** | - | **吞吐量**。单位时间内处理的消息数量。 |
|
|
|
|
|
|
| **Latency** | - | **延迟**。消息从发送到被接收的时间差。 |
|
|
|
|
|
|
| **Persistence** | - | **持久化**。消息写入磁盘,而非仅存内存。 |
|
|
|
|
|
|
| **Replication** | - | **副本**。为了高可用,消息被复制到多个节点。 |
|
|
|
|
|
|
| **Transaction Message** | - | **事务消息**。保证本地事务和消息发送的一致性。 |
|
|
|
|
|
|
| **Backpressure** | - | **背压**。消费者处理不过来时,通知生产者降速。 |
|
|
|
|
|
|
| **Offset** | - | **偏移量**。消费者在分区中的消费位置。 |
|
|
|
|
|
|
| **Rebalance** | - | **重平衡**。消费者组成员变化时,重新分配分区。 |
|