跳到主要内容

分布式锁滑动窗口计数器

2025年01月06日
柏拉文
越努力,越幸运

一、认识


在高并发场景下,基于 Redis 滑动窗口计数的实现可能出现并发更新的问题,例如多个客户端同时操作 Redis 的有序集合(Sorted Set),导致数据不一致。在这种情况下,可以引入分布式锁,确保对 Redis 的操作具有原子性和安全性, 确保多个客户端在并发访问时,通过分布式锁避免计数器的竞争冲突, 通过分布式锁确保只有一个客户端能够操作计数器。实现原理为: 在操作有序集合之前 首先获取分布式锁, 使用 RedisSET 命令设置锁, 设置锁的自动过期时间,避免死锁, NX 保证锁不存在时才能设置,PX 设置锁过期时间, 过期时间设置为请求可能最长处理时间的稍大值,防止因未释放锁导致死锁。然后计数器操作, 获取锁后, 利用 Redis 有序集合(ZSET)存储请求的时间戳,并动态调整统计范围,使得时间窗口可以 滑动具体步骤为: 为每个用户或操作通过 ZADD 创建一个有序集合键,值为请求的时间戳。当用户发起请求时, 添加当前时间戳到有序集合,通过 ZREMRANGEBYSCORE 删除集合中超出时间窗口范围的旧时间戳, 通过 ZCARD 获取集合中当前时间窗口内的元素数量, 并设置键的过期时间, 避免无用数据长期占用存储。如果获取不到锁,进入 while 循环,重新尝试获取锁,每次重启后等待 retryDelay 毫秒,其中, retryDelay 可以指定一个时间间隔,也可以采用指数退避算法(如 retryDelay = Math.pow(2, attempts) * 100),避免锁竞争。最后,释放锁,通过 Lua 脚本保证释放锁的原子性, 确保只有当前请求可以释放锁,避免误删其他客户端的锁。

二、Node.js


2.1 retryDelay 时间间隔

