Spring Cloud 消息驱动模式完全指南
一、为什么需要消息驱动模式?
1.1 同步通信的痛点
在微服务架构中,服务之间最常见的通信方式是同步调用(如HTTP REST接口):
订单服务 → 调用 → 库存服务
↓
等待响应...
↓
返回结果
问题来了:
- 耦合度高:订单服务必须等待库存服务响应,如果库存服务挂了,订单服务也会受影响
- 性能差:每个请求都要等待,响应时间累加(假设调用5个服务,每个100ms,总时间至少500ms)
- 可靠性差:任何一个服务故障,整个流程失败
- 扩展性差:流量突增时,所有服务压力都很大
1.2 异步消息驱动的优势
消息驱动模式通过引入消息中间件,实现服务间的异步通信:
订单服务 → 发送消息 → 消息队列(RabbitMQ/Kafka)
↓
库存服务(异步消费)
↓
支付服务(异步消费)
↓
物流服务(异步消费)
核心价值:
- 解耦:订单服务不需要知道谁在消费消息,只管发送即可
- 异步:订单服务发送消息后立即返回,不需要等待
- 削峰填谷:消息队列作为缓冲,流量突增时保护下游服务
- 可靠性:消息持久化,即使服务宕机,消息也不会丢失
💡 面试重点:消息驱动模式的核心价值是什么?
- 解耦、异步、削峰是三大核心价值
- 适合的场景:耗时操作、流量削峰、事件通知、跨系统通信
1.3 实际应用场景
场景1:用户注册
用户注册 → 发送"注册成功"消息
↓
发送欢迎邮件(异步)
发送优惠券(异步)
初始化用户数据(异步)
大数据统计(异步)
场景2:订单支付
支付成功 → 发送"支付成功"消息
↓
扣减库存(异步)
更新订单状态(异步)
发送短信通知(异步)
积分增加(异步)
大数据统计(异步)
场景3:秒杀活动
用户抢购 → 发送"抢购请求"消息到队列
↓
订单服务按自己的能力处理(比如每秒处理100个)
↓
避免数据库被打爆
二、Spring Cloud Stream核心概念
2.1 什么是Spring Cloud Stream?
Spring Cloud Stream是一个构建消息驱动微服务的框架,它屏蔽了底层消息中间件的差异,提供统一的编程模型。
类比理解:
- Spring Cloud Stream就像"USB接口"
- RabbitMQ、Kafka就像不同的设备(鼠标、键盘、U盘)
- 无论插入什么设备,你的代码都不需要改动
核心优势:
- 屏蔽差异:一套代码,支持RabbitMQ、Kafka、RocketMQ等多种消息中间件
- 简单易用:极简的API,几个注解就能完成复杂功能
- 灵活切换:修改配置即可切换消息中间件,不需要改代码
- Spring生态集成:与Spring Boot、Spring Cloud无缝集成
2.2 核心概念图解
┌─────────────────────────────────────────────────┐
│ Spring Cloud Stream 应用 │
│ │
│ ┌────────────┐ ┌────────────┐ │
│ │ Source │───────▶│ Channel │ │
│ │(消息生产)│ │ (通道) │ │
│ └────────────┘ └──────┬─────┘ │
│ │ │
│ ┌────▼────┐ │
│ │ Binder │ │
│ │(绑定器)│ │
│ └────┬────┘ │
│ │ │
│ ┌──────────────────────┼──────────┐ │
│ │ │ │ │
│ ┌────▼────┐ ┌─────▼─────┐ ┌──▼───┐ │
│ │RabbitMQ │ │ Kafka │ │RocketMQ│ │
│ └─────────┘ └───────────┘ └───────┘ │
└─────────────────────────────────────────────────┘
2.3 三大核心组件
2.3.1 Binder(绑定器)
作用:连接Spring Cloud Stream应用与消息中间件的桥梁
类比:就像电脑的驱动程序,让你的应用能够"理解"不同消息中间件的语言
特点:
- 可插拔:通过配置选择使用哪个Binder
- 屏蔽差异:统一API,底层自动适配RabbitMQ、Kafka等
2.3.2 Binding(绑定)
作用:连接应用代码与消息中间件的物理连接
类比:就像网线连接电脑和路由器
分类:
- Input Binding:输入绑定,用于接收消息(消费者)
- Output Binding:输出绑定,用于发送消息(生产者)
2.3.3 Message(消息)
作用:在应用和消息中间件之间传递的数据
结构:
Message<String> message = MessageBuilder
.withPayload("消息内容") // payload:消息体
.setHeader("contentType", "text/plain") // header:消息头
.build();
💡 面试重点:Spring Cloud Stream的三大核心组件是什么?
- Binder:绑定器,屏蔽消息中间件差异
- Binding:绑定,连接应用与消息中间件
- Message:消息,传递的数据结构
三、快速入门
3.1 项目结构
message-driven-demo/
├── producer-service/ # 消息生产者服务
├── consumer-service/ # 消息消费者服务
└── common-module/ # 公共模块(可选)
3.2 添加依赖
生产者服务:
<dependencies>
<!-- Spring Cloud Stream -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- 如果使用Kafka,替换为 -->
<!--
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
-->
</dependencies>
消费者服务:依赖相同
3.3 配置文件
生产者服务配置:
server:
port: 8081
spring:
application:
name: producer-service
cloud:
stream:
# 绑定器配置
bindings:
# 定义输出通道
output:
destination: order-events # 消息队列/主题名称
content-type: application/json # 消息类型
# RabbitMQ特定配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
消费者服务配置:
server:
port: 8082
spring:
application:
name: consumer-service
cloud:
stream:
bindings:
# 定义输入通道
input:
destination: order-events # 与生产者保持一致
content-type: application/json
group: order-service-group # 消费者分组
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
3.4 消息生产者
方式一:使用Source接口(传统方式)
@EnableBinding(Source.class) // 启用绑定
@RestController
public class OrderController {
@Autowired
private Source source; // Spring Cloud Stream提供的输出通道
@PostMapping("/orders")
public String createOrder(@RequestBody OrderRequest request) {
log.info("创建订单: {}", request);
// 创建消息
OrderEvent event = OrderEvent.builder()
.orderId(UUID.randomUUID().toString())
.productId(request.getProductId())
.amount(request.getAmount())
.timestamp(System.currentTimeMillis())
.build();
// 发送消息
source.output().send(MessageBuilder.withPayload(event).build());
return "订单创建成功,消息已发送";
}
}
方式二:使用函数式编程(推荐,Spring Cloud Stream 3.x+)
@Configuration
public class OrderConfig {
// 定义消息发送函数
@Bean
public Function<OrderEvent, OrderEvent> orderEventProducer() {
return orderEvent -> {
log.info("发送订单事件: {}", orderEvent);
return orderEvent; // 返回的消息会自动发送到配置的destination
};
}
}
@RestController
public class OrderController {
@Autowired
private Function<OrderEvent, OrderEvent> orderEventProducer;
@PostMapping("/orders")
public String createOrder(@RequestBody OrderRequest request) {
OrderEvent event = buildOrderEvent(request);
orderEventProducer.apply(event);
return "订单创建成功";
}
}
配置文件对应修改:
spring:
cloud:
stream:
function:
definition: orderEventProducer # 指定函数名称
bindings:
orderEventProducer-out-0: # 函数名-out-索引
destination: order-events
content-type: application/json
3.5 消息消费者
方式一:使用Sink接口(传统方式)
@EnableBinding(Sink.class) // 启用绑定
public class OrderConsumer {
@Autowired
private InventoryService inventoryService;
@Autowired
private NotificationService notificationService;
// 监听消息
@StreamListener(Sink.INPUT) // 监听输入通道
public void handleOrderEvent(OrderEvent event) {
log.info("接收到订单事件: {}", event);
try {
// 1. 扣减库存
inventoryService.deductStock(event.getProductId(), 1);
// 2. 发送通知
notificationService.sendNotification(
event.getOrderId(),
"订单创建成功"
);
} catch (Exception e) {
log.error("处理订单事件失败: {}", event, e);
throw e; // 抛出异常,触发重试机制
}
}
}
方式二:使用函数式编程(推荐)
@Configuration
public class OrderConsumerConfig {
// 定义消息消费函数
@Bean
public Consumer<OrderEvent> orderEventConsumer() {
return event -> {
log.info("接收到订单事件: {}", event);
// 处理业务逻辑
processOrder(event);
};
}
private void processOrder(OrderEvent event) {
// 1. 扣减库存
inventoryService.deductStock(event.getProductId(), 1);
// 2. 发送通知
notificationService.sendNotification(event.getOrderId(), "订单创建成功");
}
}
配置文件:
spring:
cloud:
stream:
function:
definition: orderEventConsumer # 指定函数名称
bindings:
orderEventConsumer-in-0: # 函数名-in-索引
destination: order-events
content-type: application/json
group: order-service-group
四、编程模型详解
4.1 消息发送
4.1.1 发送简单消息
@Service
public class MessageProducer {
@Autowired
private Source source;
public void sendMessage(String message) {
source.output().send(
MessageBuilder.withPayload(message).build()
);
}
}
4.1.2 发送对象消息(JSON)
@Service
public class OrderProducer {
@Autowired
private Source source;
public void sendOrderEvent(OrderEvent event) {
source.output().send(
MessageBuilder
.withPayload(event)
.setHeader("contentType", "application/json")
.build()
);
}
}
4.1.3 添加自定义Header
public void sendMessageWithHeaders(OrderEvent event) {
source.output().send(
MessageBuilder
.withPayload(event)
.setHeader("event-type", "order-created")
.setHeader("timestamp", System.currentTimeMillis())
.setHeader("source", "order-service")
.build()
);
}
4.1.4 分区发送
public void sendMessageWithPartition(OrderEvent event) {
source.output().send(
MessageBuilder
.withPayload(event)
.setHeader("partitionKey", event.getProductId()) // 指定分区键
.build()
);
}
4.2 消息消费
4.2.1 基础消费
@StreamListener(target = Sink.INPUT)
public void handleMessage(String message) {
log.info("接收到消息: {}", message);
}
4.2.2 消费对象消息
@StreamListener(target = Sink.INPUT)
public void handleOrderEvent(OrderEvent event) {
log.info("接收到订单事件: {}", event);
}
4.2.3 条件消费(消息过滤)
@StreamListener(target = Sink.INPUT, condition = "headers['event-type']=='order-created'")
public void handleOrderCreatedEvent(OrderEvent event) {
log.info("处理订单创建事件: {}", event);
}
@StreamListener(target = Sink.INPUT, condition = "headers['event-type']=='order-cancelled'")
public void handleOrderCancelledEvent(OrderEvent event) {
log.info("处理订单取消事件: {}", event);
}
4.2.4 批量消费
# application.yml
spring:
cloud:
stream:
bindings:
input:
consumer:
batch-mode: true # 启用批量模式
max-attempts: 3
@StreamListener(target = Sink.INPUT)
public void handleBatchMessages(List<OrderEvent> events) {
log.info("批量处理 {} 个订单事件", events.size());
events.forEach(this::processEvent);
}
五、高级特性
5.1 消息分组(Consumer Group)
5.1.1 为什么需要消息分组?
问题场景:
订单服务发送一条"支付成功"消息
↓
库存服务实例1收到 → 扣减库存(重复扣减!)
库存服务实例2收到 → 扣减库存(重复扣减!)
解决方案:消息分组
订单服务发送一条"支付成功"消息
↓
库存服务组(只有一个实例能收到)
├→ 实例1(收到消息)
└→ 实例2(收不到消息)
5.1.2 配置消息分组
消费者配置:
spring:
cloud:
stream:
bindings:
input:
destination: order-events
group: inventory-service-group # 设置消费者组名
效果:
- 同一个消费者组内的多个实例,只有其中一个会收到消息
- 不同消费者组可以各自收到消息(广播模式)
5.1.3 实际应用
场景:订单支付成功后,需要通知多个服务
# 库存服务配置
spring:
cloud:
stream:
bindings:
input:
destination: payment-success
group: inventory-group # 库存服务组
# 积分服务配置
spring:
cloud:
stream:
bindings:
input:
destination: payment-success
group: points-group # 积分服务组
# 通知服务配置
spring:
cloud:
stream:
bindings:
input:
destination: payment-success
group: notification-group # 通知服务组
结果:一条消息,三个服务各自收到一次,互不影响
💡 面试重点:消息分组的作用是什么?
- 避免重复消费:同一个组内只有一个实例消费消息
- 实现负载均衡:同一个组内多个实例轮流消费消息
- 实现广播:不同组可以各自收到同一条消息
5.2 消息分区(Partitioning)
5.2.1 为什么需要消息分区?
问题场景:
用户A下单 → 消息发送到队列 → 实例1处理(扣减库存)
用户A再次下单 → 消息发送到队列 → 实例2处理(扣减库存)❌ 并发问题!
解决方案:消息分区
用户A(ID=100)的所有订单 → 分区0 → 实例1处理
用户B(ID=200)的所有订单 → 分区1 → 实例2处理
5.2.2 配置消息分区
生产者配置:
spring:
cloud:
stream:
bindings:
output:
destination: order-events
producer:
partitionKeyExpression: payload.userId # 分区键表达式
partitionCount: 3 # 分区数量
消费者配置:
spring:
cloud:
stream:
bindings:
input:
destination: order-events
group: order-service-group
consumer:
partitioned: true # 启用分区消费
instanceCount: 3 # 总实例数
instanceIndex: 0 # 当前实例索引(0, 1, 2)
不同的实例配置不同的instanceIndex:
# 实例1
java -jar order-service.jar --spring.cloud.stream.bindings.input.consumer.instanceIndex=0
# 实例2
java -jar order-service.jar --spring.cloud.stream.bindings.input.consumer.instanceIndex=1
# 实例3
java -jar order-service.jar --spring.cloud.stream.bindings.input.consumer.instanceIndex=2
5.2.3 自定义分区策略
@Configuration
public class PartitionConfig {
@Bean
public PartitionSelectorStrategy customPartitionSelector() {
return (partitionKey, partitionCount) -> {
// 自定义分区逻辑
// 例如:根据用户ID的哈希值分配分区
return Math.abs(partitionKey.hashCode()) % partitionCount;
};
}
}
💡 面试重点:消息分组和消息分区的区别?
- 消息分组:解决重复消费问题,同一组内只有一个实例消费
- 消息分区:保证同一类消息有序消费,同一分区的消息总是被同一个实例消费
5.3 消息持久化
5.3.1 为什么需要消息持久化?
场景:消息发送后,消费者宕机了怎么办?
持久化保证:
- 消息发送到消息队列后,会持久化到磁盘
- 即使消费者宕机,消息也不会丢失
- 消费者重启后,可以继续消费未处理的消息
5.3.2 配置消息持久化
RabbitMQ配置:
spring:
cloud:
stream:
rabbit:
bindings:
output:
producer:
autoBindDlq: true # 自动绑定死信队列
durable: true # 持久化队列
input:
consumer:
autoBindDlq: true
acknowledgeMode: MANUAL # 手动确认
手动确认消费:
@StreamListener(target = Sink.INPUT)
public void handleMessage(OrderEvent event,
@Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) {
try {
// 处理消息
processOrder(event);
// 手动确认
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("处理消息失败", e);
try {
// 拒绝消息,重新入队
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
log.error("拒绝消息失败", ex);
}
}
}
5.4 错误处理与重试
5.4.1 自动重试
配置重试策略:
spring:
cloud:
stream:
bindings:
input:
consumer:
max-attempts: 3 # 最大重试次数
back-off-initial-interval: 1000 # 初始重试间隔(毫秒)
back-off-multiplier: 2.0 # 重试间隔倍数
back-off-max-interval: 10000 # 最大重试间隔
重试机制:
第1次消费失败 → 等待1秒 → 第2次重试
→ 等待2秒 → 第3次重试
→ 等待4秒 → 第4次重试
→ 仍然失败 → 进入死信队列
5.4.2 死信队列(DLQ)
作用:存储多次重试失败的消息,避免消息丢失
配置死信队列:
spring:
cloud:
stream:
rabbit:
bindings:
input:
consumer:
autoBindDlq: true # 自动绑定死信队列
dlqTtl: 5000 # 死信队列消息过期时间(毫秒)
dlqDeadLetterExchange: order-events.dlq # 死信交换机
处理死信队列消息:
@Configuration
public class DlqConfig {
@Bean
public Consumer<OrderEvent> dlqConsumer() {
return event -> {
log.error("消息进入死信队列: {}", event);
// 发送告警通知
alertService.sendAlert("消息处理失败", event.toString());
// 或者记录到数据库,人工介入处理
deadLetterRepository.save(event);
};
}
}
配置:
spring:
cloud:
stream:
function:
definition: dlqConsumer
bindings:
dlqConsumer-in-0:
destination: order-events.dlq # 死信队列名称
5.4.3 自定义错误处理
方式一:实现ErrorHandler
@Configuration
public class ErrorHandlerConfig {
@Bean
public ErrorHandler errorHandler() {
return (error, message) -> {
log.error("消息处理失败: {}", message, error);
// 发送告警
alertService.sendAlert("消息处理失败",
error.getMessage());
// 记录失败日志
errorLogRepository.save(
ErrorLog.builder()
.message(message.toString())
.errorMessage(error.getMessage())
.timestamp(System.currentTimeMillis())
.build()
);
};
}
}
方式二:使用ServiceActivator
@Configuration
public class ErrorHandlingConfig {
@ServiceActivator(inputChannel = "order-events.errors")
public void handleErrorMessage(Message<?> message) {
log.error("处理错误消息: {}", message);
ErrorMessage errorMessage = (ErrorMessage) message;
Throwable throwable = errorMessage.getPayload();
// 处理错误逻辑
handleError(throwable);
}
}
5.5 消息转换
5.5.1 自动转换
Spring Cloud Stream会根据"content-type"自动转换消息格式:
// 发送对象,自动转JSON
source.output().send(
MessageBuilder
.withPayload(orderEvent)
.setHeader("contentType", "application/json")
.build()
);
// 接收JSON,自动转对象
@StreamListener(target = Sink.INPUT)
public void handleOrderEvent(OrderEvent event) {
// event已经自动从JSON转换为OrderEvent对象
}
5.5.2 自定义转换器
@Configuration
public class MessageConverterConfig {
@Bean
@StreamMessageConverter
public MessageConverter customMessageConverter() {
return new AbstractMessageConverter(MimeType.valueOf("application/x-custom")) {
@Override
protected boolean supports(Class<?> clazz) {
return OrderEvent.class.isAssignableFrom(clazz);
}
@Override
protected Object convertFromInternal(
Message<?> message,
Class<?> targetClass,
Object conversionHint
) {
// 自定义反序列化逻辑
String payload = (String) message.getPayload();
return JSON.parseObject(payload, OrderEvent.class);
}
@Override
protected Object convertToInternal(
Object payload,
MessageHeaders headers,
Object conversionHint
) {
// 自定义序列化逻辑
return JSON.toJSONString(payload);
}
};
}
}
六、消息驱动模式实战
6.1 场景:电商订单系统
6.1.1 系统架构
用户下单
↓
订单服务(发送消息)
↓
RabbitMQ/Kafka
↓
├── 库存服务(扣减库存)
├── 积分服务(增加积分)
├── 通知服务(发送短信)
└── 数据服务(大数据统计)
6.1.2 订单服务(生产者)
OrderController.java:
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping
public String createOrder(@RequestBody CreateOrderRequest request) {
// 1. 创建订单
Order order = orderService.createOrder(request);
// 2. 发送订单创建事件(异步)
orderService.sendOrderCreatedEvent(order);
// 3. 立即返回(不等待下游服务处理)
return "订单创建成功,订单号: " + order.getId();
}
}
OrderService.java:
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private Source source;
@Transactional
public Order createOrder(CreateOrderRequest request) {
// 保存订单到数据库
Order order = Order.builder()
.id(UUID.randomUUID().toString())
.userId(request.getUserId())
.productId(request.getProductId())
.amount(request.getAmount())
.status(OrderStatus.CREATED)
.createTime(LocalDateTime.now())
.build();
orderRepository.save(order);
return order;
}
public void sendOrderCreatedEvent(Order order) {
OrderEvent event = OrderEvent.builder()
.eventType("ORDER_CREATED")
.orderId(order.getId())
.userId(order.getUserId())
.productId(order.getProductId())
.amount(order.getAmount())
.timestamp(System.currentTimeMillis())
.build();
// 发送消息到MQ
source.output().send(
MessageBuilder
.withPayload(event)
.setHeader("event-type", "ORDER_CREATED")
.setHeader("timestamp", System.currentTimeMillis())
.build()
);
log.info("订单事件已发送: {}", event);
}
}
配置文件:
spring:
cloud:
stream:
bindings:
output:
destination: order-events
content-type: application/json
producer:
requiredGroups: inventory-service,points-service,notification-service
6.1.3 库存服务(消费者)
InventoryConsumer.java:
@Service
public class InventoryConsumer {
@Autowired
private InventoryService inventoryService;
@StreamListener(target = Sink.INPUT)
public void handleOrderCreatedEvent(OrderEvent event) {
log.info("库存服务接收到订单事件: {}", event);
try {
// 扣减库存
inventoryService.deductStock(
event.getProductId(),
1
);
log.info("库存扣减成功,订单: {}", event.getOrderId());
} catch (InsufficientStockException e) {
log.error("库存不足,订单: {}", event.getOrderId());
throw e; // 抛出异常,触发重试
} catch (Exception e) {
log.error("库存扣减失败,订单: {}", event.getOrderId(), e);
throw e;
}
}
}
InventoryService.java:
@Service
public class InventoryService {
@Autowired
private InventoryRepository inventoryRepository;
@Transactional
public void deductStock(Long productId, Integer quantity) {
Inventory inventory = inventoryRepository.findByProductId(productId);
if (inventory == null) {
throw new ProductNotFoundException("商品不存在: " + productId);
}
if (inventory.getStock() < quantity) {
throw new InsufficientStockException("库存不足,当前库存: " + inventory.getStock());
}
// 扣减库存
inventory.setStock(inventory.getStock() - quantity);
inventoryRepository.save(inventory);
log.info("库存扣减成功,商品: {}, 剩余库存: {}", productId, inventory.getStock());
}
}
配置文件:
spring:
application:
name: inventory-service
cloud:
stream:
bindings:
input:
destination: order-events
group: inventory-service-group # 消费者组
content-type: application/json
consumer:
max-attempts: 3 # 重试3次
back-off-initial-interval: 1000
back-off-multiplier: 2.0
6.1.4 积分服务(消费者)
PointsConsumer.java:
@Service
public class PointsConsumer {
@Autowired
private PointsService pointsService;
@StreamListener(target = Sink.INPUT)
public void handleOrderCreatedEvent(OrderEvent event) {
log.info("积分服务接收到订单事件: {}", event);
try {
// 增加积分(订单金额的10%)
Integer points = event.getAmount().multiply(new BigDecimal("0.1")).intValue();
pointsService.addPoints(event.getUserId(), points);
log.info("积分增加成功,用户: {}, 积分: {}", event.getUserId(), points);
} catch (Exception e) {
log.error("积分增加失败", e);
// 积分增加失败不影响主流程,不抛出异常
}
}
}
配置文件:
spring:
application:
name: points-service
cloud:
stream:
bindings:
input:
destination: order-events
group: points-service-group
content-type: application/json
6.1.5 通知服务(消费者)
NotificationConsumer.java:
@Service
public class NotificationConsumer {
@Autowired
private NotificationService notificationService;
@StreamListener(target = Sink.INPUT)
public void handleOrderCreatedEvent(OrderEvent event) {
log.info("通知服务接收到订单事件: {}", event);
try {
// 发送短信通知
notificationService.sendSms(
event.getUserId(),
"您的订单 " + event.getOrderId() + " 已创建成功"
);
log.info("短信发送成功,订单: {}", event.getOrderId());
} catch (Exception e) {
log.error("短信发送失败", e);
// 通知发送失败不影响主流程,记录日志即可
}
}
}
配置文件:
spring:
application:
name: notification-service
cloud:
stream:
bindings:
input:
destination: order-events
group: notification-service-group
content-type: application/json
6.2 测试流程
6.2.1 启动服务
# 1. 启动RabbitMQ
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3-management
# 2. 启动订单服务
java -jar order-service.jar
# 3. 启动库存服务
java -jar inventory-service.jar
# 4. 启动积分服务
java -jar points-service.jar
# 5. 启动通知服务
java -jar notification-service.jar
6.2.2 发送请求
curl -X POST http://localhost:8081/orders \
-H "Content-Type: application/json" \
-d '{
"userId": 1001,
"productId": 2001,
"amount": 99.99
}'
6.2.3 查看日志
订单服务日志:
2025-12-07 10:30:15 INFO OrderService - 订单创建成功,订单号: abc123
2025-12-07 10:30:15 INFO OrderService - 订单事件已发送: OrderEvent{orderId='abc123', userId=1001...}
库存服务日志:
2025-12-07 10:30:16 INFO InventoryConsumer - 库存服务接收到订单事件: OrderEvent{orderId='abc123'...}
2025-12-07 10:30:16 INFO InventoryService - 库存扣减成功,商品: 2001, 剩余库存: 99
积分服务日志:
2025-12-07 10:30:16 INFO PointsConsumer - 积分服务接收到订单事件: OrderEvent{orderId='abc123'...}
2025-12-07 10:30:16 INFO PointsService - 积分增加成功,用户: 1001, 积分: 9
通知服务日志:
2025-12-07 10:30:17 INFO NotificationConsumer - 通知服务接收到订单事件: OrderEvent{orderId='abc123'...}
2025-12-07 10:30:17 INFO NotificationService - 短信发送成功,订单: abc123
七、生产环境最佳实践
7.1 幂等性处理
7.1.1 为什么需要幂等性?
问题场景:
订单服务发送1条消息 → 网络抖动 → 消息队列确认失败
→ 订单服务重发 → 消费者收到2条相同消息
→ 库存被扣减2次(错误!)
解决方案:幂等性处理
幂等性定义:同一个操作,执行多次和执行一次,结果相同
7.1.2 实现幂等性
方式一:数据库唯一约束
@Service
public class InventoryService {
@Transactional
public void deductStock(Long productId, Integer quantity, String bizId) {
try {
// 使用业务唯一ID作为主键,重复插入会失败
InventoryLog log = InventoryLog.builder()
.bizId(bizId) // 业务唯一ID(如订单号)
.productId(productId)
.quantity(quantity)
.createTime(LocalDateTime.now())
.build();
inventoryLogRepository.save(log);
// 扣减库存
Inventory inventory = inventoryRepository.findByProductId(productId);
inventory.setStock(inventory.getStock() - quantity);
inventoryRepository.save(inventory);
} catch (DuplicateKeyException e) {
log.warn("重复消息,已忽略: {}", bizId);
// 重复消息,直接返回
}
}
}
方式二:Redis分布式锁
@Service
public class InventoryService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void deductStock(Long productId, Integer quantity, String bizId) {
String lockKey = "inventory:lock:" + bizId;
// 尝试获取锁
Boolean lockAcquired = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "locked", 10, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(lockAcquired)) {
try {
// 扣减库存
doDeductStock(productId, quantity);
} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
} else {
log.warn("重复消息,已忽略: {}", bizId);
}
}
}
方式三:状态机检查
@Service
public class OrderService {
public void processOrder(OrderEvent event) {
Order order = orderRepository.findById(event.getOrderId());
// 检查订单状态,只有"已创建"状态的订单才能处理
if (order.getStatus() != OrderStatus.CREATED) {
log.warn("订单状态异常,已忽略: {}", order.getId());
return;
}
// 更新状态为"处理中"
order.setStatus(OrderStatus.PROCESSING);
orderRepository.save(order);
// 处理订单逻辑
processOrder(order);
}
}
💡 面试重点:如何保证消息消费的幂等性?
- 数据库唯一约束:使用业务ID作为唯一键,重复插入会失败
- Redis分布式锁:用业务ID作为锁,防止重复执行
- 状态机检查:检查业务状态,避免重复处理
7.2 消息顺序性
7.2.1 为什么需要顺序性?
场景:订单状态变更
订单创建 → 支付成功 → 发货 → 完成
如果消息乱序:
发货 → 订单创建 → 支付成功 → 完成(逻辑错误!)
7.2.2 保证顺序性的方案
方案一:单分区+单消费者
# 生产者配置
spring:
cloud:
stream:
bindings:
output:
producer:
partitionCount: 1 # 只有一个分区
# 消费者配置
spring:
cloud:
stream:
bindings:
input:
consumer:
concurrency: 1 # 单线程消费
缺点:性能差,无法并行处理
方案二:消息分区+按业务ID路由
# 生产者配置
spring:
cloud:
stream:
bindings:
output:
producer:
partitionKeyExpression: payload.orderId # 按订单ID分区
partitionCount: 10 # 10个分区
# 消费者配置
spring:
cloud:
stream:
bindings:
input:
group: order-service-group
consumer:
partitioned: true # 启用分区
instanceCount: 10
instanceIndex: 0
效果:
- 同一个订单的消息总是发送到同一个分区
- 同一个分区的消息总是被同一个消费者处理
- 保证同一订单的消息顺序
方案三:业务层排序
@Service
public class OrderProcessor {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void processOrder(OrderEvent event) {
String orderId = event.getOrderId();
// 检查是否有待处理的前序消息
Long lastSeq = getLastSequenceNumber(orderId);
if (lastSeq != null && event.getSequenceNumber() < lastSeq) {
// 有前序消息未处理,延迟处理当前消息
delayProcess(event);
return;
}
// 处理消息
doProcess(event);
// 更新序号
saveSequenceNumber(orderId, event.getSequenceNumber());
}
private void delayProcess(OrderEvent event) {
// 将消息放入延迟队列
redisTemplate.opsForList().rightPush(
"delay:queue:" + event.getOrderId(),
JSON.toJSONString(event)
);
}
}
7.3 消息堆积处理
7.3.1 消息堆积的原因
- 消费者处理速度慢
- 消费者宕机
- 网络延迟
- 突发流量
7.3.2 监控消息堆积
RabbitMQ监控命令:
# 查看队列消息数量
rabbitmqctl list_queues name messages
# 查看队列详细信息
rabbitmqctl list_queues name messages consumers
Spring Boot Actuator监控:
management:
endpoints:
web:
exposure:
include: health,info,bindings
访问:http://localhost:8082/actuator/bindings
7.3.3 解决方案
方案一:增加消费者实例
# 启动多个消费者实例
java -jar consumer-service.jar --server.port=8082
java -jar consumer-service.jar --server.port=8083
java -jar consumer-service.jar --server.port=8084
方案二:增加消费并发度
spring:
cloud:
stream:
bindings:
input:
consumer:
concurrency: 10 # 增加并发消费线程数
方案三:批量消费
spring:
cloud:
stream:
bindings:
input:
consumer:
batch-mode: true # 启用批量模式
max-attempts: 3
@StreamListener(target = Sink.INPUT)
public void handleBatchMessages(List<OrderEvent> events) {
log.info("批量处理 {} 个消息", events.size());
// 批量插入数据库
batchInsert(events);
// 批量更新缓存
batchUpdateCache(events);
}
方案四:临时消费者
@Component
public class TemporaryConsumer {
@Autowired
private StreamBridge streamBridge;
// 专门用于处理积压消息的临时消费者
@Scheduled(fixedRate = 1000)
public void processBacklog() {
// 查询积压消息数量
long messageCount = getMessageCount();
if (messageCount > 10000) {
log.warn("消息积压严重,启动临时消费者");
// 启动临时消费逻辑(可以跳过某些耗时操作)
processMessagesFast();
}
}
}
7.4 安全性配置
7.4.1 连接认证
spring:
cloud:
stream:
rabbit:
host: localhost
port: 5672
username: prod_user # 生产环境专用账号
password: ${RABBITMQ_PASSWORD} # 从环境变量读取
virtual-host: prod_vh # 生产环境专用虚拟主机
7.4.2 SSL/TLS加密
spring:
cloud:
stream:
rabbit:
host: rabbitmq.example.com
port: 5671
username: ${RABBITMQ_USERNAME}
password: ${RABBITMQ_PASSWORD}
ssl:
enabled: true
algorithm: TLSv1.2
key-store: classpath:rabbitmq.jks
key-store-password: ${KEYSTORE_PASSWORD}
trust-store: classpath:rabbitmq-trust.jks
trust-store-password: ${TRUSTSTORE_PASSWORD}
7.4.3 消息加密
@Component
public class MessageEncryptor {
@Autowired
private AESUtil aesUtil;
public Message<byte[]> encryptMessage(Object payload) {
// 序列化为JSON
String json = JSON.toJSONString(payload);
// AES加密
String encrypted = aesUtil.encrypt(json);
// Base64编码
byte[] bytes = Base64.getEncoder().encode(encrypted.getBytes());
return MessageBuilder.withPayload(bytes).build();
}
public Object decryptMessage(byte[] payload) {
// Base64解码
byte[] decoded = Base64.getDecoder().decode(payload);
// AES解密
String decrypted = aesUtil.decrypt(new String(decoded));
// 反序列化
return JSON.parseObject(decrypted, OrderEvent.class);
}
}
八、常见面试题
8.1 基础概念题
Q1:什么是消息驱动模式?
A:消息驱动模式是一种异步通信模式,服务之间通过消息中间件进行通信,而不是直接同步调用。
核心流程:
- 生产者发送消息到消息队列
- 消息队列存储消息
- 消费者从队列中获取消息并处理
优势:
- 解耦:生产者和消费者不需要知道对方的存在
- 异步:生产者发送消息后立即返回,不需要等待
- 削峰:消息队列作为缓冲,保护下游服务
Q2:Spring Cloud Stream的核心概念是什么?
A:Spring Cloud Stream有三个核心概念:
-
Binder(绑定器)
- 连接应用与消息中间件的桥梁
- 屏蔽不同消息中间件的差异
- 提供统一的API
-
Binding(绑定)
- 连接应用代码与消息中间件的物理连接
- 分为Input Binding(输入)和Output Binding(输出)
-
Message(消息)
- 在应用和消息中间件之间传递的数据
- 包含payload(消息体)和headers(消息头)
Q3:RabbitMQ和Kafka有什么区别?
A:
| 对比项 | RabbitMQ | Kafka |
|---|---|---|
| 消息模型 | 队列模型 | 发布-订阅模型(日志流) |
| 消息消费 | 消费后删除 | 消息保留,基于offset消费 |
| 吞吐量 | 万级/秒 | 百万级/秒 |
| 延迟 | 毫秒级 | 毫秒级 |
| 消息确认 | 支持ACK | 支持ACK |
| 消息路由 | 支持复杂的路由规则 | 简单的topic分区 |
| 消息保留 | 可配置(默认不保留) | 可配置(默认保留7天) |
| 适用场景 | 业务消息、任务队列 | 日志收集、流式处理 |
选择建议:
- 业务系统(如订单、支付)→ RabbitMQ
- 日志系统、大数据 → Kafka
8.2 实战应用题
Q4:如何保证消息不丢失?
A:消息丢失可能发生在三个环节,需要分别处理:
1. 生产者端(消息发送到MQ失败)
方案一:确认机制
spring:
cloud:
stream:
rabbit:
bindings:
output:
producer:
confirm-ack: true # 启用发送确认
@Service
public class ReliableProducer {
public void sendMessageWithConfirm(OrderEvent event) {
try {
source.output().send(
MessageBuilder.withPayload(event).build()
);
log.info("消息发送成功");
} catch (Exception e) {
log.error("消息发送失败", e);
// 重试或记录到失败队列
retryOrLog(event);
}
}
}
方案二:事务机制
spring:
cloud:
stream:
rabbit:
bindings:
output:
producer:
transacted: true # 启用事务
方案三:本地消息表
@Service
public class OrderService {
@Transactional
public void createOrder(Order order) {
// 1. 保存订单
orderRepository.save(order);
// 2. 保存待发送消息到本地表
outboxRepository.save(
OutboxMessage.builder()
.aggregateId(order.getId())
.payload(JSON.toJSONString(order))
.status(MessageStatus.PENDING)
.build()
);
}
}
// 定时任务扫描并发送消息
@Scheduled(fixedRate = 1000)
public void sendPendingMessages() {
List<OutboxMessage> messages = outboxRepository.findByStatus(MessageStatus.PENDING);
messages.forEach(message -> {
try {
source.output().send(
MessageBuilder.withPayload(message.getPayload()).build()
);
message.setStatus(MessageStatus.SENT);
outboxRepository.save(message);
} catch (Exception e) {
log.error("消息发送失败", e);
}
});
}
2. 消息队列端(MQ宕机,消息丢失)
RabbitMQ持久化配置:
spring:
cloud:
stream:
rabbit:
bindings:
output:
producer:
durable: true # 持久化队列
auto-bind-dlq: true # 死信队列
Kafka配置:
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka1:9092,kafka2:9092,kafka3:9092 # 集群
bindings:
output:
producer:
acks: all # 所有副本都确认
retries: 3
3. 消费者端(消费失败,消息丢失)
方案一:手动确认
spring:
cloud:
stream:
bindings:
input:
consumer:
acknowledge-mode: MANUAL # 手动确认
@StreamListener(target = Sink.INPUT)
public void handleMessage(OrderEvent event,
@Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) {
try {
processEvent(event);
channel.basicAck(deliveryTag, false); // 手动确认
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true); // 拒绝并重新入队
}
}
方案二:自动重试
spring:
cloud:
stream:
bindings:
input:
consumer:
max-attempts: 3
back-off-initial-interval: 1000
💡 总结:保证消息不丢失的三步法
- 生产者:确认机制 + 本地消息表
- 消息队列:持久化 + 集群
- 消费者:手动确认 + 重试机制
Q5:如何处理消息重复消费?
A:消息重复消费是常见的分布式系统问题,需要通过幂等性来解决。
方案一:数据库唯一约束
@Service
public class InventoryService {
@Transactional
public void deductStock(OrderEvent event) {
try {
InventoryLog log = InventoryLog.builder()
.bizId(event.getOrderId()) // 唯一业务ID
.productId(event.getProductId())
.quantity(1)
.build();
inventoryLogRepository.save(log); // 重复插入会抛出DuplicateKeyException
// 扣减库存
doDeductStock(event.getProductId(), 1);
} catch (DuplicateKeyException e) {
log.warn("重复消息,已忽略: {}", event.getOrderId());
}
}
}
方案二:Redis分布式锁
@Service
public class InventoryService {
public void deductStock(OrderEvent event) {
String lockKey = "deduct:lock:" + event.getOrderId();
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(locked)) {
try {
doDeductStock(event.getProductId(), 1);
} finally {
redisTemplate.delete(lockKey);
}
} else {
log.warn("重复消息,已忽略: {}", event.getOrderId());
}
}
}
方案三:状态机检查
@Service
public class OrderService {
public void processOrder(OrderEvent event) {
Order order = orderRepository.findById(event.getOrderId());
// 只有"已创建"状态的订单才能处理
if (order.getStatus() != OrderStatus.CREATED) {
log.warn("订单状态异常,已忽略: {}", event.getOrderId());
return;
}
// 更新状态
order.setStatus(OrderStatus.PROCESSING);
orderRepository.save(order);
// 处理订单
doProcess(order);
}
}
Q6:如何保证消息的顺序性?
A:消息顺序性需要从生产、传输、消费三个环节保证。
1. 生产者端:有序发送
@Service
public class OrderProducer {
// 使用同一个通道,确保有序发送
public void sendOrderEvents(List<OrderEvent> events) {
events.stream()
.sorted(Comparator.comparing(OrderEvent::getSequenceNumber))
.forEach(event -> {
source.output().send(
MessageBuilder.withPayload(event).build()
);
});
}
}
2. 消息队列端:使用分区
# 生产者配置
spring:
cloud:
stream:
bindings:
output:
producer:
partitionKeyExpression: payload.orderId # 同一订单ID发送到同一分区
partitionCount: 3
3. 消费者端:单线程消费或分区消费
# 方案一:单线程消费(性能差)
spring:
cloud:
stream:
bindings:
input:
consumer:
concurrency: 1
# 方案二:分区消费(推荐)
spring:
cloud:
stream:
bindings:
input:
consumer:
partitioned: true
instanceIndex: 0
instanceCount: 3
Q7:如何处理消息积压?
A:消息积压是生产环境的常见问题,需要快速处理。
1. 排查原因
# 查看消息积压数量
rabbitmqctl list_queues name messages
# 查看消费者状态
rabbitmqctl list_queues name consumers
2. 解决方案
方案一:增加消费者实例
# 快速扩容
java -jar consumer-service.jar --server.port=8082
java -jar consumer-service.jar --server.port=8083
java -jar consumer-service.jar --server.port=8084
方案二:增加并发度
spring:
cloud:
stream:
bindings:
input:
consumer:
concurrency: 20 # 增加到20个并发线程
方案三:批量消费
spring:
cloud:
stream:
bindings:
input:
consumer:
batch-mode: true
@StreamListener(target = Sink.INPUT)
public void handleBatchMessages(List<OrderEvent> events) {
// 批量插入数据库(性能更高)
batchInsert(events);
}
方案四:临时优化(跳过非关键操作)
@StreamListener(target = Sink.INTPUT)
public void handleOrderEvent(OrderEvent event) {
try {
// 关键业务必须执行
deductStock(event);
// 非关键业务可以跳过
if (!isBacklog()) {
sendNotification(event);
addPoints(event);
}
} catch (Exception e) {
log.error("处理失败", e);
}
}
8.3 高级应用题
Q8:Spring Cloud Stream 3.x 函数式编程模型相比注解方式有什么优势?
A:函数式编程模型是Spring Cloud Stream 3.x推荐的编程方式。
优势对比:
| 对比项 | 注解方式(2.x) | 函数式方式(3.x+) |
|---|---|---|
| 代码复杂度 | 需要定义接口 | 纯函数,更简洁 |
| 类型安全 | 弱类型 | 强类型 |
| 测试友好性 | 需要Spring上下文 | 纯函数,易测试 |
| 响应式支持 | 不支持 | 支持Reactor |
| 编程模型 | 命令式 | 函数式 + 响应式 |
代码对比:
注解方式:
@EnableBinding(Source.class)
public class OrderProducer {
@Autowired
private Source source;
public void sendOrder(OrderEvent event) {
source.output().send(
MessageBuilder.withPayload(event).build()
);
}
}
函数式方式:
@Configuration
public class OrderProducer {
@Bean
public Function<OrderEvent, OrderEvent> orderProducer() {
return event -> {
log.info("发送订单: {}", event);
return event;
};
}
}
响应式支持:
@Bean
public Function<Flux<OrderEvent>, Flux<OrderEvent>> orderProcessor() {
return flux -> flux
.map(event -> {
processEvent(event);
return event;
})
.onErrorContinue((error, event) -> {
log.error("处理失败", error);
});
}
Q9:如何实现消息的延迟消费?
A:延迟消费是指消息发送后,延迟一段时间才被消费。
方案一:RabbitMQ延迟插件
安装插件:
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
配置:
spring:
cloud:
stream:
rabbit:
bindings:
output:
producer:
delayed-exchange: true # 启用延迟交换机
发送延迟消息:
public void sendDelayedMessage(OrderEvent event, long delayMillis) {
source.output().send(
MessageBuilder
.withPayload(event)
.setHeader("x-delay", delayMillis) // 延迟时间(毫秒)
.build()
);
}
方案二:Kafka时间轮
@Service
public class DelayedMessageService {
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
private final TimeWheel timeWheel = new TimeWheel(100, 20, System.currentTimeMillis());
public void sendDelayedMessage(OrderEvent event, long delayMillis) {
// 加入时间轮
timeWheel.addTask(() -> {
kafkaTemplate.send("order-events", event);
}, delayMillis);
}
}
方案三:应用层延迟
@Service
public class DelayedMessageConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@StreamListener(target = Sink.INPUT)
public void handleOrderEvent(OrderEvent event) {
Long executeTime = event.getExecuteTime();
if (executeTime != null && executeTime > System.currentTimeMillis()) {
// 延迟时间未到,放入延迟队列
redisTemplate.opsForZSet().add(
"delay:queue",
JSON.toJSONString(event),
executeTime
);
} else {
// 处理消息
processEvent(event);
}
}
@Scheduled(fixedRate = 1000)
public void processDelayedMessages() {
long currentTime = System.currentTimeMillis();
// 查询到期的消息
Set<String> events = redisTemplate.opsForZSet()
.rangeByScore("delay:queue", 0, currentTime);
events.forEach(json -> {
OrderEvent event = JSON.parseObject(json, OrderEvent.class);
processEvent(event);
// 从延迟队列删除
redisTemplate.opsForZSet().remove("delay:queue", json);
});
}
}
Q10:如何实现消息的事务(跨服务事务)?
A:分布式事务是微服务架构的难点,消息驱动的最终一致性是常用方案。
方案一:本地消息表(推荐)
流程:
- 在同一个本地事务中,同时写入业务数据和消息记录
- 定时任务扫描消息记录,发送到消息队列
- 消费者消费消息并执行业务逻辑
代码实现:
生产者:
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private OutboxMessageRepository outboxRepository;
@Transactional
public void createOrder(Order order) {
// 1. 保存订单
order.setStatus(OrderStatus.CREATED);
orderRepository.save(order);
// 2. 保存消息到本地表(在同一事务中)
OutboxMessage message = OutboxMessage.builder()
.aggregateId(order.getId())
.aggregateType("Order")
.payload(JSON.toJSONString(order))
.status(MessageStatus.PENDING)
.createTime(LocalDateTime.now())
.build();
outboxRepository.save(message);
}
}
// 定时任务发送消息
@Component
public class MessageSenderJob {
@Autowired
private OutboxMessageRepository outboxRepository;
@Autowired
private StreamBridge streamBridge;
@Scheduled(fixedRate = 1000)
public void sendPendingMessages() {
List<OutboxMessage> messages = outboxRepository
.findByStatus(MessageStatus.PENDING, PageRequest.of(0, 100));
messages.forEach(message -> {
try {
streamBridge.send("order-events", message.getPayload());
message.setStatus(MessageStatus.SENT);
message.setSentTime(LocalDateTime.now());
outboxRepository.save(message);
} catch (Exception e) {
log.error("消息发送失败: {}", message.getId(), e);
}
});
}
}
消费者:
@Service
public class InventoryConsumer {
@Transactional
@StreamListener(target = Sink.INPUT)
public void handleOrderCreated(OrderEvent event) {
// 1. 记录消息已接收
InboxMessage inbox = InboxMessage.builder()
.messageId(event.getMessageId())
.status(MessageStatus.PROCESSING)
.build();
inboxRepository.save(inbox);
try {
// 2. 执行业务逻辑
inventoryService.deductStock(event.getProductId(), 1);
// 3. 更新状态为已处理
inbox.setStatus(MessageStatus.PROCESSED);
inboxRepository.save(inbox);
} catch (Exception e) {
inbox.setStatus(MessageStatus.FAILED);
inboxRepository.save(inbox);
throw e;
}
}
}
方案二:Saga模式
流程:
- 订单服务创建订单,发送"订单创建"事件
- 库存服务扣减库存,发送"库存扣减成功"事件
- 支付服务处理支付,发送"支付成功"事件
- 如果任何步骤失败,执行补偿操作
代码实现:
@Service
public class OrderSagaOrchestrator {
public void executeOrderSaga(Order order) {
try {
// 步骤1:创建订单
createOrder(order);
// 步骤2:扣减库存
deductInventory(order);
// 步骤3:处理支付
processPayment(order);
// 所有步骤成功
log.info("订单流程完成: {}", order.getId());
} catch (Exception e) {
log.error("订单流程失败,执行补偿: {}", order.getId());
compensate(order);
}
}
private void compensate(Order order) {
try {
// 补偿支付
if (order.getPaymentStatus() == PaymentStatus.SUCCESS) {
refundPayment(order);
}
// 补偿库存
if (order.getInventoryStatus() == InventoryStatus.DEDUCTED) {
restoreInventory(order);
}
// 取消订单
cancelOrder(order);
} catch (Exception e) {
log.error("补偿失败", e);
// 人工介入
}
}
}
💡 面试重点:分布式事务的解决方案?
- 强一致性:2PC、3PC(性能差,少用)
- 最终一致性:本地消息表(推荐)、Saga模式、TCC
- 最佳实践:优先使用本地消息表 + 最终一致性
九、总结与建议
9.1 核心要点
-
消息驱动模式是微服务架构的重要组成部分
- 解耦、异步、削峰是三大核心价值
- Spring Cloud Stream提供了统一的编程模型
-
Spring Cloud Stream三大核心组件
- Binder:绑定器,屏蔽消息中间件差异
- Binding:绑定,连接应用与消息中间件
- Message:消息,传递的数据结构
-
生产环境关键问题
- 消息不丢失:确认机制 + 持久化 + 手动ACK
- 消息不重复:幂等性处理(唯一约束、分布式锁、状态机)
- 消息顺序性:分区 + 按业务ID路由
- 消息积压:增加实例 + 提高并发 + 批量消费
9.2 学习路径
初学者:
- 理解消息驱动模式的基本概念
- 搭建RabbitMQ环境
- 实现简单的生产者和消费者
- 理解消息分组和分区
进阶开发者:
- 掌握错误处理和重试机制
- 实现幂等性消费
- 理解消息持久化原理
- 掌握函数式编程模型
架构师:
- 设计分布式事务方案(本地消息表、Saga)
- 处理大规模消息积压
- 优化消息系统性能
- 监控和告警体系建设
9.3 参考资源
官方文档:
- Spring Cloud Stream: https://spring.io/projects/spring-cloud-stream
- Spring Cloud Stream中文文档: https://mccrosson.io/spring-cloud-stream-reference/
- RabbitMQ官方文档: https://www.rabbitmq.com/getstarted.html
- Kafka官方文档: https://kafka.apache.org/documentation/
推荐阅读:
- 《消息队列高手课》
- 《深入理解RabbitMQ》
- 《Kafka权威指南》
- 《微服务架构设计模式》
开源项目:
- Spring Cloud Stream Examples: https://github.com/spring-cloud/spring-cloud-stream-samples
- RabbitMQ Tutorials: https://github.com/rabbitmq/rabbitmq-tutorials
💡 最后建议:消息驱动模式是微服务架构的必备技能,但不要滥用。
- 适合使用场景:异步处理、流量削峰、事件通知、跨系统通信
- 不适合使用场景:简单的同步调用、实时性要求极高的场景
- 最佳实践:同步+异步结合,核心业务用同步,非核心业务用异步