Producer
一、创建 Producer
const Kafka = require("node-rdkafka");
const producer = new Kafka.Producer({
'client.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'compression.codec': 'gzip',
'retry.backoff.ms': 200,
'message.send.max.retries': 10,
'socket.keepalive.enable': true,
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms': 1000,
'batch.num.messages': 1000000,
'dr_cb': true
});
-
acks
: 要求消息在所有同步副本(in-sync replicas
)确认后才返回成功,等同于设置为all
。确保消息在写入Kafka
时具有更高的持久性,降低数据丢失的风险。 -
client.id
: 设置当前Producer
客户端的标识。用于日志、监控和追踪请求来源,帮助区分不同的Producer
实例。便于在Kafka
监控和日志中识别不同的Producer
。在问题排查和监控上提供便利。 -
metadata.broker.list
: 指定Kafka
集群中Broker
的地址列表。Producer
启动时会从这些Broker
获取集群元数据(例如Topic
、分区信息)。 -
compression.codec
: 配置Producer
发送消息时使用的压缩算法。压缩可以减少网络传输的数据量,提高吞吐量。设置为gzip
,表示所有消息将使用GZIP
算法进行压缩。降低网络带宽消耗、提高吞吐量,对于高并发场景下有积极效果。 -
retry.backoff.ms
: 在发送消息失败后,Producer
重试之前等待的时间(单位:毫秒)。防止重试过于频繁,给Broker
一定的恢复时间。配置发送失败时的重试机制,提升消息可靠性。两次重试之间的退避时间为1000
毫秒。为自动重试提供合理间隔,有助于缓解短时网络抖动和Broker
问题。 -
message.send.max.retries
: 消息发送失败后允许重试的最大次数。在网络抖动或临时性故障时,确保Producer
有更多机会将消息成功发送。配置发送失败时的重试机制,提升消息可靠性。 -
socket.keepalive.enable
: 启用 TCP 保活机制,保证连接在长时间空闲后仍然活跃。保持与Broker
的连接活跃,及时检测网络故障或连接断开。有助于稳定长连接,减少因网络断开导致的消息发送中断。 -
queue.buffering.max.messages
:Producer
内部队列允许缓存的最大消息数。限制Producer
内部缓冲区的消息数量,避免因消息积压导致内存占用过高。设置为100000
条消息。控制Producer
内部消息缓冲区大小和消息等待发送的最长时间,支持批量发送。确保高并发情况下Producer
不会因缓冲队列溢出而丢失消息。 -
queue.buffering.max.ms
: 消息在Producer
内部队列中等待发送的最长时间(单位:毫秒)。用于控制消息批次发送的延迟,达到批量处理的效果。设置为1000
毫秒,即最多等待1
秒后发送。控制Producer
内部消息缓冲区大小和消息等待发送的最长时间,支持批量发送。 -
batch.num.messages
: 每个批次发送的最大消息数。控制每个批次中聚集的消息数量,影响批量发送的大小,从而优化吞吐量。设置为1000000
条消息,表示批量发送时可以包含最多100
万条消息。设置每个批次中允许的最大消息数,进一步优化批量发送性能。平衡吞吐量与延迟,同时支持批量发送来提高网络利用率。 -
dr_cb
: 开启Delivery Report
回调。在消息被成功发送或失败后,Producer
会触发回调函数(delivery report
),便于开发者监控消息状态。启用交付报告回调,确保Producer
能够获得消息发送结果并进行错误处理。有助于实时监控消息发送状态,从而可以在回调中进行重试或将失败消息转入死信队列(DLQ
)。 -
enable.idempotence
: 启用幂等性保证,即使发生重试,也不会重复写入相同的消息,确保“恰好一次”语义。减少重复消息,简化外部重试逻辑,并提高系统的整体可靠性。 -
metadata.broker.list
: 指定Kafka Broker
列表,本例中使用localhost
上的29092
端口。确保Producer
能够正确连接到Kafka
集群。 -
message.timeout.ms
: 消息在Producer
队列中最长等待时间为10
秒,超过这个时间消息会被丢弃或触发错误。确保失败消息能够及时反馈,使外部重试和DLQ
机制可以介入,防止长时间阻塞。
二、Producer 连接
producer.connect()
方法在连接成功时发出 ready
事件。否则,错误将通过回调传递。
const Kafka = require("node-rdkafka");
// 创建生产者实例
const producer = new Kafka.Producer({
"client.id": "kafka",
"metadata.broker.list": "localhost:29092",
"compression.codec": "gzip",
"retry.backoff.ms": 200,
"message.send.max.retries": 10,
"socket.keepalive.enable": true,
"queue.buffering.max.messages": 100000,
"queue.buffering.max.ms": 1000,
"batch.num.messages": 1000000,
dr_cb: true,
});
producer.connect();
producer.setPollInterval(100);
producer.on("ready", () => {
console.log("Producer is ready");
setInterval(() => {
const message = `Hello Kafka! ${new Date().toISOString()}`;
producer.produce(
"kafka-topic-test", // 主题
null, // 分区(null 表示自动分区)
Buffer.from(message), // 消息内容
null, // Key
Date.now() // 时间戳
);
console.log(`Message sent: ${message}`);
}, 4000);
});
producer.on("event.error", (err) => {
console.error("Producer error:", err);
});
producer.on("delivery-report", (err, report) => {
if (err) {
console.error("Message delivery failed:", err);
} else {
console.log("Message delivered:", report);
}
});
// 监听退出信号,确保应用退出前 flush 和 disconnect
function shutdown() {
console.log("Shutdown initiated. Flushing pending messages...");
// flush() 会等待指定毫秒数内的消息发送完成
producer.flush(10000, () => {
console.log("Flush completed. Disconnecting producer...");
producer.disconnect(() => {
console.log("Producer disconnected. Exiting process.");
process.exit(0);
});
});
}
// 捕获 CTRL+C 和其它退出信号
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);