跳到主要内容

高级特性

一、发布订阅(Pub/Sub)

1.1 基本概念

发布订阅模式:一种消息通信模式,发送者(Pub)发布消息,订阅者(Sub)接收消息。

应用场景:

  • 实时消息推送
  • 聊天室
  • 系统通知
  • 解耦服务

1.2 常用命令

# 订阅频道
SUBSCRIBE channel1 channel2

# 模式订阅
PSUBSCRIBE news.* # 订阅所有 news. 开头的频道

# 发布消息
PUBLISH channel1 "hello"

# 取消订阅
UNSUBSCRIBE channel1
PUNSUBSCRIBE news.*

# 查看活跃频道
PUBSUB CHANNELS

# 查看频道订阅数
PUBSUB NUMSUB channel1

# 查看模式订阅数
PUBSUB NUMPAT

1.3 使用示例

客户端 1(订阅者):

127.0.0.1:6379> SUBSCRIBE news
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news"
3) (integer) 1

客户端 2(发布者):

127.0.0.1:6379> PUBLISH news "Hello Redis"
(integer) 1 # 返回订阅者数量

客户端 1(收到消息):

1) "message"
2) "news"
3) "Hello Redis"

1.4 模式订阅示例

# 订阅所有 sports 开头的频道
PSUBSCRIBE sports.*

# 发布消息到 sports.basketball
PUBLISH sports.basketball "NBA game started"

# 订阅者会收到消息

1.5 原理分析

频道数据结构:

// 服务器状态
struct redisServer {
dict *pubsub_channels; // 频道字典,键为频道名,值为链表(订阅客户端)
list *pubsub_patterns; // 模式订阅链表
};

// 客户端状态
struct redisClient {
dict *pubsub_channels; // 客户端订阅的频道
list *pubsub_patterns; // 客户端订阅的模式
};

消息转发流程:

  1. 客户端发送 PUBLISH channel message
  2. 服务器在 pubsub_channels 中查找频道
  3. 遍历频道的订阅者链表,发送消息给每个订阅者
  4. 遍历 pubsub_patterns,匹配模式订阅者,发送消息

1.6 优缺点

优点:

  • 解耦发布者和订阅者
  • 支持多对多通信
  • 支持模式匹配(通配符)

缺点:

  • 消息不持久化:离线订阅者无法接收历史消息
  • 消息可能丢失:订阅者断线时无法接收消息
  • 无消息确认:发布者不知道消息是否被接收
  • 无消息堆积:如果订阅者处理慢,消息会丢失

面试题:Pub/Sub 的适用场景和局限性?

答案:

适用场景:

  1. 实时通知:系统通知、消息推送
  2. 即时通讯:聊天室、在线状态
  3. 配置更新:配置变更通知
  4. 日志收集:分布式日志收集

不适用场景:

  1. 可靠消息传递:需要使用消息队列(RabbitMQ、Kafka)
  2. 离线消息:需要使用 Stream 或消息队列
  3. 消息堆积:需要使用消息队列
  4. 事务消息:需要使用事务机制

替代方案:

  • Redis Stream(Redis 5.0+):支持消息持久化和消费者组
  • 消息队列:RabbitMQ、Kafka、RocketMQ

二、Lua 脚本

2.1 基本概念

Lua 脚本:在 Redis 中执行 Lua 脚本,可以保证多个命令的原子性执行。

优势:

  • 原子性:脚本执行期间不执行其他命令
  • 减少网络开销:多个命令一次性发送
  • 复用性:脚本可以被缓存和重复执行
  • 复杂逻辑:支持条件判断、循环等

2.2 常用命令

# 执行脚本
EVAL script numkeys key [key ...] arg [arg ...]

# 执行缓存的脚本
EVALSHA sha1 numkeys key [key ...] arg [arg ...]

# 脚本缓存(但不执行)
SCRIPT LOAD script

# 检查脚本是否存在
SCRIPT EXISTS sha1 [sha1 ...]

# 刷新脚本缓存
SCRIPT FLUSH

# 强制杀死脚本
SCRIPT KILL

2.3 使用示例

示例 1:原子性操作

-- 扣减库存并记录订单
local stock = redis.call('GET', KEYS[1])
if tonumber(stock) > 0 then
redis.call('DECR', KEYS[1])
redis.call('HSET', KEYS[2], 'userid', ARGV[1], 'time', ARGV[2])
return 1
else
return 0
end

执行:

EVAL "local stock = redis.call('GET', KEYS[1]) if tonumber(stock) > 0 then redis.call('DECR', KEYS[1]) redis.call('HSET', KEYS[2], 'userid', ARGV[1], 'time', ARGV[2]) return 1 else return 0 end" 2 stock:1001 order:1001 user123 1704067200

示例 2:批量操作

-- 批量设置 key,并设置过期时间
for i = 1, #KEYS do
redis.call('SET', KEYS[i], ARGV[i])
redis.call('EXPIRE', KEYS[i], 3600)
end
return #KEYS

执行:

EVAL "for i = 1, #KEYS do redis.call('SET', KEYS[i], ARGV[i]) redis.call('EXPIRE', KEYS[i], 3600) end return #KEYS" 3 key1 key2 key3 value1 value2 value3

示例 3:限流器

-- 滑动窗口限流
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

-- 删除窗口外的记录
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)

-- 获取当前窗口内的请求数
local count = redis.call('ZCARD', key)

if count < limit then
-- 添加当前请求
redis.call('ZADD', key, now, now)
-- 设置过期时间
redis.call('EXPIRE', key, window)
return 1 -- 允许请求
else
return 0 -- 拒绝请求
end

执行:

EVAL "local key = KEYS[1] local limit = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) redis.call('ZREMRANGEBYSCORE', key, 0, now - window) local count = redis.call('ZCARD', key) if count < limit then redis.call('ZADD', key, now, now) redis.call('EXPIRE', key, window) return 1 else return 0 end" 1 limit:user:123 100 60 1704067200

示例 4:分布式锁(更完善的实现)

-- 获取锁
local lockKey = KEYS[1]
local lockValue = ARGV[1]
local expireTime = tonumber(ARGV[2])

if redis.call('SET', lockKey, lockValue, 'NX', 'EX', expireTime) then
return 1
else
return 0
end
-- 释放锁
local lockKey = KEYS[1]
local lockValue = ARGV[1]

if redis.call('GET', lockKey) == lockValue then
return redis.call('DEL', lockKey)
else
return 0
end

2.4 脚本缓存

问题:每次发送完整脚本会消耗网络带宽

解决:使用 SCRIPT LOAD 缓存脚本,返回 SHA1 校验和

# 缓存脚本
SCRIPT LOAD "return redis.call('GET', KEYS[1])"
# 返回: "4c64034eb8f4256c5b202840bb5ed6b0d3925c53"

# 使用 SHA1 执行
EVALSHA 4c64034eb8f4256c5b202840bb5ed6b0d3925c53 1 mykey

Java 实现:

@Autowired
private RedisScript<Long> redisScript;

@Autowired
private StringRedisTemplate redisTemplate;

public Boolean seckill(Long userId, Long productId) {
String sha = redisScript.getSha1(); // 脚本的 SHA1

Long result = redisTemplate.execute(
RedisScript.of(sha, Long.class),
Arrays.asList("stock:" + productId, "order:" + productId),
userId.toString(), String.valueOf(System.currentTimeMillis())
);

return result == 1;
}

