跳到主要内容

分布式锁计数器

2025年01月06日
柏拉文
越努力,越幸运

一、认识


Redis 分布式锁计数器结合了分布式锁和计数器功能,可以确保在高并发场景下对共享资源的安全访问。用于实现一个分布式计数器,用于统计操作次数。确保多个客户端在并发访问时,通过分布式锁避免计数器的竞争冲突, 通过分布式锁确保只有一个客户端能够操作计数器。具体实现为: 首先获取分布式锁, 使用 RedisSET 命令设置锁, 设置锁的自动过期时间,避免死锁, NX 保证锁不存在时才能设置,PX 设置锁过期时间, 过期时间设置为请求可能最长处理时间的稍大值,防止因未释放锁导致死锁。然后计数器操作, 获取锁后,安全地操作计数器, 通过分布式锁确保只有一个客户端能够操作计数器。如果获取不到锁,进入 while 循环,重新尝试获取锁,每次重启后等待 retryDelay 毫秒,其中, retryDelay 可以指定一个时间间隔,也可以采用指数退避算法(如 retryDelay = Math.pow(2, attempts) * 100),避免锁竞争。最后,释放锁,通过 Lua 脚本保证释放锁的原子性, 确保只有当前请求可以释放锁,避免误删其他客户端的锁。例如确保某个任务最多运行 N 次。使用 Redis 的计数器维护任务执行次数,通过 INCR 检查是否超过上限。

二、Node.js


2.1 retryDelay 时间间隔

