跳到主要内容

Consumer

2025年02月23日
柏拉文
越努力,越幸运

一、创建 Consumer


const consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'rebalance_cb': (err, assignment) => {

if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
this.assign(assignment);
} else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS){
this.unassign();
} else {
console.error(err);
}
},
'offset_commit_cb': (err, topicPartitions) => {

if (err) {
console.error(err);
} else {
console.log(topicPartitions);
}
}
})
  • group.id: 指定消费者所属的组。Kafka 会根据这个组标识将多个消费者组织在一起,并在组内实现分区的均衡分配group.id: 'kafka' 表示该消费者属于名称为 kafka 的消费者组。

  • metadata.broker.list: 指定 Kafka 集群中 Broker 的地址列表,用于建立连接和获取元数据。

  • rebalance_cb: 再均衡回调, 当消费者组内的分区发生变动(例如有新的消费者加入或现有消费者退出)时,Kafka 会重新分配分区,此时会触发该回调函数。当 err.codeKafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS 时,表示此次再均衡是分配新分区,此时调用 this.assign(assignment) 将分配到的分区绑定到当前消费者。当 err.codeKafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS 时,表示需要撤销当前消费者已分配的分区,此时调用 this.unassign()。若错误不是上述两种情况,则通过 console.error(err) 输出错误信息。再均衡处理, rebalance_cb 回调处理消费者组内分区的分配与撤销,确保在消费者变动时能够正确维护分区状态。

  • offset_commit_cb: 消费者在提交消费位移(offset)后,该回调会被触发,用于告知提交结果。如果 err 存在,说明提交 offset 时发生错误,此时通过 console.error(err) 输出错误信息。offset 提交监控,offset_commit_cb 回调帮助开发者跟踪 offset 提交的成功与失败情况,从而保障消费进度的可靠记录。

二、Consumer 连接


const Kafka = require("node-rdkafka");

const consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
})

consumer.connect();

consumer.on("ready", () => {
console.log("Consumer 连接成功!!!");
})

consumer.on("data", (data)=>{
console.log(`Received message: ${data.value.toString()}`);
});

consumer.on("event.error", (err) => {
console.error("Consumer error:", err);
});

三、Consumer 订阅消息


const Kafka = require("node-rdkafka");

const consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
})

consumer.connect();

consumer.on("ready", () => {
console.log("Consumer 连接成功!!!");
})

consumer.on("data", (data)=>{
console.log(`Received message: ${data.value.toString()}`);
});

consumer.on("event.error", (err) => {
console.error("Consumer error:", err);
});