Consumer
2025年02月23日
一、创建 Consumer
const consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'rebalance_cb': (err, assignment) => {
if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
this.assign(assignment);
} else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS){
this.unassign();
} else {
console.error(err);
}
},
'offset_commit_cb': (err, topicPartitions) => {
if (err) {
console.error(err);
} else {
console.log(topicPartitions);
}
}
})
-
group.id
: 指定消费者所属的组。Kafka
会根据这个组标识将多个消费者组织在一起,并在组内实现分区的均衡分配group.id: 'kafka'
表示该消费者属于名称为kafka
的消费者组。 -
metadata.broker.list
: 指定Kafka
集群中Broker
的地址列表,用于建立连接和获取元数据。 -
rebalance_cb
: 再均衡回调, 当消费者组内的分区发生变动(例如有新的消费者加入或现有消费者退出)时,Kafka
会重新分配分区,此时会触发该回调函数。当err.code
为Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS
时,表示此次再均衡是分配新分区,此时调用this.assign(assignment)
将分配到的分区绑定到当前消费者。当err.code
为Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS
时,表示需要撤销当前消费者已分配的分区,此时调用this.unassign()
。若错误不是上述两种情况,则通过console.error(err)
输出错误信息。再均衡处理,rebalance_cb
回调处理消费者组内分区的分配与撤销,确保在消费者变动时能够正确维护分区状态。 -
offset_commit_cb
: 消费者在提交消费位移(offset
)后,该回调会被触发,用于告知提交结果。如果err
存在,说明提交offset
时发生错误,此时通过console.error(err)
输出错误信息。offset
提交监控,offset_commit_cb
回调帮助开发者跟踪offset
提交的成功与失败情况,从而保障消费进度的可靠记录。
二、Consumer 连接
const Kafka = require("node-rdkafka");
const consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
})
consumer.connect();
consumer.on("ready", () => {
console.log("Consumer 连接成功!!!");
})
consumer.on("data", (data)=>{
console.log(`Received message: ${data.value.toString()}`);
});
consumer.on("event.error", (err) => {
console.error("Consumer error:", err);
});
三、Consumer 订阅消息
const Kafka = require("node-rdkafka");
const consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
})
consumer.connect();
consumer.on("ready", () => {
console.log("Consumer 连接成功!!!");
})
consumer.on("data", (data)=>{
console.log(`Received message: ${data.value.toString()}`);
});
consumer.on("event.error", (err) => {
console.error("Consumer error:", err);
});