Consumer
一、认识
Kafka.KafkaConsumer
是 node-rdkafka
提供的 高性能 Kafka
消费者(Consumer
),用于订阅和消费 Kafka
主题中的消息。它基于 librdkafka
,支持自动/手动分区分配、偏移提交等特性。
二、语法
const consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'rebalance_cb': (err, assignment) => {
if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
// Note: this can throw when you are disconnected. Take care and wrap it in
// a try catch if that matters to you
this.assign(assignment);
} else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS){
// Same as above
this.unassign();
} else {
// We had a real error
console.error(err);
}
},
'offset_commit_cb': (err, topicPartitions) => {
if (err) {
// There was an error committing
console.error(err);
} else {
// Commit went through. Let's log the topic partitions
console.log(topicPartitions);
}
}
})
-
group.id
: 指定消费者所属的组。Kafka
会根据这个组标识将多个消费者组织在一起,并在组内实现分区的均衡分配group.id: 'kafka'
表示该消费者属于名称为kafka
的消费者组。 -
metadata.broker.list
: 指定Kafka
集群中Broker
的地址列表,用于建立连接和获取元数据。 -
rebalance_cb
: 再均衡回调, 当消费者组内的分区发生变动(例如有新的消费者加入或现有消费者退出)时,Kafka
会重新分配分区,此时会触发该回调函数。当err.code
为Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS
时,表示此次再均衡是分配新分区,此时调用this.assign(assignment)
将分配到的分区绑定到当前消费者。当err.code
为Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS
时,表示需要撤销当前消费者已分配的分区,此时调用this.unassign()
。若错误不是上述两种情况,则通过console.error(err)
输出错误信息。再均衡处理,rebalance_cb
回调处理消费者组内分区的分配与撤销,确保在消费者变动时能够正确维护分区状态。 -
offset_commit_cb
: 消费者在提交消费位移(offset
)后,该回调会被触发,用于告知提交结果。如果err
存在,说明提交offset
时发生错误,此时通过console.error(err)
输出错误信息。offset
提交监控,offset_commit_cb
回调帮助开发者跟踪offset
提交的成功与失败情况,从而保障消费进度的可靠记录。
三、方法
3.1 consumer.commit()
consumer.commit()
不传参数调用时,表示提交当前消费者内部保存的所有已消费的 offset
。这通常在你处理完一批消息后,想一次性提交所有分区的最新消费进度时使用。适用场景: 当你希望批量提交所有分区的最新 offset
,或者消费者内部已记录了各个分区的最新 offset
时,可以直接调用 commit()
。总结: consumer.commit()
提交当前内部所有已消费的 offset
,适合批量提交。
consumer.on("data", (message)=>{
consumer.commit();
});
consumer.commit(topicPartition)
传入一个或多个 topic-partition
对象,用于提交指定分区的 offset
。适用场景: 当你希望更精细地控制某个主题或分区的 offset
提交,例如在处理过程中遇到错误或需要自定义 commit
逻辑时,可以构造特定的 topic-partition
对象进行提交。总结: consumer.commit(topicPartition)
针对特定主题和分区提交指定的 offset
,更灵活,可用于自定义提交逻辑。每个 topic-partition
对象通常包含以下属性:
-
topic
: 主题名称 -
partition
: 分区号 -
offset
: 要提交的offset
值(通常是已成功消费的最后一条消息的offset
加1
)。 -
(可选)
metadata
: 附加元数据
consumer.on("data", (message)=>{
const topicPartition = [{
topic: message.topic,
partition: message.partition,
offset: message.offset + 1
}];
consumer.commit(topicPartition);
});
3.2 consumer.connect()
consumer.connect()
在成功连接后发出 ready
事件。否则,错误将通过回调传递。
3.3 consumer.consume()
在 node-rdkafka
中,consumer.consume
方法是用来消费 Kafka
中的消息的。通过不同的参数形式,consumer.consume
提供了四种使用方式,分别适用于不同的场景。
consumer.consume()
直接调用 consumer.consume()
不传递任何参数,会启动消费者的内部消息拉取机制,并进入流式模式。此时,消息会通过触发 data
事件的方式传递给你。使用场景: 适用于需要持续不断地处理消息,且希望利用事件驱动模型处理每条消息的情况。总结: 启动消费,消息通过 data
事件逐条推送,适合持续流式消费。
consumer.on("ready", ()=>{
consumer.subscribe(["topic-name"]);
consume.consume();
});
consumer.on('data', (message) => {
console.log('Received message:', message.value.toString());
});
consumer.consume(cb)
当传入一个回调函数作为参数时,调用 consumer.consume(cb)
会尝试消费当前可用的所有消息,并在消费完成后通过回调返回消息数组。适用场景: 适用于你希望一次性获取当前所有可用消息,并通过回调来处理这批消息,而不是依赖事件流。总结: 无数量限制地拉取当前所有可用消息,并通过回调返回数组,适合一次性批量处理。
consumer.consume((err, messages) => {
if (err) {
console.error('Error consuming messages:', err);
return;
}
console.log('Received messages:', messages);
});
consumer.consume(number)
: 调用 consumer.consume(number)
时,传入的数字参数指定了消费请求中希望一次性拉取的最大消息数量。但因为没有提供回调函数,所以消息不会通过回调返回,而是通过消费者的流模式发送,即通过触发 data
事件来交付消息。适用场景: 当你希望使用流模式(即通过监听 data
事件来处理消息)而同时希望在内部请求时限定一次拉取的初始消息数量时,可以使用这种方式。例如,你可能希望在消费者启动时请求一定数量的消息,但消息的后续处理依旧依赖事件回调。
consumer.on("ready", ()=>{
consumer.subscribe(["topic-name"]);
setInterval(()=>{
consumer.consume(1);
},1000);
});
consumer.on("data", (message)=>{
console.log('Received message:', message.value.toString());
});
consumer.consume(number, cb)
传入一个数字参数和回调函数时,调用 consumer.consume(number, cb)
表示希望一次性最多拉取指定数量的消息。回调函数会返回一个消息数组,长度可能小于或等于传入的数量。适用场景: 当你需要对消费的消息数量做控制(例如批量处理固定数量的消息),以避免一次性拉取过多数据时使用。总结: 限定一次性消费的消息数量,并通过回调返回这些消息,适合对消费量做精确控制。
consumer.consume(5, (err, messages) => {
if (err) {
console.error('Error consuming messages:', err);
return;
}
messages.forEach((msg) => {
console.log('Received message:', msg.value.toString());
});
});
3.4 consumer.subscribe()
consumer.subscribe(topics)
是 node‑rdkafka
中用于订阅 Kafka 主题的方法,它允许你指定一个或多个你感兴趣的主题,消费者在启动消费前就会自动订阅这些主题,以便在后续通过 consumer.consume()
拉取消息。调用时机, 通常在消费者实例连接并触发 ready
事件后调用,此时可以确定消费者已就绪,可以开始订阅和消费消息。订阅后,消费者会自动跟踪所订阅主题的分区信息和元数据。当你调用 consumer.consume()
时,拉取到的消息都来自于这些已订阅的主题。
consumer.on("ready", ()=>{
consumer.subscribe(["topic-name1", "topic-name2"]);
});
3.5 consumer.disconnect()
consumer.disconnect()
断开与代理的连接。disconnect()
方法在断开连接时发出 disconnected
。否则,错误将通过回调传递。
3.6 consumer.unsubscribe()
consumer.unsubscribe()
取消订阅当前订阅的主题。如果不先调用 unsubscribe()
方法,则无法订阅不同的主题。
3.7 consumer.commitMessage()
consumer.commitMessage()
这是一个便捷方法,专门用于基于消费到的单个消息来提交 offset
。内部会自动从传入的消息对象中提取 topic
、partition
和 offset
信息,并计算出下一个要消费的 offset
(通常是 message.offset + 1
)。使用场景: 当你以 逐条 消费消息并希望在处理完单个消息后立即提交其消费进度时,调用 commitMessage(message)
可以减少手动构造 topic-partition
对象的麻烦。总结: consumer.commitMessage()
便捷地基于单条消息进行 offset
提交,自动处理 offset
增量,适用于逐条处理的场景。
consumer.on("data", (message)=>{
consumer.commitMessage(message);
});
四、事件
4.1 data
在 node‑rdkafka
中,Consumer
的 data
事件是用于实时接收消费到的消息的关键事件。当你调用 consumer.consume()
(不传入回调函数)时,消费者将进入流模式,此时拉取到的消息会通过触发 data
事件的方式传递给你。适用模式: 当你希望利用事件驱动方式连续处理消息时,推荐使用 data
事件。这种方式类似于 Node.js
的流(Stream
)模式,非常适合实时处理数据。消息对象结构 通常包含以下属性:
-
topic
: 消息所属的主题名称。 -
partition
: 消息所属的分区编号。 -
offset
: 消息在分区中的偏移量。 -
key
: 消息的键(如果有设置)。 -
value
: 消息的实际内容,通常为Buffer
类型,可以通过.toString()
转为字符串显示。
consumer.on("ready", ()=>{
consumer.subscribe(["topic-name1", "topic-name2"]);
consumer.consume(10);
});
consumer.on("data", (message)=>{
console.log(`接收到消息:${message.value.toString()}`);
console.log(`主题:${message.topic},分区:${message.partition},偏移量:${message.offset}`);
});
4.2 ready
在 node‑rdkafka
中,Consumer
的 ready
事件表示消费者已成功完成初始化,并与 Kafka Broker
建立连接,具备接收和处理消息的能力。当该事件触发后,就可以安全地进行主题订阅和消息消费等后续操作。触发时机, 当消费者连接成功、完成内部设置和分区元数据加载后,会触发 ready
事件,表示消费者已准备好工作。订阅与消费, 在 ready
事件回调中,你通常会调用 consumer.subscribe(topics)
订阅所需的主题,然后调用 consumer.consume()
开始消费消息。该事件为后续消费流程的启动信号,确保你不会在消费者尚未准备好时就调用消费相关的 API
。
consumer.on("ready", ()=>{
consumer.subscribe(["topic-name-1", "topic-name-2"]);
consumer.consume();
});
4.3 event
4.4 warning
warning
在 Flowing
模式下使用时,如果出现 UNKNOWN_TOPIC_OR_PART
或 TOPIC_AUTHORIZATION_FAILED
错误,则会发出该事件。由于如果错误仍然存在,使用者将继续工作,因此警告事件应在下次元数据刷新后重新出现。要控制元数据刷新率,请设置 topic.metadata.refresh.interval.ms property
。解决错误后,您可以手动调用 getMetadata
以加快使用者恢复速度。
4.5 rebalance
在 node‑rdkafka
中,通常通过在创建 KafkaConsumer
实例时配置 rebalance_cb
回调函数来处理 rebalancing
事件。该回调函数在发生 rebalancing
时被调用,参数中包含了错误信息和分区分配的数据。
const Kafka = require('node-rdkafka');
const consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'rebalance_cb': (err, assignment) => {
if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
// 当分配新分区时,绑定新分区
try {
consumer.assign(assignment);
console.log('Assigned partitions:', assignment);
} catch (ex) {
console.error('Error during assign:', ex);
}
} else if (err.code === Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
// 当撤销分区时,撤销绑定
try {
consumer.unassign();
console.log('Unassigned partitions');
} catch (ex) {
console.error('Error during unassign:', ex);
}
} else {
// 出现其他错误时,记录错误信息
console.error('Rebalance error:', err);
}
}
});
consumer.on('ready', () => {
console.log('Consumer is ready. Subscribing to topic...');
consumer.subscribe(['test-topic']);
consumer.consume();
});
consumer.on('data', (message) => {
console.log(`Received message: ${message.value.toString()}`);
});
consumer.on('event.error', (err) => {
console.error('Error from consumer:', err);
});
consumer.connect();
4.6 event.log
event.log
事件在发生日志记录事件时发出(如果您通过 event_cb
选项选择加入日志记录)。如果要发送信息,则需要为 debug
设置一个值。
4.7 event.stats
event.stats
事件在 librdkafka
报告统计信息时发出(如果您通过将 statistics.interval.ms
设置为非零值来选择加入)。
4.8 event.error
event.error
当 librdkafka
报告错误时,会发出 event.error
事件
4.9 disconnected
disconnected
当代理断开连接时,将发出 disconnected
事件。仅当调用 .disconnect
时,才会发出此事件。否则,包装器将始终尝试重新连接。
4.10 partition.eof
partition.eof
使用标准 API
并设置了配置选项 enable.partition.eof
时,将在此事件中发出 partition.eof
事件。该事件包含 topic
、partition
和 offset
属性。
4.11 event.throttle
event.throttle
当 librdkafka
报告限制时,将发出 event.throttle
事件。