高级特性
一、发布订阅(Pub/Sub)
1.1 基本概念
发布订阅模式:一种消息通信模式,发送者(Pub)发布消息,订阅者(Sub)接收消息。
应用场景:
- 实时消息推送
- 聊天室
- 系统通知
- 解耦服务
1.2 常用命令
# 订阅频道
SUBSCRIBE channel1 channel2
# 模式订阅
PSUBSCRIBE news.* # 订阅所有 news. 开头的频道
# 发布消息
PUBLISH channel1 "hello"
# 取消订阅
UNSUBSCRIBE channel1
PUNSUBSCRIBE news.*
# 查看活跃频道
PUBSUB CHANNELS
# 查看频道订阅数
PUBSUB NUMSUB channel1
# 查看模式订阅数
PUBSUB NUMPAT
1.3 使用示例
客户端 1(订阅者):
127.0.0.1:6379> SUBSCRIBE news
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news"
3) (integer) 1
客户端 2(发布者):
127.0.0.1:6379> PUBLISH news "Hello Redis"
(integer) 1 # 返回订阅者数量
客户端 1(收到消息):
1) "message"
2) "news"
3) "Hello Redis"
1.4 模式订阅示例
# 订阅所有 sports 开头的频道
PSUBSCRIBE sports.*
# 发布消息到 sports.basketball
PUBLISH sports.basketball "NBA game started"
# 订阅者会收到消息
1.5 原理分析
频道数据结构:
// 服务器状态
struct redisServer {
dict *pubsub_channels; // 频道字典,键为频道名,值为链表(订阅客户端)
list *pubsub_patterns; // 模式订阅链表
};
// 客户端状态
struct redisClient {
dict *pubsub_channels; // 客户端订阅的频道
list *pubsub_patterns; // 客户端订阅的模式
};
消息转发流程:
- 客户端发送
PUBLISH channel message - 服务器在
pubsub_channels中查找频道 - 遍历频道的订阅者链表,发送消息给每个订阅者
- 遍历
pubsub_patterns,匹配模式订阅者,发送消息
1.6 优缺点
优点:
- 解耦发布者和订阅者
- 支持多对多通信
- 支持模式匹配(通配符)
缺点:
- 消息不持久化:离线订阅者无法接收历史消息
- 消息可能丢失:订阅者断线时无法接收消息
- 无消息确认:发布者不知道消息是否被接收
- 无消息堆积:如果订阅者处理慢,消息会丢失
面试题:Pub/Sub 的适用场景和局限性?
答案:
适用场景:
- 实时通知:系统通知、消息推送
- 即时通讯:聊天室、在线状态
- 配置更新:配置变更通知
- 日志收集:分布式日志收集
不适用场景:
- 可靠消息传递:需要使用消息队列(RabbitMQ、Kafka)
- 离线消息:需要使用 Stream 或消息队列
- 消息堆积:需要使用消息队列
- 事务消息:需要使用事务机制
替代方案:
- Redis Stream(Redis 5.0+):支持消息持久化和消费者组
- 消息队列:RabbitMQ、Kafka、RocketMQ
二、Lua 脚本
2.1 基本概念
Lua 脚本:在 Redis 中执行 Lua 脚本,可以保证多个命令的原子性执行。
优势:
- 原子性:脚本执行期间不执行其他命令
- 减少网络开销:多个命令一次性发送
- 复用性:脚本可以被缓存和重复执行
- 复杂逻辑:支持条件判断、循环等
2.2 常用命令
# 执行脚本
EVAL script numkeys key [key ...] arg [arg ...]
# 执行缓存的脚本
EVALSHA sha1 numkeys key [key ...] arg [arg ...]
# 脚本缓存(但不执行)
SCRIPT LOAD script
# 检查脚本是否存在
SCRIPT EXISTS sha1 [sha1 ...]
# 刷新脚本缓存
SCRIPT FLUSH
# 强制杀死脚本
SCRIPT KILL
2.3 使用示例
示例 1:原子性操作
-- 扣减库存并记录订单
local stock = redis.call('GET', KEYS[1])
if tonumber(stock) > 0 then
redis.call('DECR', KEYS[1])
redis.call('HSET', KEYS[2], 'userid', ARGV[1], 'time', ARGV[2])
return 1
else
return 0
end
执行:
EVAL "local stock = redis.call('GET', KEYS[1]) if tonumber(stock) > 0 then redis.call('DECR', KEYS[1]) redis.call('HSET', KEYS[2], 'userid', ARGV[1], 'time', ARGV[2]) return 1 else return 0 end" 2 stock:1001 order:1001 user123 1704067200
示例 2:批量操作
-- 批量设置 key,并设置过期时间
for i = 1, #KEYS do
redis.call('SET', KEYS[i], ARGV[i])
redis.call('EXPIRE', KEYS[i], 3600)
end
return #KEYS
执行:
EVAL "for i = 1, #KEYS do redis.call('SET', KEYS[i], ARGV[i]) redis.call('EXPIRE', KEYS[i], 3600) end return #KEYS" 3 key1 key2 key3 value1 value2 value3
示例 3:限流器
-- 滑动窗口限流
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- 删除窗口外的记录
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
-- 获取当前窗口内的请求数
local count = redis.call('ZCARD', key)
if count < limit then
-- 添加当前请求
redis.call('ZADD', key, now, now)
-- 设置过期时间
redis.call('EXPIRE', key, window)
return 1 -- 允许请求
else
return 0 -- 拒绝请求
end
执行:
EVAL "local key = KEYS[1] local limit = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) redis.call('ZREMRANGEBYSCORE', key, 0, now - window) local count = redis.call('ZCARD', key) if count < limit then redis.call('ZADD', key, now, now) redis.call('EXPIRE', key, window) return 1 else return 0 end" 1 limit:user:123 100 60 1704067200
示例 4:分布式锁(更完善的实现)
-- 获取锁
local lockKey = KEYS[1]
local lockValue = ARGV[1]
local expireTime = tonumber(ARGV[2])
if redis.call('SET', lockKey, lockValue, 'NX', 'EX', expireTime) then
return 1
else
return 0
end
-- 释放锁
local lockKey = KEYS[1]
local lockValue = ARGV[1]
if redis.call('GET', lockKey) == lockValue then
return redis.call('DEL', lockKey)
else
return 0
end
2.4 脚本缓存
问题:每次发送完整脚本会消耗网络带宽
解决:使用 SCRIPT LOAD 缓存脚本,返回 SHA1 校验和
# 缓存脚本
SCRIPT LOAD "return redis.call('GET', KEYS[1])"
# 返回: "4c64034eb8f4256c5b202840bb5ed6b0d3925c53"
# 使用 SHA1 执行
EVALSHA 4c64034eb8f4256c5b202840bb5ed6b0d3925c53 1 mykey
Java 实现:
@Autowired
private RedisScript<Long> redisScript;
@Autowired
private StringRedisTemplate redisTemplate;
public Boolean seckill(Long userId, Long productId) {
String sha = redisScript.getSha1(); // 脚本的 SHA1
Long result = redisTemplate.execute(
RedisScript.of(sha, Long.class),
Arrays.asList("stock:" + productId, "order:" + productId),
userId.toString(), String.valueOf(System.currentTimeMillis())
);
return result == 1;
}
2.5 注意事项
超时处理:
- 脚本执行时间默认限制为 5 秒
- 超时后返回
BUSY错误 - 可通过
lua-time-limit配置调整
脚本超时处理:
# 查看正在执行的脚本
SCRIPT KILL
# 如果脚本正在修改数据,无法 KILL,需要 SHUTDOWN NOSAVE
禁止操作:
- 不建议在脚本中使用随机数(每次执行结果相同)
- 不建议执行长时间运行的脚本
- 避免访问外部系统(网络、文件)
面试题:Lua 脚本如何保证原子性?
答案:
- Redis 使用单线程模型执行命令
- Lua 脚本执行期间,不会执行其他客户端的命令
- 整个脚本作为一个整体执行,不会被中断
- 类似于事务,但比事务更灵活(支持逻辑判断)
面试题:Lua 脚本和事务的区别?
答案:
| 特性 | Lua 脚本 | 事务(MULTI/EXEC) |
|---|---|---|
| 原子性 | 支持 | 支持 |
| 条件判断 | 支持 | 不支持 |
| 循环 | 支持 | 不支持 |
| 错误处理 | 支持复杂逻辑 | 遇到错误继续执行 |
| 网络开销 | 一次请求 | 多次请求(Pipeline 可优化) |
| 复杂度 | 适合复杂逻辑 | 适合简单批量操作 |
使用建议:
- 简单批量操作:使用事务 + Pipeline
- 复杂业务逻辑:使用 Lua 脚本
- 需要条件判断:使用 Lua 脚本
面试题:如何实现一个限流器?
答案:
方案 1:固定窗口(有问题)
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local count = redis.call('GET', key)
if count == false then
redis.call('SET', key, 1, 'EX', window)
return 1
elseif tonumber(count) < limit then
redis.call('INCR', key)
return 1
else
return 0
end
问题:窗口边界可能出现 2 倍流量
方案 2:滑动窗口(推荐)
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- 删除窗口外的记录
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
-- 获取当前窗口内的请求数
local count = redis.call('ZCARD', key)
if count < limit then
redis.call('ZADD', key, now, now)
redis.call('EXPIRE', key, window)
return 1
else
return 0
end
方案 3:令牌桶(适合平滑限流)
local key = KEYS[1]
local capacity = tonumber(ARGV[1]) -- 桶容量
local rate = tonumber(ARGV[2]) -- 令牌生成速率(个/秒)
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4]) -- 请求令牌数
-- 获取当前令牌数和上次填充时间
local info = redis.call('HMGET', key, 'tokens', 'last_time')
local tokens = tonumber(info[1]) or capacity
local last_time = tonumber(info[2]) or now
-- 计算需要补充的令牌数
local delta = math.max(0, (now - last_time) * rate / 1000)
tokens = math.min(capacity, tokens + delta)
if tokens >= requested then
-- 扣减令牌
redis.call('HMSET', key, 'tokens', tokens - requested, 'last_time', now)
redis.call('EXPIRE', key, math.ceil(capacity / rate * 2) + 1)
return 1
else
-- 令牌不足,更新但不扣减
redis.call('HMSET', key, 'tokens', tokens, 'last_time', now)
redis.call('EXPIRE', key, math.ceil(capacity / rate * 2) + 1)
return 0
end
三、Pipeline(管道)
3.1 基本概念
Pipeline:在一次网络往返中执行多个命令,减少网络开销。
原理:
- 客户端打包多个命令,一次性发送给服务器
- 服务器一次性执行多个命令,返回所有结果
- 减少网络往返时间(RTT,Round Trip Time)
3.2 性能对比
不使用 Pipeline:
# 100 次 SET 操作
for i in {1..100}; do
redis-cli SET key$i value$i
done
# 耗时:约 1-2 秒(100 次 RTT)
使用 Pipeline:
# 100 次 SET 操作
echo -n "$(for i in {1..100}; do echo "SET key$i value$i"; done)" | redis-cli --pipe
# 耗时:约 0.01-0.05 秒(1 次 RTT)
**性能提升:**20-100 倍(取决于网络延迟)
3.3 使用示例
Java(Jedis)
Jedis jedis = new Jedis("localhost");
// 不使用 Pipeline
long start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
jedis.set("key:" + i, "value:" + i);
}
System.out.println("Time: " + (System.currentTimeMillis() - start) + "ms");
// 使用 Pipeline
Pipeline pipeline = jedis.pipelined();
start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
pipeline.set("key:" + i, "value:" + i);
}
pipeline.sync(); // 必须调用 sync() 才会发送命令
System.out.println("Pipeline Time: " + (System.currentTimeMillis() - start) + "ms");
Java(Lettuce)
RedisClient client = RedisClient.create("redis://localhost");
StatefulRedisConnection<String, String> connection = client.connect();
// 使用 Pipeline
long start = System.currentTimeMillis();
connection.setAutoFlushCommands(false); // 关闭自动刷新
for (int i = 0; i < 10000; i++) {
connection.sync().set("key:" + i, "value:" + i);
}
connection.flushCommands(); // 手动刷新(一次性发送)
System.out.println("Pipeline Time: " + (System.currentTimeMillis() - start) + "ms");
Python(redis-py)
import redis
r = redis.Redis(host='localhost', port=6379)
# 使用 Pipeline
pipe = r.pipeline()
for i in range(10000):
pipe.set(f'key:{i}', f'value:{i}')
pipe.execute() # 一次性执行
3.4 注意事项
命令数量限制:
- Pipeline 不应该包含过多命令
- 建议每批 100-1000 个命令
- 避免内存占用过大
事务性:
- Pipeline 不保证原子性
- 只是为了减少网络开销
- 需要原子性请使用
MULTI/EXEC或 Lua 脚本
错误处理:
- 如果某个命令失败,不影响其他命令执行
- 需要检查每个命令的返回结果
Pipeline pipeline = jedis.pipelined();
pipeline.set("key1", "value1");
pipeline.incr("key1"); // 错误:key1 不是数字
pipeline.set("key2", "value2");
List<Object> results = pipeline.syncAndReturnAll();
// results[1] 会返回错误信息
面试题:Pipeline 和事务的区别?
答案:
| 特性 | Pipeline | 事务(MULTI/EXEC) |
|---|---|---|
| 网络往返 | 减少到 1 次 | 减少到 1 次 |
| 原子性 | 不保证 | 保证 |
| 隔离性 | 不保证 | 保证 |
| 命令执行 | 立即执行 | EXEC 时一起执行 |
| 错误处理 | 单独处理 | 遇到语法错误全部不执行 |
| 使用场景 | 批量操作 | 需要原子性的操作 |
使用建议:
- 批量操作,不需要原子性:使用 Pipeline
- 需要原子性:使用事务
- 需要复杂逻辑:使用 Lua 脚本
面试题:Pipeline 如何提升性能?
答案:
- 减少网络往返:从 N 次 RTT 减少到 1 次 RTT
- 减少 TCP 开销:减少 TCP 包的封装和解封装
- 减少系统调用:减少 read/write 系统调用次数
- 适用场景:批量操作、网络延迟高的环境
性能测试示例:
网络延迟 1ms,执行 1000 个命令:
- 不使用 Pipeline:1000ms(1000 × 1ms)
- 使用 Pipeline:5ms(1 × 1ms + 4ms 执行时间)
网络延迟 100ms,执行 1000 个命令:
- 不使用 Pipeline:100000ms(1000 × 100ms)
- 使用 Pipeline:105ms(1 × 100ms + 5ms 执行时间)
四、慢查询分析
4.1 基本概念
慢查询:执行时间超过指定阈值的命令。
作用:
- 发现性能瓶颈
- 优化慢命令
- 监控系统健康状态
4.2 相关配置
# 慢查询阈值(微秒),默认 10000(10ms)
slowlog-log-slower-than 10000
# 慢查询日志最大长度,默认 128
slowlog-max-len 128
注意事项:
slowlog-log-slower-than = 0:记录所有命令slowlog-log-slower-than < 0:不记录任何命令- 循环队列,新日志覆盖最老的日志
4.3 常用命令
# 获取慢查询日志
SLOWLOG GET [n]
# 获取慢查询日志长度
SLOWLOG LEN
# 重置慢查询日志
SLOWLOG RESET
4.4 慢查询日志格式
127.0.0.1:6379> SLOWLOG GET
1) 1) (integer) 14 # 日志唯一标识
2) (integer) 1704067200 # 执行时间戳
3) (integer) 50000 # 执行耗时(微秒)
4) "KEYS" "user:*" # 执行的命令及参数
5) "127.0.0.1:54321" # 客户端 IP 和端口
6) "" # 客户端名称
4.5 常见慢命令
O(N) 复杂度命令:
# 查找所有匹配的 key(生产环境禁止使用)
KEYS pattern
# 获取所有 Hash 字段
HGETALL key
# 获取所有 Set 成员
SMEMBERS key
# 获取所有 ZSet 成员
ZRANGE key 0 -1 WITHSCORES
大数据量操作:
# 删除大 Hash(阻塞主线程)
DEL bigkey
# 大 List 操作
LRANGE large_list 0 -1
4.6 慢查询优化
优化建议:
- 避免 KEYS,使用 SCAN
# 不推荐
KEYS user:*
# 推荐
SCAN cursor MATCH user:* COUNT 100
- 使用 HSCAN/SSCAN/ZSCAN
# 不推荐
HGETALL user:1001
# 推荐
HSCAN user:1001 0 COUNT 100
- 使用 UNLINK 替代 DEL
# DEL 同步删除(阻塞)
DEL bigkey
# UNLINK 异步删除(不阻塞)
UNLINK bigkey
- 分批操作
// 分批删除大 Hash
String cursor = "0";
do {
ScanResult<Map.Entry<String, String>> scanResult =
jedis.hscan("bigkey", cursor);
cursor = scanResult.getCursor();
List<String> fields = scanResult.getResult().stream()
.map(Map.Entry::getKey)
.collect(Collectors.toList());
if (!fields.isEmpty()) {
jedis.hdel("bigkey", fields.toArray(new String[0]));
}
} while (!cursor.equals("0"));
4.7 慢查询监控
定期分析脚本:
#!/bin/bash
# 慢查询分析脚本
redis-cli SLOWLOG GET 100 > slowlog.txt
# 分析最慢的 10 个命令
cat slowlog.txt | grep -A 6 "integer) [0-9]" | \
awk '/4)/ {cmd=$0; next} /3)/ {time=$2; print time, cmd}' | \
sort -rn | head -10
Prometheus 监控:
# 使用 redis_exporter 导出慢查询指标
- job_name: 'redis'
static_configs:
- targets: ['localhost:9121']
面试题:如何分析和优化慢查询?
答案:
分析步骤:
- 查看慢查询日志
redis-cli SLOWLOG GET 10
- 分析慢命令类型
- O(N) 复杂度命令(KEYS、HGETALL)
- 大 key 操作
- 网络延迟
- 分析执行频率
# 查看 INFO commandstats
redis-cli INFO commandstats | grep -E "cmdstat_(keys|hgetall)"
- 优化方案
- KEYS → SCAN
- HGETALL → HSCAN
- DEL → UNLINK
- 分批操作
预防措施:
- 监控慢查询日志
- 设置告警阈值
- 代码审查,禁止危险命令
- 使用 Pipeline 批量操作
五、Bitmap(位图)
5.1 基本概念
Bitmap:通过操作二进制位来实现数据存储,底层使用 String 类型。
特点:
- 节省内存(每个用户只占 1 bit)
- 支持位运算
- 适合存储二值状态(签到、活跃用户等)
5.2 常用命令
# 设置位
SETBIT key offset value
# 获取位
GETBIT key offset
# 统计位数为 1 的个数
BITCOUNT key [start end]
# 位运算(AND、OR、XOR、NOT)
BITOP operation destkey key [key ...]
# 查找第一个为 1/0 的位
BITPOS key bit [start end]
# 获取位图范围内的字节值
GETRANGE key start end
5.3 使用示例
示例 1:用户签到
# 用户在 2024-01-01 签到(offset 为日期)
SETBIT user:1001:signin:2024 0 1
# 检查用户是否签到
GETBIT user:1001:signin:2024 0
# 统计用户签到天数
BITCOUNT user:1001:signin:2024
Java 实现:
// 用户签到
public void signIn(Long userId, LocalDate date) {
int offset = date.getDayOfYear() - 1;
redisTemplate.opsForValue().setBit(
"user:" + userId + ":signin:" + date.getYear(),
offset,
true
);
}
// 检查签到
public boolean checkSignIn(Long userId, LocalDate date) {
int offset = date.getDayOfYear() - 1;
Boolean signed = redisTemplate.opsForValue().getBit(
"user:" + userId + ":signin:" + date.getYear(),
offset
);
return Boolean.TRUE.equals(signed);
}
// 统计签到天数
public long getSignInDays(Long userId, int year) {
return redisTemplate.execute((RedisCallback<Long>) connection -> {
return connection.bitCount(
("user:" + userId + ":signin:" + year).getBytes()
);
});
}
示例 2:连续签到统计
# 用户签到位图
# offset: 0-364(一年 365 天)
# value: 0 未签到,1 已签到
# 用户 2024 年签到记录(假设前 10 天都签到了)
for i in {0..9}; do
redis-cli SETBIT user:1001:signin:2024 $i 1
done
# 查看用户是否连续签到 7 天
# 需要 BITPOS 和 BITCOUNT 配合使用
Java 实现(计算连续签到天数):
public int getConsecutiveSignInDays(Long userId, int year) {
byte[] key = ("user:" + userId + ":signin:" + year).getBytes();
return redisTemplate.execute((RedisCallback<Integer>) connection -> {
// 从今天开始往前查找
int consecutiveDays = 0;
int today = LocalDate.now().getDayOfYear() - 1;
for (int i = today; i >= 0; i--) {
Boolean bit = connection.getBit(key, i);
if (Boolean.TRUE.equals(bit)) {
consecutiveDays++;
} else {
break;
}
}
return consecutiveDays;
});
}
示例 3:活跃用户统计
# 用户每天活跃记录
SETBIT active:2024-01-01 1001 1
SETBIT active:2024-01-01 1002 1
SETBIT active:2024-01-01 1003 1
# 统计某天活跃用户数
BITCOUNT active:2024-01-01
# 统计 7 天活跃用户(BITOR 运算)
BITOP OR active:week:2024-01-01 \
active:2024-01-01 \
active:2024-01-02 \
active:2024-01-03 \
active:2024-01-04 \
active:2024-01-05 \
active:2024-01-06 \
active:2024-01-07
BITCOUNT active:week:2024-01-01
示例 4:用户特征标签
# 用户标签位图
# bit 0: 性别(0 女,1 男)
# bit 1: 是否 VIP
# bit 2: 是否认证
# bit 3: 是否签约作者
SETBIT user:1001:tags 0 1 # 男
SETBIT user:1001:tags 1 1 # VIP
SETBIT user:1001:tags 2 1 # 已认证
SETBIT user:1001:tags 3 0 # 未签约
# 查询是否 VIP
GETBIT user:1001:tags 1
# 批量查询标签
GETRANGE user:1001:tags 0 0 # 获取第一个字节
5.4 内存优化
内存占用计算:
1 个用户 = 1 bit
1 万用户 = 1.25 KB
10 万用户 = 12.5 KB
100 万用户 = 125 KB
1 亿用户 = 12.5 MB
面试题:Bitmap 的应用场景?
答案:
-
用户签到系统
- 每天一个 bit
- 统计签到天数、连续签到
-
活跃用户统计
- DAU/MAU 统计
- 留存率计算
-
用户标签系统
- 多维度标签
- 快速过滤
-
在线用户统计
- 实时在线用户
- 历史在线记录
-
权限管理
- 角色权限位运算
- 快速权限判断
面试题:Bitmap 和 Set 的区别?
答案:
| 特性 | Bitmap | Set |
|---|---|---|
| 内存占用 | 极小(1 bit/用户) | 较大(根据元素大小) |
| 元素类型 | 数字 ID | 任意类型 |
| 操作 | 位运算 | 集合运算 |
| 查询 | GETBIT | SISMEMBER |
| 排序 | 不支持 | Sorted Set 支持 |
| 适用场景 | 二值状态 | 存储唯一值 |
选择建议:
- 用户 ID 连续、二值状态:使用 Bitmap
- 元素类型多样、需要存储额外信息:使用 Set
六、HyperLogLog(基数统计)
6.1 基本概念
HyperLogLog:用于基数统计的概率算法,可以统计集合中不重复元素的个数。
特点:
- 内存占用极小(12 KB)
- 误差率约 0.81%
- 不存储实际元素
应用场景:
- 统计独立访客数(UV)
- 统计独立用户数
- 大数据去重统计
6.2 常用命令
# 添加元素
PFADD key element [element ...]
# 统计基数
PFCOUNT key [key ...]
# 合并多个 HyperLogLog
PFMERGE destkey sourcekey [sourcekey ...]
6.3 使用示例
示例 1:统计日活跃用户(DAU)
# 用户 1001 访问
PFADD dau:2024-01-01 user:1001
# 用户 1002 访问
PFADD dau:2024-01-01 user:1002
# 用户 1001 再次访问(不会重复计数)
PFADD dau:2024-01-01 user:1001
# 统计 DAU
PFCOUNT dau:2024-01-01
# 返回:2
示例 2:统计周活跃用户(WAU)
# 假设已有 7 天的数据
# dau:2024-01-01 到 dau:2024-01-07
# 合并 7 天的数据
PFMERGE wau:2024-01-01 \
dau:2024-01-01 \
dau:2024-01-02 \
dau:2024-01-03 \
dau:2024-01-04 \
dau:2024-01-05 \
dau:2024-01-06 \
dau:2024-01-07
# 统计 WAU
PFCOUNT wau:2024-01-01
Java 实现:
// 记录用户访问
public void recordVisit(Long userId, LocalDate date) {
redisTemplate.opsForHyperLogLog().add(
"dau:" + date.toString(),
"user:" + userId
);
}
// 统计 DAU
public long getDAU(LocalDate date) {
return redisTemplate.opsForHyperLogLog().size("dau:" + date);
}
// 统计 WAU
public long getWAU(LocalDate startDate) {
List<LocalDate> dates = new ArrayList<>();
for (int i = 0; i < 7; i++) {
dates.add(startDate.plusDays(i));
}
String[] keys = dates.stream()
.map(date -> "dau:" + date)
.toArray(String[]::new);
// PFMERGE + PFCOUNT
return redisTemplate.opsForHyperLogLog().size(keys);
}
6.4 原理简介
核心思想:
- 使用哈希函数将元素映射到 [0, 1) 区间
- 记录哈希值的前导零个数
- 前导零越多,说明哈希值越小,概率越低
- 通过调和平均数估计基数
示例:
元素 "user:1001" → hash("user:1001") → 0.000011... (前导零 4 个)
元素 "user:1002" → hash("user:1002") → 0.00101... (前导零 2 个)
元素 "user:1003" → hash("user:1003") → 0.1... (前导零 0 个)
取最大的前导零个数(4),估计基数为 2^4 = 16
6.5 注意事项
误差率:
- 标准误差:0.81%
- 元素个数 < 2^14 时,误差较小
- 元素个数极大时,误差增大
不适用场景:
- 需要精确统计(使用 Set)
- 需要获取具体元素(HyperLogLog 不存储元素)
- 数据量很小(< 10000)
面试题:HyperLogLog 的优缺点?
答案:
优点:
- 内存占用极小:固定 12 KB,而 Set 需要根据元素数量
- 性能优秀:添加和查询都是 O(1)
- 支持合并:可以合并多个 HyperLogLog
缺点:
- 有误差:0.81% 的误差率
- 不存储元素:无法获取具体元素列表
- 不适合精确统计:只适合估算场景
面试题:Set 和 HyperLogLog 如何选择?
答案:
| 特性 | Set | HyperLogLog |
|---|---|---|
| 内存占用 | 大 | 极小(12 KB) |
| 精度 | 精确 | 有误差(0.81%) |
| 查询元素 | 支持 | 不支持 |
| 合并操作 | SUNION | PFMERGE |
| 使用场景 | 精确去重、元素查询 | 大数据去重统计 |
选择建议:
- 需要精确统计:使用 Set
- 大数据量、只需要估算:使用 HyperLogLog
- DAU/MAU 统计:使用 HyperLogLog
- 用户标签、白名单:使用 Set
七、GEO(地理位置)
7.1 基本概念
GEO:Redis 3.2+ 提供的地理位置功能,底层使用 Sorted Set(ZSet)实现。
特点:
- 存储地理位置(经纬度)
- 计算距离
- 范围查询(附近的人/地点)
- 底层使用 Geohash 算法
7.2 常用命令
# 添加地理位置
GEOADD key longitude latitude member [longitude latitude member ...]
# 获取地理位置
GEOPOS key member [member ...]
# 计算两个位置的距离
GEODIST key member1 member2 [unit]
# 获取指定范围内的位置(以中心点为圆心)
GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]
# 获取指定范围内的位置(以成员为圆心)
GEORADIUSBYMEMBER key member radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]
# 获取 Geohash
GEOHASH key member [member ...]
7.3 使用示例
示例 1:添加位置
# 添加城市坐标
GEOADD cities:coords 116.4074 39.9042 "Beijing"
GEOADD cities:coords 121.4737 31.2304 "Shanghai"
GEOADD cities:coords 113.2644 23.1291 "Guangzhou"
GEOADD cities:coords 114.1694 22.3193 "Shenzhen"
示例 2:计算距离
# 计算北京到上海的距离
GEODIST cities:coords Beijing Shanghai km
# 返回:"1067.6941"
# 计算北京到广州的距离
GEODIST cities:coords Beijing Guangzhou km
# 返回:"1888.1234"
示例 3:附近的人
# 添加用户位置
GEOADD users:location 116.4074 39.9042 "user:1001"
GEOADD users:location 116.4174 39.9142 "user:1002"
GEOADD users:location 116.3874 39.8942 "user:1003"
# 查找北京 10 km 内的用户
GEORADIUS users:location 116.4074 39.9042 10 km WITHDIST WITHCOORD
返回结果:
1) 1) "user:1001"
2) "0.0000" # 距离(km)
3) 1) "116.40739902853965" # 经度
2) "39.90420085479628" # 纬度
2) 1) "user:1002"
2) "1.5678"
3) 1) "116.41739902853965"
2) "39.91420085479628"
示例 4:附近的人(按成员)
# 查找 user:1001 附近 5 km 内的用户
GEORADIUSBYMEMBER users:location "user:1001" 5 km WITHDIST
示例 5:按距离排序
# 查找北京 100 km 内的城市,按距离升序
GEORADIUS cities:coords 116.4074 39.9042 100 km ASC WITHDIST
Java 实现:
// 添加用户位置
public void addUserLocation(Long userId, Double longitude, Double latitude) {
redisTemplate.opsForGeo().add(
"users:location",
new Point(longitude, latitude),
userId.toString()
);
}
// 查找附近的人
public List<NearUser> findNearUsers(Double longitude, Double latitude,
double radius, int limit) {
Distance distance = new Distance(radius, RedisGeoCommands.DistanceUnit.KILOMETERS);
Circle circle = new Circle(new Point(longitude, latitude), distance);
RedisGeoCommands.GeoRadiusCommandArgs args =
RedisGeoCommands.GeoRadiusCommandArgs
.newGeoRadiusArgs()
.includeDistance() # 包含距离
.includeCoordinates() # 包含坐标
.sortAscending() # 按距离升序
.limit(limit); # 限制数量
GeoResults<RedisGeoCommands.GeoLocation<String>> results =
redisTemplate.opsForGeo().radius("users:location", circle, args);
return results.getContent().stream()
.map(result -> {
RedisGeoCommands.GeoLocation<String> location = result.getContent();
Point point = location.getPoint();
return new NearUser(
Long.parseLong(location.getName()),
result.getDistance().getValue(),
point.getX(),
point.getY()
);
})
.collect(Collectors.toList());
}
7.4 底层原理
Geohash 算法:
- 将经纬度编码为字符串
- 相近的位置有相似的 Geohash 值
- 使用 ZSet 存储,score 为 Geohash
编码过程:
- 经纬度转换为二进制
- 经纬度二进制位交错合并
- 使用 Base32 编码
示例:
北京:116.4074, 39.9042
→ 经度二进制:1100100...
→ 纬度二进制:1011000...
→ 交错合并:111001000...
→ Base32 编码:wx4g0e
7.5 性能优化
避免全量查询:
- 使用 COUNT 限制结果数量
- 使用 STORE 缓存结果
# 查询并缓存结果
GEORADIUS users:location 116.4074 39.9042 10 km COUNT 100 STORE nearby:users
数据分片:
- 按城市/区域分片
- 减少单个 key 的数据量
面试题:GEO 的底层实现原理?
答案:
底层使用 ZSet:
# GEO 内部使用 ZSet 存储
ZADD users:location geohash member
# 查看 ZSet
ZRANGE users:location 0 -1 WITHSCORES
Geohash 算法:
- 将二维坐标(经纬度)编码为一维字符串
- 相近的位置有相似的 Geohash 前缀
- 通过 ZSet 的范围查询实现附近查询
精度:
- Geohash 长度越长,精度越高
- 默认使用 52 位双精度浮点数
- 误差范围在 0.5 m 左右
面试题:附近的人如何实现?
答案:
方案 1:Redis GEO
GEORADIUS users:location 116.4074 39.9042 10 km WITHDIST
- 优点:简单易用
- 缺点:数据量大时性能下降
方案 2:Geohash + 数据库
- 将用户位置按 Geohash 分桶
- 查询时查找相邻的 8 个桶
- 过滤距离并排序
方案 3:MongoDB Geospatial Query
db.users.find({
location: {
$near: {
$geometry: { type: "Point", coordinates: [116.4074, 39.9042] },
$maxDistance: 10000 # 10 km
}
}
})
性能优化:
- Redis GEO:适合数据量 < 100 万
- MongoDB:适合数据量 > 100 万
- Elasticsearch:适合海量数据
八、Stream(数据流)
8.1 基本概念
Stream:Redis 5.0+ 提供的日志数据结构,类似 Kafka。
特点:
- 支持消息持久化
- 支持消费者组
- 支持 ACK 确认机制
- 支持消息回溯
应用场景:
- 消息队列
- 事件溯源
- 日志收集
8.2 常用命令
# 添加消息
XADD key [NOMKSTREAM] ["*" | ID] field value [field value ...]
# 读取消息
XRANGE key start end [COUNT count]
XREVRANGE key end start [COUNT count]
# 读取流(阻塞)
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
# 创建消费者组
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id]
[DESTROY key groupname]
[DELGROUP key groupname]
# 读取消费者组消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]
[NOACK] STREAMS key [key ...] ID [ID ...]
# ACK 确认
XACK key group id [id ...]
# 查看消费者组信息
XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
8.3 使用示例
示例 1:生产者
# 添加消息(自动生成 ID)
XADD stream:orders * name "order1" amount 100
# 返回:"1704067200000-0"
# 添加消息(指定 ID)
XADD stream:orders 1704067200000-1 name "order2" amount 200
示例 2:消费者
# 读取所有消息
XRANGE stream:orders - +
# 读取指定范围消息
XRANGE stream:orders 1704067200000-0 1704067200000-1
# 阻塞读取新消息
XREAD BLOCK 0 STREAMS stream:orders $
示例 3:消费者组
# 创建消费者组(从开始消费)
XGROUP CREATE stream:orders group1 0
# 创建消费者组(从新消息开始消费)
XGROUP CREATE stream:orders group2 $
# 消费者读取消息
XREADGROUP GROUP group1 consumer1 COUNT 1 STREAMS stream:orders >
# 返回:当前组未处理的消息
# ACK 确认
XACK stream:orders group1 1704067200000-0
示例 4:消息队列
# 生产者
XADD stream:tasks * task "send_email" user 1001
# 消费者(消费者组)
XGROUP CREATE stream:tasks workers 0
# Worker 1
XREADGROUP GROUP workers worker1 COUNT 1 STREAMS stream:tasks >
# 处理完成后 ACK
XACK stream:tasks workers <message-id>
# Worker 2
XREADGROUP GROUP workers worker2 COUNT 1 STREAMS stream:tasks >
Java 实现:
// 发送消息
public String sendMessage(String stream, Map<String, String> message) {
return redisTemplate.opsForStream().add(
stream,
message
).getValue();
}
// 消费消息
public void consumeMessages(String stream, String group, String consumer) {
while (true) {
List<MapRecord<String, Object, Object>> messages =
redisTemplate.opsForStream().read(
Consumer.from(group, consumer),
StreamReadOptions.empty().count(1),
StreamOffset.create(stream, ReadOffset.lastConsumed())
);
if (messages.isEmpty()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
continue;
}
for (MapRecord<String, Object, Object> message : messages) {
try {
// 处理消息
handleMessage(message.getValue());
// ACK 确认
redisTemplate.opsForStream().acknowledge(stream, group, message.getId());
} catch (Exception e) {
// 处理失败,消息会保留,后续可以重试
log.error("Handle message failed: {}", message, e);
}
}
}
}
8.4 消息可靠性
Pending 消息:
- 已发送给消费者但未 ACK 的消息
- 使用
XPENDING查看
# 查看 Pending 消息
XPENDING stream:orders group1
# 查看 Pending 详情
XPENDING stream:orders group1 - + 10 consumer1
消息回溯:
# 将消息重新发送到消费者组
XCLAIM stream:orders group1 consumer2 0 1704067200000-0
8.5 Stream vs List vs Pub/Sub
| 特性 | Stream | List | Pub/Sub |
|---|---|---|---|
| 消息持久化 | 支持 | 支持 | 不支持 |
| 消费者组 | 支持 | 不支持 | 不支持 |
| ACK 机制 | 支持 | 不支持 | 不支持 |
| 消息回溯 | 支持 | 不支持 | 不支持 |
| 多消费 | 支持 | 不支持 | 支持 |
| 性能 | 中等 | 高 | 高 |
| 复杂度 | 高 | 低 | 低 |
面试题:Stream、List、Pub/Sub 如何选择?
答案:
Stream:
- 需要消息持久化
- 需要消费者组
- 需要确认机制
- 类似 Kafka
List:
- 简单队列
- 不需要 ACK
- 性能要求高
- 单消费者
Pub/Sub:
- 实时通知
- 不持久化
- 多订阅者
- 解耦服务
使用建议:
- 复杂消息队列:Stream
- 简单队列:List
- 实时通知:Pub/Sub
- 生产环境 MQ:Kafka、RabbitMQ
九、客户端管理
9.1 常用命令
# 查看客户端连接
CLIENT LIST
# 查看客户端名称
CLIENT GETNAME
# 设置客户端名称
CLIENT SETNAME connection-name
# 杀掉客户端连接
CLIENT KILL ip:port
CLIENT KILL ID client-id
CLIENT KILL TYPE master|normal|slave|pubsub
# 暂停客户端
CLIENT PAUSE timeout (ms)
# 查看客户端信息
CLIENT INFO
9.2 连接管理
查看连接信息:
127.0.0.1:6379> CLIENT LIST
addr=127.0.0.1:54321
fd=6
name=redis-cli
age=123
idle=0
db=0
sub=0
psub=0
multi=-1
qbuf=0
qbuf-free=0
obl=0
oll=0
omem=0
events=r
cmd=client
字段说明:
addr:客户端地址name:客户端名称age:连接时长(秒)idle:空闲时长(秒)db:当前数据库cmd:最后执行的命令
限制最大连接数:
# 最大客户端连接数,默认 10000
maxclients 10000
9.3 连接池配置
Java(Jedis):
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(100); # 最大连接数
config.setMaxIdle(50); # 最大空闲连接数
config.setMinIdle(10); # 最小空闲连接数
config.setMaxWaitMillis(10000); # 获取连接的最大等待时间
config.setTestOnBorrow(true); # 获取连接时测试
config.setTestOnReturn(false); # 归还连接时测试
JedisPool pool = new JedisPool(config, "localhost", 6379);
Java(Lettuce):
RedisURI uri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
.build();
LettuceClientConfiguration config = LettuceClientConfiguration.builder()
.commandTimeout(Duration.ofSeconds(10))
.shutdownTimeout(Duration.ZERO)
.build();
RedisClient client = RedisClient.create(uri);
StatefulRedisConnection<String, String> connection = client.connect();
十、服务器管理
10.1 常用命令
# 查看 Redis 信息
INFO [section]
# 查看 CONFIG
CONFIG GET parameter
# 修改 CONFIG
CONFIG SET parameter value
# 重写配置文件
CONFIG REWRITE
# 获取指定数据库的键个数
DBSIZE
# 异步刷新数据库
BGREWRITEAOF
BGSAVE
# 同步保存数据
SAVE
# 最后一次保存的时间戳
LASTSAVE
# 清空数据库
FLUSHDB [ASYNC|SYNC]
FLUSHALL [ASYNC|SYNC]
# 监控命令
MONITOR
# 同步从服务器
SYNC
10.2 INFO 命令详解
# 查看 Server 信息
INFO server
# 查看 Clients 信息
INFO clients
# 查看 Memory 信息
INFO memory
# 查看 Persistence 信息
INFO persistence
# 查看 Stats 信息
INFO stats
# 查看 Replication 信息
INFO replication
# 查看 CPU 信息
INFO cpu
# 查看 Cluster 信息
INFO cluster
# 查看 Keyspace 信息
INFO keyspace
# 查看所有信息
INFO all
Memory 信息示例:
127.0.0.1:6379> INFO memory
used_memory:1048576 # 当前内存使用量(字节)
used_memory_human:1.00M # 可读格式
used_memory_rss:2097152 # 系统分配的内存
used_memory_peak:2097152 # 历史峰值
maxmemory:0 # 最大内存限制(0 表示无限制)
maxmemory_policy:noeviction # 内存淘汰策略
10.3 性能监控
监控脚本:
#!/bin/bash
# Redis 性能监控脚本
while true; do
clear
echo "=== Redis Performance Monitor ==="
echo "Time: $(date)"
echo ""
# 连接数
clients=$(redis-cli --csv INFO clients | grep -o 'connected_clients:[0-9]*' | cut -d: -f2)
echo "Connected Clients: $clients"
# 内存使用
memory=$(redis-cli --csv INFO memory | grep -o 'used_memory_human:[^,]*' | cut -d: -f2)
echo "Memory Used: $memory"
# 命令执行次数
cmds=$(redis-cli --csv INFO stats | grep -o 'total_commands_processed:[0-9]*' | cut -d: -f2)
echo "Total Commands: $cmds"
# 每秒命令数(OPS)
ops=$(redis-cli --csv INFO stats | grep -o 'instantaneous_ops_per_sec:[0-9]*' | cut -d: -f2)
echo "OPS: $ops"
# 慢查询数
slowlog=$(redis-cli SLOWLOG LEN)
echo "Slow Queries: $slowlog"
echo ""
sleep 1
done
Prometheus 监控:
# docker-compose.yml
version: '3'
services:
redis:
image: redis:7
ports:
- "6379:6379"
redis-exporter:
image: oliver006/redis_exporter
ports:
- "9121:9121"
environment:
- REDIS_ADDR=redis:6379
十一、高级面试题汇总
11.1 如何实现全局唯一 ID?
答案:
方案 1:Redis INCR
# 简单递增
INCR id:generator
# 返回:1, 2, 3, ...
缺点:
- ID 连续,容易被猜测
- 单点问题
方案 2:时间戳 + 递增
-- 生成唯一 ID(雪花算法)
local key = KEYS[1]
local timestamp = ARGV[1]
-- 刷新时间窗口
if redis.call('GET', key .. ':timestamp') ~= timestamp then
redis.call('SET', key .. ':timestamp', timestamp)
redis.call('SET', key .. ':seq', 0)
end
-- 获取序列号
local seq = redis.call('INCR', key .. ':seq')
-- 组合 ID:时间戳(41 位)+ 机器 ID(10 位)+ 序列号(12 位)
return timestamp .. seq
方案 3:UUID + Redis 去重
public String generateId() {
String uuid = UUID.randomUUID().toString();
String key = "id:generated:" + uuid;
// 使用 SETNX 保证唯一性
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, "1", Duration.ofDays(1));
if (Boolean.TRUE.equals(success)) {
return uuid;
} else {
return generateId(); # 递归重试
}
}
方案 4:雪花算法
public class SnowflakeIdGenerator {
private final long epoch = 1609459200000L; # 2021-01-01 00:00:00
private final long machineId; # 机器 ID(0-1023)
private long sequence = 0; # 序列号
private long lastTimestamp = -1L;
public synchronized long nextId() {
long timestamp = System.currentTimeMillis() - epoch;
if (timestamp < lastTimestamp) {
throw new RuntimeException("Clock moved backwards");
}
if (timestamp == lastTimestamp) {
sequence = (sequence + 1) & 4095; # 12 位
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0;
}
lastTimestamp = timestamp;
# 41 位时间戳 + 10 位机器 ID + 12 位序列号
return (timestamp << 22) | (machineId << 12) | sequence;
}
private long tilNextMillis(long lastTimestamp) {
long timestamp = System.currentTimeMillis() - epoch;
while (timestamp <= lastTimestamp) {
timestamp = System.currentTimeMillis() - epoch;
}
return timestamp;
}
}
方案 5:Redis + 雪花算法
-- 分布式雪花算法
local key = KEYS[1]
local timestamp = ARGV[1]
local machineId = ARGV[2]
-- 刷新时间窗口
if redis.call('GET', key .. ':timestamp') ~= timestamp then
redis.call('SET', key .. ':timestamp', timestamp)
redis.call('SET', key .. ':seq', 0)
end
-- 获取序列号(12 位,最大 4095)
local seq = redis.call('INCR', key .. ':seq')
if seq > 4095 then
return nil # 序列号溢出
end
-- 组合 ID
return (timestamp * 10000 + machineId) * 10000 + seq
11.2 如何实现分布式限流?
答案:
方案 1:固定窗口(有问题)
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local count = redis.call('GET', key)
if count == false then
redis.call('SET', key, 1, 'EX', window)
return 1
elseif tonumber(count) < limit then
redis.call('INCR', key)
return 1
else
return 0
end
**问题:**窗口边界可能出现 2 倍流量
方案 2:滑动窗口(推荐)
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
# 删除窗口外的记录
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
# 获取当前窗口内的请求数
local count = redis.call('ZCARD', key)
if count < limit then
redis.call('ZADD', key, now, now)
redis.call('EXPIRE', key, window)
return 1
else
return 0
end
方案 3:令牌桶(适合平滑限流)
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local info = redis.call('HMGET', key, 'tokens', 'last_time')
local tokens = tonumber(info[1]) or capacity
local last_time = tonumber(info[2]) or now
local delta = math.max(0, (now - last_time) * rate / 1000)
tokens = math.min(capacity, tokens + delta)
if tokens >= requested then
redis.call('HMSET', key, 'tokens', tokens - requested, 'last_time', now)
redis.call('EXPIRE', key, math.ceil(capacity / rate * 2) + 1)
return 1
else
redis.call('HMSET', key, 'tokens', tokens, 'last_time', now)
redis.call('EXPIRE', key, math.ceil(capacity / rate * 2) + 1)
return 0
end
方案 4:漏桶(适合恒定速率)
public class LeakyBucketRateLimiter {
private final RedisTemplate redisTemplate;
private final double capacity; # 桶容量
private final double rate; # 漏水速率(请求/秒)
public boolean allowRequest(String key) {
long now = System.currentTimeMillis();
# 获取当前水量和上次漏水时间
List<Object> info = redisTemplate.executePipelined(
(RedisCallback<Object>) connection -> {
connection.get((key + ":water").getBytes());
connection.get((key + ":last_leak").getBytes());
return null;
}
);
double water = info[0] != null ? Double.parseDouble(info[0].toString()) : 0;
long lastLeak = info[1] != null ? Long.parseLong(info[1].toString()) : now;
# 计算漏出的水量
long elapsed = now - lastLeak;
water = Math.max(0, water - elapsed * rate / 1000.0);
if (water < capacity) {
# 加水
redisTemplate.opsForValue().set(key + ":water", water + 1);
redisTemplate.opsForValue().set(key + ":last_leak", now);
return true;
} else {
return false;
}
}
}
11.3 如何实现延迟队列?
答案:
方案 1:Sorted Set
# 添加延迟任务(score 为执行时间戳)
ZADD delayed:tasks 1704067200000 "task:send_email:user:1001"
# 定时扫描(每秒执行一次)
while true; do
now=$(date +%s%3N)
tasks=$(redis-cli ZRANGEBYSCORE delayed:tasks 0 $now)
for task in $tasks; do
# 处理任务
process_task $task
# 删除任务
redis-cli ZREM delayed:tasks $task
done
sleep 1
done
Java 实现:
public class DelayedQueue {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private TaskExecutor taskExecutor;
// 添加延迟任务
public void addDelayedTask(String task, long delayMs) {
long executeTime = System.currentTimeMillis() + delayMs;
redisTemplate.opsForZSet().add("delayed:tasks", task, executeTime);
}
// 定时扫描
@Scheduled(fixedDelay = 1000)
public void processDelayedTasks() {
long now = System.currentTimeMillis();
// 获取到期的任务
Set<String> tasks = redisTemplate.opsForZSet()
.rangeByScore("delayed:tasks", 0, now);
for (String task : tasks) {
// 使用 Lua 脚本保证原子性
String luaScript =
"if redis.call('zrem', KEYS[1], ARGV[1]) == 1 then " +
" return 1 " +
"else " +
" return 0 " +
"end";
DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(script, Collections.singletonList("delayed:tasks"), task);
if (result == 1) {
// 异步处理任务
taskExecutor.execute(() -> processTask(task));
}
}
}
private void processTask(String task) {
// 解析任务并执行
// ...
}
}
方案 2:Redis Keyspace Notifications
# 启用 Keyspace Notifications
CONFIG SET notify-keyspace-events Ex
# 设置过期 key
SET task:send_email:user:1001 "data" EX 60
# 监听过期事件
redis-cli --subscribe __keyevent@0__:expired
Java 实现:
@Configuration
public class RedisConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListener listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listener, new PatternTopic("__keyevent@*:expired"));
return container;
}
}
@Component
public class ExpiredKeyListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
String key = new String(message.getBody());
// key 格式:task:send_email:user:1001
// 解析并执行任务
processTask(key);
}
}
问题:
- 不保证消息可靠性(Redis 宕机可能丢失)
- 时间不精确(依赖过期检查时间)
方案 3:Redisson Delayed Queue
@Autowired
private RedissonClient redisson;
public void addDelayedTask(String task, long delay, TimeUnit unit) {
RDelayedQueue<String> queue = redisson.getDelayedQueue("delayed:tasks");
queue.offer(task, delay, unit);
}
public void startConsumer() {
RDelayedQueue<String> queue = redisson.getDelayedQueue("delayed:tasks");
queue.addListener(new MessageListener<String>() {
@Override
public void onMessage(String channel, String msg) {
processTask(msg);
}
});
}
11.4 如何实现点赞功能?
答案:
需求分析:
- 用户可以点赞/取消点赞
- 统计点赞数
- 查询用户是否点赞
- 查询点赞用户列表
方案 1:使用 Set
# 用户点赞
SADD post:1001:likes user:1001
# 取消点赞
SREM post:1001:likes user:1001
# 统计点赞数
SCARD post:1001:likes
# 检查是否点赞
SISMEMBER post:1001:likes user:1001
# 获取点赞用户列表(分页)
SRANDMEMBER post:1001:likes 10
方案 2:使用 Sorted Set(按时间排序)
# 用户点赞
ZADD post:1001:likes 1704067200 user:1001
# 取消点赞
ZREM post:1001:likes user:1001
# 统计点赞数
ZCARD post:1001:likes
# 检查是否点赞
ZSCORE post:1001:likes user:1001
# 获取点赞用户列表(按时间倒序)
ZREVRANGE post:1001:likes 0 9 WITHSCORES
Java 实现:
@Service
public class LikeService {
@Autowired
private StringRedisTemplate redisTemplate;
// 点赞
public void like(Long postId, Long userId) {
long score = System.currentTimeMillis();
redisTemplate.opsForZSet().add(
"post:" + postId + ":likes",
"user:" + userId,
score
);
# 更新帖子点赞数缓存
redisTemplate.opsForValue().increment("post:" + postId + ":like_count");
# 异步更新数据库
updateDatabase(postId, userId, true);
}
// 取消点赞
public void unlike(Long postId, Long userId) {
redisTemplate.opsForZSet().remove(
"post:" + postId + ":likes",
"user:" + userId
);
# 更新帖子点赞数缓存
redisTemplate.opsForValue().decrement("post:" + postId + ":like_count");
# 异步更新数据库
updateDatabase(postId, userId, false);
}
// 获取点赞数
public long getLikeCount(Long postId) {
# 先从缓存获取
String count = redisTemplate.opsForValue().get("post:" + postId + ":like_count");
if (count != null) {
return Long.parseLong(count);
}
# 缓存未命中,从 Sorted Set 获取
Long size = redisTemplate.opsForZSet().zCard("post:" + postId + ":likes");
if (size != null) {
# 写入缓存
redisTemplate.opsForValue().set("post:" + postId + ":like_count", size.toString(), 1, TimeUnit.HOURS);
return size;
}
return 0L;
}
// 检查是否点赞
public boolean isLiked(Long postId, Long userId) {
Double score = redisTemplate.opsForZSet().score(
"post:" + postId + ":likes",
"user:" + userId
);
return score != null;
}
// 获取点赞用户列表
public List<Long> getLikeUsers(Long postId, int offset, int limit) {
Set<String> users = redisTemplate.opsForZSet().reverseRange(
"post:" + postId + ":likes",
offset,
offset + limit - 1
);
return users.stream()
.map(user -> Long.parseLong(user.substring(5))) # 去掉 "user:" 前缀
.collect(Collectors.toList());
}
}
11.5 如何实现抽奖功能?
答案:
方案 1:使用 Set(简单抽奖)
# 添加参与用户
SADD lottery:2024 user:1001 user:1002 user:1003
# 抽取 1 个中奖用户
SRANDMEMBER lottery:2024 1
# 抽取 3 个中奖用户(不重复)
SPOP lottery:2024 3
方案 2:使用 Sorted Set(权重抽奖)
# 添加参与用户(score 为权重)
ZADD lottery:2024 1 user:1001 # user:1001 的权重为 1
ZADD lottery:2024 10 user:1002 # user:1002 的权重为 10
# 抽奖算法(Lua 实现)
Lua 抽奖脚本:
-- 权重抽奖
local key = KEYS[1]
local count = tonumber(ARGV[1])
local totalWeight = 0
local members = redis.call('ZRANGE', key, 0, -1, 'WITHSCORES')
# 计算总权重
for i = 1, #members, 2 do
totalWeight = totalWeight + tonumber(members[i + 1])
end
local winners = {}
local winnerCount = 0
while winnerCount < count and #members > 0 do
# 随机生成权重值
local randomWeight = math.random() * totalWeight
# 查找对应的用户
local currentWeight = 0
for i = 1, #members, 2 do
currentWeight = currentWeight + tonumber(members[i + 1])
if currentWeight >= randomWeight then
table.insert(winners, members[i])
# 删除已中奖用户
redis.call('ZREM', key, members[i])
# 更新总权重
totalWeight = totalWeight - tonumber(members[i + 1])
# 移除该用户
table.remove(members, i)
table.remove(members, i)
winnerCount = winnerCount + 1
break
end
end
end
return winners
Java 实现:
@Service
public class LotteryService {
@Autowired
private RedisTemplate redisTemplate;
// 参与抽奖
public void join(Long userId, Long activityId) {
redisTemplate.opsForSet().add("lottery:" + activityId, "user:" + userId);
}
// 抽奖
public List<Long> draw(Long activityId, int count) {
// 使用 Lua 脚本保证原子性
String luaScript =
"local winners = {} " +
"for i = 1, tonumber(ARGV[1]) do " +
" local member = redis.call('SPOP', KEYS[1]) " +
" if member == false then " +
" break " +
" end " +
" table.insert(winners, member) " +
"end " +
"return winners";
DefaultRedisScript<List> script = new DefaultRedisScript<>(luaScript, List.class);
List<String> winners = redisTemplate.execute(
script,
Collections.singletonList("lottery:" + activityId),
String.valueOf(count)
);
return winners.stream()
.map(user -> Long.parseLong(user.substring(5)))
.collect(Collectors.toList());
}
// 权重抽奖
public List<Long> weightedDraw(Long activityId, int count) {
String luaScript =
"local key = KEYS[1] " +
"local count = tonumber(ARGV[1]) " +
"local totalWeight = 0 " +
"local members = redis.call('ZRANGE', key, 0, -1, 'WITHSCORES') " +
"for i = 1, #members, 2 do " +
" totalWeight = totalWeight + tonumber(members[i + 1]) " +
"end " +
"local winners = {} " +
"local winnerCount = 0 " +
"while winnerCount < count and #members > 0 do " +
" local randomWeight = math.random() * totalWeight " +
" local currentWeight = 0 " +
" for i = 1, #members, 2 do " +
" currentWeight = currentWeight + tonumber(members[i + 1]) " +
" if currentWeight >= randomWeight then " +
" table.insert(winners, members[i]) " +
" redis.call('ZREM', key, members[i]) " +
" totalWeight = totalWeight - tonumber(members[i + 1]) " +
" table.remove(members, i) " +
" table.remove(members, i) " +
" winnerCount = winnerCount + 1 " +
" break " +
" end " +
" end " +
"end " +
"return winners";
DefaultRedisScript<List> script = new DefaultRedisScript<>(luaScript, List.class);
List<String> winners = redisTemplate.execute(
script,
Collections.singletonList("lottery:weighted:" + activityId),
String.valueOf(count)
);
return winners.stream()
.map(user -> Long.parseLong(user.substring(5)))
.collect(Collectors.toList());
}
}
十二、总结
12.1 Redis 高级特性回顾
- 发布订阅:实时消息推送,但不持久化
- Lua 脚本:原子性执行复杂逻辑
- Pipeline:减少网络开销,提升性能
- 慢查询分析:发现和优化性能瓶颈
- Bitmap:位图,节省内存,适合二值状态
- HyperLogLog:基数统计,内存占用极小
- GEO:地理位置,附近的人
- Stream:消息队列,支持持久化和消费者组
- 客户端管理:连接池、监控
- 服务器管理:配置、监控、运维
12.2 面试准备建议
- 深入理解原理:每个特性的底层实现
- 对比分析:不同方案的优缺点
- 场景设计:结合实际业务场景
- 性能优化:监控、调优经验
- 最佳实践:生产环境的注意事项
12.3 学习资源
- 官方文档:https://redis.io/documentation
- 源码分析:《Redis 设计与实现》
- 实战案例:《Redis 开发与运维》
- 在线教程:Redis 官方教程
参考文档:
- Redis 官方文档
- Redis 中文文档
- 《Redis 设计与实现》
- 《Redis 开发与运维》