2.5 注意事项

超时处理:

  • 脚本执行时间默认限制为 5 秒
  • 超时后返回 BUSY 错误
  • 可通过 lua-time-limit 配置调整

脚本超时处理:

# 查看正在执行的脚本
SCRIPT KILL

# 如果脚本正在修改数据,无法 KILL,需要 SHUTDOWN NOSAVE

禁止操作:

  • 不建议在脚本中使用随机数(每次执行结果相同)
  • 不建议执行长时间运行的脚本
  • 避免访问外部系统(网络、文件)

面试题:Lua 脚本如何保证原子性?

答案:

  • Redis 使用单线程模型执行命令
  • Lua 脚本执行期间,不会执行其他客户端的命令
  • 整个脚本作为一个整体执行,不会被中断
  • 类似于事务,但比事务更灵活(支持逻辑判断)

面试题:Lua 脚本和事务的区别?

答案:

特性Lua 脚本事务(MULTI/EXEC)
原子性支持支持
条件判断支持不支持
循环支持不支持
错误处理支持复杂逻辑遇到错误继续执行
网络开销一次请求多次请求(Pipeline 可优化)
复杂度适合复杂逻辑适合简单批量操作

使用建议:

  • 简单批量操作:使用事务 + Pipeline
  • 复杂业务逻辑:使用 Lua 脚本
  • 需要条件判断:使用 Lua 脚本

面试题:如何实现一个限流器?

答案:

方案 1:固定窗口(有问题)

local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])

local count = redis.call('GET', key)
if count == false then
redis.call('SET', key, 1, 'EX', window)
return 1
elseif tonumber(count) < limit then
redis.call('INCR', key)
return 1
else
return 0
end

问题:窗口边界可能出现 2 倍流量

方案 2:滑动窗口(推荐)

local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

-- 删除窗口外的记录
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)

-- 获取当前窗口内的请求数
local count = redis.call('ZCARD', key)

if count < limit then
redis.call('ZADD', key, now, now)
redis.call('EXPIRE', key, window)
return 1
else
return 0
end

方案 3:令牌桶(适合平滑限流)

local key = KEYS[1]
local capacity = tonumber(ARGV[1]) -- 桶容量
local rate = tonumber(ARGV[2]) -- 令牌生成速率(个/秒)
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4]) -- 请求令牌数

-- 获取当前令牌数和上次填充时间
local info = redis.call('HMGET', key, 'tokens', 'last_time')
local tokens = tonumber(info[1]) or capacity
local last_time = tonumber(info[2]) or now

-- 计算需要补充的令牌数
local delta = math.max(0, (now - last_time) * rate / 1000)
tokens = math.min(capacity, tokens + delta)

if tokens >= requested then
-- 扣减令牌
redis.call('HMSET', key, 'tokens', tokens - requested, 'last_time', now)
redis.call('EXPIRE', key, math.ceil(capacity / rate * 2) + 1)
return 1
else
-- 令牌不足,更新但不扣减
redis.call('HMSET', key, 'tokens', tokens, 'last_time', now)
redis.call('EXPIRE', key, math.ceil(capacity / rate * 2) + 1)
return 0
end

三、Pipeline(管道)

3.1 基本概念

Pipeline:在一次网络往返中执行多个命令,减少网络开销。

原理:

  • 客户端打包多个命令,一次性发送给服务器
  • 服务器一次性执行多个命令,返回所有结果
  • 减少网络往返时间(RTT,Round Trip Time)

3.2 性能对比

不使用 Pipeline:

# 100 次 SET 操作
for i in {1..100}; do
redis-cli SET key$i value$i
done
# 耗时:约 1-2 秒(100 次 RTT)

使用 Pipeline:

# 100 次 SET 操作
echo -n "$(for i in {1..100}; do echo "SET key$i value$i"; done)" | redis-cli --pipe
# 耗时:约 0.01-0.05 秒(1 次 RTT)

**性能提升:**20-100 倍(取决于网络延迟)

3.3 使用示例

Java(Jedis)

Jedis jedis = new Jedis("localhost");

// 不使用 Pipeline
long start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
jedis.set("key:" + i, "value:" + i);
}
System.out.println("Time: " + (System.currentTimeMillis() - start) + "ms");

// 使用 Pipeline
Pipeline pipeline = jedis.pipelined();
start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
pipeline.set("key:" + i, "value:" + i);
}
pipeline.sync(); // 必须调用 sync() 才会发送命令
System.out.println("Pipeline Time: " + (System.currentTimeMillis() - start) + "ms");

Java(Lettuce)

RedisClient client = RedisClient.create("redis://localhost");
StatefulRedisConnection<String, String> connection = client.connect();

// 使用 Pipeline
long start = System.currentTimeMillis();
connection.setAutoFlushCommands(false); // 关闭自动刷新

for (int i = 0; i < 10000; i++) {
connection.sync().set("key:" + i, "value:" + i);
}

connection.flushCommands(); // 手动刷新(一次性发送)
System.out.println("Pipeline Time: " + (System.currentTimeMillis() - start) + "ms");

Python(redis-py)

import redis

r = redis.Redis(host='localhost', port=6379)

# 使用 Pipeline
pipe = r.pipeline()
for i in range(10000):
pipe.set(f'key:{i}', f'value:{i}')
pipe.execute() # 一次性执行

3.4 注意事项

命令数量限制:

  • Pipeline 不应该包含过多命令
  • 建议每批 100-1000 个命令
  • 避免内存占用过大

事务性:

  • Pipeline 不保证原子性
  • 只是为了减少网络开销
  • 需要原子性请使用 MULTI/EXEC 或 Lua 脚本

错误处理:

  • 如果某个命令失败,不影响其他命令执行
  • 需要检查每个命令的返回结果
Pipeline pipeline = jedis.pipelined();
pipeline.set("key1", "value1");
pipeline.incr("key1"); // 错误:key1 不是数字
pipeline.set("key2", "value2");

List<Object> results = pipeline.syncAndReturnAll();
// results[1] 会返回错误信息

面试题:Pipeline 和事务的区别?

答案:

特性Pipeline事务(MULTI/EXEC)
网络往返减少到 1 次减少到 1 次
原子性不保证保证
隔离性不保证保证
命令执行立即执行EXEC 时一起执行
错误处理单独处理遇到语法错误全部不执行
使用场景批量操作需要原子性的操作

使用建议:

  • 批量操作,不需要原子性:使用 Pipeline
  • 需要原子性:使用事务
  • 需要复杂逻辑:使用 Lua 脚本

面试题:Pipeline 如何提升性能?

答案:

  • 减少网络往返:从 N 次 RTT 减少到 1 次 RTT
  • 减少 TCP 开销:减少 TCP 包的封装和解封装
  • 减少系统调用:减少 read/write 系统调用次数
  • 适用场景:批量操作、网络延迟高的环境

性能测试示例:

网络延迟 1ms,执行 1000 个命令:
- 不使用 Pipeline:1000ms(1000 × 1ms)
- 使用 Pipeline:5ms(1 × 1ms + 4ms 执行时间)

网络延迟 100ms,执行 1000 个命令:
- 不使用 Pipeline:100000ms(1000 × 100ms)
- 使用 Pipeline:105ms(1 × 100ms + 5ms 执行时间)

四、慢查询分析

4.1 基本概念

慢查询:执行时间超过指定阈值的命令。

作用:

  • 发现性能瓶颈
  • 优化慢命令
  • 监控系统健康状态

