语法
一、主节点 - 单个从节点
基于 ioredis
,通过分别创建 master
和 slave
连接实现读写分离。主节点连接(redisMaster
), 通过指定主节点的 IP
和端口,确保写操作直接发送到主节点。从节点连接(redisSlave
), 通过指定从节点的 IP
和端口,实现读操作,从而减轻主节点的压力。如果主节点发生故障,客户端不会自动切换,需要手动处理连接。
const Redis = require('ioredis');
// 直接连接到主节点,用于写操作
const redisMaster = new Redis({
host: '192.168.1.100',
port: 6379,
// password: 'your_master_password', // 如有密码,可添加
});
// 直接连接到从节点,用于读操作
const redisSlave = new Redis({
host: '192.168.1.101',
port: 6379,
// password: 'your_slave_password', // 如有密码,可添加
});
async function testReadWrite() {
try {
// 写入数据到主节点
await redisMaster.set('foo', 'bar');
console.log('写入成功');
// 从从节点读取数据(注意复制延迟问题)
const value = await redisSlave.get('foo');
console.log('从从节点读取到的值:', value);
} catch (err) {
console.error('操作出错:', err);
} finally {
redisMaster.disconnect();
redisSlave.disconnect();
}
}
testReadWrite();
二、主节点 - 多个从节点
如果有多个 从节点(Slave
),我们可以在代码层面实现简单的负载均衡,在多个从节点之间随机或轮询选择一个来执行读操作。这样可以均衡多个从节点的查询压力,提高读性能。
2.1 随机分配从节点
const Redis = require('ioredis');
// 主节点(Master) - 只负责写入
const redisMaster = new Redis({
host: '192.168.1.100',
port: 6379,
});
// 从节点(Slaves) - 负责读取操作
const redisSlaves = [
new Redis({ host: '192.168.1.101', port: 6379 }),
new Redis({ host: '192.168.1.102', port: 6379 }),
new Redis({ host: '192.168.1.103', port: 6379 }),
];
// 随机选择一个从节点进行读操作
function getRandomSlave() {
return redisSlaves[Math.floor(Math.random() * redisSlaves.length)];
}
async function testReadWrite() {
try {
// 写入数据到主节点
await redisMaster.set('foo', 'bar');
console.log('写入成功');
// 随机选择一个从节点读取数据
const redisSlave = getRandomSlave();
const value = await redisSlave.get('foo');
console.log('从从节点读取到的值:', value);
} catch (err) {
console.error('操作出错:', err);
} finally {
redisMaster.disconnect();
redisSlaves.forEach(redis => redis.disconnect());
}
}
testReadWrite();
2.2 轮询分配从节点
const Redis = require('ioredis');
// 主节点(Master) - 只负责写入
const redisMaster = new Redis({
host: '192.168.1.100',
port: 6379,
});
// 从节点(Slaves) - 负责读取操作
const redisSlaves = [
new Redis({ host: '192.168.1.101', port: 6379 }),
new Redis({ host: '192.168.1.102', port: 6379 }),
new Redis({ host: '192.168.1.103', port: 6379 }),
];
let slaveIndex = 0;
// 轮询获取下一个从节点
function getNextSlave() {
const redisSlave = redisSlaves[slaveIndex];
slaveIndex = (slaveIndex + 1) % redisSlaves.length;
return redisSlave;
}
async function testReadWrite() {
try {
await redisMaster.set('foo', 'bar');
console.log('写入成功');
// 轮询选择一个从节点读取数据
const redisSlave = getNextSlave();
const value = await redisSlave.get('foo');
console.log(`从从节点 [${slaveIndex}] 读取到的值:`, value);
} catch (err) {
console.error('操作出错:', err);
}
}
setInterval(testReadWrite, 2000); // 每 2 秒执行一次,测试轮询效果
2.3 自定义连接池动态分配
一、连接池: 我们创建一个 SlavePool
类,管理所有从节点连接。每个从节点用一个对象表示,其中包含 ioredis
客户端和当前活跃的请求数。思路是: 为每个从节点维护一个活跃请求数,每次读操作时,选择当前活跃数最小的从节点来执行请求,从而实现简单的负载均衡。
const Redis = require('ioredis');
class SlavePool {
/**
* @param {Array<Object>} slaveConfigs - 每个从节点的配置,例如:{ host: '192.168.1.101', port: 6379 }
*/
constructor(slaveConfigs) {
// 为每个从节点创建一个包装对象,初始活跃数为 0
this.slaves = slaveConfigs.map(cfg => ({
client: new Redis(cfg),
active: 0,
}));
}
// 获取当前活跃请求数最小的从节点
getLeastLoadedSlave() {
return this.slaves.reduce((prev, curr) => (prev.active <= curr.active ? prev : curr));
}
/**
* 执行一个 Redis 命令到当前最空闲的从节点上
* @param {string} command - Redis 命令,例如 'get'
* @param {...any} args - 命令参数
* @returns {Promise<any>}
*/
async execute(command, ...args) {
const slaveWrapper = this.getLeastLoadedSlave();
// 请求开始前,将活跃数加 1
slaveWrapper.active++;
try {
// 利用 ioredis 的动态方法调用执行命令
const result = await slaveWrapper.client[command](...args);
return result;
} catch (error) {
throw error;
} finally {
// 请求结束后,将活跃数减 1
slaveWrapper.active--;
}
}
// 关闭所有从节点的连接
disconnect() {
this.slaves.forEach(slave => slave.client.disconnect());
}
}
二、读写分离: 主节点直接连接, 进行写入操作。多个节点则由我们刚刚实现的连接池来管理, 通过 slavePool.execute
方法自动选择当前负载最低的从节点进行读取。
// 主节点配置(写操作使用)
const master = new Redis({
host: '192.168.1.100',
port: 6379,
// 如果有密码请添加 password 属性
});
// 从节点配置数组(读操作使用)
const slaveConfigs = [
{ host: '192.168.1.101', port: 6379 },
{ host: '192.168.1.102', port: 6379 },
{ host: '192.168.1.103', port: 6379 },
];
// 实例化连接池
const slavePool = new SlavePool(slaveConfigs);
// 示例函数:先写入数据到主节点,然后从连接池中执行读操作
async function testReadWrite() {
try {
// 写操作:写入数据到主节点
await master.set('foo', 'bar');
console.log('写入成功');
// 读操作:通过连接池动态分配到负载较低的从节点
const value = await slavePool.execute('get', 'foo');
console.log('从从节点读取到的值:', value);
// 模拟并发多个读请求,展示动态负载均衡效果
const concurrentReads = [];
for (let i = 0; i < 10; i++) {
concurrentReads.push(slavePool.execute('get', 'foo'));
}
const results = await Promise.all(concurrentReads);
console.log('并发读取结果:', results);
} catch (err) {
console.error('操作出错:', err);
} finally {
master.disconnect();
slavePool.disconnect();
}
}
testReadWrite();
2.4 基于 generic-pool 连接池动态分配
结合 generic-pool
与 ioredis
,实现多个从节点之间的动态负载均衡, 过程如下:
-
为从节点构建连接池, 每个连接池使用
generic-pool
管理,通过factory.create
创建Redis
连接,确保等待ready
事件后再返回。配置了最大/最小连接数和空闲超时时间,可根据实际负载调整。 -
主节点直接用
ioredis
进行写操作。 -
从节点进行读取操作, 通过
MultiSlavePool
动态分配到负载较低的从节点, 从所有从节点的池中选择 等待请求数最少 的那个池,并从中获取连接执行Redis
命令。命令执行完成后,将连接归还给对应的连接池
const Redis = require('ioredis');
const genericPool = require('generic-pool');
/**
* 为指定的从节点配置创建一个 Redis 连接池
* @param {Object} slaveConfig - 从节点配置,例如 { host: '192.168.1.101', port: 6379 }
* @returns {Pool} generic-pool 实例
*/
function createRedisPool(slaveConfig) {
// 定义连接工厂
const factory = {
create: () => {
return new Promise((resolve, reject) => {
const client = new Redis(slaveConfig);
// 等待连接建立
client.once('ready', () => resolve(client));
client.once('error', (err) => {
client.disconnect();
reject(err);
});
});
},
destroy: (client) => {
return client.quit();
}
};
// 连接池配置:可根据业务情况调整最大最小连接数
const opts = {
max: 5, // 池中最大连接数
min: 1, // 池中最小连接数
idleTimeoutMillis: 30000,
};
return genericPool.createPool(factory, opts);
}
/**
* MultiSlavePool 类:管理多个从节点连接池,实现动态负载均衡
*/
class MultiSlavePool {
/**
* @param {Array<Object>} slaveConfigs - 多个从节点配置
*/
constructor(slaveConfigs) {
// 为每个从节点创建一个连接池
this.pools = slaveConfigs.map(cfg => createRedisPool(cfg));
}
/**
* 根据各连接池当前待处理请求数(pending)选择最空闲的池
* @returns {Pool} 选择的连接池
*/
getLeastLoadedPool() {
// 默认选第一个池
let selectedPool = this.pools[0];
// 遍历所有池,选择 pending 数量最少的
for (const pool of this.pools) {
// generic-pool 提供 pending 属性,代表当前排队等待连接的请求数
if (pool.pending < selectedPool.pending) {
selectedPool = pool;
}
}
return selectedPool;
}
/**
* 执行一个 Redis 命令:从当前最空闲的连接池获取连接,执行后释放
* @param {string} command - Redis 命令,例如 'get'
* @param {...any} args - 命令参数
* @returns {Promise<any>} 命令执行结果
*/
async execute(command, ...args) {
const pool = this.getLeastLoadedPool();
// 从选定的池中借用一个连接
const client = await pool.acquire();
try {
// 利用 ioredis 的动态方法调用执行命令
const result = await client[command](...args);
return result;
} finally {
// 无论成功或失败,都要释放连接
pool.release(client);
}
}
/**
* 关闭所有连接池,释放资源
*/
async drain() {
for (const pool of this.pools) {
await pool.drain();
await pool.clear();
}
}
}
/* ===========================
示例:读写分离场景
- 主节点用于写入操作,直接使用 ioredis 连接
- 读取操作使用 MultiSlavePool 从多个从节点中动态选择
=========================== */
// 主节点配置(写操作)
const master = new Redis({
host: '192.168.1.100',
port: 6379,
// 如有密码,可增加 password 字段
});
// 从节点配置数组(读操作使用)
const slaveConfigs = [
{ host: '192.168.1.101', port: 6379 },
{ host: '192.168.1.102', port: 6379 },
{ host: '192.168.1.103', port: 6379 },
];
// 实例化 MultiSlavePool
const multiSlavePool = new MultiSlavePool(slaveConfigs);
async function testReadWrite() {
try {
// 写入操作:将数据写入主节点
await master.set('foo', 'bar');
console.log('主节点写入成功');
// 读操作:使用 MultiSlavePool 执行 'get' 命令
const value = await multiSlavePool.execute('get', 'foo');
console.log('从从节点读取到的值:', value);
// 模拟并发多个读请求,展示动态负载均衡效果
const concurrentReads = [];
for (let i = 0; i < 10; i++) {
concurrentReads.push(multiSlavePool.execute('get', 'foo'));
}
const results = await Promise.all(concurrentReads);
console.log('并发读取结果:', results);
} catch (err) {
console.error('操作出错:', err);
} finally {
master.disconnect();
await multiSlavePool.drain();
}
}
testReadWrite();