调度器
2024年04月08日
一、认识
调度器: 采用并行任务调度,支持最大并发控制(parallelism
),具备任务队列管理、自动任务调度、暂停与恢复功能
-
pause
: 暂停调度任务执行 -
resume
: 继续任务调度执行 -
addTask
: 添加任务,进入任务队列, 执行scheduler
调度函数 -
scheduler
:while
循环检测parallelism
、暂停状态、队列长度, 进行调度执行任务
二、实现
class Scheduler {
constructor(parallelism) {
this.queue = [];
this.paused = false;
this.runningTask = 0;
this.parallelism = parallelism;
}
add(task, callback) {
return new Promise((resolve, reject) => {
const taskItem = {
reject,
resolve,
callback,
processor: () => Promise.resolve().then(() => task()),
};
this.queue.push(taskItem);
this.schedule();
});
}
pause() {
this.paused = true;
console.log("⚠️ 网络状态不佳,上传已暂停");
}
resume() {
if (this.paused) {
this.paused = false;
console.log("✅ 网络恢复,恢复上传");
this.schedule();
}
}
schedule() {
while (
!this.paused &&
this.runningTask < this.parallelism &&
this.queue.length
) {
this.runningTask++;
const taskItem = this.queue.shift();
const { processor, resolve, reject, callback } = taskItem;
processor()
.then((res) => {
resolve && resolve(res);
callback && callback(null, res);
})
.catch((error) => {
reject && reject(error);
callback && callback(error, null);
})
.finally(() => {
this.runningTask--;
this.schedule();
});
}
}
}
三、测试
const scheduler = new Scheduler(2);
function request(data) {
return new Promise((resolve) => {
setTimeout(() => {
console.log(`📤 上传 ${data.id}:`, data.chunk);
resolve();
}, 1000);
});
}
function requestWithRetry(data, retries = 3) {
return new Promise((resolve, reject) => {
function attempt(remaining) {
request(data)
.then(resolve)
.catch((err) => {
if (remaining > 0) {
console.log(
`⚠️ 任务 ${data.id} 失败,正在重试 (${3 - remaining + 1}/3)...`
);
attempt(remaining - 1);
} else {
reject(err);
}
});
}
attempt(retries);
});
}
function addTask(data) {
scheduler.add(
() => requestWithRetry(data),
(error, result) => {
console.log(`${data.id} 已上传完成`);
if (scheduler.queue.length === 0 && scheduler.runningTask === 0) {
console.log("✅ 上传队列已清空");
}
}
);
}
addTask({ id: 0, chunk: [0]});
addTask({ id: 1, chunk: [1]});
addTask({ id: 2, chunk: [2]});
addTask({ id: 3, chunk: [3]});