跳到主要内容

Consumer

2025年02月23日
柏拉文
越努力,越幸运

一、认识


Kafka.KafkaConsumernode-rdkafka 提供的 高性能 Kafka 消费者(Consumer),用于订阅和消费 Kafka 主题中的消息。它基于 librdkafka,支持自动/手动分区分配、偏移提交等特性。

二、语法


const consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'rebalance_cb': (err, assignment) => {

if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
// Note: this can throw when you are disconnected. Take care and wrap it in
// a try catch if that matters to you
this.assign(assignment);
} else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS){
// Same as above
this.unassign();
} else {
// We had a real error
console.error(err);
}
},
'offset_commit_cb': (err, topicPartitions) => {

if (err) {
// There was an error committing
console.error(err);
} else {
// Commit went through. Let's log the topic partitions
console.log(topicPartitions);
}
}
})
  • group.id: 指定消费者所属的组。Kafka 会根据这个组标识将多个消费者组织在一起,并在组内实现分区的均衡分配group.id: 'kafka' 表示该消费者属于名称为 kafka 的消费者组。

  • metadata.broker.list: 指定 Kafka 集群中 Broker 的地址列表,用于建立连接和获取元数据。

  • rebalance_cb: 再均衡回调, 当消费者组内的分区发生变动(例如有新的消费者加入或现有消费者退出)时,Kafka 会重新分配分区,此时会触发该回调函数。当 err.codeKafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS 时,表示此次再均衡是分配新分区,此时调用 this.assign(assignment) 将分配到的分区绑定到当前消费者。当 err.codeKafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS 时,表示需要撤销当前消费者已分配的分区,此时调用 this.unassign()。若错误不是上述两种情况,则通过 console.error(err) 输出错误信息。再均衡处理, rebalance_cb 回调处理消费者组内分区的分配与撤销,确保在消费者变动时能够正确维护分区状态。

  • offset_commit_cb: 消费者在提交消费位移(offset)后,该回调会被触发,用于告知提交结果。如果 err 存在,说明提交 offset 时发生错误,此时通过 console.error(err) 输出错误信息。offset 提交监控,offset_commit_cb 回调帮助开发者跟踪 offset 提交的成功与失败情况,从而保障消费进度的可靠记录。

三、方法


3.1 consumer.commit()

consumer.commit() 不传参数调用时,表示提交当前消费者内部保存的所有已消费的 offset。这通常在你处理完一批消息后,想一次性提交所有分区的最新消费进度时使用。适用场景: 当你希望批量提交所有分区的最新 offset,或者消费者内部已记录了各个分区的最新 offset 时,可以直接调用 commit()。总结: consumer.commit() 提交当前内部所有已消费的 offset,适合批量提交。

consumer.on("data", (message)=>{
consumer.commit();
});

consumer.commit(topicPartition) 传入一个或多个 topic-partition 对象,用于提交指定分区的 offset适用场景: 当你希望更精细地控制某个主题或分区的 offset 提交,例如在处理过程中遇到错误或需要自定义 commit 逻辑时,可以构造特定的 topic-partition 对象进行提交。总结: consumer.commit(topicPartition) 针对特定主题和分区提交指定的 offset,更灵活,可用于自定义提交逻辑。每个 topic-partition 对象通常包含以下属性:

  • topic: 主题名称

  • partition: 分区号

  • offset: 要提交的 offset 值(通常是已成功消费的最后一条消息的 offset1)。

  • (可选)metadata: 附加元数据

consumer.on("data", (message)=>{
const topicPartition = [{
topic: message.topic,
partition: message.partition,
offset: message.offset + 1
}];
consumer.commit(topicPartition);
});

3.2 consumer.connect()

consumer.connect() 在成功连接后发出 ready 事件。否则,错误将通过回调传递。

3.3 consumer.consume()

node-rdkafka 中,consumer.consume 方法是用来消费 Kafka 中的消息的。通过不同的参数形式,consumer.consume 提供了四种使用方式,分别适用于不同的场景。

consumer.consume() 直接调用 consumer.consume() 不传递任何参数,会启动消费者的内部消息拉取机制,并进入流式模式。此时,消息会通过触发 data 事件的方式传递给你。使用场景: 适用于需要持续不断地处理消息,且希望利用事件驱动模型处理每条消息的情况。总结: 启动消费,消息通过 data 事件逐条推送,适合持续流式消费。