4.2 相关配置

# 慢查询阈值(微秒),默认 10000(10ms)
slowlog-log-slower-than 10000

# 慢查询日志最大长度,默认 128
slowlog-max-len 128

注意事项:

  • slowlog-log-slower-than = 0:记录所有命令
  • slowlog-log-slower-than < 0:不记录任何命令
  • 循环队列,新日志覆盖最老的日志

4.3 常用命令

# 获取慢查询日志
SLOWLOG GET [n]

# 获取慢查询日志长度
SLOWLOG LEN

# 重置慢查询日志
SLOWLOG RESET

4.4 慢查询日志格式

127.0.0.1:6379> SLOWLOG GET
1) 1) (integer) 14 # 日志唯一标识
2) (integer) 1704067200 # 执行时间戳
3) (integer) 50000 # 执行耗时(微秒)
4) "KEYS" "user:*" # 执行的命令及参数
5) "127.0.0.1:54321" # 客户端 IP 和端口
6) "" # 客户端名称

4.5 常见慢命令

O(N) 复杂度命令:

# 查找所有匹配的 key(生产环境禁止使用)
KEYS pattern

# 获取所有 Hash 字段
HGETALL key

# 获取所有 Set 成员
SMEMBERS key

# 获取所有 ZSet 成员
ZRANGE key 0 -1 WITHSCORES

大数据量操作:

# 删除大 Hash(阻塞主线程)
DEL bigkey

# 大 List 操作
LRANGE large_list 0 -1

4.6 慢查询优化

优化建议:

  1. 避免 KEYS,使用 SCAN
# 不推荐
KEYS user:*

# 推荐
SCAN cursor MATCH user:* COUNT 100
  1. 使用 HSCAN/SSCAN/ZSCAN
# 不推荐
HGETALL user:1001

# 推荐
HSCAN user:1001 0 COUNT 100
  1. 使用 UNLINK 替代 DEL
# DEL 同步删除(阻塞)
DEL bigkey

# UNLINK 异步删除(不阻塞)
UNLINK bigkey
  1. 分批操作
// 分批删除大 Hash
String cursor = "0";
do {
ScanResult<Map.Entry<String, String>> scanResult =
jedis.hscan("bigkey", cursor);
cursor = scanResult.getCursor();

List<String> fields = scanResult.getResult().stream()
.map(Map.Entry::getKey)
.collect(Collectors.toList());

if (!fields.isEmpty()) {
jedis.hdel("bigkey", fields.toArray(new String[0]));
}
} while (!cursor.equals("0"));

4.7 慢查询监控

定期分析脚本:

#!/bin/bash
# 慢查询分析脚本

redis-cli SLOWLOG GET 100 > slowlog.txt

# 分析最慢的 10 个命令
cat slowlog.txt | grep -A 6 "integer) [0-9]" | \
awk '/4)/ {cmd=$0; next} /3)/ {time=$2; print time, cmd}' | \
sort -rn | head -10

Prometheus 监控:

# 使用 redis_exporter 导出慢查询指标
- job_name: 'redis'
static_configs:
- targets: ['localhost:9121']

面试题:如何分析和优化慢查询?

答案:

分析步骤:

  1. 查看慢查询日志
redis-cli SLOWLOG GET 10
  1. 分析慢命令类型
  • O(N) 复杂度命令(KEYS、HGETALL)
  • 大 key 操作
  • 网络延迟
  1. 分析执行频率
# 查看 INFO commandstats
redis-cli INFO commandstats | grep -E "cmdstat_(keys|hgetall)"
  1. 优化方案
  • KEYS → SCAN
  • HGETALL → HSCAN
  • DEL → UNLINK
  • 分批操作

预防措施:

  • 监控慢查询日志
  • 设置告警阈值
  • 代码审查,禁止危险命令
  • 使用 Pipeline 批量操作

五、Bitmap(位图)

5.1 基本概念

Bitmap:通过操作二进制位来实现数据存储,底层使用 String 类型。

特点:

  • 节省内存(每个用户只占 1 bit)
  • 支持位运算
  • 适合存储二值状态(签到、活跃用户等)

5.2 常用命令

# 设置位
SETBIT key offset value

# 获取位
GETBIT key offset

# 统计位数为 1 的个数
BITCOUNT key [start end]

# 位运算(AND、OR、XOR、NOT)
BITOP operation destkey key [key ...]

# 查找第一个为 1/0 的位
BITPOS key bit [start end]

# 获取位图范围内的字节值
GETRANGE key start end

5.3 使用示例

示例 1:用户签到

# 用户在 2024-01-01 签到(offset 为日期)
SETBIT user:1001:signin:2024 0 1

# 检查用户是否签到
GETBIT user:1001:signin:2024 0

# 统计用户签到天数
BITCOUNT user:1001:signin:2024

Java 实现:

// 用户签到
public void signIn(Long userId, LocalDate date) {
int offset = date.getDayOfYear() - 1;
redisTemplate.opsForValue().setBit(
"user:" + userId + ":signin:" + date.getYear(),
offset,
true
);
}

// 检查签到
public boolean checkSignIn(Long userId, LocalDate date) {
int offset = date.getDayOfYear() - 1;
Boolean signed = redisTemplate.opsForValue().getBit(
"user:" + userId + ":signin:" + date.getYear(),
offset
);
return Boolean.TRUE.equals(signed);
}

// 统计签到天数
public long getSignInDays(Long userId, int year) {
return redisTemplate.execute((RedisCallback<Long>) connection -> {
return connection.bitCount(
("user:" + userId + ":signin:" + year).getBytes()
);
});
}

示例 2:连续签到统计

# 用户签到位图
# offset: 0-364(一年 365 天)
# value: 0 未签到,1 已签到

# 用户 2024 年签到记录(假设前 10 天都签到了)
for i in {0..9}; do
redis-cli SETBIT user:1001:signin:2024 $i 1
done

# 查看用户是否连续签到 7 天
# 需要 BITPOS 和 BITCOUNT 配合使用

Java 实现(计算连续签到天数):

public int getConsecutiveSignInDays(Long userId, int year) {
byte[] key = ("user:" + userId + ":signin:" + year).getBytes();

return redisTemplate.execute((RedisCallback<Integer>) connection -> {
// 从今天开始往前查找
int consecutiveDays = 0;
int today = LocalDate.now().getDayOfYear() - 1;

for (int i = today; i >= 0; i--) {
Boolean bit = connection.getBit(key, i);
if (Boolean.TRUE.equals(bit)) {
consecutiveDays++;
} else {
break;
}
}

return consecutiveDays;
});
}

示例 3:活跃用户统计

# 用户每天活跃记录
SETBIT active:2024-01-01 1001 1
SETBIT active:2024-01-01 1002 1
SETBIT active:2024-01-01 1003 1

# 统计某天活跃用户数
BITCOUNT active:2024-01-01

# 统计 7 天活跃用户(BITOR 运算)
BITOP OR active:week:2024-01-01 \
active:2024-01-01 \
active:2024-01-02 \
active:2024-01-03 \
active:2024-01-04 \
active:2024-01-05 \
active:2024-01-06 \
active:2024-01-07

BITCOUNT active:week:2024-01-01

示例 4:用户特征标签

# 用户标签位图
# bit 0: 性别(0 女,1 男)
# bit 1: 是否 VIP
# bit 2: 是否认证
# bit 3: 是否签约作者

