跳到主要内容

重试机制

2025年01月20日
柏拉文
越努力,越幸运

一、指数退避重试


1.1 认识

指数退避算法 retryDelay: Math.min(baseDelay * Math.pow(2, attempts), maxDelay) + Math.random() * jitterRange

  • baseDelay * Math.pow(2, attempts): 指数递增延迟

  • maxDelay: 最大延迟限制为 maxDelay

  • Math.random() * jitterRange: 随机抖动, 使延迟不完全固定,从而避免并发请求重试时发生雪崩效应,防止多个任务在相同的延迟下同时重试。

因此, 未获取锁, 重试退避机制: 如果未能获取锁, 引入 指数退避算法(Exponential Backoff, 计算得出的延迟时间会随着重试次数指数增长, 并加入最大延迟限制。同时加入随机抖动(jitterRange)来防止多个任务在相同的延迟下同时重试。 设置最大重试次数,避免无限等待,避免无限重试。通过使用锁机制来确保在多个工作进程中只有一个进程能够执行特定任务,而其他进程则会根据指数退避算法进行重试

1.2 实现

const Redis = require("ioredis");

const redis = new Redis({
port: 6379,
host: "127.0.0.1",
});

redis.on("error", (error) => {
console.log("Redis Error:", error);
});

redis.on("close", () => {
console.log("Redis Close!!");
});

redis.on("connect", () => {
console.log("Redis Connect Success!!");
});

async function acquireLock(redis, key, value, expiration) {
const result = await redis.set(key, value, "NX", "EX", expiration);

return result === "OK";
}

async function releaseLock(redis, key, value) {
const script = `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`;
return await redis.eval(script, 1, key, value);
}

async function lockWithRetry(
redis,
taskId,
callback,
others = {
baseDelay: 1,
maxDelay: 60,
retryCount: 0,
maxRetries: 5,
jitterRange: 0.5,
}
) {
const { maxDelay, baseDelay, maxRetries, jitterRange } = others;

let { retryCount } = others;
const lockKey = `lock:task:${taskId}`;
const uniqueValue = `worker-${Date.now()}`;

while (retryCount < maxRetries) {
const acquired = await acquireLock(redis, lockKey, uniqueValue, 60);

if (!acquired) {
const retryDelay =
Math.min(baseDelay * Math.pow(2, retryCount), maxDelay) +
Math.random() * jitterRange;
console.log(
`任务 ${taskId} 没有获取到锁, ${retryDelay.toFixed(
2
)} 秒后重试, 当前重试次数为 ${retryCount + 1}`
);
await new Promise((resolve) => setTimeout(resolve, retryDelay * 1000));
retryCount++;
} else {
try {
console.log(`开始执行任务 ${taskId}`);
await callback();
console.log(`任务 ${taskId} 执行完成`);
break;
} catch (error) {
console.log(`任务 ${taskId} 执行失败: ${error.message}`);
} finally {
await releaseLock(redis, lockKey, uniqueValue);
}
}
}

if (retryCount >= maxRetries) {
console.log(`任务 ${taskId} 达到最大重试次数 ${maxRetries},停止重试`);
}
}

async function run() {
const taskId = "1997";

const taskCallback = async () => {
await new Promise((resolve) => setTimeout(resolve, 5000));
};

for (let i = 0; i < 20; i++) {
lockWithRetry(redis, taskId, taskCallback);
}
}

run();