实战与面试
一、秒杀系统设计
1.1 需求分析
核心问题:
- 高并发:瞬间大量请求(百万级 QPS)
- 超卖:库存扣减准确性
- 性能:响应时间要求(< 100ms)
- 可靠性:防止数据丢失
技术挑战:
- 数据库扛不住高并发
- 库存扣减的原子性
- 防刷和重复购买
- 订单处理异步化
1.2 整体架构
┌─────────────┐
│ 用户请求 │
└──────┬──────┘
↓
┌─────────────┐
│ CDN + WAF │ ← 静态资源、防刷
└──────┬──────┘
↓
┌─────────────┐
│ 负载均衡 │ ← Nginx/LVS
└──────┬──────┘
↓
┌─────────────┐
│ Redis 缓存 │ ← 库存扣减、去重
└──────┬──────┘
↓
┌─────────────┐
│ 消息队列 MQ │ ← 削峰填谷
└──────┬──────┘
↓
┌─────────────┐
│ 订单服务 │ ← 异步处理
└──────┬──────┘
↓
┌─────────────┐
│ 数据库 │ ← 持久化存储
└─────────────┘
1.3 核心代码实现
1.3.1 库存预热
@Service
public class SeckillPreheatService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private ProductService productService;
/**
* 预热库存到 Redis
*/
public void preloadStock(Long productId, int stock) {
String stockKey = "seckill:stock:" + productId;
String userSetKey = "seckill:users:" + productId;
// 设置库存
redisTemplate.opsForValue().set(stockKey, String.valueOf(stock));
// 初始化已购买用户集合(用于去重)
redisTemplate.delete(userSetKey);
log.info("Preloaded stock for product {}: {}", productId, stock);
}
/**
* 批量预热
*/
@PostConstruct
public void preloadAllStocks() {
List<Product> products = productService.getSeckillProducts();
for (Product product : products) {
preloadStock(product.getId(), product.getStock());
}
}
}
1.3.2 秒杀核心逻辑
@Service
public class SeckillService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* Lua 脚本:原子性扣减库存 + 记录用户
*/
private static final String SECKILL_SCRIPT =
"local stockKey = KEYS[1] " +
"local userSetKey = KEYS[2] " +
"local userId = ARGV[1] " +
"local orderId = ARGV[2] " +
// 1. 检查用户是否已购买
"if redis.call('SISMEMBER', userSetKey, userId) == 1 then " +
" return -1 " + // 重复购买
"end " +
// 2. 检查并扣减库存
"local stock = tonumber(redis.call('GET', stockKey)) " +
"if stock == nil or stock <= 0 then " +
" return 0 " + // 库存不足
"end " +
"redis.call('DECR', stockKey) " +
"redis.call('SADD', userSetKey, userId) " +
// 3. 记录订单 ID
"local orderKey = 'seckill:orders:' .. userId " +
"redis.call('LPUSH', orderKey, orderId) " +
"redis.call('EXPIRE', orderKey, 3600) " +
"return 1 " + // 成功
"end";
/**
* 执行秒杀
*/
public SeckillResult doSeckill(Long userId, Long productId) {
String stockKey = "seckill:stock:" + productId;
String userSetKey = "seckill:users:" + productId;
String orderId = generateOrderId(userId, productId);
// 执行 Lua 脚本
DefaultRedisScript<Long> script = new DefaultRedisScript<>(SECKILL_SCRIPT, Long.class);
Long result = redisTemplate.execute(
script,
Arrays.asList(stockKey, userSetKey),
userId.toString(),
orderId
);
SeckillResult seckillResult = new SeckillResult();
if (result == 1) {
// 秒杀成功,发送消息到 MQ
SeckillMessage message = new SeckillMessage(userId, productId, orderId);
rabbitTemplate.convertAndSend("seckill.queue", message);
seckillResult.setSuccess(true);
seckillResult.setMessage("秒杀成功!");
seckillResult.setOrderId(orderId);
} else if (result == 0) {
seckillResult.setSuccess(false);
seckillResult.setMessage("抱歉,库存不足!");
} else if (result == -1) {
seckillResult.setSuccess(false);
seckillResult.setMessage("您已购买过该商品!");
}
return seckillResult;
}
/**
* 生成订单 ID
*/
private String generateOrderId(Long userId, Long productId) {
return String.format("%d%d%d",
System.currentTimeMillis(),
productId,
userId);
}
}
1.3.3 订单消息消费者
@Component
public class SeckillOrderConsumer {
@Autowired
private OrderService orderService;
@Autowired
private ProductService productService;
@RabbitListener(queues = "seckill.queue")
public void handleSeckillOrder(SeckillMessage message) {
try {
log.info("Processing seckill order: {}", message);
// 1. 创建订单
Order order = new Order();
order.setId(message.getOrderId());
order.setUserId(message.getUserId());
order.setProductId(message.getProductId());
order.setStatus(OrderStatus.CREATED);
order.setCreateTime(new Date());
orderService.createOrder(order);
// 2. 扣减数据库库存
productService.decreaseStock(message.getProductId(), 1);
// 3. 更新订单状态
order.setStatus(OrderStatus.PAID);
orderService.updateOrder(order);
log.info("Seckill order processed successfully: {}", message.getOrderId());
} catch (Exception e) {
log.error("Failed to process seckill order: {}", message, e);
// 失败处理:补偿库存
redisTemplate.opsForValue().increment("seckill:stock:" + message.getProductId());
redisTemplate.opsForSet().remove("seckill:users:" + message.getProductId(), message.getUserId().toString());
}
}
}
1.3.4 接口限流
@Component
public class RateLimiter {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 限流检查
* @param userId 用户 ID
* @param limit 限流数量
* @param window 时间窗口(秒)
* @return true=允许访问,false=被限流
*/
public boolean allowRequest(Long userId, int limit, int window) {
String key = "ratelimit:user:" + userId;
long now = System.currentTimeMillis();
// Lua 脚本:滑动窗口限流
String luaScript =
"redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, ARGV[1] - " + (window * 1000) + ") " +
"local count = redis.call('ZCARD', KEYS[1]) " +
"if count < tonumber(ARGV[2]) then " +
" redis.call('ZADD', KEYS[1], ARGV[3], ARGV[3]) " +
" redis.call('EXPIRE', KEYS[1], " + window + ") " +
" return 1 " +
"else " +
" return 0 " +
"end";
DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(
script,
Collections.singletonList(key),
String.valueOf(now),
String.valueOf(limit),
String.valueOf(now)
);
return result == 1;
}
}
// 使用
@RestController
@RequestMapping("/seckill")
public class SeckillController {
@Autowired
private RateLimiter rateLimiter;
@Autowired
private SeckillService seckillService;
@PostMapping("/{productId}")
public ResponseEntity<?> seckill(
@PathVariable Long productId,
@RequestHeader("X-User-Id") Long userId) {
// 限流:每个用户每秒最多请求 5 次
if (!rateLimiter.allowRequest(userId, 5, 1)) {
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
.body("请求过于频繁,请稍后再试");
}
SeckillResult result = seckillService.doSeckill(userId, productId);
return ResponseEntity.ok(result);
}
}
1.4 优化方案
1.4.1 本地缓存
@Component
public class LocalCache {
private final Cache<Long, Boolean> cache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(1, TimeUnit.MINUTES)
.build();
public boolean isSeckilled(Long userId, Long productId) {
String key = userId + ":" + productId;
return cache.getIfPresent(key) != null;
}
public void markSeckilled(Long userId, Long productId) {
String key = userId + ":" + productId;
cache.put(key, true);
}
}
1.4.2 库存分段
/**
* 将库存分散到多个 key,减少竞争
*/
public void segmentedStockPreheat(Long productId, int totalStock) {
int segmentCount = 10; // 分成 10 段
int stockPerSegment = totalStock / segmentCount;
for (int i = 0; i < segmentCount; i++) {
String key = "seckill:stock:" + productId + ":" + i;
redisTemplate.opsForValue().set(key, String.valueOf(stockPerSegment));
}
}
/**
* 随机选择一段扣减
*/
public boolean decreaseSegmentedStock(Long productId) {
int segmentCount = 10;
int randomSegment = ThreadLocalRandom.current().nextInt(segmentCount);
String key = "seckill:stock:" + productId + ":" + randomSegment;
Long result = redisTemplate.opsForValue().decrement(key);
if (result != null && result >= 0) {
return true;
} else {
// 扣减失败,回滚
redisTemplate.opsForValue().increment(key);
return false;
}
}
1.5 面试题汇总
Q1:秒杀系统如何解决超卖问题?
答案:
-
Redis 原子操作
- 使用 Lua 脚本保证原子性
- DECR 命令是原子的
-
数据库乐观锁
UPDATE stock
SET count = count - 1
WHERE product_id = ? AND count > 0
- 分布式锁
- Redis SET NX
- ZooKeeper 临时节点
Q2:秒杀系统如何应对高并发?
答案:
-
多级缓存
- 浏览器缓存
- CDN 缓存
- 本地缓存
- Redis 缓存
-
异步处理
- 消息队列削峰
- 订单异步创建
-
限流降级
- 接口限流
- 用户限流
- IP 限流
-
水平扩展
- Redis 集群
- 应用集群
- 数据库读写分离
二、分布式锁实现
2.1 基础实现
@Service
public class RedisLockService {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String LOCK_PREFIX = "lock:";
private static final long LOCK_EXPIRE = 30; // 锁过期时间(秒)
/**
* 获取锁
* @param lockKey 锁的 key
* @param requestId 唯一标识(用于释放锁时验证)
* @return 是否获取成功
*/
public boolean tryLock(String lockKey, String requestId) {
String key = LOCK_PREFIX + lockKey;
Boolean result = redisTemplate.opsForValue().setIfAbsent(
key,
requestId,
LOCK_EXPIRE,
TimeUnit.SECONDS
);
return Boolean.TRUE.equals(result);
}
/**
* 释放锁(Lua 脚本保证原子性)
*/
public boolean unlock(String lockKey, String requestId) {
String key = LOCK_PREFIX + lockKey;
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(script, Collections.singletonList(key), requestId);
return result == 1;
}
}
2.2 可重入锁
@Service
public class RedisReentrantLock {
@Autowired
private StringRedisTemplate redisTemplate;
private final ThreadLocal<Map<String, Integer>> lockers =
ThreadLocal.withInitial(HashMap::new);
/**
* 可重入锁
*/
public boolean lock(String lockKey, long expireTime) {
String key = "lock:" + lockKey;
String value = String.valueOf(Thread.currentThread().getId());
// 检查是否已持有锁
Map<String, Integer> locks = lockers.get();
if (locks.containsKey(key)) {
locks.put(key, locks.get(key) + 1);
return true;
}
// 尝试获取锁
Boolean acquired = redisTemplate.opsForValue().setIfAbsent(
key,
value,
expireTime,
TimeUnit.SECONDS
);
if (Boolean.TRUE.equals(acquired)) {
locks.put(key, 1);
return true;
}
return false;
}
/**
* 释放可重入锁
*/
public boolean unlock(String lockKey) {
String key = "lock:" + lockKey;
Map<String, Integer> locks = lockers.get();
Integer count = locks.get(key);
if (count == null) {
return false;
}
count--;
if (count > 0) {
locks.put(key, count);
return true;
}
locks.remove(key);
String value = String.valueOf(Thread.currentThread().getId());
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(script, Collections.singletonList(key), value);
return result == 1;
}
}
2.3 Redlock 算法(Redis 官方推荐)
@Service
public class RedlockService {
private List<RedisTemplate<String, String>> redisTemplates;
private static final long LOCK_EXPIRE = 30000; // 30 秒
/**
* 获取锁(需要在大多数节点上成功)
*/
public boolean lock(String lockKey, String requestId) {
int successCount = 0;
int requiredCount = redisTemplates.size() / 2 + 1; // 多数节点
long startTime = System.currentTimeMillis();
for (RedisTemplate<String, String> template : redisTemplates) {
try {
Boolean result = template.opsForValue().setIfAbsent(
lockKey,
requestId,
LOCK_EXPIRE,
TimeUnit.MILLISECONDS
);
if (Boolean.TRUE.equals(result)) {
successCount++;
}
} catch (Exception e) {
log.error("Failed to acquire lock on node", e);
}
}
// 计算获取锁消耗的时间
long elapsed = System.currentTimeMillis() - startTime;
if (elapsed >= LOCK_EXPIRE) {
// 获取锁耗时超过锁的过期时间,释放所有已获取的锁
unlock(lockKey, requestId);
return false;
}
// 检查是否在大多数节点上成功
return successCount >= requiredCount;
}
/**
* 释放锁
*/
public void unlock(String lockKey, String requestId) {
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
for (RedisTemplate<String, String> template : redisTemplates) {
try {
DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
template.execute(script, Collections.singletonList(lockKey), requestId);
} catch (Exception e) {
log.error("Failed to release lock on node", e);
}
}
}
}
2.4 看门狗机制(自动续期)
@Service
public class WatchDogLock {
@Autowired
private StringRedisTemplate redisTemplate;
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
private final Map<String, Future<?>> watchDogs = new ConcurrentHashMap<>();
/**
* 获取锁并启动看门狗
*/
public boolean lockWithWatchDog(String lockKey, String requestId) {
// 1. 尝试获取锁
Boolean acquired = redisTemplate.opsForValue().setIfAbsent(
"lock:" + lockKey,
requestId,
30,
TimeUnit.SECONDS
);
if (!Boolean.TRUE.equals(acquired)) {
return false;
}
// 2. 启动看门狗线程,定期续期
Future<?> future = scheduler.scheduleAtFixedRate(() -> {
// 检查锁是否还存在
String value = redisTemplate.opsForValue().get("lock:" + lockKey);
if (requestId.equals(value)) {
// 延长锁的过期时间
redisTemplate.expire("lock:" + lockKey, 30, TimeUnit.SECONDS);
log.debug("Watch dog extended lock: {}", lockKey);
} else {
// 锁已经不存在了,停止看门狗
Future<?> f = watchDogs.get(lockKey);
if (f != null) {
f.cancel(false);
watchDogs.remove(lockKey);
}
}
}, 10, 10, TimeUnit.SECONDS); // 每 10 秒续期一次
watchDogs.put(lockKey, future);
return true;
}
/**
* 释放锁并停止看门狗
*/
public void unlock(String lockKey, String requestId) {
// 1. 停止看门狗
Future<?> future = watchDogs.remove(lockKey);
if (future != null) {
future.cancel(false);
}
// 2. 释放锁
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
redisTemplate.execute(script, Collections.singletonList("lock:" + lockKey), requestId);
}
}
2.5 分布式锁对比
| 特性 | Redis SET NX | ZooKeeper | 数据库 | etcd |
|---|---|---|---|---|
| 性能 | 高 | 中 | 低 | 中 |
| 可靠性 | 中 | 高 | 高 | 高 |
| 实现复杂度 | 低 | 中 | 低 | 中 |
| 持久化 | 可配置 | 持久化 | 持久化 | 持久化 |
| 适用场景 | 高并发、高性能 | 高可靠性 | 简单场景 | 分布式协调 |
2.6 面试题
Q1:分布式锁如何保证原子性?
答案:
- SET NX + EX:Redis 原子命令
- Lua 脚本:保证多个命令原子执行
- WATCH + MULTI/EXEC:乐观锁机制
Q2:分布式锁失效怎么办?
答案:
- 看门狗机制:自动续期
- 唯一标识:释放时验证
- 超时重试:获取失败自动重试
- Redlock 算法:多节点冗余
三、排行榜系统设计
3.1 需求分析
核心功能:
- 实时排名
- 分数更新
- 范围查询(Top N)
- 用户排名查询
- 周围排名查询
**技术选型:**Redis Sorted Set(ZSet)
3.2 核心实现
@Service
public class LeaderboardService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String LEADERBOARD_KEY = "leaderboard:";
/**
* 增加分数
*/
public Double addScore(String leaderboard, String member, double score) {
String key = LEADERBOARD_KEY + leaderboard;
return redisTemplate.opsForZSet().incrementScore(key, member, score);
}
/**
* 获取用户排名
*/
public Long getRank(String leaderboard, String member) {
String key = LEADERBOARD_KEY + leaderboard;
Long rank = redisTemplate.opsForZSet().reverseRank(key, member);
return rank != null ? rank + 1 : null; // 排名从 1 开始
}
/**
* 获取用户分数
*/
public Double getScore(String leaderboard, String member) {
String key = LEADERBOARD_KEY + leaderboard;
return redisTemplate.opsForZSet().score(key, member);
}
/**
* 获取 Top N
*/
public List<RankingEntry> getTopN(String leaderboard, int n) {
String key = LEADERBOARD_KEY + leaderboard;
Set<ZSetOperations.TypedTuple<Object>> set =
redisTemplate.opsForZSet().reverseRangeWithScores(key, 0, n - 1);
List<RankingEntry> result = new ArrayList<>();
int rank = 1;
for (ZSetOperations.TypedTuple<Object> tuple : set) {
RankingEntry entry = new RankingEntry();
entry.setRank(rank++);
entry.setMember(tuple.getValue().toString());
entry.setScore(tuple.getScore());
result.add(entry);
}
return result;
}
/**
* 获取用户周围排名(前后各 n 名)
*/
public List<RankingEntry> getAroundRank(String leaderboard, String member, int n) {
String key = LEADERBOARD_KEY + leaderboard;
// 获取用户排名
Long rank = redisTemplate.opsForZSet().reverseRank(key, member);
if (rank == null) {
return Collections.emptyList();
}
// 计算范围
long start = Math.max(0, rank - n);
long end = Math.min(redisTemplate.opsForZSet().size(key) - 1, rank + n);
// 获取范围内的用户
Set<Object> members = redisTemplate.opsForZSet().reverseRange(key, start, end);
if (members == null || members.isEmpty()) {
return Collections.emptyList();
}
// 查询分数
List<RankingEntry> result = new ArrayList<>();
for (Object m : members) {
String mem = m.toString();
Double score = redisTemplate.opsForZSet().score(key, mem);
Long r = redisTemplate.opsForZSet().reverseRank(key, mem) + 1;
RankingEntry entry = new RankingEntry();
entry.setRank(r);
entry.setMember(mem);
entry.setScore(score);
result.add(entry);
}
return result;
}
/**
* 批量获取用户排名
*/
public Map<String, Long> batchGetRank(String leaderboard, List<String> members) {
String key = LEADERBOARD_KEY + leaderboard;
Map<String, Long> result = new HashMap<>();
for (String member : members) {
Long rank = redisTemplate.opsForZSet().reverseRank(key, member);
if (rank != null) {
result.put(member, rank + 1);
}
}
return result;
}
@Data
public static class RankingEntry {
private Long rank;
private String member;
private Double score;
}
}
3.3 实时排行榜优化
@Service
public class RealTimeLeaderboardService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private LeaderboardRepository leaderboardRepository;
/**
* 定时同步 Redis 排行榜到数据库(每 5 分钟)
*/
@Scheduled(fixedDelay = 300000)
public void syncToDatabase() {
String pattern = LEADERBOARD_KEY + "*";
Set<String> keys = redisTemplate.keys(pattern);
for (String key : keys) {
String leaderboard = key.substring(LEADERBOARD_KEY.length());
// 获取 Top 1000
Set<ZSetOperations.TypedTuple<Object>> top1000 =
redisTemplate.opsForZSet().reverseRangeWithScores(key, 0, 999);
// 转换并保存到数据库
List<LeaderboardEntry> entries = top1000.stream()
.map(tuple -> {
LeaderboardEntry entry = new LeaderboardEntry();
entry.setLeaderboard(leaderboard);
entry.setMember(tuple.getValue().toString());
entry.setScore(tuple.getScore());
return entry;
})
.collect(Collectors.toList());
leaderboardRepository.saveAll(entries);
}
}
/**
* 分页查询(使用缓存)
*/
public List<RankingEntry> getLeaderboardPage(String leaderboard, int page, int pageSize) {
String cacheKey = String.format("leaderboard:page:%s:%d:%d", leaderboard, page, pageSize);
// 先从缓存获取
List<RankingEntry> cached = (List<RankingEntry>) redisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
return cached;
}
// 缓存未命中,从 Redis ZSet 获取
String key = LEADERBOARD_KEY + leaderboard;
long start = (long) (page - 1) * pageSize;
long end = start + pageSize - 1;
Set<ZSetOperations.TypedTuple<Object>> set =
redisTemplate.opsForZSet().reverseRangeWithScores(key, start, end);
List<RankingEntry> result = set.stream()
.map(tuple -> {
RankingEntry entry = new RankingEntry();
entry.setRank(start + 1); // 简化处理,实际需要计算准确排名
entry.setMember(tuple.getValue().toString());
entry.setScore(tuple.getScore());
return entry;
})
.collect(Collectors.toList());
// 写入缓存(5 分钟)
redisTemplate.opsForValue().set(cacheKey, result, 5, TimeUnit.MINUTES);
return result;
}
}
3.4 多维度排行榜
/**
* 支持多个维度的排行榜
*/
@Service
public class MultiDimensionLeaderboard {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 更新多个维度分数
*/
public void updateScores(String member, Map<String, Double> dimensions) {
for (Map.Entry<String, Double> entry : dimensions.entrySet()) {
String dimension = entry.getKey();
Double score = entry.getValue();
String key = "leaderboard:" + dimension;
redisTemplate.opsForZSet().incrementScore(key, member, score);
}
}
/**
* 获取用户在各个维度的排名
*/
public Map<String, Long> getDimensionRanks(String member, List<String> dimensions) {
Map<String, Long> result = new HashMap<>();
for (String dimension : dimensions) {
String key = "leaderboard:" + dimension;
Long rank = redisTemplate.opsForZSet().reverseRank(key, member);
if (rank != null) {
result.put(dimension, rank + 1);
}
}
return result;
}
/**
* 综合排名(加权)
*/
public List<RankingEntry> getCompositeLeaderboard(
Map<String, Double> weights, int n) {
// 计算综合分数:score = w1*score1 + w2*score2 + ...
Map<String, Double> compositeScores = new HashMap<>();
for (Map.Entry<String, Double> weight : weights.entrySet()) {
String dimension = weight.getKey();
Double weightValue = weight.getValue();
String key = "leaderboard:" + dimension;
Set<ZSetOperations.TypedTuple<Object>> scores =
redisTemplate.opsForZSet().reverseRangeWithScores(key, 0, -1);
for (ZSetOperations.TypedTuple<Object> tuple : scores) {
String member = tuple.getValue().toString();
Double score = tuple.getScore();
compositeScores.merge(member, score * weightValue, Double::sum);
}
}
// 排序并返回 Top N
return compositeScores.entrySet().stream()
.sorted(Map.Entry.<String, Double>comparingByValue().reversed())
.limit(n)
.map(entry -> {
RankingEntry rankingEntry = new RankingEntry();
rankingEntry.setMember(entry.getKey());
rankingEntry.setScore(entry.getValue());
return rankingEntry;
})
.collect(Collectors.toList());
}
}
四、高并发缓存架构
4.1 多级缓存架构
┌─────────────────┐
│ L1: 浏览器缓存 │ → 强缓存、协商缓存
└────────┬────────┘
↓ Miss
┌─────────────────┐
│ L2: CDN 缓存 │ → 静态资源、图片
└────────┬────────┘
↓ Miss
┌─────────────────┐
│ L3: Nginx 缓存 │ → 反向代理缓存
└────────┬────────┘
↓ Miss
┌─────────────────┐
│ L4: 本地缓存 │ → Caffeine/Guava
└────────┬────────┘
↓ Miss
┌─────────────────┐
│ L5: Redis 缓存 │ → 分布式缓存
└────────┬────────┘
↓ Miss
┌─────────────────┐
│ 数据库 │ → MySQL/PostgreSQL
└─────────────────┘
4.2 本地缓存 + Redis 实现
@Service
public class MultiLevelCacheService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 本地缓存
private final Cache<String, Object> localCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build();
/**
* 多级缓存查询
*/
public Object get(String key) {
// L1: 本地缓存
Object value = localCache.getIfPresent(key);
if (value != null) {
log.debug("Cache hit in L1 (local): {}", key);
return value;
}
// L2: Redis 缓存
value = redisTemplate.opsForValue().get(key);
if (value != null) {
log.debug("Cache hit in L2 (Redis): {}", key);
// 写入本地缓存
localCache.put(key, value);
return value;
}
// L3: 数据库
log.debug("Cache miss, fetching from database: {}", key);
value = loadFromDatabase(key);
if (value != null) {
// 写入 Redis
redisTemplate.opsForValue().set(key, value, 30, TimeUnit.MINUTES);
// 写入本地缓存
localCache.put(key, value);
}
return value;
}
/**
* 更新缓存(删除所有层级的缓存)
*/
public void update(String key, Object value) {
// 1. 更新数据库
updateDatabase(key, value);
// 2. 删除本地缓存
localCache.invalidate(key);
// 3. 删除 Redis 缓存
redisTemplate.delete(key);
// 4. 通知其他节点删除本地缓存
publishCacheInvalidation(key);
}
/**
* 发布缓存失效消息
*/
private void publishCacheInvalidation(String key) {
String channel = "cache:invalidate";
redisTemplate.convertAndSend(channel, key);
}
/**
* 监听缓存失效消息
*/
@RedisMessageListener(topic = "cache:invalidate")
public void handleCacheInvalidation(String message) {
log.info("Received cache invalidation message: {}", message);
localCache.invalidate(message);
}
}
4.3 缓存预热
@Component
public class CacheWarmer {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ProductService productService;
/**
* 应用启动时预热缓存
*/
@PostConstruct
public void warmUpCache() {
log.info("Starting cache warm-up...");
// 1. 预热热门商品
List<Product> hotProducts = productService.getHotProducts();
for (Product product : hotProducts) {
String key = "product:" + product.getId();
redisTemplate.opsForValue().set(key, product, 1, TimeUnit.HOURS);
}
// 2. 预热商品分类
List<Category> categories = productService.getAllCategories();
for (Category category : categories) {
String key = "category:" + category.getId();
redisTemplate.opsForValue().set(key, category, 1, TimeUnit.HOURS);
}
log.info("Cache warm-up completed. Total {} items loaded.",
hotProducts.size() + categories.size());
}
/**
* 定时刷新缓存(每小时)
*/
@Scheduled(fixedDelay = 3600000)
public void refreshCache() {
log.info("Refreshing cache...");
warmUpCache();
}
}
4.4 缓存一致性方案
方案 1:Cache Aside Pattern
// 读:先读缓存,没有则读数据库,再写入缓存
public Product get(Long id) {
Product product = (Product) redisTemplate.opsForValue().get("product:" + id);
if (product == null) {
product = productRepository.findById(id);
if (product != null) {
redisTemplate.opsForValue().set("product:" + id, product, 1, TimeUnit.HOURS);
}
}
return product;
}
// 写:先更新数据库,再删除缓存
public void update(Product product) {
productRepository.save(product);
redisTemplate.delete("product:" + product.getId());
}
问题:并发时可能出现不一致
方案 2:延迟双删
public void updateWithDoubleDelete(Product product) {
// 1. 删除缓存
redisTemplate.delete("product:" + product.getId());
// 2. 更新数据库
productRepository.save(product);
// 3. 延迟再删除缓存
Thread.sleep(1000); // 延迟 1 秒
redisTemplate.delete("product:" + product.getId());
}
方案 3:订阅 Binlog(Canal)
@Component
public class CanalConsumer {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@KafkaListener(topics = "canal-topic")
public void handleCanalMessage(CanalMessage message) {
if ("UPDATE".equals(message.getType()) || "DELETE".equals(message.getType())) {
String tableName = message.getTable();
String id = message.getId();
// 删除相关缓存
String cacheKey = tableName + ":" + id;
redisTemplate.delete(cacheKey);
log.info("Invalidated cache: {}", cacheKey);
}
}
}
五、常见面试题汇总
5.1 基础题
Q1:Redis 为什么快?
答案:
- 纯内存操作:内存访问速度快(纳秒级)
- 单线程模型:避免线程切换和锁竞争
- IO 多路复用:epoll/kqueue 实现高并发
- 高效数据结构:跳表、压缩列表等
Q2:Redis 是单线程的吗?
答案:
- 网络请求处理:单线程(Redis 6.0 之前)
- 持久化:fork 子进程处理
- Redis 6.0+:引入多线程处理网络 IO(命令执行仍是单线程)
Q3:Redis 的数据结构及应用场景?
答案:
| 数据结构 | 应用场景 |
|---|---|
| String | 缓存、计数器、分布式锁 |
| Hash | 对象存储、购物车 |
| List | 消息队列、最新列表 |
| Set | 唯一性、标签、交集/并集 |
| Sorted Set | 排行榜、范围查询、优先级队列 |
Q4:Redis 持久化方式?
答案:
- RDB:快照持久化,适合备份,恢复快
- AOF:日志持久化,数据安全性高
- 混合持久化:RDB + AOF,兼顾性能和安全
5.2 进阶题
Q5:缓存穿透、击穿、雪崩的区别及解决方案?
答案:
| 问题 | 定义 | 解决方案 |
|---|---|---|
| 缓存穿透 | 查询不存在的数据 | 布隆过滤器、缓存空值 |
| 缓存击穿 | 热点 key 过期 | 互斥锁、热点数据永不过期 |
| 缓存雪崩 | 大量 key 同时过期 | 过期时间加随机值、多级缓存 |
Q6:如何保证缓存和数据库一致性?
答案:
-
Cache Aside Pattern
- 读:先读缓存,未命中读数据库,写入缓存
- 写:先更新数据库,再删除缓存
-
延迟双删
- 删除缓存 → 更新数据库 → 延迟 → 删除缓存
-
订阅 Binlog
- 监听 MySQL binlog
- 解析变更后删除缓存
- 最终一致性
Q7:Redis 的过期策略?
答案:
- 惰性删除:访问时检查是否过期
- 定期删除:每秒 10 次随机抽查
- 主动删除:内存不足时 + 淘汰策略
Q8:Redis 的内存淘汰策略?
答案:
- noeviction:不淘汰,写入返回错误
- allkeys-lru:淘汰最少使用的 key
- allkeys-lfu:淘汰最不经常使用的 key
- volatile-lru:淘汰设置了过期时间的 key(LRU)
- volatile-ttl:淘汰即将过期的 key
5.3 架构题
Q9:Redis 主从复制的原理?
答案:
- 建立连接:从节点发送 SYNC 命令
- 全量同步:主节点生成 RDB 发送给从节点
- 增量同步:主节点记录写命令到复制缓冲区
- 命令传播:主节点执行写命令后发送给从节点
Q10:哨兵模式的工作原理?
答案:
- 主观下线(SDOWN):单个哨兵认为主节点下线
- 客观下线(ODOWN):多个哨兵(quorum)认为主节点下线
- 故障转移:
- 选举领头哨兵
- 选择新主节点(优先级、偏移量、run ID)
- 从节点升级为主节点
- 通知其他从节点和新主节点建立复制
Q11:Redis Cluster 的原理?
答案:
- 分片:16384 个槽位
- Key 分布:
CRC16(key) % 16384 - 节点分配:每个节点负责部分槽位
- 高可用:每个主节点配置从节点
- 自动故障转移:主节点故障时从节点升级
Q12:如何设计分布式锁?
答案:
- 基础实现:
SET key value NX EX 30 - 可重入锁:记录线程 + 计数
- 看门狗:自动续期
- Redlock:多节点实现
- 释放锁:Lua 脚本保证原子性
5.4 场景题
Q13:如何设计一个秒杀系统?
答案:
架构设计:
用户 → CDN → 负载均衡 → Redis(库存扣减) → MQ → 订单服务 → 数据库
核心要点:
- 库存预热:提前加载到 Redis
- Lua 脚本:保证原子性
- 用户去重:Set 记录已购买用户
- MQ 异步:削峰填谷
- 限流降级:保护系统
Q14:如何实现排行榜?
答案:
使用 Redis Sorted Set:
// 增加分数
zadd leaderboard score member
// 获取排名
zrevrank leaderboard member
// 获取 Top N
zrevrange leaderboard 0 n-1 withscores
// 获取用户周围排名
zrevrange leaderboard start end withscores
优化:
- 分页缓存
- 定时同步到数据库
- 多维度排行榜
Q15:如何设计限流器?
答案:
- 固定窗口:简单但边界问题
- 滑动窗口:推荐方案
- 令牌桶:平滑限流
- 漏桶:恒定速率
实现(滑动窗口):
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local count = redis.call('ZCARD', key)
if count < limit then
redis.call('ZADD', key, now, now)
return 1
else
return 0
end
Q16:如何实现点赞功能?
答案:
使用 Sorted Set:
// 点赞
zadd post:likes timestamp user:1001
// 取消点赞
zrem post:likes user:1001
// 统计点赞数
zcard post:likes
// 检查是否点赞
zscore post:likes user:1001
// 获取点赞用户列表
zrevrange post:likes 0 9 withscores
优化:
- 缓存点赞数
- 异步更新数据库
- BitMap 存储点赞状态
5.5 性能优化题
Q17:如何监控 Redis 性能?
答案:
监控工具:
-
redis-cli
- INFO 命令
- SLOWLOG 慢查询
- MONITOR 实时监控
-
Prometheus + Grafana
- redis_exporter 导出指标
- 可视化监控
关键指标:
- 内存使用率:< 80%
- QPS:监控命令执行频率
- 慢查询:记录并优化
- 命中率:缓存命中率 > 90%
- 连接数:避免连接数过多
Q18:如何优化 Redis 性能?
答案:
-
避免 bigkey
- 使用 UNLINK 替代 DEL
- 分批删除
-
批量操作
- Pipeline 减少网络开销
- Lua 脚本保证原子性
-
选择合适数据结构
- Hash 替代多个 String
- ZSet 替代 List 排序
-
控制 key 生命周期
- 设置合理过期时间
- 避免内存泄漏
-
主从读写分离
- 主节点写
- 从节点读
-
集群化部署
- Redis Cluster 分片
- 减少单节点压力
Q19:Redis 大 key 如何处理?
答案:
危害:
- 删除时阻塞主线程
- 网络传输慢
- 内存占用大
解决方案:
- 拆分:将大 key 拆分成多个小 key
- 压缩:使用序列化压缩
- 分批删除:使用 SCAN/HSCAN
- UNLINK:异步删除
- 监控:使用
redis-cli --bigkeys发现
5.6 高级题
Q20:Redis 集群方案对比?
答案:
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 主从复制 | 简单 | 主节点单点 | 读写分离 |
| 哨兵 | 自动故障转移 | 主节点写压力大 | 中小规模 |
| Cluster | 自动分片 | 运维复杂 | 大规模 |
| Twemproxy | 代理分片 | 单点 | 已淘汰 |
| Codis | 功能完善 | 需要额外组件 | 中大规模 |
Q21:如何实现 Session 共享?
答案:
使用 Redis 存储 Session:
@Configuration
@EnableRedisHttpSession(maxInactiveIntervalInSeconds = 1800)
public class SessionConfig {
// Spring Session 自动配置
}
优点:
- 集中式存储
- 自动过期
- 跨域共享
注意:
- Session 序列化
- 内存占用
- 过期策略
Q22:Redis 实现 IM 消息存储?
答案:
方案设计:
- 最近消息:List
lpush messages:1001 "{...}"
ltrim messages:1001 0 99 # 保留最近 100 条
- 未读消息:Hash
hset unread:1001 "msg:123" "{...}"
hdel unread:1001 "msg:123"
- 消息已读状态:Bitmap
setbit user:1001:read 123 1
六、实战技巧总结
6.1 设计原则
-
Key 命名规范
- 使用冒号分隔:
namespace:type:id - 示例:
user:1001:profile
- 使用冒号分隔:
-
合理设置过期时间
- 热点数据:较长过期时间
- 临时数据:较短过期时间
- 加随机值避免同时过期
-
避免 bigkey
- 单个 key 不超过 10 MB
- 集合元素不超过 10000 个
-
使用 Pipeline
- 批量操作减少网络开销
- 控制每批 100-1000 个命令
-
Lua 脚本
- 保证原子性
- 减少网络往返
- 复用脚本(SCRIPT LOAD)
6.2 常见陷阱
-
KEYS 命令
- 生产环境禁止使用
- 使用 SCAN 代替
-
FLUSHALL
- 清空所有数据库
- 误操作风险高
-
缓存雪崩
- 过期时间加随机值
- 多级缓存
-
Monitor 命令
- 严重影响性能
- 仅用于调试
-
SAVE 命令
- 阻塞主线程
- 使用 BGSAVE
6.3 最佳实践
- 连接池配置
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(100);
config.setMaxIdle(50);
config.setMinIdle(10);
-
序列化选择
- JSON:可读性好,空间占用大
- Protobuf:性能好,空间占用小
- Kryo:性能优秀
-
监控告警
- 内存使用率 > 80%
- 慢查询数量
- 连接数异常
-
备份恢复
- 定时 RDB 快照
- AOF 持久化
- 异地备份
七、面试准备建议
7.1 复习重点
-
基础概念(30%)
- 数据结构及应用场景
- 持久化方式
- 过期策略
-
核心原理(40%)
- 单线程模型
- IO 多路复用
- 主从复制
- 哨兵、集群
-
实战场景(30%)
- 缓存设计
- 分布式锁
- 秒杀系统
- 排行榜
7.2 实战经验
-
项目经验
- 分享实际项目中的使用场景
- 遇到的问题及解决方案
- 性能优化经验
-
故障排查
- 慢查询分析
- 内存泄漏处理
- 连接数异常排查
-
性能调优
- 监控指标
- 优化方案
- 效果对比
7.3 进阶学习
-
源码阅读
- Redis 核心代码
- 数据结构实现
- 网络模型
-
架构设计
- 分布式缓存架构
- 高可用方案
- 容量规划
-
新技术
- Redis 7.0 新特性
- Redis Modules
- Redis Cluster 演进
八、模拟面试
面试官:请介绍一下 Redis 的基本数据结构及使用场景?
参考答案:
Redis 有 5 种基本数据结构:
-
String(字符串)
- 应用:缓存、计数器、分布式锁、Session
- 示例:
SET user:1001 "Alice"
-
Hash(哈希)
- 应用:对象存储、购物车
- 示例:
HSET user:1001 name "Alice" age 20
-
List(列表)
- 应用:消息队列、最新列表
- 示例:
LPUSH queue:tasks "task1"
-
Set(集合)
- 应用:唯一性、标签、共同好友
- 示例:
SADD tags:1001 "java" "redis"
-
Sorted Set(有序集合)
- 应用:排行榜、范围查询
- 示例:
ZADD leaderboard 100 "user1001"
此外,Redis 还有 3 种高级数据结构:
- Bitmap:用户签到、在线统计
- HyperLogLog:UV 统计
- GEO:地理位置
面试官:如何保证缓存和数据库的一致性?
参考答案:
常用方案:
-
Cache Aside Pattern(旁路缓存)
- 读:先读缓存,未命中读数据库,写入缓存
- 写:先更新数据库,再删除缓存
-
延迟双删
// 1. 删除缓存
redis.delete(key);
// 2. 更新数据库
db.update(data);
// 3. 延迟再删除
Thread.sleep(1000);
redis.delete(key); -
订阅 Binlog(Canal)
- 监听 MySQL binlog
- 解析变更后删除缓存
- 最终一致性
对比:
- Cache Aside:简单,但可能短暂不一致
- 延迟双删:更可靠,但性能稍差
- Canal:解耦,但需要额外组件
参考文档:
- Redis 官方文档
- Redis 中文文档
- 《Redis 设计与实现》
- 《Redis 开发与运维》
- 《Redis 实战》