固定时间间隔(如 retryDelay = 100ms

const redis = require('redis');
const { v4: uuidv4 } = require('uuid'); // 用于生成唯一请求 ID

// 创建 Redis 客户端
const redisClient = redis.createClient();

(async () => {
await redisClient.connect(); // 连接 Redis
console.log("Connected to Redis!");
})();

// 分布式锁默认配置
const LOCK_KEY = 'distributed:lock';
const LOCK_EXPIRATION = 5000; // 锁过期时间(毫秒)

/**
* 获取分布式锁
* @param {string} lockKey - 锁的 Redis 键名
* @param {string} requestId - 唯一请求标识
* @param {number} ttl - 锁的过期时间(毫秒)
* @returns {Promise<boolean>} - 是否获取到锁
*/
async function acquireLock(lockKey, requestId, ttl) {
const result = await redisClient.set(lockKey, requestId, {
NX: true, // 锁不存在时才设置
PX: ttl, // 设置过期时间
});
return result === 'OK';
}

/**
* 释放分布式锁
* @param {string} lockKey - 锁的 Redis 键名
* @param {string} requestId - 唯一请求标识
* @returns {Promise<boolean>} - 是否释放成功
*/
async function releaseLock(lockKey, requestId) {
const releaseScript = `
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
`;
const result = await redisClient.eval(releaseScript, {
keys: [lockKey],
arguments: [requestId],
});
return result === 1; // 1 表示成功释放锁
}

/**
* 分布式锁计数器
* @param {string} counterKey - 计数器的 Redis 键名
* @returns {Promise<number>} - 当前计数器值
*/
async function distributedCounter(counterKey, maxRetries = 3, retryDelay = 100) {
let attempts = 0;
const requestId = uuidv4(); // 唯一请求 ID


while(attempts <= maxRetries){
const lockAcquired = await acquireLock(LOCK_KEY, requestId, LOCK_EXPIRATION);

if (!lockAcquired) {
attempts++;
console.log(`Retrying to acquire lock... Attempt ${attempts}/${maxRetries}`);
await new Promise((resolve) => setTimeout(resolve, retryDelay)); // 等待一段时间再重试
}

try {
// 安全操作计数器
const currentCount = await redisClient.incr(counterKey);
console.log(`Counter incremented to: ${currentCount}`);
return currentCount;
} catch (err) {
console.error("Error updating counter:", err);
return null;
} finally {
// 释放锁
const lockReleased = await releaseLock(LOCK_KEY, requestId);
if (lockReleased) {
console.log("Lock released successfully.");
} else {
console.warn("Failed to release lock, potential issue!");
}
}
}

console.warn("Failed to acquire lock after maximum retries.");
return null; // 如果超过最大重试次数,返回 null
}

// 示例:模拟分布式计数器操作
(async () => {
const counterKey = 'distributed:counter';

// 模拟多次调用计数器
for (let i = 0; i < 5; i++) {
await distributedCounter(counterKey);
}
})();

2.2 retryDelay 指数退避算法

将重试机制的 固定间隔 优化为 指数退避策略 是一种更优的方案,尤其是在高并发环境下,可以有效地减少锁竞争,并避免集中重试导致的资源浪费。指数退避(Exponential Backoff)是指: 每次重试的间隔时间按照指数增长(如 2^n),逐步增加, 增加一个随机抖动(jitter),防止大量请求在相同时间再次争抢锁。计算公式为: Math.min(baseDelay * Math.pow(2, attempts), maxDelay) + Math.random() * jitterRangebaseDelay 为基础延迟时间, attempts 为当前重试的次数,使用 baseDelay * 2^attempts 动态调整重试间隔时间。maxDelay 为最大延迟时间,限制最大间隔时间为 maxDelay,防止过长的延迟。randomJitter 为随机抖动时间, 每次重试增加随机抖动 randomJitter,避免并发请求同时重试导致锁竞争。通过 引入指数退避策略,可以显著降低高并发场景下锁竞争的冲突概率,同时最大限度提高获取锁的成功率,增强系统的稳定性和可靠性。

const redis = require('redis');
const { v4: uuidv4 } = require('uuid'); // 用于生成唯一请求 ID

// 创建 Redis 客户端
const redisClient = redis.createClient();

(async () => {
await redisClient.connect(); // 连接 Redis
console.log("Connected to Redis!");
})();

// 分布式锁默认配置
const LOCK_KEY = 'distributed:lock';
const LOCK_EXPIRATION = 5000; // 锁过期时间(毫秒)

/**
* 获取分布式锁
* @param {string} lockKey - 锁的 Redis 键名
* @param {string} requestId - 唯一请求标识
* @param {number} ttl - 锁的过期时间(毫秒)
* @returns {Promise<boolean>} - 是否获取到锁
*/
async function acquireLock(lockKey, requestId, ttl) {
const result = await redisClient.set(lockKey, requestId, {
NX: true, // 锁不存在时才设置
PX: ttl, // 设置过期时间
});
return result === 'OK';
}

/**
* 释放分布式锁
* @param {string} lockKey - 锁的 Redis 键名
* @param {string} requestId - 唯一请求标识
* @returns {Promise<boolean>} - 是否释放成功
*/
async function releaseLock(lockKey, requestId) {
const releaseScript = `
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
`;
const result = await redisClient.eval(releaseScript, {
keys: [lockKey],
arguments: [requestId],
});
return result === 1; // 1 表示成功释放锁
}

/**
* 分布式锁计数器(带指数退避重试策略)
* @param {string} counterKey - 计数器的 Redis 键名
* @param {number} maxRetries - 最大重试次数
* @param {number} baseDelay - 初始延迟(毫秒)
* @returns {Promise<number>} - 当前计数器值
*/
async function distributedCounter(counterKey, maxRetries = 5, baseDelay = 100) {
let attempts = 0;
const requestId = uuidv4(); // 唯一请求 ID

while (attempts < maxRetries) {
const lockAcquired = await acquireLock(LOCK_KEY, requestId, LOCK_EXPIRATION);

if (lockAcquired) {
try {
// 安全操作计数器
const currentCount = await redisClient.incr(counterKey);
console.log(`Counter incremented to: ${currentCount}`);
return currentCount;
} catch (err) {
console.error("Error updating counter:", err);
return null;
} finally {
// 释放锁
const lockReleased = await releaseLock(LOCK_KEY, requestId);
if (lockReleased) {
console.log("Lock released successfully.");
} else {
console.warn("Failed to release lock, potential issue!");
}
}
}

// 未获取到锁,使用指数退避策略重试
attempts++;
const delay = Math.min(baseDelay * 2 ** attempts, 5000); // 最大退避时间限制为 5 秒
const jitter = Math.random() * 100; // 随机抖动(0-100 毫秒)
console.log(`Retrying to acquire lock... Attempt ${attempts}/${maxRetries}. Delay: ${delay + jitter}ms`);

await new Promise((resolve) => setTimeout(resolve, delay + jitter)); // 等待一段时间再重试
}

console.warn("Failed to acquire lock after maximum retries.");
return null; // 如果超过最大重试次数,返回 null
}

// 示例:模拟分布式计数器操作
(async () => {
const counterKey = 'distributed:counter';

// 模拟多次调用计数器
for (let i = 0; i < 5; i++) {
await distributedCounter(counterKey);
}

redisClient.quit(); // 关闭 Redis 连接
})();

三、Lua 脚本


四、Pipeline


如果并发量高,可以通过 Redis Pipeline 批量执行命令,减少网络开销。

五 Lua Node.js


Lua 脚本可以将多个 Redis 操作封装为一个原子性事务,减少网络延迟和数据竞争。减少网络往返,适合复杂业务逻辑。所有操作在 Redis 内部一次执行,具有原子性。如果业务逻辑复杂,涉及多次读写并需要保证原子性,比如滑动窗口计数器、分布式锁等,推荐使用 Lua 脚本。

const { createClient } = require('redis');

const redisClient = createClient();
redisClient.connect();

// Lua 脚本:带分布式锁的计数器
const distributedLockCounterLuaScript = `
local key = KEYS[1]
local lockKey = KEYS[2]
local lockValue = ARGV[1]
local lockTTL = tonumber(ARGV[2])
local increment = tonumber(ARGV[3])

-- 尝试获取锁
if redis.call("SETNX", lockKey, lockValue) == 1 then
redis.call("PEXPIRE", lockKey, lockTTL)

-- 增加计数器值
local count = redis.call("INCRBY", key, increment)

-- 设置计数器过期时间(可选)
if redis.call("TTL", key) == -1 then
redis.call("EXPIRE", key, 3600) -- 默认过期时间为 1 小时
end

-- 释放锁
redis.call("DEL", lockKey)

return count
else
return nil -- 表示未获取锁
end
`;

/**
* 分布式锁计数器(基于 Lua 脚本)
* @param {string} key - 计数器的键
* @param {string} lockKey - 分布式锁的键
* @param {number} increment - 增量值
* @param {number} maxRetries - 最大重试次数
* @param {number} baseDelay - 初始延迟(毫秒)
* @returns {Promise<number|null>} - 当前计数值或 null(未获取锁)
*/
async function distributedLockCounterWithLua(
key,
lockKey,
increment,
maxRetries,
baseDelay
) {
const lockValue = `${Date.now()}-${Math.random()}`; // 锁的唯一标识
const lockTTL = 1000; // 锁的过期时间(毫秒)

let attempt = 0;

while (attempt < maxRetries) {
const result = await redisClient.eval(
distributedLockCounterLuaScript,
2, // 键数量
key, // 计数器的键
lockKey, // 分布式锁的键
lockValue, // 锁的唯一标识
lockTTL, // 锁的过期时间
increment // 增量值
);

if (result !== null) {
return result; // 成功获取锁并执行计数操作
}

// 未获取到锁,使用指数退避策略重试
const delay = Math.min(baseDelay * 2 ** attempt, 5000); // 最大退避时间限制为 5 秒
await new Promise((resolve) => setTimeout(resolve, delay));
attempt++;
}

return null; // 超过重试次数未获取到锁
}

/**
* 模拟分布式锁计数器(Lua 实现)
*/
(async () => {
try {
const key = 'counter:lua';
const lockKey = `${key}:lock`;
const increment = 1;
const maxRetries = 5; // 最大重试次数
const baseDelay = 100; // 初始延迟(毫秒)

const result = await distributedLockCounterWithLua(key, lockKey, increment, maxRetries, baseDelay);
console.log(`Counter value: ${result}`);
} catch (error) {
console.error("Error in Lua-based distributed lock counter:", error);
} finally {
redisClient.quit();
}
})();

六、Pipeline Node.js


如果并发量高,可以通过 Redis Pipeline 批量执行命令,减少网络开销。操作之间没有原子性,适合需要简单批量操作的场景。减少 RTT,但不适合复杂逻辑。Pipeline 是一种批量发送 Redis 命令的方式,可以减少网络往返次数,但操作之间的顺序和原子性由客户端控制。如果仅需要批量操作(如删除键值、批量设置值等)且对原子性要求较低,可以选择 Pipeline,提高性能的同时降低开发复杂度。

/**
* 分布式锁计数器(基于 Pipeline)
* @param {string} key - 计数器的键
* @param {string} lockKey - 分布式锁的键
* @param {number} increment - 增量值
* @param {number} maxRetries - 最大重试次数
* @param {number} baseDelay - 初始延迟(毫秒)
* @returns {Promise<number|null>} - 当前计数值或 null(未获取锁)
*/
async function distributedLockCounterWithPipeline(
key,
lockKey,
increment,
maxRetries,
baseDelay
) {
const lockValue = `${Date.now()}-${Math.random()}`; // 锁的唯一标识
const lockTTL = 1000; // 锁的过期时间(毫秒)

let attempt = 0;

while (attempt < maxRetries) {
const isLockSet = await redisClient.set(lockKey, lockValue, {
NX: true,
PX: lockTTL,
});

if (isLockSet) {
try {
// 开始 Pipeline 操作
const pipeline = redisClient.multi();

pipeline.incrBy(key, increment); // 增加计数器值
pipeline.ttl(key); // 获取当前过期时间

const results = await pipeline.exec();

const count = results[0]; // 获取 INCRBY 的结果
const ttl = results[1]; // 获取 TTL 的结果

// 如果没有设置过期时间,则设置默认值
if (ttl === -1) {
await redisClient.expire(key, 3600); // 默认过期时间为 1 小时
}

return count;
} finally {
await redisClient.del(lockKey); // 释放锁
}
}

// 未获取到锁,使用指数退避策略重试
const delay = Math.min(baseDelay * 2 ** attempt, 5000); // 最大退避时间限制为 5 秒
await new Promise((resolve) => setTimeout(resolve, delay));
attempt++;
}

return null; // 超过重试次数未获取到锁
}

/**
* 模拟分布式锁计数器(Pipeline 实现)
*/
(async () => {
try {
const key = 'counter:pipeline';
const lockKey = `${key}:lock`;
const increment = 1;
const maxRetries = 5; // 最大重试次数
const baseDelay = 100; // 初始延迟(毫秒)

const result = await distributedLockCounterWithPipeline(key, lockKey, increment, maxRetries, baseDelay);
console.log(`Counter value: ${result}`);
} catch (error) {
console.error("Error in Pipeline-based distributed lock counter:", error);
} finally {
redisClient.quit();
}
})();