SETBIT user:1001:tags 0 1 # 男
SETBIT user:1001:tags 1 1 # VIP
SETBIT user:1001:tags 2 1 # 已认证
SETBIT user:1001:tags 3 0 # 未签约

# 查询是否 VIP
GETBIT user:1001:tags 1

# 批量查询标签
GETRANGE user:1001:tags 0 0 # 获取第一个字节

5.4 内存优化

内存占用计算:

1 个用户 = 1 bit
1 万用户 = 1.25 KB
10 万用户 = 12.5 KB
100 万用户 = 125 KB
1 亿用户 = 12.5 MB

面试题:Bitmap 的应用场景?

答案:

  1. 用户签到系统

    • 每天一个 bit
    • 统计签到天数、连续签到
  2. 活跃用户统计

    • DAU/MAU 统计
    • 留存率计算
  3. 用户标签系统

    • 多维度标签
    • 快速过滤
  4. 在线用户统计

    • 实时在线用户
    • 历史在线记录
  5. 权限管理

    • 角色权限位运算
    • 快速权限判断

面试题:Bitmap 和 Set 的区别?

答案:

特性BitmapSet
内存占用极小(1 bit/用户)较大(根据元素大小)
元素类型数字 ID任意类型
操作位运算集合运算
查询GETBITSISMEMBER
排序不支持Sorted Set 支持
适用场景二值状态存储唯一值

选择建议:

  • 用户 ID 连续、二值状态:使用 Bitmap
  • 元素类型多样、需要存储额外信息:使用 Set

六、HyperLogLog(基数统计)

6.1 基本概念

HyperLogLog:用于基数统计的概率算法,可以统计集合中不重复元素的个数。

特点:

  • 内存占用极小(12 KB)
  • 误差率约 0.81%
  • 不存储实际元素

应用场景:

  • 统计独立访客数(UV)
  • 统计独立用户数
  • 大数据去重统计

6.2 常用命令

# 添加元素
PFADD key element [element ...]

# 统计基数
PFCOUNT key [key ...]

# 合并多个 HyperLogLog
PFMERGE destkey sourcekey [sourcekey ...]

6.3 使用示例

示例 1:统计日活跃用户(DAU)

# 用户 1001 访问
PFADD dau:2024-01-01 user:1001

# 用户 1002 访问
PFADD dau:2024-01-01 user:1002

# 用户 1001 再次访问(不会重复计数)
PFADD dau:2024-01-01 user:1001

# 统计 DAU
PFCOUNT dau:2024-01-01
# 返回:2

示例 2:统计周活跃用户(WAU)

# 假设已有 7 天的数据
# dau:2024-01-01 到 dau:2024-01-07

# 合并 7 天的数据
PFMERGE wau:2024-01-01 \
dau:2024-01-01 \
dau:2024-01-02 \
dau:2024-01-03 \
dau:2024-01-04 \
dau:2024-01-05 \
dau:2024-01-06 \
dau:2024-01-07

# 统计 WAU
PFCOUNT wau:2024-01-01

Java 实现:

// 记录用户访问
public void recordVisit(Long userId, LocalDate date) {
redisTemplate.opsForHyperLogLog().add(
"dau:" + date.toString(),
"user:" + userId
);
}

// 统计 DAU
public long getDAU(LocalDate date) {
return redisTemplate.opsForHyperLogLog().size("dau:" + date);
}

// 统计 WAU
public long getWAU(LocalDate startDate) {
List<LocalDate> dates = new ArrayList<>();
for (int i = 0; i < 7; i++) {
dates.add(startDate.plusDays(i));
}

String[] keys = dates.stream()
.map(date -> "dau:" + date)
.toArray(String[]::new);

// PFMERGE + PFCOUNT
return redisTemplate.opsForHyperLogLog().size(keys);
}

6.4 原理简介

核心思想:

  • 使用哈希函数将元素映射到 [0, 1) 区间
  • 记录哈希值的前导零个数
  • 前导零越多,说明哈希值越小,概率越低
  • 通过调和平均数估计基数

示例:

元素 "user:1001" → hash("user:1001") → 0.000011... (前导零 4 个)
元素 "user:1002" → hash("user:1002") → 0.00101... (前导零 2 个)
元素 "user:1003" → hash("user:1003") → 0.1... (前导零 0 个)

取最大的前导零个数(4),估计基数为 2^4 = 16

6.5 注意事项

误差率:

  • 标准误差:0.81%
  • 元素个数 < 2^14 时,误差较小
  • 元素个数极大时,误差增大

不适用场景:

  • 需要精确统计(使用 Set)
  • 需要获取具体元素(HyperLogLog 不存储元素)
  • 数据量很小(< 10000)

面试题:HyperLogLog 的优缺点?

答案:

优点:

  • 内存占用极小:固定 12 KB,而 Set 需要根据元素数量
  • 性能优秀:添加和查询都是 O(1)
  • 支持合并:可以合并多个 HyperLogLog

缺点:

  • 有误差:0.81% 的误差率
  • 不存储元素:无法获取具体元素列表
  • 不适合精确统计:只适合估算场景

面试题:Set 和 HyperLogLog 如何选择?

答案:

特性SetHyperLogLog
内存占用极小(12 KB)
精度精确有误差(0.81%)
查询元素支持不支持
合并操作SUNIONPFMERGE
使用场景精确去重、元素查询大数据去重统计

选择建议:

  • 需要精确统计:使用 Set
  • 大数据量、只需要估算:使用 HyperLogLog
  • DAU/MAU 统计:使用 HyperLogLog
  • 用户标签、白名单:使用 Set

七、GEO(地理位置)

7.1 基本概念

GEO:Redis 3.2+ 提供的地理位置功能,底层使用 Sorted Set(ZSet)实现。

特点:

  • 存储地理位置(经纬度)
  • 计算距离
  • 范围查询(附近的人/地点)
  • 底层使用 Geohash 算法

7.2 常用命令

# 添加地理位置
GEOADD key longitude latitude member [longitude latitude member ...]

# 获取地理位置
GEOPOS key member [member ...]

# 计算两个位置的距离
GEODIST key member1 member2 [unit]

# 获取指定范围内的位置(以中心点为圆心)
GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]

# 获取指定范围内的位置(以成员为圆心)
GEORADIUSBYMEMBER key member radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]

# 获取 Geohash
GEOHASH key member [member ...]

7.3 使用示例

示例 1:添加位置

# 添加城市坐标
GEOADD cities:coords 116.4074 39.9042 "Beijing"
GEOADD cities:coords 121.4737 31.2304 "Shanghai"
GEOADD cities:coords 113.2644 23.1291 "Guangzhou"
GEOADD cities:coords 114.1694 22.3193 "Shenzhen"

示例 2:计算距离

# 计算北京到上海的距离
GEODIST cities:coords Beijing Shanghai km
# 返回:"1067.6941"

# 计算北京到广州的距离
GEODIST cities:coords Beijing Guangzhou km
# 返回:"1888.1234"

示例 3:附近的人

# 添加用户位置
GEOADD users:location 116.4074 39.9042 "user:1001"
GEOADD users:location 116.4174 39.9142 "user:1002"
GEOADD users:location 116.3874 39.8942 "user:1003"

# 查找北京 10 km 内的用户
GEORADIUS users:location 116.4074 39.9042 10 km WITHDIST WITHCOORD

返回结果:

