跳到主要内容

Producer

2025年02月23日
柏拉文
越努力,越幸运

一、认识


Kafka ProducerKafka 的核心组件之一,负责将数据(消息)发送到 Kafka 集群中的指定 Topic。它在数据流管道中起到消息生产者的作用,是整个消息传递系统的起点。消息可以是任意数据(文本、JSON、二进制数据等),并通过 Producer API 发送给指定的 Topic。通过优化的批量处理、分区路由、压缩与重试机制,能够高效、可靠地将数据发送到分布式集群中,为后续实时数据处理和消息传递提供坚实保障。

Kafka Producer 工作流程:

  1. 构建消息: 应用程序创建消息(通常为 Key-Value 形式),并指定目标 Topic 及(可选)消息 Key

  2. 消息缓冲: 消息进入 Producer 内部缓冲队列,根据配置(如 queue.buffering.max.msqueue.buffering.max.messages)进行缓存与批量聚合。Kafka Producer 异步发送 为了实现高吞吐量,Kafka Producer 通常采用异步发送方式,将多条消息批量发送。通过设置批量大小(batch size)和等待时间(linger.ms 或队列缓冲等待时间),Producer 能够将多条消息合并成一个批次,从而减少网络 IO 次数,提高传输效率。

  3. 分区选择: Producer 根据配置的分区策略,将消息分配到相应的 Partition。若指定了 Key,则 Kafka 会基于 Key 的哈希值进行分区分配。Kafka Producer 分区路由 KafkaTopic 通常由多个分区(Partition)构成。Producer 根据指定的 Key 或自定义的分区策略,将消息分发到不同的分区上。这样可以实现数据的并行写入和后续的并行消费,同时保证同一 Key 的消息在同一分区内有序。

  4. 消息压缩: 消息在达到一定批量大小或等待时间后,会被压缩(如果配置了压缩算法),然后批量发送到 Kafka BrokerKafka Producer 消息压缩: Producer 可以对消息进行压缩(如 gzipsnappylz4),减少网络传输的数据量,降低带宽压力,提升系统整体性能。

  5. Broker 处理: Broker 接收到消息后,会将消息存储到对应 Topic 的分区日志中,并根据副本策略进行同步备份。

  6. 交付报告: 一旦 Broker 确认消息存储完成,Producer 会收到交付报告,通知应用程序消息发送成功或失败。Kafka Producer 重试容错: 在消息发送失败的情况下,Producer 可以自动重试(通过参数配置重试次数与间隔时间)。这样可以应对暂时的网络问题或 Broker 不可用的情况,确保消息最终成功送达。Kafka Producer 回调机制: Producer 支持交付报告(Delivery Report)回调机制。通过该机制,Producer 能够确认每条消息是否被成功发送,以及在发送失败时获取错误信息,从而进行必要的处理或日志记录。

二、语法


const producer = new Kafka.Producer({
'client.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'compression.codec': 'gzip',
'retry.backoff.ms': 200,
'message.send.max.retries': 10,
'socket.keepalive.enable': true,
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms': 1000,
'batch.num.messages': 1000000,
'dr_cb': true
});
  • client.id: 设置当前 Producer 客户端的标识。用于日志、监控和追踪请求来源,帮助区分不同的 Producer 实例。

  • metadata.broker.list: 指定 Kafka 集群中 Broker 的地址列表。Producer 启动时会从这些 Broker 获取集群元数据(例如 Topic、分区信息)。

  • compression.codec: 配置 Producer 发送消息时使用的压缩算法。压缩可以减少网络传输的数据量,提高吞吐量。设置为 gzip,表示所有消息将使用 GZIP 算法进行压缩。

  • retry.backoff.ms: 在发送消息失败后,Producer 重试之前等待的时间(单位:毫秒)。防止重试过于频繁,给 Broker 一定的恢复时间。配置发送失败时的重试机制,提升消息可靠性。

  • message.send.max.retries: 消息发送失败后允许重试的最大次数。在网络抖动或临时性故障时,确保 Producer 有更多机会将消息成功发送。配置发送失败时的重试机制,提升消息可靠性。

  • socket.keepalive.enable: 启用 TCP keepalive 机制。保持与 Broker 的连接活跃,及时检测网络故障或连接断开。

  • queue.buffering.max.messages: Producer 内部队列允许缓存的最大消息数。限制 Producer 内部缓冲区的消息数量,避免因消息积压导致内存占用过高。设置为 100000 条消息。控制 Producer 内部消息缓冲区大小和消息等待发送的最长时间,支持批量发送。

  • queue.buffering.max.ms: 消息在 Producer 内部队列中等待发送的最长时间(单位:毫秒)。用于控制消息批次发送的延迟,达到批量处理的效果。设置为 1000 毫秒,即最多等待 1 秒后发送。控制 Producer 内部消息缓冲区大小和消息等待发送的最长时间,支持批量发送。

  • batch.num.messages: 每个批次发送的最大消息数。控制每个批次中聚集的消息数量,影响批量发送的大小,从而优化吞吐量。设置为 1000000 条消息,表示批量发送时可以包含最多 100 万条消息。设置每个批次中允许的最大消息数,进一步优化批量发送性能。

  • dr_cb: 开启 Delivery Report 回调。在消息被成功发送或失败后,Producer 会触发回调函数(delivery report),便于开发者监控消息状态。启用交付报告回调,确保 Producer 能够获得消息发送结果并进行错误处理。

三、方法


3.1 producer.poll()

