跳到主要内容

Producer

2025年02月23日
柏拉文
越努力,越幸运

一、创建 Producer


const Kafka = require("node-rdkafka");

const producer = new Kafka.Producer({
'client.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'compression.codec': 'gzip',
'retry.backoff.ms': 200,
'message.send.max.retries': 10,
'socket.keepalive.enable': true,
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms': 1000,
'batch.num.messages': 1000000,
'dr_cb': true
});
  • acks: 要求消息在所有同步副本(in-sync replicas)确认后才返回成功,等同于设置为 all。确保消息在写入 Kafka 时具有更高的持久性,降低数据丢失的风险。

  • client.id: 设置当前 Producer 客户端的标识。用于日志、监控和追踪请求来源,帮助区分不同的 Producer 实例。便于在 Kafka 监控和日志中识别不同的 Producer。在问题排查和监控上提供便利。

  • metadata.broker.list: 指定 Kafka 集群中 Broker 的地址列表。Producer 启动时会从这些 Broker 获取集群元数据(例如 Topic、分区信息)。

  • compression.codec: 配置 Producer 发送消息时使用的压缩算法。压缩可以减少网络传输的数据量,提高吞吐量。设置为 gzip,表示所有消息将使用 GZIP 算法进行压缩。降低网络带宽消耗、提高吞吐量,对于高并发场景下有积极效果。

  • retry.backoff.ms: 在发送消息失败后,Producer 重试之前等待的时间(单位:毫秒)。防止重试过于频繁,给 Broker 一定的恢复时间。配置发送失败时的重试机制,提升消息可靠性。两次重试之间的退避时间为 1000 毫秒。为自动重试提供合理间隔,有助于缓解短时网络抖动和 Broker 问题。

  • message.send.max.retries: 消息发送失败后允许重试的最大次数。在网络抖动或临时性故障时,确保 Producer 有更多机会将消息成功发送。配置发送失败时的重试机制,提升消息可靠性。

  • socket.keepalive.enable: 启用 TCP 保活机制,保证连接在长时间空闲后仍然活跃。保持与 Broker 的连接活跃,及时检测网络故障或连接断开。有助于稳定长连接,减少因网络断开导致的消息发送中断。

  • queue.buffering.max.messages: Producer 内部队列允许缓存的最大消息数。限制 Producer 内部缓冲区的消息数量,避免因消息积压导致内存占用过高。设置为 100000 条消息。控制 Producer 内部消息缓冲区大小和消息等待发送的最长时间,支持批量发送。确保高并发情况下 Producer 不会因缓冲队列溢出而丢失消息。

  • queue.buffering.max.ms: 消息在 Producer 内部队列中等待发送的最长时间(单位:毫秒)。用于控制消息批次发送的延迟,达到批量处理的效果。设置为 1000 毫秒,即最多等待 1 秒后发送。控制 Producer 内部消息缓冲区大小和消息等待发送的最长时间,支持批量发送。

  • batch.num.messages: 每个批次发送的最大消息数。控制每个批次中聚集的消息数量,影响批量发送的大小,从而优化吞吐量。设置为 1000000 条消息,表示批量发送时可以包含最多 100 万条消息。设置每个批次中允许的最大消息数,进一步优化批量发送性能。平衡吞吐量与延迟,同时支持批量发送来提高网络利用率。

  • dr_cb: 开启 Delivery Report 回调。在消息被成功发送或失败后,Producer 会触发回调函数(delivery report),便于开发者监控消息状态。启用交付报告回调,确保 Producer 能够获得消息发送结果并进行错误处理。有助于实时监控消息发送状态,从而可以在回调中进行重试或将失败消息转入死信队列(DLQ)。

  • enable.idempotence: 启用幂等性保证,即使发生重试,也不会重复写入相同的消息,确保“恰好一次”语义。减少重复消息,简化外部重试逻辑,并提高系统的整体可靠性。

  • metadata.broker.list: 指定 Kafka Broker 列表,本例中使用 localhost 上的 29092 端口。确保 Producer 能够正确连接到 Kafka 集群。

  • message.timeout.ms: 消息在 Producer 队列中最长等待时间为 10 秒,超过这个时间消息会被丢弃或触发错误。确保失败消息能够及时反馈,使外部重试和 DLQ 机制可以介入,防止长时间阻塞。

二、Producer 连接


producer.connect() 方法在连接成功时发出 ready 事件。否则,错误将通过回调传递。

const Kafka = require("node-rdkafka");

// 创建生产者实例
const producer = new Kafka.Producer({
"client.id": "kafka",
"metadata.broker.list": "localhost:29092",
"compression.codec": "gzip",
"retry.backoff.ms": 200,
"message.send.max.retries": 10,
"socket.keepalive.enable": true,
"queue.buffering.max.messages": 100000,
"queue.buffering.max.ms": 1000,
"batch.num.messages": 1000000,
dr_cb: true,
});

producer.connect();
producer.setPollInterval(100);

producer.on("ready", () => {
console.log("Producer is ready");
setInterval(() => {
const message = `Hello Kafka! ${new Date().toISOString()}`;
producer.produce(
"kafka-topic-test", // 主题
null, // 分区(null 表示自动分区)
Buffer.from(message), // 消息内容
null, // Key
Date.now() // 时间戳
);
console.log(`Message sent: ${message}`);
}, 4000);
});

producer.on("event.error", (err) => {
console.error("Producer error:", err);
});

producer.on("delivery-report", (err, report) => {
if (err) {
console.error("Message delivery failed:", err);
} else {
console.log("Message delivered:", report);
}
});

// 监听退出信号,确保应用退出前 flush 和 disconnect
function shutdown() {
console.log("Shutdown initiated. Flushing pending messages...");
// flush() 会等待指定毫秒数内的消息发送完成
producer.flush(10000, () => {
console.log("Flush completed. Disconnecting producer...");
producer.disconnect(() => {
console.log("Producer disconnected. Exiting process.");
process.exit(0);
});
});
}

// 捕获 CTRL+C 和其它退出信号
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);