1) 1) "user:1001"
2) "0.0000" # 距离(km)
3) 1) "116.40739902853965" # 经度
2) "39.90420085479628" # 纬度
2) 1) "user:1002"
2) "1.5678"
3) 1) "116.41739902853965"
2) "39.91420085479628"

示例 4:附近的人(按成员)

# 查找 user:1001 附近 5 km 内的用户
GEORADIUSBYMEMBER users:location "user:1001" 5 km WITHDIST

示例 5:按距离排序

# 查找北京 100 km 内的城市,按距离升序
GEORADIUS cities:coords 116.4074 39.9042 100 km ASC WITHDIST

Java 实现:

// 添加用户位置
public void addUserLocation(Long userId, Double longitude, Double latitude) {
redisTemplate.opsForGeo().add(
"users:location",
new Point(longitude, latitude),
userId.toString()
);
}

// 查找附近的人
public List<NearUser> findNearUsers(Double longitude, Double latitude,
double radius, int limit) {
Distance distance = new Distance(radius, RedisGeoCommands.DistanceUnit.KILOMETERS);
Circle circle = new Circle(new Point(longitude, latitude), distance);

RedisGeoCommands.GeoRadiusCommandArgs args =
RedisGeoCommands.GeoRadiusCommandArgs
.newGeoRadiusArgs()
.includeDistance() # 包含距离
.includeCoordinates() # 包含坐标
.sortAscending() # 按距离升序
.limit(limit); # 限制数量

GeoResults<RedisGeoCommands.GeoLocation<String>> results =
redisTemplate.opsForGeo().radius("users:location", circle, args);

return results.getContent().stream()
.map(result -> {
RedisGeoCommands.GeoLocation<String> location = result.getContent();
Point point = location.getPoint();
return new NearUser(
Long.parseLong(location.getName()),
result.getDistance().getValue(),
point.getX(),
point.getY()
);
})
.collect(Collectors.toList());
}

7.4 底层原理

Geohash 算法:

  • 将经纬度编码为字符串
  • 相近的位置有相似的 Geohash 值
  • 使用 ZSet 存储,score 为 Geohash

编码过程:

  1. 经纬度转换为二进制
  2. 经纬度二进制位交错合并
  3. 使用 Base32 编码

示例:

北京:116.4074, 39.9042
→ 经度二进制:1100100...
→ 纬度二进制:1011000...
→ 交错合并:111001000...
→ Base32 编码:wx4g0e

7.5 性能优化

避免全量查询:

  • 使用 COUNT 限制结果数量
  • 使用 STORE 缓存结果
# 查询并缓存结果
GEORADIUS users:location 116.4074 39.9042 10 km COUNT 100 STORE nearby:users

数据分片:

  • 按城市/区域分片
  • 减少单个 key 的数据量

面试题:GEO 的底层实现原理?

答案:

底层使用 ZSet:

# GEO 内部使用 ZSet 存储
ZADD users:location geohash member

# 查看 ZSet
ZRANGE users:location 0 -1 WITHSCORES

Geohash 算法:

  • 将二维坐标(经纬度)编码为一维字符串
  • 相近的位置有相似的 Geohash 前缀
  • 通过 ZSet 的范围查询实现附近查询

精度:

  • Geohash 长度越长,精度越高
  • 默认使用 52 位双精度浮点数
  • 误差范围在 0.5 m 左右

面试题:附近的人如何实现?

答案:

方案 1:Redis GEO

GEORADIUS users:location 116.4074 39.9042 10 km WITHDIST
  • 优点:简单易用
  • 缺点:数据量大时性能下降

方案 2:Geohash + 数据库

  • 将用户位置按 Geohash 分桶
  • 查询时查找相邻的 8 个桶
  • 过滤距离并排序

方案 3:MongoDB Geospatial Query

db.users.find({
location: {
$near: {
$geometry: { type: "Point", coordinates: [116.4074, 39.9042] },
$maxDistance: 10000 # 10 km
}
}
})

性能优化:

  • Redis GEO:适合数据量 < 100 万
  • MongoDB:适合数据量 > 100 万
  • Elasticsearch:适合海量数据

八、Stream(数据流)

8.1 基本概念

Stream:Redis 5.0+ 提供的日志数据结构,类似 Kafka。

特点:

  • 支持消息持久化
  • 支持消费者组
  • 支持 ACK 确认机制
  • 支持消息回溯

应用场景:

  • 消息队列
  • 事件溯源
  • 日志收集

8.2 常用命令

# 添加消息
XADD key [NOMKSTREAM] ["*" | ID] field value [field value ...]

# 读取消息
XRANGE key start end [COUNT count]
XREVRANGE key end start [COUNT count]

# 读取流(阻塞)
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

# 创建消费者组
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id]
[DESTROY key groupname]
[DELGROUP key groupname]

# 读取消费者组消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]
[NOACK] STREAMS key [key ...] ID [ID ...]

# ACK 确认
XACK key group id [id ...]

# 查看消费者组信息
XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

8.3 使用示例

示例 1:生产者

# 添加消息(自动生成 ID)
XADD stream:orders * name "order1" amount 100
# 返回:"1704067200000-0"

# 添加消息(指定 ID)
XADD stream:orders 1704067200000-1 name "order2" amount 200

示例 2:消费者

# 读取所有消息
XRANGE stream:orders - +

# 读取指定范围消息
XRANGE stream:orders 1704067200000-0 1704067200000-1

# 阻塞读取新消息
XREAD BLOCK 0 STREAMS stream:orders $

示例 3:消费者组

# 创建消费者组(从开始消费)
XGROUP CREATE stream:orders group1 0

# 创建消费者组(从新消息开始消费)
XGROUP CREATE stream:orders group2 $

# 消费者读取消息
XREADGROUP GROUP group1 consumer1 COUNT 1 STREAMS stream:orders >
# 返回:当前组未处理的消息

# ACK 确认
XACK stream:orders group1 1704067200000-0

示例 4:消息队列

# 生产者
XADD stream:tasks * task "send_email" user 1001

# 消费者(消费者组)
XGROUP CREATE stream:tasks workers 0

# Worker 1
XREADGROUP GROUP workers worker1 COUNT 1 STREAMS stream:tasks >
# 处理完成后 ACK
XACK stream:tasks workers <message-id>

# Worker 2
XREADGROUP GROUP workers worker2 COUNT 1 STREAMS stream:tasks >

Java 实现:

// 发送消息
public String sendMessage(String stream, Map<String, String> message) {
return redisTemplate.opsForStream().add(
stream,
message
).getValue();
}

// 消费消息
public void consumeMessages(String stream, String group, String consumer) {
while (true) {
List<MapRecord<String, Object, Object>> messages =
redisTemplate.opsForStream().read(
Consumer.from(group, consumer),
StreamReadOptions.empty().count(1),
StreamOffset.create(stream, ReadOffset.lastConsumed())
);

if (messages.isEmpty()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
continue;
}

for (MapRecord<String, Object, Object> message : messages) {
try {
// 处理消息
handleMessage(message.getValue());

// ACK 确认
redisTemplate.opsForStream().acknowledge(stream, group, message.getId());
} catch (Exception e) {
// 处理失败,消息会保留,后续可以重试
log.error("Handle message failed: {}", message, e);
}
}
}
}

8.4 消息可靠性

Pending 消息:

  • 已发送给消费者但未 ACK 的消息
  • 使用 XPENDING 查看
# 查看 Pending 消息
XPENDING stream:orders group1

