跳到主要内容

任务队列

2025年01月20日
柏拉文
越努力,越幸运

一、Redis List 任务队列


1.1 认识

基于 RedisFIFO 先进先出任务队列: 使用 RedisLPUSH/RPOP 命令实现 先进先出(FIFO)队列实现逻辑为: 当任务未能获取锁时,将其放入队列 LPUSH。定期从队列 RPOP 取出任务并执行。任务按 FIFO 顺序依次执行,确保没有遗漏。这样的话,确保任务按照顺序依次执行,任务不可丢失,适合事务性处理。

1.2 实现

const Redis = require("ioredis");

const redis = new Redis({
port: 6379,
host: "127.0.0.1",
});

redis.on("error", (error) => {
console.log("Redis Error:", error);
});

redis.on("close", () => {
console.log("Redis Close!!");
});

redis.on("connect", () => {
console.log("Redis Connect Success!!");
});

async function acquireLock(redis, key, value, expiration) {
const result = await redis.set(key, value, "NX", "EX", expiration);

return result === "OK";
}

async function releaseLock(redis, key, value) {
const script = `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`;
return await redis.eval(script, 1, key, value);
}

async function enqueueTask(redis, taskId, index) {
const queueKey = `queue:${taskId}`;
await redis.lpush(queueKey, index);
console.log(`${taskId} ${index} 已加入队列`);
}

async function processQueue(redis, taskId, callback) {
const queueKey = `queue:${taskId}`;
const index = await redis.rpop(queueKey);

if (index) {
console.log(`从队列取出任务: ${index}`);
await lockWithQueue(redis, taskId, index, callback);
} else {
console.log("队列为空,等待新任务...");
}
}

async function lockWithQueue(redis, taskId, index, callback) {
const lockKey = `lock:key:${taskId}`;
const uniqueValue = `lock:value:${Date.now()}`;
const acquired = await acquireLock(redis, lockKey, uniqueValue, 60);

if (!acquired) {
console.log(`任务 ${taskId} ${index} 已被锁定,稍后重试`);
await enqueueTask(redis, taskId, index);
return;
}

try {
console.log(`开始处理 ${taskId} ${index}`);
await callback();
console.log(`${taskId} ${index} 处理完成`);
} catch (error) {
console.log(`处理 ${taskId} ${index} 出错,原因为: ${error}`);
} finally {
await releaseLock(redis, lockKey, uniqueValue);
processQueue(redis, taskId, callback);
}
}

async function handleTask() {
return new Promise((resolve) => {
setTimeout(() => {
resolve(true);
}, 3000);
});
}

async function run() {
const taskId = "学习 Redis";

for (let i = 0; i < 20; i++) {
lockWithQueue(redis, taskId, i, handleTask);
}
}

run();