consumer.on("ready", ()=>{
consumer.subscribe(["topic-name"]);
consume.consume();
});

consumer.on('data', (message) => {
console.log('Received message:', message.value.toString());
});

consumer.consume(cb) 当传入一个回调函数作为参数时,调用 consumer.consume(cb) 会尝试消费当前可用的所有消息,并在消费完成后通过回调返回消息数组。适用场景: 适用于你希望一次性获取当前所有可用消息,并通过回调来处理这批消息,而不是依赖事件流。总结: 无数量限制地拉取当前所有可用消息,并通过回调返回数组,适合一次性批量处理。

consumer.consume((err, messages) => {
if (err) {
console.error('Error consuming messages:', err);
return;
}
console.log('Received messages:', messages);
});

consumer.consume(number): 调用 consumer.consume(number) 时,传入的数字参数指定了消费请求中希望一次性拉取的最大消息数量。但因为没有提供回调函数,所以消息不会通过回调返回,而是通过消费者的流模式发送,即通过触发 data 事件来交付消息。适用场景: 当你希望使用流模式(即通过监听 data 事件来处理消息)而同时希望在内部请求时限定一次拉取的初始消息数量时,可以使用这种方式。例如,你可能希望在消费者启动时请求一定数量的消息,但消息的后续处理依旧依赖事件回调。

consumer.on("ready", ()=>{
consumer.subscribe(["topic-name"]);

setInterval(()=>{
consumer.consume(1);
},1000);
});

consumer.on("data", (message)=>{
console.log('Received message:', message.value.toString());
});

consumer.consume(number, cb) 传入一个数字参数和回调函数时,调用 consumer.consume(number, cb) 表示希望一次性最多拉取指定数量的消息。回调函数会返回一个消息数组,长度可能小于或等于传入的数量。适用场景: 当你需要对消费的消息数量做控制(例如批量处理固定数量的消息),以避免一次性拉取过多数据时使用。总结: 限定一次性消费的消息数量,并通过回调返回这些消息,适合对消费量做精确控制。

consumer.consume(5, (err, messages) => {
if (err) {
console.error('Error consuming messages:', err);
return;
}
messages.forEach((msg) => {
console.log('Received message:', msg.value.toString());
});
});

3.4 consumer.subscribe()

consumer.subscribe(topics)node‑rdkafka 中用于订阅 Kafka 主题的方法,它允许你指定一个或多个你感兴趣的主题,消费者在启动消费前就会自动订阅这些主题,以便在后续通过 consumer.consume() 拉取消息。调用时机, 通常在消费者实例连接并触发 ready 事件后调用,此时可以确定消费者已就绪,可以开始订阅和消费消息。订阅后,消费者会自动跟踪所订阅主题的分区信息和元数据。当你调用 consumer.consume() 时,拉取到的消息都来自于这些已订阅的主题。

consumer.on("ready", ()=>{
consumer.subscribe(["topic-name1", "topic-name2"]);
});

3.5 consumer.disconnect()

consumer.disconnect() 断开与代理的连接。disconnect() 方法在断开连接时发出 disconnected。否则,错误将通过回调传递。

3.6 consumer.unsubscribe()

consumer.unsubscribe() 取消订阅当前订阅的主题。如果不先调用 unsubscribe() 方法,则无法订阅不同的主题。

3.7 consumer.commitMessage()

consumer.commitMessage() 这是一个便捷方法,专门用于基于消费到的单个消息来提交 offset。内部会自动从传入的消息对象中提取 topicpartitionoffset 信息,并计算出下一个要消费的 offset(通常是 message.offset + 1)。使用场景: 当你以 逐条 消费消息并希望在处理完单个消息后立即提交其消费进度时,调用 commitMessage(message) 可以减少手动构造 topic-partition 对象的麻烦。总结: consumer.commitMessage() 便捷地基于单条消息进行 offset 提交,自动处理 offset 增量,适用于逐条处理的场景。

consumer.on("data", (message)=>{
consumer.commitMessage(message);
});

四、事件


4.1 data