# 查看 Pending 详情
XPENDING stream:orders group1 - + 10 consumer1

消息回溯:

# 将消息重新发送到消费者组
XCLAIM stream:orders group1 consumer2 0 1704067200000-0

8.5 Stream vs List vs Pub/Sub

特性StreamListPub/Sub
消息持久化支持支持不支持
消费者组支持不支持不支持
ACK 机制支持不支持不支持
消息回溯支持不支持不支持
多消费支持不支持支持
性能中等
复杂度

面试题:Stream、List、Pub/Sub 如何选择?

答案:

Stream:

  • 需要消息持久化
  • 需要消费者组
  • 需要确认机制
  • 类似 Kafka

List:

  • 简单队列
  • 不需要 ACK
  • 性能要求高
  • 单消费者

Pub/Sub:

  • 实时通知
  • 不持久化
  • 多订阅者
  • 解耦服务

使用建议:

  • 复杂消息队列:Stream
  • 简单队列:List
  • 实时通知:Pub/Sub
  • 生产环境 MQ:Kafka、RabbitMQ

九、客户端管理

9.1 常用命令

# 查看客户端连接
CLIENT LIST

# 查看客户端名称
CLIENT GETNAME

# 设置客户端名称
CLIENT SETNAME connection-name

# 杀掉客户端连接
CLIENT KILL ip:port
CLIENT KILL ID client-id
CLIENT KILL TYPE master|normal|slave|pubsub

# 暂停客户端
CLIENT PAUSE timeout (ms)

# 查看客户端信息
CLIENT INFO

9.2 连接管理

查看连接信息:

127.0.0.1:6379> CLIENT LIST
addr=127.0.0.1:54321
fd=6
name=redis-cli
age=123
idle=0
db=0
sub=0
psub=0
multi=-1
qbuf=0
qbuf-free=0
obl=0
oll=0
omem=0
events=r
cmd=client

字段说明:

  • addr:客户端地址
  • name:客户端名称
  • age:连接时长(秒)
  • idle:空闲时长(秒)
  • db:当前数据库
  • cmd:最后执行的命令

限制最大连接数:

# 最大客户端连接数,默认 10000
maxclients 10000

9.3 连接池配置

Java(Jedis):

JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(100); # 最大连接数
config.setMaxIdle(50); # 最大空闲连接数
config.setMinIdle(10); # 最小空闲连接数
config.setMaxWaitMillis(10000); # 获取连接的最大等待时间
config.setTestOnBorrow(true); # 获取连接时测试
config.setTestOnReturn(false); # 归还连接时测试

JedisPool pool = new JedisPool(config, "localhost", 6379);

Java(Lettuce):

RedisURI uri = RedisURI.builder()
.withHost("localhost")
.withPort(6379)
.build();

LettuceClientConfiguration config = LettuceClientConfiguration.builder()
.commandTimeout(Duration.ofSeconds(10))
.shutdownTimeout(Duration.ZERO)
.build();

RedisClient client = RedisClient.create(uri);
StatefulRedisConnection<String, String> connection = client.connect();

十、服务器管理

10.1 常用命令

# 查看 Redis 信息
INFO [section]

# 查看 CONFIG
CONFIG GET parameter

# 修改 CONFIG
CONFIG SET parameter value

# 重写配置文件
CONFIG REWRITE

# 获取指定数据库的键个数
DBSIZE

# 异步刷新数据库
BGREWRITEAOF
BGSAVE

# 同步保存数据
SAVE

# 最后一次保存的时间戳
LASTSAVE

# 清空数据库
FLUSHDB [ASYNC|SYNC]
FLUSHALL [ASYNC|SYNC]

# 监控命令
MONITOR

# 同步从服务器
SYNC

10.2 INFO 命令详解

# 查看 Server 信息
INFO server

# 查看 Clients 信息
INFO clients

# 查看 Memory 信息
INFO memory

# 查看 Persistence 信息
INFO persistence

# 查看 Stats 信息
INFO stats

# 查看 Replication 信息
INFO replication

# 查看 CPU 信息
INFO cpu

# 查看 Cluster 信息
INFO cluster

# 查看 Keyspace 信息
INFO keyspace

# 查看所有信息
INFO all

Memory 信息示例:

127.0.0.1:6379> INFO memory
used_memory:1048576 # 当前内存使用量(字节)
used_memory_human:1.00M # 可读格式
used_memory_rss:2097152 # 系统分配的内存
used_memory_peak:2097152 # 历史峰值
maxmemory:0 # 最大内存限制(0 表示无限制)
maxmemory_policy:noeviction # 内存淘汰策略

10.3 性能监控

监控脚本:

#!/bin/bash
# Redis 性能监控脚本

while true; do
clear
echo "=== Redis Performance Monitor ==="
echo "Time: $(date)"
echo ""

# 连接数
clients=$(redis-cli --csv INFO clients | grep -o 'connected_clients:[0-9]*' | cut -d: -f2)
echo "Connected Clients: $clients"

# 内存使用
memory=$(redis-cli --csv INFO memory | grep -o 'used_memory_human:[^,]*' | cut -d: -f2)
echo "Memory Used: $memory"

# 命令执行次数
cmds=$(redis-cli --csv INFO stats | grep -o 'total_commands_processed:[0-9]*' | cut -d: -f2)
echo "Total Commands: $cmds"

# 每秒命令数(OPS)
ops=$(redis-cli --csv INFO stats | grep -o 'instantaneous_ops_per_sec:[0-9]*' | cut -d: -f2)
echo "OPS: $ops"

# 慢查询数
slowlog=$(redis-cli SLOWLOG LEN)
echo "Slow Queries: $slowlog"

echo ""
sleep 1
done

Prometheus 监控:

# docker-compose.yml
version: '3'
services:
redis:
image: redis:7
ports:
- "6379:6379"

redis-exporter:
image: oliver006/redis_exporter
ports:
- "9121:9121"
environment:
- REDIS_ADDR=redis:6379

十一、高级面试题汇总

11.1 如何实现全局唯一 ID?

答案:

方案 1:Redis INCR

# 简单递增
INCR id:generator

# 返回:1, 2, 3, ...

缺点:

  • ID 连续,容易被猜测
  • 单点问题

方案 2:时间戳 + 递增

-- 生成唯一 ID(雪花算法)
local key = KEYS[1]
local timestamp = ARGV[1]

-- 刷新时间窗口
if redis.call('GET', key .. ':timestamp') ~= timestamp then
redis.call('SET', key .. ':timestamp', timestamp)
redis.call('SET', key .. ':seq', 0)
end

-- 获取序列号
local seq = redis.call('INCR', key .. ':seq')

-- 组合 ID:时间戳(41 位)+ 机器 ID(10 位)+ 序列号(12 位)
return timestamp .. seq

方案 3:UUID + Redis 去重

public String generateId() {
String uuid = UUID.randomUUID().toString();
String key = "id:generated:" + uuid;

// 使用 SETNX 保证唯一性
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, "1", Duration.ofDays(1));

if (Boolean.TRUE.equals(success)) {
return uuid;
} else {
return generateId(); # 递归重试
}
}

方案 4:雪花算法

