认识
2024年04月19日
一、认识
node-rdkafka 是一个基于 librdkafka
的高性能 Kafka
客户端库,适用于生产环境。它比 kafka-node
更稳定,支持 Kafka Streams
和高级配置。
二、使用
2.1 主题
const Kafka = require("node-rdkafka");
const adminClient = Kafka.AdminClient.create({
"client.id": "kafka-admin",
"metadata.broker.list": "localhost:29092",
});
adminClient.createTopic(
{
topic: "kafka-topic-1",
num_partitions: 3,
replication_factor: 1,
},
(error) => {
if (error) {
console.error(`创建 Topic 失败: ${error}`);
} else {
console.log(`Topic kafka-topic-test 创建成功`);
}
}
);
2.2 生产者
const Kafka = require("node-rdkafka");
const producer = new Kafka.Producer({
acks: -1,
dr_cb: true,
"enable.idempotence": true,
"socket.keepalive.enable": true,
"compression.codec": "gzip",
"client.id": "kafka-producer-client-1",
"metadata.broker.list": "localhost:29092",
"retry.backoff.ms": 1000,
"message.timeout.ms": 10000,
"batch.num.messages": 1000000,
"queue.buffering.max.ms": 1000,
"message.send.max.retries": 10,
"queue.buffering.max.messages": 100000,
});
producer.connect();
producer.setPollInterval(100);
producer.on("ready", () => {
console.log("Producer is ready!!!");
let index = 0;
setInterval(() => {
const message = `Hello Kafka ${index++}`;
producer.produce("kafka-topic-1", null, Buffer.from(message), null, Date.now());
console.log(`Message Sent: ${message}`);
}, 4000);
});
producer.on("delivery-report", (err, report) => {
if (err) {
console.error("Message delivery failed:", err);
} else {
console.log("Message delivered:", report);
}
});
producer.on("event.error", (err) => {
console.error("Producer error:", err);
});
2.3 消费者
const Kafka = require("node-rdkafka");
const consumer = new Kafka.KafkaConsumer({
"enable.auto.commit": false,
"group.id": "kafka-consumer-group-1",
"metadata.broker.list": "localhost:29092",
rebalance_cb: (err, assignment) => {
if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
try {
consumer.assign(assignment);
console.log("Consumer assigned partitions:", assignment);
} catch (ex) {
console.error("Error during assign:", ex);
}
} else if (err.code === Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
try {
consumer.unassign();
console.log("Consumer unassigned partitions");
} catch (ex) {
console.error("Error during unassign:", ex);
}
} else {
console.error("Consumer rebalance error:", err);
}
},
offset_commit_cb: (err, topicPartitions) => {
if (err) {
console.error("Consumer offset commit error:", err);
} else {
console.log("Consumer offset commit successful:", topicPartitions);
}
},
});
consumer.connect();
consumer.on("ready", () => {
console.log("Consumer is ready.");
consumer.subscribe(["kafka-topic-1"]);
consumer.consume();
});
consumer.on("data", async (message) => {
console.log(`Received message: ${message.value.toString()}`);
consumer.commit(message);
});
consumer.on("event.error", (err) => {
console.error("Consumer error:", err);
});