Producer
一、认识
Kafka Producer
是 Kafka
的核心组件之一,负责将数据(消息)发送到 Kafka
集群中的指定 Topic
。它在数据流管道中起到消息生产者的作用,是整个消息传递系统的起点。消息可以是任意数据(文本、JSON
、二进制数据等),并通过 Producer API
发送给指定的 Topic
。通过优化的批量处理、分区路由、压缩与重试机制,能够高效、可靠地将数据发送到分布式集群中,为后续实时数据处理和消息传递提供坚实保障。
Kafka Producer
工作流程:
-
构建消息: 应用程序创建消息(通常为
Key-Value
形式),并指定目标Topic
及(可选)消息Key
。 -
消息缓冲: 消息进入
Producer
内部缓冲队列,根据配置(如queue.buffering.max.ms
和queue.buffering.max.messages
)进行缓存与批量聚合。Kafka Producer
异步发送 为了实现高吞吐量,Kafka Producer
通常采用异步发送方式,将多条消息批量发送。通过设置批量大小(batch size
)和等待时间(linger.ms
或队列缓冲等待时间),Producer
能够将多条消息合并成一个批次,从而减少网络IO
次数,提高传输效率。 -
分区选择:
Producer
根据配置的分区策略,将消息分配到相应的Partition
。若指定了Key
,则Kafka
会基于Key
的哈希值进行分区分配。Kafka Producer
分区路由Kafka
的Topic
通常由多个分区(Partition
)构成。Producer
根据指定的Key
或自定义的分区策略,将消息分发到不同的分区上。这样可以实现数据的并行写入和后续的并行消费,同时保证同一Key
的消息在同一分区内有序。 -
消息压缩: 消息在达到一定批量大小或等待时间后,会被压缩(如果配置了压缩算法),然后批量发送到
Kafka Broker
。Kafka Producer
消息压缩:Producer
可以对消息进行压缩(如gzip
、snappy
、lz4
),减少网络传输的数据量,降低带宽压力,提升系统整体性能。 -
Broker
处理:Broker
接收到消息后,会将消息存储到对应Topic
的分区日志中,并根据副本策略进行同步备份。 -
交付报告: 一旦
Broker
确认消息存储完成,Producer
会收到交付报告,通知应用程序消息发送成功或失败。Kafka Producer
重试容错: 在消息发送失败的情况下,Producer
可以自动重试(通过参数配置重试次数与间隔时间)。这样可以应对暂时的网络问题或Broker
不可用的情况,确保消息最终成功送达。Kafka Producer
回调机制:Producer
支持交付报告(Delivery Report
)回调机制。通过该机制,Producer
能够确认每条消息是否被成功发送,以及在发送失败时获取错误信息,从而进行必要的处理或日志记录。
二、语法
const producer = new Kafka.Producer({
'client.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'compression.codec': 'gzip',
'retry.backoff.ms': 200,
'message.send.max.retries': 10,
'socket.keepalive.enable': true,
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms': 1000,
'batch.num.messages': 1000000,
'dr_cb': true
});
-
client.id
: 设置当前Producer
客户端的标识。用于日志、监控和追踪请求来源,帮助区分不同的Producer
实例。 -
metadata.broker.list
: 指定Kafka
集群中Broker
的地址列表。Producer
启动时会从这些Broker
获取集群元数据(例如Topic
、分区信息)。 -
compression.codec
: 配置Producer
发送消息时使用的压缩算法。压缩可以减少网络传输的数据量,提高吞吐量。设置为gzip
,表示所有消息将使用GZIP
算法进行压缩。 -
retry.backoff.ms
: 在发送消息失败后,Producer
重试之前等待的时间(单位:毫秒)。防止重试过于频繁,给Broker
一定的恢复时间。配置发送失败时的重试机制,提升消息可靠性。 -
message.send.max.retries
: 消息发送失败后允许重试的最大次数。在网络抖动或临时性故障时,确保Producer
有更多机会将消息成功发送。配置发送失败时的重试机制,提升消息可靠性。 -
socket.keepalive.enable
: 启用TCP keepalive
机制。保持与Broker
的连接活跃,及时检测网络故障或连接断开。 -
queue.buffering.max.messages
:Producer
内部队列允许缓存的最大消息数。限制Producer
内部缓冲区的消息数量,避免因消息积压导致内存占用过高。设置为100000
条消息。控制Producer
内部消息缓冲区大小和消息等待发送的最长时间,支持批量发送。 -
queue.buffering.max.ms
: 消息在Producer
内部队列中等待发送的最长时间(单位:毫秒)。用于控制消息批次发送的延迟,达到批量处理的效果。设置为1000
毫秒,即最多等待1
秒后发送。控制Producer
内部消息缓冲区大小和消息等待发送的最长时间,支持批量发送。 -
batch.num.messages
: 每个批次发送的最大消息数。控制每个批次中聚集的消息数量,影响批量发送的大小,从而优化吞吐量。设置为1000000
条消息,表示批量发送时可以包含最多100
万条消息。设置每个批次中允许的最大消息数,进一步优化批量发送性能。 -
dr_cb
: 开启Delivery Report
回调。在消息被成功发送或失败后,Producer
会触发回调函数(delivery report
),便于开发者监控消息状态。启用交付报告回调,确保Producer
能够获得消息发送结果并进行错误处理。
三、方法
3.1 producer.poll()
producer.poll()
是 Kafka Producer
(通过 node‑rdkafka
实现)的一个方法,用于轮询并处理内部事件,例如处理消息发送的回调(delivery report
)、错误事件和其他内部通知。具体来说: 事件处理: 当你启用了 dr_cb
(delivery report
回调)后,Producer
内部会积累一些待处理的事件,例如消息是否成功发送或者发送失败的通知。调用 producer.poll()
可以将这些事件传递给相应的回调函数进行处理; 轮询机制: 由于 node‑rdkafka
是基于 librdkafka
的事件驱动模型,因此必须定期调用 producer.poll()
来驱动事件循环,确保消息发送的状态能够及时反馈到应用层。如果不调用 poll()
,可能会导致回调函数无法被触发,影响对消息状态的监控; 通常可以传入一个超时时间(单位毫秒),例如 producer.poll(0)
表示立即返回。多数情况下,可以将 poll()
集成到应用的主事件循环中,或者通过定时器定期调用它来保持事件处理的活跃性。
setInterval(() => {
producer.poll();
}, 100);
3.2 producer.flush()
producer.flush()
刷新 librdkafka
内部队列,发送所有消息。默认超时为 500
毫秒
3.3 producer.connect()
producer.connect()
方法在连接成功时发出 ready
事件。否则,错误将通过回调传递。
3.4 producer.produce()
producer.produce()
用于将一条消息发送到 Kafka
指定的 Topic
中。
producer.produce(topic, partition, msg, key, timestamp, opaque)
-
topic
: 指定消息发送的目标Topic
。消息会被发送到该Topic
下。 -
partition
: 指定消息要发送到的分区编号。如果设置为特定数字,则消息直接发送到该分区;如果设置为null
或-1
(取决于配置),则Kafka
会根据默认的分区器(通常基于消息key
或随机分配)来选择分区。 -
msg
: 实际的消息内容。通常是一个Buffer
(或可以转换为Buffer
的数据,如字符串),表示需要发送的数据负载。 -
key
: 消息的Key
。用于决定消息的分区(当未指定partition
时)和保持分区内消息顺序;相同的key
会被分配到同一个分区,便于有序处理。 -
timestamp
: 消息的时间戳。通常为Unix
时间戳(毫秒级);如果不提供,则Kafka
可能会自动使用当前时间。 -
opaque
: 透传数据,用于在delivery report
回调中标识或传递额外的上下文信息。这个参数可以是任意数据结构,便于在消息发送成功或失败时进行追踪、日志记录或关联特定的业务逻辑。
3.5 producer.disconnect()
producer.disconnect()
断开与代理的连接。disconnect()
方法在断开连接时发出 disconnected
事件。否则,错误将通过回调传递。
3.6 producer.setPollInterval()
producer.setPollInterval()
用于设置 Kafka Producer
内部自动轮询的时间间隔,单位为毫秒。在这个例子中,每隔 100
毫秒,Producer
会自动调用内部的 poll()
方法来处理事件。比如: 交付报告(delivery report
), 处理消息发送的确认和错误反馈; 内部事件, 如心跳、错误、统计数据更新等。这种自动轮询机制可以免去手动使用定时器调用 producer.poll()
,从而确保 Producer
的事件能够及时处理。需要注意的是,使用自动轮询可能在某些场景下不如手动调用轮询那样灵活,但对于大多数应用来说,它可以简化代码并减少出错概率。
const producer = new Kafka.Producer({
'client.id': 'my-client',
'metadata.broker.list': 'localhost:9092',
'dr_cb': true
});
producer.setPollInterval(100);
producer.on('delivery-report', (err, report) => {
console.log(report);
});
3.7 producer.beginTransaction()
producer.beginTransaction()
3.8 producer.initTransactions()
producer.initTransactions()
初始化事务性生产者。
3.9 producer.abortTransaction()
producer.abortTransaction()
中止正在进行的事务。
3.10 producer.commitTransaction()
producer.commitTransaction()
提交正在进行的事务。
3.11 producer.sendOffsetsToTransaction()
producer.sendOffsetsToTransaction()
将消耗的 topic-partition-offsets
发送到代理,该 broker
将与事务一起提交。
四、事件
4.1 ready
ready
当 Producer
准备好发送消息时,将发出 ready
事件。
4.2 event
event
当 librdkafka
报告事件时,将发出 event
事件(如果您通过 event_cb
选项选择加入)。
4.3 event.log
event.log
当日志记录事件传入时,将发出 event.log
事件(如果您通过 event_cb
选项选择加入日志记录)。
4.4 event.stats
event.stats
事件在 librdkafka
报告统计信息时发出(如果您通过将 statistics.interval.ms
设置为非零值来选择加入)。
4.5 event.error
event.error
当 librdkafka
报告错误时,会发出 event.error
事件
4.6 disconnected
disconnected
当代理断开连接时,将发出 disconnected
事件。
4.7 event.throttle
event.throttle
当 librdkafka
报告限制时发出 event.throttle
事件。
4.8 delivery-report
delivery-report
当你启用了 dr_cb
(交付报告回调)后, delivery-report
来反馈每条消息的投递状态, 实时监控消息的发送结果, 处理发送失败的情况(如重试或记录日志), 获取消息在 Kafka
中的存储位置信息(分区和偏移量)。这对于监控消息是否被正确投递非常关键。在 node‑rdkafka
中,delivery-report
事件的回调依赖于内部的事件循环,这个事件循环需要通过调用 producer.poll()
来触发处理。使用 setInterval
每 100
毫秒调用一次 poll()
,确保内部事件(例如 delivery-report
)能被及时处理,这是标准做法。
setInterval(() => {
producer.poll();
}, 100);
producer.on('delivery-report', (err, report) => {
if (err) {
console.error('Message delivery failed:', err);
} else {
console.log('Message delivered:', report);
// report 中通常包含 topic, partition, offset 等信息
}
});