public class SnowflakeIdGenerator {
private final long epoch = 1609459200000L; # 2021-01-01 00:00:00
private final long machineId; # 机器 ID0-1023
private long sequence = 0; # 序列号
private long lastTimestamp = -1L;

public synchronized long nextId() {
long timestamp = System.currentTimeMillis() - epoch;

if (timestamp < lastTimestamp) {
throw new RuntimeException("Clock moved backwards");
}

if (timestamp == lastTimestamp) {
sequence = (sequence + 1) & 4095; # 12
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0;
}

lastTimestamp = timestamp;

# 41 位时间戳 + 10 位机器 ID + 12 位序列号
return (timestamp << 22) | (machineId << 12) | sequence;
}

private long tilNextMillis(long lastTimestamp) {
long timestamp = System.currentTimeMillis() - epoch;
while (timestamp <= lastTimestamp) {
timestamp = System.currentTimeMillis() - epoch;
}
return timestamp;
}
}

方案 5:Redis + 雪花算法

-- 分布式雪花算法
local key = KEYS[1]
local timestamp = ARGV[1]
local machineId = ARGV[2]

-- 刷新时间窗口
if redis.call('GET', key .. ':timestamp') ~= timestamp then
redis.call('SET', key .. ':timestamp', timestamp)
redis.call('SET', key .. ':seq', 0)
end

-- 获取序列号(12 位,最大 4095)
local seq = redis.call('INCR', key .. ':seq')
if seq > 4095 then
return nil # 序列号溢出
end

-- 组合 ID
return (timestamp * 10000 + machineId) * 10000 + seq

11.2 如何实现分布式限流?

答案:

方案 1:固定窗口(有问题)

local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])

local count = redis.call('GET', key)
if count == false then
redis.call('SET', key, 1, 'EX', window)
return 1
elseif tonumber(count) < limit then
redis.call('INCR', key)
return 1
else
return 0
end

**问题:**窗口边界可能出现 2 倍流量

方案 2:滑动窗口(推荐)

local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

# 删除窗口外的记录
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)

# 获取当前窗口内的请求数
local count = redis.call('ZCARD', key)

if count < limit then
redis.call('ZADD', key, now, now)
redis.call('EXPIRE', key, window)
return 1
else
return 0
end

方案 3:令牌桶(适合平滑限流)

local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local info = redis.call('HMGET', key, 'tokens', 'last_time')
local tokens = tonumber(info[1]) or capacity
local last_time = tonumber(info[2]) or now

local delta = math.max(0, (now - last_time) * rate / 1000)
tokens = math.min(capacity, tokens + delta)

if tokens >= requested then
redis.call('HMSET', key, 'tokens', tokens - requested, 'last_time', now)
redis.call('EXPIRE', key, math.ceil(capacity / rate * 2) + 1)
return 1
else
redis.call('HMSET', key, 'tokens', tokens, 'last_time', now)
redis.call('EXPIRE', key, math.ceil(capacity / rate * 2) + 1)
return 0
end

方案 4:漏桶(适合恒定速率)

public class LeakyBucketRateLimiter {
private final RedisTemplate redisTemplate;
private final double capacity; # 桶容量
private final double rate; # 漏水速率(请求/秒)

public boolean allowRequest(String key) {
long now = System.currentTimeMillis();

# 获取当前水量和上次漏水时间
List<Object> info = redisTemplate.executePipelined(
(RedisCallback<Object>) connection -> {
connection.get((key + ":water").getBytes());
connection.get((key + ":last_leak").getBytes());
return null;
}
);

double water = info[0] != null ? Double.parseDouble(info[0].toString()) : 0;
long lastLeak = info[1] != null ? Long.parseLong(info[1].toString()) : now;

# 计算漏出的水量
long elapsed = now - lastLeak;
water = Math.max(0, water - elapsed * rate / 1000.0);

if (water < capacity) {
# 加水
redisTemplate.opsForValue().set(key + ":water", water + 1);
redisTemplate.opsForValue().set(key + ":last_leak", now);
return true;
} else {
return false;
}
}
}

11.3 如何实现延迟队列?

答案:

方案 1:Sorted Set

# 添加延迟任务(score 为执行时间戳)
ZADD delayed:tasks 1704067200000 "task:send_email:user:1001"

# 定时扫描(每秒执行一次)
while true; do
now=$(date +%s%3N)
tasks=$(redis-cli ZRANGEBYSCORE delayed:tasks 0 $now)

for task in $tasks; do
# 处理任务
process_task $task

# 删除任务
redis-cli ZREM delayed:tasks $task
done

sleep 1
done

Java 实现:

public class DelayedQueue {
@Autowired
private RedisTemplate redisTemplate;

@Autowired
private TaskExecutor taskExecutor;

// 添加延迟任务
public void addDelayedTask(String task, long delayMs) {
long executeTime = System.currentTimeMillis() + delayMs;
redisTemplate.opsForZSet().add("delayed:tasks", task, executeTime);
}

// 定时扫描
@Scheduled(fixedDelay = 1000)
public void processDelayedTasks() {
long now = System.currentTimeMillis();

// 获取到期的任务
Set<String> tasks = redisTemplate.opsForZSet()
.rangeByScore("delayed:tasks", 0, now);

for (String task : tasks) {
// 使用 Lua 脚本保证原子性
String luaScript =
"if redis.call('zrem', KEYS[1], ARGV[1]) == 1 then " +
" return 1 " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> script = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(script, Collections.singletonList("delayed:tasks"), task);

if (result == 1) {
// 异步处理任务
taskExecutor.execute(() -> processTask(task));
}
}
}

private void processTask(String task) {
// 解析任务并执行
// ...
}
}

方案 2:Redis Keyspace Notifications

# 启用 Keyspace Notifications
CONFIG SET notify-keyspace-events Ex

# 设置过期 key
SET task:send_email:user:1001 "data" EX 60

# 监听过期事件
redis-cli --subscribe __keyevent@0__:expired

Java 实现:

@Configuration
public class RedisConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListener listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listener, new PatternTopic("__keyevent@*:expired"));
return container;
}
}

@Component
public class ExpiredKeyListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
String key = new String(message.getBody());
// key 格式:task:send_email:user:1001
// 解析并执行任务
processTask(key);
}
}

问题:

  • 不保证消息可靠性(Redis 宕机可能丢失)
  • 时间不精确(依赖过期检查时间)

方案 3:Redisson Delayed Queue

@Autowired
private RedissonClient redisson;

public void addDelayedTask(String task, long delay, TimeUnit unit) {
RDelayedQueue<String> queue = redisson.getDelayedQueue("delayed:tasks");
queue.offer(task, delay, unit);
}

public void startConsumer() {
RDelayedQueue<String> queue = redisson.getDelayedQueue("delayed:tasks");
queue.addListener(new MessageListener<String>() {
@Override
public void onMessage(String channel, String msg) {
processTask(msg);
}
});
}

11.4 如何实现点赞功能?

答案:

需求分析:

  • 用户可以点赞/取消点赞
  • 统计点赞数
  • 查询用户是否点赞
  • 查询点赞用户列表

方案 1:使用 Set

# 用户点赞
SADD post:1001:likes user:1001

# 取消点赞
SREM post:1001:likes user:1001

# 统计点赞数
SCARD post:1001:likes

# 检查是否点赞
SISMEMBER post:1001:likes user:1001

# 获取点赞用户列表(分页)
SRANDMEMBER post:1001:likes 10

方案 2:使用 Sorted Set(按时间排序)

# 用户点赞
ZADD post:1001:likes 1704067200 user:1001

# 取消点赞
ZREM post:1001:likes user:1001

# 统计点赞数
ZCARD post:1001:likes

