跳到主要内容

认识

2024年04月19日
柏拉文
越努力,越幸运

一、认识


node-rdkafka 是一个基于 librdkafka 的高性能 Kafka 客户端库,适用于生产环境。它比 kafka-node 更稳定,支持 Kafka Streams 和高级配置。

二、使用


2.1 主题

const Kafka = require("node-rdkafka");

const adminClient = Kafka.AdminClient.create({
"client.id": "kafka-admin",
"metadata.broker.list": "localhost:29092",
});

adminClient.createTopic(
{
topic: "kafka-topic-1",
num_partitions: 3,
replication_factor: 1,
},
(error) => {
if (error) {
console.error(`创建 Topic 失败: ${error}`);
} else {
console.log(`Topic kafka-topic-test 创建成功`);
}
}
);

2.2 生产者

const Kafka = require("node-rdkafka");

const producer = new Kafka.Producer({
acks: -1,
dr_cb: true,
"enable.idempotence": true,
"socket.keepalive.enable": true,

"compression.codec": "gzip",
"client.id": "kafka-producer-client-1",
"metadata.broker.list": "localhost:29092",

"retry.backoff.ms": 1000,
"message.timeout.ms": 10000,
"batch.num.messages": 1000000,
"queue.buffering.max.ms": 1000,
"message.send.max.retries": 10,
"queue.buffering.max.messages": 100000,
});

producer.connect();
producer.setPollInterval(100);

producer.on("ready", () => {
console.log("Producer is ready!!!");

let index = 0;

setInterval(() => {
const message = `Hello Kafka ${index++}`;

producer.produce("kafka-topic-1", null, Buffer.from(message), null, Date.now());

console.log(`Message Sent: ${message}`);
}, 4000);
});

producer.on("delivery-report", (err, report) => {
if (err) {
console.error("Message delivery failed:", err);
} else {
console.log("Message delivered:", report);
}
});

producer.on("event.error", (err) => {
console.error("Producer error:", err);
});

2.3 消费者

const Kafka = require("node-rdkafka");

const consumer = new Kafka.KafkaConsumer({
"enable.auto.commit": false,
"group.id": "kafka-consumer-group-1",
"metadata.broker.list": "localhost:29092",
rebalance_cb: (err, assignment) => {
if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
try {
consumer.assign(assignment);
console.log("Consumer assigned partitions:", assignment);
} catch (ex) {
console.error("Error during assign:", ex);
}
} else if (err.code === Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
try {
consumer.unassign();
console.log("Consumer unassigned partitions");
} catch (ex) {
console.error("Error during unassign:", ex);
}
} else {
console.error("Consumer rebalance error:", err);
}
},
offset_commit_cb: (err, topicPartitions) => {
if (err) {
console.error("Consumer offset commit error:", err);
} else {
console.log("Consumer offset commit successful:", topicPartitions);
}
},
});

consumer.connect();

consumer.on("ready", () => {
console.log("Consumer is ready.");
consumer.subscribe(["kafka-topic-1"]);
consumer.consume();
});

consumer.on("data", async (message) => {
console.log(`Received message: ${message.value.toString()}`);
consumer.commit(message);
});

consumer.on("event.error", (err) => {
console.error("Consumer error:", err);
});