跳到主要内容

Topic

2025年02月23日
柏拉文
越努力,越幸运

一、创建 Topic


const Kafka = require("node-rdkafka");

const adminClient = Kafka.AdminClient.create({
"client.id": "kafka-admin",
"metadata.broker.list": "localhost:29092",
});

adminClient.createTopic(
{
topic: "kafka-topic-test",
num_partitions: 1,
replication_factor: 1,
},
(error) => {
if (error) {
console.error(`创建 Topic 失败: ${err}`);
} else {
console.log(`Topic kafka-topic-test 创建成功`);
}
}
);

console.log("adminClient", adminClient);
  • topic: 指定要创建的 Topic 的名称。TopicKafka 中用于组织消息的基本单位,所有消息都会被发布到特定的 Topic 中。

  • num_partitions: 指定 Topic 的分区数量。分区数量直接影响 Kafka 的并发处理能力和数据的分布情况。设置为 1 表示这个 Topic 只有一个分区。如果需要更高的并发或更好的负载均衡,可以设置更多的分区。

  • replication_factor: 指定每个分区的副本数(即数据复制因子)。副本数决定了数据的冗余备份和容错能力。副本数越高,在某个 Broker 发生故障时数据丢失的风险越低。设置为 1 表示没有数据副本备份(即单副本),通常适用于测试环境。在生产环境中一般建议至少设置为 23

二、获取 Topic


AdminClient 实例暂时没有实现获取 Topic 的方法。需要在终端中使用 Kafka Topic 命令 来获取。

kafka-topics --list --bootstrap-server localhost:29092

三、删除 Topic


const Kafka = require("node-rdkafka");

const adminClient = Kafka.AdminClient.create({
"client.id": "kafka-admin",
"metadata.broker.list": "localhost:29092",
});

adminClient.deleteTopic("test-topic", 3000, (error) => {
if (error) {
console.error(`删除 Topic 失败: ${err}`);
} else {
console.log(`Topic test-topic 删除成功`);
}
});

四、增加 Topic 分区


Kafka Topic 只能增加分区, 不能减少。

const Kafka = require("node-rdkafka");

const adminClient = Kafka.AdminClient.create({
"client.id": "kafka-admin",
"metadata.broker.list": "localhost:29092",
});

adminClient.createPartitions("kafka-topic-test", 3, 3000, (error) => {
if (error) {
console.error(`创建 Topic 分区失败: ${error}`);
} else {
console.log(`Topic kafka-topic-test 创建分区成功`);
}
});

五、查看 Topic 描述信息


node-rdkafka 没有提供 adminClient.getMetadata 方法,但可以通过 Kafka 自带的命令行工具来查询元数据。

kafka-topics --describe --topic kafka-topic-test --bootstrap-server localhost:29092

// 输出信息如下:

Topic: your-topic-name PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: your-topic-name Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: your-topic-name Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: your-topic-name Partition: 2 Leader: 1 Replicas: 1 Isr: 1
  • Isr: 1 表示当前存活的副本。

  • Leader: 1 表示该分区的主分区所在的 Kafka Broker

  • Replicas: 1 表示该分区的副本数。

  • PartitionCount: 3 表示有 3 个分区