Topic
2025年02月23日
一、创建 Topic
const Kafka = require("node-rdkafka");
const adminClient = Kafka.AdminClient.create({
"client.id": "kafka-admin",
"metadata.broker.list": "localhost:29092",
});
adminClient.createTopic(
{
topic: "kafka-topic-test",
num_partitions: 1,
replication_factor: 1,
},
(error) => {
if (error) {
console.error(`创建 Topic 失败: ${err}`);
} else {
console.log(`Topic kafka-topic-test 创建成功`);
}
}
);
console.log("adminClient", adminClient);
-
topic
: 指定要创建的Topic
的名称。Topic
是Kafka
中用于组织消息的基本单位,所有消息都会被发布到特定的Topic
中。 -
num_partitions
: 指定Topic
的分区数量。分区数量直接影响Kafka
的并发处理能力和数据的分布情况。设置为1
表示这个Topic
只有一个分区。如果需要更高的并发或更好的负载均衡,可以设置更多的分区。 -
replication_factor
: 指定每个分区的副本数(即数据复制因子)。副本数决定了数据的冗余备份和容错能力。副本数越高,在某个Broker
发生故障时数据丢失的风险越低。设置为1
表示没有数据副本备份(即单副本),通常适用于测试环境。在生产环境中一般建议至少设置为2
或3
。
二、获取 Topic
AdminClient
实例暂时没有实现获取 Topic
的方法。需要在终端中使用 Kafka Topic
命令 来获取。
kafka-topics --list --bootstrap-server localhost:29092
三、删除 Topic
const Kafka = require("node-rdkafka");
const adminClient = Kafka.AdminClient.create({
"client.id": "kafka-admin",
"metadata.broker.list": "localhost:29092",
});
adminClient.deleteTopic("test-topic", 3000, (error) => {
if (error) {
console.error(`删除 Topic 失败: ${err}`);
} else {
console.log(`Topic test-topic 删除成功`);
}
});
四、增加 Topic 分区
Kafka Topic
只能增加分区, 不能减少。
const Kafka = require("node-rdkafka");
const adminClient = Kafka.AdminClient.create({
"client.id": "kafka-admin",
"metadata.broker.list": "localhost:29092",
});
adminClient.createPartitions("kafka-topic-test", 3, 3000, (error) => {
if (error) {
console.error(`创建 Topic 分区失败: ${error}`);
} else {
console.log(`Topic kafka-topic-test 创建分区成功`);
}
});
五、查看 Topic 描述信息
node-rdkafka
没有提供 adminClient.getMetadata
方法,但可以通过 Kafka
自带的命令行工具来查询元数据。
kafka-topics --describe --topic kafka-topic-test --bootstrap-server localhost:29092
// 输出信息如下:
Topic: your-topic-name PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: your-topic-name Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: your-topic-name Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: your-topic-name Partition: 2 Leader: 1 Replicas: 1 Isr: 1
-
Isr
:1
表示当前存活的副本。 -
Leader
:1
表示该分区的主分区所在的Kafka Broker
-
Replicas
:1
表示该分区的副本数。 -
PartitionCount
:3
表示有3
个分区