固定时间间隔(如 retryDelay = 100ms

const redis = require('redis');
const { v4: uuidv4 } = require('uuid'); // 用于生成唯一请求 ID

// 创建 Redis 客户端
const redisClient = redis.createClient();

(async () => {
await redisClient.connect();
console.log("Connected to Redis!");
})();

// 分布式锁默认配置
const LOCK_KEY = 'sliding_window:lock';
const LOCK_EXPIRATION = 5000; // 锁过期时间(毫秒)

/**
* 获取分布式锁
* @param {string} lockKey - 锁的 Redis 键名
* @param {string} requestId - 唯一请求标识
* @param {number} ttl - 锁的过期时间(毫秒)
* @returns {Promise<boolean>} - 是否获取到锁
*/
async function acquireLock(lockKey, requestId, ttl) {
const result = await redisClient.set(lockKey, requestId, {
NX: true, // 锁不存在时才设置
PX: ttl, // 设置过期时间
});
return result === 'OK';
}

/**
* 释放分布式锁
* @param {string} lockKey - 锁的 Redis 键名
* @param {string} requestId - 唯一请求标识
* @returns {Promise<boolean>} - 是否释放成功
*/
async function releaseLock(lockKey, requestId) {
const releaseScript = `
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
`;
const result = await redisClient.eval(releaseScript, {
keys: [lockKey],
arguments: [requestId],
});
return result === 1; // 1 表示成功释放锁
}

/**
* 滑动窗口计数器(带分布式锁和重试机制)
* @param {string} key - Redis 键名,用于存储请求记录
* @param {number} windowSizeInSeconds - 滑动窗口大小(秒)
* @param {number} maxRetries - 最大重试次数
* @param {number} retryDelay - 每次重试的间隔时间(毫秒)
* @returns {Promise<number | null>} - 当前时间窗口内的请求数量,或 null 表示失败
*/
async function slidingWindowCounterWithRetry(key, windowSizeInSeconds, maxRetries = 3, retryDelay = 100) {
const requestId = uuidv4(); // 唯一请求 ID
let attempts = 0;

while (attempts <= maxRetries) {
const lockAcquired = await acquireLock(LOCK_KEY, requestId, LOCK_EXPIRATION);

if (lockAcquired) {
try {
const currentTime = Date.now(); // 当前时间戳(毫秒)
const windowStart = currentTime - windowSizeInSeconds * 1000; // 窗口开始时间

// 添加当前请求时间戳到有序集合
await redisClient.zAdd(key, { score: currentTime, value: currentTime.toString() });

// 删除窗口外的请求记录
await redisClient.zRemRangeByScore(key, 0, windowStart);

// 获取窗口内的请求数量
const requestCount = await redisClient.zCard(key);

// 设置键的过期时间,避免无用数据长期占用存储
await redisClient.expire(key, windowSizeInSeconds + 1);

return requestCount; // 成功返回计数结果
} catch (err) {
console.error("Error in sliding window counter:", err);
return null; // 发生错误时返回 null
} finally {
// 释放锁
const lockReleased = await releaseLock(LOCK_KEY, requestId);
if (lockReleased) {
console.log("Lock released successfully.");
} else {
console.warn("Failed to release lock, potential issue!");
}
}
} else {
// 锁未获取到,增加重试逻辑
attempts++;
console.log(`Retrying to acquire lock... Attempt ${attempts}/${maxRetries}`);
await new Promise((resolve) => setTimeout(resolve, retryDelay)); // 等待一段时间再重试
}
}

console.warn("Failed to acquire lock after maximum retries.");
return null; // 如果超过最大重试次数,返回 null
}

// 示例:模拟 3 次获取锁失败后成功的场景:
(async () => {
const key = 'api:sliding_window_with_retry'; // Redis 键名
const windowSize = 10; // 窗口大小(秒)

// 调用带重试的滑动窗口计数器
const requestCount = await slidingWindowCounterWithRetry(key, windowSize, 5, 200);

if (requestCount !== null) {
console.log(`Requests in the last ${windowSize} seconds: ${requestCount}`);
} else {
console.error("Failed to update the sliding window counter.");
}
})();

2.2 retryDelay 指数退避算法

将重试机制的 固定间隔 优化为 指数退避策略 是一种更优的方案,尤其是在高并发环境下,可以有效地减少锁竞争,并避免集中重试导致的资源浪费。指数退避(Exponential Backoff)是指: 每次重试的间隔时间按照指数增长(如 2^n),逐步增加, 增加一个随机抖动(jitter),防止大量请求在相同时间再次争抢锁。计算公式为: Math.min(baseDelay * Math.pow(2, attempts), maxDelay) + Math.random() * jitterRangebaseDelay 为基础延迟时间, attempts 为当前重试的次数,使用 baseDelay * 2^attempts 动态调整重试间隔时间。maxDelay 为最大延迟时间,限制最大间隔时间为 maxDelay,防止过长的延迟。randomJitter 为随机抖动时间, 每次重试增加随机抖动 randomJitter,避免并发请求同时重试导致锁竞争。通过 引入指数退避策略,可以显著降低高并发场景下锁竞争的冲突概率,同时最大限度提高获取锁的成功率,增强系统的稳定性和可靠性。

const redis = require('redis');
const { v4: uuidv4 } = require('uuid'); // 用于生成唯一请求 ID

// 创建 Redis 客户端
const redisClient = redis.createClient();

(async () => {
await redisClient.connect();
console.log("Connected to Redis!");
})();

// 分布式锁默认配置
const LOCK_KEY = 'sliding_window:lock';
const LOCK_EXPIRATION = 5000; // 锁过期时间(毫秒)

/**
* 获取分布式锁
* @param {string} lockKey - 锁的 Redis 键名
* @param {string} requestId - 唯一请求标识
* @param {number} ttl - 锁的过期时间(毫秒)
* @returns {Promise<boolean>} - 是否获取到锁
*/
async function acquireLock(lockKey, requestId, ttl) {
const result = await redisClient.set(lockKey, requestId, {
NX: true, // 锁不存在时才设置
PX: ttl, // 设置过期时间
});
return result === 'OK';
}

/**
* 释放分布式锁
* @param {string} lockKey - 锁的 Redis 键名
* @param {string} requestId - 唯一请求标识
* @returns {Promise<boolean>} - 是否释放成功
*/
async function releaseLock(lockKey, requestId) {
const releaseScript = `
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
`;
const result = await redisClient.eval(releaseScript, {
keys: [lockKey],
arguments: [requestId],
});
return result === 1; // 1 表示成功释放锁
}

/**
* 滑动窗口计数器(带分布式锁和指数退避重试机制)
* @param {string} key - Redis 键名,用于存储请求记录
* @param {number} windowSizeInSeconds - 滑动窗口大小(秒)
* @param {number} maxRetries - 最大重试次数
* @param {number} baseDelay - 基础延迟时间(毫秒)
* @param {number} maxDelay - 最大延迟时间(毫秒)
* @param {number} jitterRange - 随机抖动范围(毫秒)
* @returns {Promise<number | null>} - 当前时间窗口内的请求数量,或 null 表示失败
*/
async function slidingWindowCounterWithExponentialBackoff(
key,
windowSizeInSeconds,
maxRetries = 5,
baseDelay = 100,
maxDelay = 2000,
jitterRange = 200
) {
const requestId = uuidv4(); // 唯一请求 ID
let attempts = 0;

while (attempts <= maxRetries) {
const lockAcquired = await acquireLock(LOCK_KEY, requestId, LOCK_EXPIRATION);

if (lockAcquired) {
try {
const currentTime = Date.now(); // 当前时间戳(毫秒)
const windowStart = currentTime - windowSizeInSeconds * 1000; // 窗口开始时间

// 添加当前请求时间戳到有序集合
await redisClient.zAdd(key, { score: currentTime, value: currentTime.toString() });

// 删除窗口外的请求记录
await redisClient.zRemRangeByScore(key, 0, windowStart);

// 获取窗口内的请求数量
const requestCount = await redisClient.zCard(key);

// 设置键的过期时间,避免无用数据长期占用存储
await redisClient.expire(key, windowSizeInSeconds + 1);

return requestCount; // 成功返回计数结果
} catch (err) {
console.error("Error in sliding window counter:", err);
return null; // 发生错误时返回 null
} finally {
// 释放锁
const lockReleased = await releaseLock(LOCK_KEY, requestId);
if (lockReleased) {
console.log("Lock released successfully.");
} else {
console.warn("Failed to release lock, potential issue!");
}
}
} else {
// 未获取到锁,执行指数退避策略
attempts++;
const retryDelay = Math.min(baseDelay * Math.pow(2, attempts), maxDelay) +
Math.random() * jitterRange;
console.log(
`Retrying to acquire lock... Attempt ${attempts}/${maxRetries}. Delay: ${retryDelay.toFixed(
2
)}ms`
);
await new Promise((resolve) => setTimeout(resolve, retryDelay));
}
}

console.warn("Failed to acquire lock after maximum retries.");
return null; // 如果超过最大重试次数,返回 null
}

/**
* 模拟滑动窗口计数器的使用
*/
(async () => {
try {
const key = 'api:sliding_window_with_backoff'; // Redis 键名
const windowSize = 10; // 滑动窗口大小(秒)

// 模拟 5 个并发请求
const promises = Array.from({ length: 5 }, async (_, i) => {
const requestCount = await slidingWindowCounterWithExponentialBackoff(
key,
windowSize,
5, // 最大重试次数
100, // 基础延迟时间(毫秒)
2000, // 最大延迟时间(毫秒)
300 // 随机抖动范围(毫秒)
);

console.log(`Request ${i + 1}:`, requestCount !== null
? `Requests in the last ${windowSize} seconds: ${requestCount}`
: "Failed to process request.");
});

await Promise.all(promises);
} catch (error) {
console.error("Simulation error:", error);
} finally {
redisClient.quit(); // 关闭 Redis 连接
}
})();

三、Lua 脚本


四、Pipeline


如果并发量高,可以通过 Redis Pipeline 批量执行命令,减少网络开销。

五、Lua Node.js


Lua 脚本可以将多个 Redis 操作封装为一个原子性事务,减少网络延迟和数据竞争。减少网络往返,适合复杂业务逻辑。所有操作在 Redis 内部一次执行,具有原子性。如果业务逻辑复杂,涉及多次读写并需要保证原子性,比如滑动窗口计数器、分布式锁等,推荐使用 Lua 脚本。

const { createClient } = require('redis');

const redisClient = createClient();
redisClient.connect();

// Lua 脚本:带分布式锁的滑动窗口计数器
const slidingWindowLuaScript = `
local key = KEYS[1]
local currentTime = tonumber(ARGV[1])
local windowSize = tonumber(ARGV[2])
local lockKey = KEYS[2]
local lockValue = ARGV[3]
local lockTTL = tonumber(ARGV[4])

-- 尝试获取锁
if redis.call("SETNX", lockKey, lockValue) == 1 then
redis.call("PEXPIRE", lockKey, lockTTL)

-- 删除窗口外的请求记录
redis.call("ZREMRANGEBYSCORE", key, 0, currentTime - windowSize)

-- 添加当前请求时间
redis.call("ZADD", key, currentTime, currentTime)

-- 获取当前窗口内请求数量
local count = redis.call("ZCARD", key)

-- 设置键的过期时间
redis.call("EXPIRE", key, windowSize + 1)

-- 释放锁
redis.call("DEL", lockKey)

return count
else
return nil -- 表示未获取锁
end
`;

/**
* 滑动窗口计数器(基于 Lua 脚本,带分布式锁)
* @param {string} key - Redis 键名
* @param {string} lockKey - 分布式锁键名
* @param {number} windowSizeInSeconds - 滑动窗口大小(秒)
* @param {number} maxRetries - 最大重试次数
* @param {number} baseDelay - 基础重试延迟(毫秒)
* @returns {Promise<number|null>} - 当前窗口内的请求数量或 null(未获取锁)
*/
async function slidingWindowWithLuaAndLock(
key,
lockKey,
windowSizeInSeconds,
maxRetries,
baseDelay
) {
const lockValue = `${Date.now()}-${Math.random()}`; // 锁的唯一标识
const lockTTL = 1000; // 锁的过期时间(毫秒)
const currentTime = Date.now();

let attempt = 0;

while (attempt < maxRetries) {
const result = await redisClient.eval(
slidingWindowLuaScript,
2, // 键数量
key, // 滑动窗口的键
lockKey, // 分布式锁的键
currentTime, // 当前时间戳
windowSizeInSeconds * 1000, // 滑动窗口大小(毫秒)
lockValue, // 锁的唯一标识
lockTTL // 锁的过期时间
);

if (result !== null) {
return result; // 成功获取锁并执行滑动窗口逻辑
}

// 未获取到锁,使用指数退避策略重试
const delay = Math.min(baseDelay * 2 ** attempt, 5000); // 最大退避时间限制为 5 秒
await new Promise((resolve) => setTimeout(resolve, delay));
attempt++;
}

return null; // 超过重试次数未获取到锁
}

/**
* 模拟滑动窗口计数器(Lua + 分布式锁)
*/
(async () => {
try {
const key = 'api:sliding_window_lua_with_lock';
const lockKey = `${key}:lock`;
const windowSize = 10; // 窗口大小(秒)
const maxRetries = 5; // 最大重试次数
const baseDelay = 100; // 初始延迟(毫秒)

const result = await slidingWindowWithLuaAndLock(key, lockKey, windowSize, maxRetries, baseDelay);
console.log(`Requests in the last ${windowSize} seconds: ${result}`);
} catch (error) {
console.error("Error in Lua-based sliding window with lock:", error);
} finally {
redisClient.quit();
}
})();

六、Pipeline Node.js


如果并发量高,可以通过 Redis Pipeline 批量执行命令,减少网络开销。操作之间没有原子性,适合需要简单批量操作的场景。减少 RTT,但不适合复杂逻辑。Pipeline 是一种批量发送 Redis 命令的方式,可以减少网络往返次数,但操作之间的顺序和原子性由客户端控制。如果仅需要批量操作(如删除键值、批量设置值等)且对原子性要求较低,可以选择 Pipeline,提高性能的同时降低开发复杂度。

/**
* 滑动窗口计数器(基于 Pipeline,带分布式锁)
* @param {string} key - Redis 键名
* @param {string} lockKey - 分布式锁键名
* @param {number} windowSizeInSeconds - 滑动窗口大小(秒)
* @param {number} maxRetries - 最大重试次数
* @param {number} baseDelay - 基础重试延迟(毫秒)
* @returns {Promise<number|null>} - 当前窗口内的请求数量或 null(未获取锁)
*/
async function slidingWindowWithPipelineAndLock(
key,
lockKey,
windowSizeInSeconds,
maxRetries,
baseDelay
) {
const lockValue = `${Date.now()}-${Math.random()}`; // 锁的唯一标识
const lockTTL = 1000; // 锁的过期时间(毫秒)
const currentTime = Date.now();
const windowStart = currentTime - windowSizeInSeconds * 1000;

let attempt = 0;

while (attempt < maxRetries) {
const isLockSet = await redisClient.set(lockKey, lockValue, {
NX: true,
PX: lockTTL,
});

if (isLockSet) {
try {
// 开始 Pipeline 操作
const pipeline = redisClient.multi();

pipeline.zRemRangeByScore(key, 0, windowStart); // 删除窗口外的请求记录
pipeline.zAdd(key, { score: currentTime, value: currentTime.toString() }); // 添加当前请求时间
pipeline.zCard(key); // 获取当前窗口内的请求数量
pipeline.expire(key, windowSizeInSeconds + 1); // 设置键的过期时间

const results = await pipeline.exec();

// 获取 ZCARD 的结果(第三个命令的结果)
return results[2];
} finally {
await redisClient.del(lockKey); // 释放锁
}
}

// 未获取到锁,使用指数退避策略重试
const delay = Math.min(baseDelay * 2 ** attempt, 5000); // 最大退避时间限制为 5 秒
await new Promise((resolve) => setTimeout(resolve, delay));
attempt++;
}

return null; // 超过重试次数未获取到锁
}

/**
* 模拟滑动窗口计数器(Pipeline + 分布式锁)
*/
(async () => {
try {
const key = 'api:sliding_window_pipeline_with_lock';
const lockKey = `${key}:lock`;
const windowSize = 10; // 窗口大小(秒)
const maxRetries = 5; // 最大重试次数
const baseDelay = 100; // 初始延迟(毫秒)

const result = await slidingWindowWithPipelineAndLock(key, lockKey, windowSize, maxRetries, baseDelay);
console.log(`Requests in the last ${windowSize} seconds: ${result}`);
} catch (error) {
console.error("Error in Pipeline-based sliding window with lock:", error);
} finally {
redisClient.quit();
}
})();