producer.poll()Kafka Producer(通过 node‑rdkafka 实现)的一个方法,用于轮询并处理内部事件,例如处理消息发送的回调(delivery report)、错误事件和其他内部通知。具体来说: 事件处理: 当你启用了 dr_cbdelivery report 回调)后,Producer 内部会积累一些待处理的事件,例如消息是否成功发送或者发送失败的通知。调用 producer.poll() 可以将这些事件传递给相应的回调函数进行处理; 轮询机制: 由于 node‑rdkafka 是基于 librdkafka 的事件驱动模型,因此必须定期调用 producer.poll() 来驱动事件循环,确保消息发送的状态能够及时反馈到应用层。如果不调用 poll(),可能会导致回调函数无法被触发,影响对消息状态的监控; 通常可以传入一个超时时间(单位毫秒),例如 producer.poll(0) 表示立即返回。多数情况下,可以将 poll() 集成到应用的主事件循环中,或者通过定时器定期调用它来保持事件处理的活跃性。

setInterval(() => {
producer.poll();
}, 100);

3.2 producer.flush()

producer.flush() 刷新 librdkafka 内部队列,发送所有消息。默认超时为 500 毫秒

3.3 producer.connect()

producer.connect() 方法在连接成功时发出 ready 事件。否则,错误将通过回调传递。

3.4 producer.produce()

producer.produce() 用于将一条消息发送到 Kafka 指定的 Topic 中。

producer.produce(topic, partition, msg, key, timestamp, opaque)
  • topic: 指定消息发送的目标 Topic。消息会被发送到该 Topic 下。

  • partition: 指定消息要发送到的分区编号。如果设置为特定数字,则消息直接发送到该分区;如果设置为 null-1(取决于配置),则 Kafka 会根据默认的分区器(通常基于消息 key 或随机分配)来选择分区。

  • msg: 实际的消息内容。通常是一个 Buffer(或可以转换为 Buffer 的数据,如字符串),表示需要发送的数据负载。

  • key: 消息的 Key。用于决定消息的分区(当未指定 partition 时)和保持分区内消息顺序;相同的 key 会被分配到同一个分区,便于有序处理。

  • timestamp: 消息的时间戳。通常为 Unix 时间戳(毫秒级);如果不提供,则 Kafka 可能会自动使用当前时间。

  • opaque: 透传数据,用于在 delivery report 回调中标识或传递额外的上下文信息。这个参数可以是任意数据结构,便于在消息发送成功或失败时进行追踪、日志记录或关联特定的业务逻辑。

3.5 producer.disconnect()

producer.disconnect() 断开与代理的连接。disconnect() 方法在断开连接时发出 disconnected 事件。否则,错误将通过回调传递。

3.6 producer.setPollInterval()

producer.setPollInterval() 用于设置 Kafka Producer 内部自动轮询的时间间隔,单位为毫秒。在这个例子中,每隔 100 毫秒,Producer 会自动调用内部的 poll() 方法来处理事件。比如: 交付报告(delivery report, 处理消息发送的确认和错误反馈; 内部事件, 如心跳、错误、统计数据更新等。这种自动轮询机制可以免去手动使用定时器调用 producer.poll(),从而确保 Producer 的事件能够及时处理。需要注意的是,使用自动轮询可能在某些场景下不如手动调用轮询那样灵活,但对于大多数应用来说,它可以简化代码并减少出错概率。

const producer = new Kafka.Producer({
'client.id': 'my-client',
'metadata.broker.list': 'localhost:9092',
'dr_cb': true
});

producer.setPollInterval(100);

producer.on('delivery-report', (err, report) => {
console.log(report);
});

3.7 producer.beginTransaction()

producer.beginTransaction()

3.8 producer.initTransactions()

producer.initTransactions() 初始化事务性生产者。

3.9 producer.abortTransaction()

producer.abortTransaction() 中止正在进行的事务。

3.10 producer.commitTransaction()

producer.commitTransaction() 提交正在进行的事务。

3.11 producer.sendOffsetsToTransaction()

producer.sendOffsetsToTransaction() 将消耗的 topic-partition-offsets 发送到代理,该 broker 将与事务一起提交。

四、事件


4.1 ready

readyProducer 准备好发送消息时,将发出 ready 事件。

4.2 event

eventlibrdkafka 报告事件时,将发出 event 事件(如果您通过 event_cb 选项选择加入)。

4.3 event.log

event.log 当日志记录事件传入时,将发出 event.log 事件(如果您通过 event_cb 选项选择加入日志记录)。

4.4 event.stats

event.stats 事件在 librdkafka 报告统计信息时发出(如果您通过将 statistics.interval.ms 设置为非零值来选择加入)。

4.5 event.error

event.errorlibrdkafka 报告错误时,会发出 event.error 事件

4.6 disconnected

disconnected 当代理断开连接时,将发出 disconnected 事件。

4.7 event.throttle

event.throttlelibrdkafka 报告限制时发出 event.throttle 事件。

4.8 delivery-report

delivery-report 当你启用了 dr_cb(交付报告回调)后, delivery-report 来反馈每条消息的投递状态, 实时监控消息的发送结果, 处理发送失败的情况(如重试或记录日志), 获取消息在 Kafka 中的存储位置信息(分区和偏移量)。这对于监控消息是否被正确投递非常关键。在 node‑rdkafka 中,delivery-report 事件的回调依赖于内部的事件循环,这个事件循环需要通过调用 producer.poll() 来触发处理。使用 setInterval100 毫秒调用一次 poll(),确保内部事件(例如 delivery-report)能被及时处理,这是标准做法。

setInterval(() => {
producer.poll();
}, 100);

producer.on('delivery-report', (err, report) => {
if (err) {
console.error('Message delivery failed:', err);
} else {
console.log('Message delivered:', report);
// report 中通常包含 topic, partition, offset 等信息
}
});