跳到主要内容

语法

2025年02月22日
柏拉文
越努力,越幸运

一、主节点 - 单个从节点


基于 ioredis,通过分别创建 masterslave 连接实现读写分离。主节点连接(redisMaster, 通过指定主节点的 IP 和端口,确保写操作直接发送到主节点。从节点连接(redisSlave, 通过指定从节点的 IP 和端口,实现读操作,从而减轻主节点的压力。如果主节点发生故障,客户端不会自动切换,需要手动处理连接。

const Redis = require('ioredis');

// 直接连接到主节点,用于写操作
const redisMaster = new Redis({
host: '192.168.1.100',
port: 6379,
// password: 'your_master_password', // 如有密码,可添加
});

// 直接连接到从节点,用于读操作
const redisSlave = new Redis({
host: '192.168.1.101',
port: 6379,
// password: 'your_slave_password', // 如有密码,可添加
});

async function testReadWrite() {
try {
// 写入数据到主节点
await redisMaster.set('foo', 'bar');
console.log('写入成功');

// 从从节点读取数据(注意复制延迟问题)
const value = await redisSlave.get('foo');
console.log('从从节点读取到的值:', value);
} catch (err) {
console.error('操作出错:', err);
} finally {
redisMaster.disconnect();
redisSlave.disconnect();
}
}

testReadWrite();

二、主节点 - 多个从节点


如果有多个 从节点(Slave,我们可以在代码层面实现简单的负载均衡,在多个从节点之间随机或轮询选择一个来执行读操作。这样可以均衡多个从节点的查询压力,提高读性能。

2.1 随机分配从节点

const Redis = require('ioredis');

// 主节点(Master) - 只负责写入
const redisMaster = new Redis({
host: '192.168.1.100',
port: 6379,
});

// 从节点(Slaves) - 负责读取操作
const redisSlaves = [
new Redis({ host: '192.168.1.101', port: 6379 }),
new Redis({ host: '192.168.1.102', port: 6379 }),
new Redis({ host: '192.168.1.103', port: 6379 }),
];

// 随机选择一个从节点进行读操作
function getRandomSlave() {
return redisSlaves[Math.floor(Math.random() * redisSlaves.length)];
}

async function testReadWrite() {
try {
// 写入数据到主节点
await redisMaster.set('foo', 'bar');
console.log('写入成功');

// 随机选择一个从节点读取数据
const redisSlave = getRandomSlave();
const value = await redisSlave.get('foo');
console.log('从从节点读取到的值:', value);
} catch (err) {
console.error('操作出错:', err);
} finally {
redisMaster.disconnect();
redisSlaves.forEach(redis => redis.disconnect());
}
}

testReadWrite();

2.2 轮询分配从节点

const Redis = require('ioredis');

// 主节点(Master) - 只负责写入
const redisMaster = new Redis({
host: '192.168.1.100',
port: 6379,
});

// 从节点(Slaves) - 负责读取操作
const redisSlaves = [
new Redis({ host: '192.168.1.101', port: 6379 }),
new Redis({ host: '192.168.1.102', port: 6379 }),
new Redis({ host: '192.168.1.103', port: 6379 }),
];

let slaveIndex = 0;

// 轮询获取下一个从节点
function getNextSlave() {
const redisSlave = redisSlaves[slaveIndex];
slaveIndex = (slaveIndex + 1) % redisSlaves.length;
return redisSlave;
}

async function testReadWrite() {
try {
await redisMaster.set('foo', 'bar');
console.log('写入成功');

// 轮询选择一个从节点读取数据
const redisSlave = getNextSlave();
const value = await redisSlave.get('foo');
console.log(`从从节点 [${slaveIndex}] 读取到的值:`, value);
} catch (err) {
console.error('操作出错:', err);
}
}

setInterval(testReadWrite, 2000); // 每 2 秒执行一次,测试轮询效果

2.3 自定义连接池动态分配

一、连接池: 我们创建一个 SlavePool 类,管理所有从节点连接。每个从节点用一个对象表示,其中包含 ioredis 客户端和当前活跃的请求数。思路是: 为每个从节点维护一个活跃请求数,每次读操作时,选择当前活跃数最小的从节点来执行请求,从而实现简单的负载均衡。

const Redis = require('ioredis');

class SlavePool {
/**
* @param {Array<Object>} slaveConfigs - 每个从节点的配置,例如:{ host: '192.168.1.101', port: 6379 }
*/
constructor(slaveConfigs) {
// 为每个从节点创建一个包装对象,初始活跃数为 0
this.slaves = slaveConfigs.map(cfg => ({
client: new Redis(cfg),
active: 0,
}));
}

// 获取当前活跃请求数最小的从节点
getLeastLoadedSlave() {
return this.slaves.reduce((prev, curr) => (prev.active <= curr.active ? prev : curr));
}

/**
* 执行一个 Redis 命令到当前最空闲的从节点上
* @param {string} command - Redis 命令,例如 'get'
* @param {...any} args - 命令参数
* @returns {Promise<any>}
*/
async execute(command, ...args) {
const slaveWrapper = this.getLeastLoadedSlave();
// 请求开始前,将活跃数加 1
slaveWrapper.active++;
try {
// 利用 ioredis 的动态方法调用执行命令
const result = await slaveWrapper.client[command](...args);
return result;
} catch (error) {
throw error;
} finally {
// 请求结束后,将活跃数减 1
slaveWrapper.active--;
}
}

// 关闭所有从节点的连接
disconnect() {
this.slaves.forEach(slave => slave.client.disconnect());
}
}

二、读写分离: 主节点直接连接, 进行写入操作。多个节点则由我们刚刚实现的连接池来管理, 通过 slavePool.execute 方法自动选择当前负载最低的从节点进行读取。

// 主节点配置(写操作使用)
const master = new Redis({
host: '192.168.1.100',
port: 6379,
// 如果有密码请添加 password 属性
});

// 从节点配置数组(读操作使用)
const slaveConfigs = [
{ host: '192.168.1.101', port: 6379 },
{ host: '192.168.1.102', port: 6379 },
{ host: '192.168.1.103', port: 6379 },
];

// 实例化连接池
const slavePool = new SlavePool(slaveConfigs);

// 示例函数:先写入数据到主节点,然后从连接池中执行读操作
async function testReadWrite() {
try {
// 写操作:写入数据到主节点
await master.set('foo', 'bar');
console.log('写入成功');

// 读操作:通过连接池动态分配到负载较低的从节点
const value = await slavePool.execute('get', 'foo');
console.log('从从节点读取到的值:', value);

// 模拟并发多个读请求,展示动态负载均衡效果
const concurrentReads = [];
for (let i = 0; i < 10; i++) {
concurrentReads.push(slavePool.execute('get', 'foo'));
}
const results = await Promise.all(concurrentReads);
console.log('并发读取结果:', results);
} catch (err) {
console.error('操作出错:', err);
} finally {
master.disconnect();
slavePool.disconnect();
}
}

testReadWrite();

2.4 基于 generic-pool 连接池动态分配

结合 generic-poolioredis,实现多个从节点之间的动态负载均衡, 过程如下:

  1. 为从节点构建连接池, 每个连接池使用 generic-pool 管理,通过 factory.create 创建 Redis 连接,确保等待 ready 事件后再返回。配置了最大/最小连接数和空闲超时时间,可根据实际负载调整。

  2. 主节点直接用 ioredis 进行写操作。

  3. 从节点进行读取操作, 通过 MultiSlavePool 动态分配到负载较低的从节点, 从所有从节点的池中选择 等待请求数最少 的那个池,并从中获取连接执行 Redis 命令。命令执行完成后,将连接归还给对应的连接池

const Redis = require('ioredis');
const genericPool = require('generic-pool');

/**
* 为指定的从节点配置创建一个 Redis 连接池
* @param {Object} slaveConfig - 从节点配置,例如 { host: '192.168.1.101', port: 6379 }
* @returns {Pool} generic-pool 实例
*/
function createRedisPool(slaveConfig) {
// 定义连接工厂
const factory = {
create: () => {
return new Promise((resolve, reject) => {
const client = new Redis(slaveConfig);
// 等待连接建立
client.once('ready', () => resolve(client));
client.once('error', (err) => {
client.disconnect();
reject(err);
});
});
},
destroy: (client) => {
return client.quit();
}
};

// 连接池配置:可根据业务情况调整最大最小连接数
const opts = {
max: 5, // 池中最大连接数
min: 1, // 池中最小连接数
idleTimeoutMillis: 30000,
};

return genericPool.createPool(factory, opts);
}

/**
* MultiSlavePool 类:管理多个从节点连接池,实现动态负载均衡
*/
class MultiSlavePool {
/**
* @param {Array<Object>} slaveConfigs - 多个从节点配置
*/
constructor(slaveConfigs) {
// 为每个从节点创建一个连接池
this.pools = slaveConfigs.map(cfg => createRedisPool(cfg));
}

/**
* 根据各连接池当前待处理请求数(pending)选择最空闲的池
* @returns {Pool} 选择的连接池
*/
getLeastLoadedPool() {
// 默认选第一个池
let selectedPool = this.pools[0];
// 遍历所有池,选择 pending 数量最少的
for (const pool of this.pools) {
// generic-pool 提供 pending 属性,代表当前排队等待连接的请求数
if (pool.pending < selectedPool.pending) {
selectedPool = pool;
}
}
return selectedPool;
}

/**
* 执行一个 Redis 命令:从当前最空闲的连接池获取连接,执行后释放
* @param {string} command - Redis 命令,例如 'get'
* @param {...any} args - 命令参数
* @returns {Promise<any>} 命令执行结果
*/
async execute(command, ...args) {
const pool = this.getLeastLoadedPool();
// 从选定的池中借用一个连接
const client = await pool.acquire();
try {
// 利用 ioredis 的动态方法调用执行命令
const result = await client[command](...args);
return result;
} finally {
// 无论成功或失败,都要释放连接
pool.release(client);
}
}

/**
* 关闭所有连接池,释放资源
*/
async drain() {
for (const pool of this.pools) {
await pool.drain();
await pool.clear();
}
}
}

/* ===========================
示例:读写分离场景
- 主节点用于写入操作,直接使用 ioredis 连接
- 读取操作使用 MultiSlavePool 从多个从节点中动态选择
=========================== */

// 主节点配置(写操作)
const master = new Redis({
host: '192.168.1.100',
port: 6379,
// 如有密码,可增加 password 字段
});

// 从节点配置数组(读操作使用)
const slaveConfigs = [
{ host: '192.168.1.101', port: 6379 },
{ host: '192.168.1.102', port: 6379 },
{ host: '192.168.1.103', port: 6379 },
];

// 实例化 MultiSlavePool
const multiSlavePool = new MultiSlavePool(slaveConfigs);

async function testReadWrite() {
try {
// 写入操作:将数据写入主节点
await master.set('foo', 'bar');
console.log('主节点写入成功');

// 读操作:使用 MultiSlavePool 执行 'get' 命令
const value = await multiSlavePool.execute('get', 'foo');
console.log('从从节点读取到的值:', value);

// 模拟并发多个读请求,展示动态负载均衡效果
const concurrentReads = [];
for (let i = 0; i < 10; i++) {
concurrentReads.push(multiSlavePool.execute('get', 'foo'));
}
const results = await Promise.all(concurrentReads);
console.log('并发读取结果:', results);
} catch (err) {
console.error('操作出错:', err);
} finally {
master.disconnect();
await multiSlavePool.drain();
}
}

testReadWrite();