node‑rdkafka 中,Consumerdata 事件是用于实时接收消费到的消息的关键事件。当你调用 consumer.consume()(不传入回调函数)时,消费者将进入流模式,此时拉取到的消息会通过触发 data 事件的方式传递给你。适用模式: 当你希望利用事件驱动方式连续处理消息时,推荐使用 data 事件。这种方式类似于 Node.js 的流(Stream)模式,非常适合实时处理数据。消息对象结构 通常包含以下属性:

  • topic: 消息所属的主题名称。

  • partition: 消息所属的分区编号。

  • offset: 消息在分区中的偏移量。

  • key: 消息的键(如果有设置)。

  • value: 消息的实际内容,通常为 Buffer 类型,可以通过 .toString() 转为字符串显示。

consumer.on("ready", ()=>{
consumer.subscribe(["topic-name1", "topic-name2"]);
consumer.consume(10);
});

consumer.on("data", (message)=>{
console.log(`接收到消息:${message.value.toString()}`);
console.log(`主题:${message.topic},分区:${message.partition},偏移量:${message.offset}`);
});

4.2 ready

node‑rdkafka 中,Consumerready 事件表示消费者已成功完成初始化,并与 Kafka Broker 建立连接,具备接收和处理消息的能力。当该事件触发后,就可以安全地进行主题订阅和消息消费等后续操作。触发时机, 当消费者连接成功、完成内部设置和分区元数据加载后,会触发 ready 事件,表示消费者已准备好工作。订阅与消费, 在 ready 事件回调中,你通常会调用 consumer.subscribe(topics) 订阅所需的主题,然后调用 consumer.consume() 开始消费消息。该事件为后续消费流程的启动信号,确保你不会在消费者尚未准备好时就调用消费相关的 API

consumer.on("ready", ()=>{
consumer.subscribe(["topic-name-1", "topic-name-2"]);
consumer.consume();
});

4.3 event

4.4 warning

warningFlowing 模式下使用时,如果出现 UNKNOWN_TOPIC_OR_PARTTOPIC_AUTHORIZATION_FAILED 错误,则会发出该事件。由于如果错误仍然存在,使用者将继续工作,因此警告事件应在下次元数据刷新后重新出现。要控制元数据刷新率,请设置 topic.metadata.refresh.interval.ms property。解决错误后,您可以手动调用 getMetadata 以加快使用者恢复速度。

4.5 rebalance

node‑rdkafka 中,通常通过在创建 KafkaConsumer 实例时配置 rebalance_cb 回调函数来处理 rebalancing 事件。该回调函数在发生 rebalancing 时被调用,参数中包含了错误信息和分区分配的数据。

const Kafka = require('node-rdkafka');

const consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'rebalance_cb': (err, assignment) => {
if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
// 当分配新分区时,绑定新分区
try {
consumer.assign(assignment);
console.log('Assigned partitions:', assignment);
} catch (ex) {
console.error('Error during assign:', ex);
}
} else if (err.code === Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
// 当撤销分区时,撤销绑定
try {
consumer.unassign();
console.log('Unassigned partitions');
} catch (ex) {
console.error('Error during unassign:', ex);
}
} else {
// 出现其他错误时,记录错误信息
console.error('Rebalance error:', err);
}
}
});

consumer.on('ready', () => {
console.log('Consumer is ready. Subscribing to topic...');
consumer.subscribe(['test-topic']);
consumer.consume();
});

consumer.on('data', (message) => {
console.log(`Received message: ${message.value.toString()}`);
});

consumer.on('event.error', (err) => {
console.error('Error from consumer:', err);
});

consumer.connect();

4.6 event.log

event.log 事件在发生日志记录事件时发出(如果您通过 event_cb 选项选择加入日志记录)。如果要发送信息,则需要为 debug 设置一个值。

4.7 event.stats

event.stats 事件在 librdkafka 报告统计信息时发出(如果您通过将 statistics.interval.ms 设置为非零值来选择加入)。

4.8 event.error

event.errorlibrdkafka 报告错误时,会发出 event.error 事件

4.9 disconnected

disconnected 当代理断开连接时,将发出 disconnected 事件。仅当调用 .disconnect 时,才会发出此事件。否则,包装器将始终尝试重新连接。

4.10 partition.eof

partition.eof 使用标准 API 并设置了配置选项 enable.partition.eof 时,将在此事件中发出 partition.eof 事件。该事件包含 topicpartitionoffset 属性。

4.11 event.throttle

event.throttlelibrdkafka 报告限制时,将发出 event.throttle 事件。