# 检查是否点赞
ZSCORE post:1001:likes user:1001

# 获取点赞用户列表(按时间倒序)
ZREVRANGE post:1001:likes 0 9 WITHSCORES

Java 实现:

@Service
public class LikeService {
@Autowired
private StringRedisTemplate redisTemplate;

// 点赞
public void like(Long postId, Long userId) {
long score = System.currentTimeMillis();
redisTemplate.opsForZSet().add(
"post:" + postId + ":likes",
"user:" + userId,
score
);

# 更新帖子点赞数缓存
redisTemplate.opsForValue().increment("post:" + postId + ":like_count");

# 异步更新数据库
updateDatabase(postId, userId, true);
}

// 取消点赞
public void unlike(Long postId, Long userId) {
redisTemplate.opsForZSet().remove(
"post:" + postId + ":likes",
"user:" + userId
);

# 更新帖子点赞数缓存
redisTemplate.opsForValue().decrement("post:" + postId + ":like_count");

# 异步更新数据库
updateDatabase(postId, userId, false);
}

// 获取点赞数
public long getLikeCount(Long postId) {
# 先从缓存获取
String count = redisTemplate.opsForValue().get("post:" + postId + ":like_count");
if (count != null) {
return Long.parseLong(count);
}

# 缓存未命中,从 Sorted Set 获取
Long size = redisTemplate.opsForZSet().zCard("post:" + postId + ":likes");
if (size != null) {
# 写入缓存
redisTemplate.opsForValue().set("post:" + postId + ":like_count", size.toString(), 1, TimeUnit.HOURS);
return size;
}

return 0L;
}

// 检查是否点赞
public boolean isLiked(Long postId, Long userId) {
Double score = redisTemplate.opsForZSet().score(
"post:" + postId + ":likes",
"user:" + userId
);
return score != null;
}

// 获取点赞用户列表
public List<Long> getLikeUsers(Long postId, int offset, int limit) {
Set<String> users = redisTemplate.opsForZSet().reverseRange(
"post:" + postId + ":likes",
offset,
offset + limit - 1
);

return users.stream()
.map(user -> Long.parseLong(user.substring(5))) # 去掉 "user:" 前缀
.collect(Collectors.toList());
}
}

11.5 如何实现抽奖功能?

答案:

方案 1:使用 Set(简单抽奖)

# 添加参与用户
SADD lottery:2024 user:1001 user:1002 user:1003

# 抽取 1 个中奖用户
SRANDMEMBER lottery:2024 1

# 抽取 3 个中奖用户(不重复)
SPOP lottery:2024 3

方案 2:使用 Sorted Set(权重抽奖)

# 添加参与用户(score 为权重)
ZADD lottery:2024 1 user:1001 # user:1001 的权重为 1
ZADD lottery:2024 10 user:1002 # user:1002 的权重为 10

# 抽奖算法(Lua 实现)

Lua 抽奖脚本:

-- 权重抽奖
local key = KEYS[1]
local count = tonumber(ARGV[1])

local totalWeight = 0
local members = redis.call('ZRANGE', key, 0, -1, 'WITHSCORES')

# 计算总权重
for i = 1, #members, 2 do
totalWeight = totalWeight + tonumber(members[i + 1])
end

local winners = {}
local winnerCount = 0

while winnerCount < count and #members > 0 do
# 随机生成权重值
local randomWeight = math.random() * totalWeight

# 查找对应的用户
local currentWeight = 0
for i = 1, #members, 2 do
currentWeight = currentWeight + tonumber(members[i + 1])
if currentWeight >= randomWeight then
table.insert(winners, members[i])

# 删除已中奖用户
redis.call('ZREM', key, members[i])

# 更新总权重
totalWeight = totalWeight - tonumber(members[i + 1])

# 移除该用户
table.remove(members, i)
table.remove(members, i)

winnerCount = winnerCount + 1
break
end
end
end

return winners

Java 实现:

@Service
public class LotteryService {
@Autowired
private RedisTemplate redisTemplate;

// 参与抽奖
public void join(Long userId, Long activityId) {
redisTemplate.opsForSet().add("lottery:" + activityId, "user:" + userId);
}

// 抽奖
public List<Long> draw(Long activityId, int count) {
// 使用 Lua 脚本保证原子性
String luaScript =
"local winners = {} " +
"for i = 1, tonumber(ARGV[1]) do " +
" local member = redis.call('SPOP', KEYS[1]) " +
" if member == false then " +
" break " +
" end " +
" table.insert(winners, member) " +
"end " +
"return winners";

DefaultRedisScript<List> script = new DefaultRedisScript<>(luaScript, List.class);
List<String> winners = redisTemplate.execute(
script,
Collections.singletonList("lottery:" + activityId),
String.valueOf(count)
);

return winners.stream()
.map(user -> Long.parseLong(user.substring(5)))
.collect(Collectors.toList());
}

// 权重抽奖
public List<Long> weightedDraw(Long activityId, int count) {
String luaScript =
"local key = KEYS[1] " +
"local count = tonumber(ARGV[1]) " +
"local totalWeight = 0 " +
"local members = redis.call('ZRANGE', key, 0, -1, 'WITHSCORES') " +
"for i = 1, #members, 2 do " +
" totalWeight = totalWeight + tonumber(members[i + 1]) " +
"end " +
"local winners = {} " +
"local winnerCount = 0 " +
"while winnerCount < count and #members > 0 do " +
" local randomWeight = math.random() * totalWeight " +
" local currentWeight = 0 " +
" for i = 1, #members, 2 do " +
" currentWeight = currentWeight + tonumber(members[i + 1]) " +
" if currentWeight >= randomWeight then " +
" table.insert(winners, members[i]) " +
" redis.call('ZREM', key, members[i]) " +
" totalWeight = totalWeight - tonumber(members[i + 1]) " +
" table.remove(members, i) " +
" table.remove(members, i) " +
" winnerCount = winnerCount + 1 " +
" break " +
" end " +
" end " +
"end " +
"return winners";

DefaultRedisScript<List> script = new DefaultRedisScript<>(luaScript, List.class);
List<String> winners = redisTemplate.execute(
script,
Collections.singletonList("lottery:weighted:" + activityId),
String.valueOf(count)
);

return winners.stream()
.map(user -> Long.parseLong(user.substring(5)))
.collect(Collectors.toList());
}
}

十二、总结

12.1 Redis 高级特性回顾

  1. 发布订阅:实时消息推送,但不持久化
  2. Lua 脚本:原子性执行复杂逻辑
  3. Pipeline:减少网络开销,提升性能
  4. 慢查询分析:发现和优化性能瓶颈
  5. Bitmap:位图,节省内存,适合二值状态
  6. HyperLogLog:基数统计,内存占用极小
  7. GEO:地理位置,附近的人
  8. Stream:消息队列,支持持久化和消费者组
  9. 客户端管理:连接池、监控
  10. 服务器管理:配置、监控、运维

12.2 面试准备建议

  1. 深入理解原理:每个特性的底层实现
  2. 对比分析:不同方案的优缺点
  3. 场景设计:结合实际业务场景
  4. 性能优化:监控、调优经验
  5. 最佳实践:生产环境的注意事项

12.3 学习资源

  • 官方文档https://redis.io/documentation
  • 源码分析:《Redis 设计与实现》
  • 实战案例:《Redis 开发与运维》
  • 在线教程:Redis 官方教程

参考文档: