Ubuntu 上定位与解决 Kafka 消息丢失的实用方案
一 快速排查清单
- 检查生产者是否使用了回调并正确处理异常,避免“发后即焚”;确认关键配置:acks、retries、retry.backoff.ms、max.in.flight.requests.per.connection。
- 检查 Broker:主题副本数是否≥3,是否开启 min.insync.replicas≥2,以及 unclean.leader.election.enable=false。
- 检查消费者:是否关闭自动提交(enable.auto.commit=false),是否“处理完成后再提交”,以及 session.timeout.ms、max.poll.interval.ms 是否过小导致频繁重平衡。
- 检查运行环境:磁盘是否写满(如 df -h /var/lib/kafka),网络是否抖动,ZooKeeper/KRaft 是否稳定。
以上任一项不当都可能导致消息丢失或“已提交却不可用”。
二 生产者端加固
- 使用带回调的发送方式(send(record, callback)),在回调中记录失败并重试或落盘补偿,避免异步无回调导致“以为成功”。
- 将 acks=all,配合 Broker 的 min.insync.replicas≥2,确保写入到 ISR 的多数副本后才认为成功。
- 合理重试:设置 retries(如 3–5)与 retry.backoff.ms(如 1000 ms),避免无效重试风暴。
- 需要严格顺序时,设置 max.in.flight.requests.per.connection=1;若允许吞吐优先,可适当放宽并配合幂等。
- 启用幂等生产者:enable.idempotence=true(需同时满足 acks=all、retries>0、max.in.flight.requests.per.connection≤5),可在可恢复错误下避免重复且降低丢数风险。
- 避免消息过大被拒收:对齐 message.max.bytes(Broker)与 max.request.size(Producer)。
三 Broker 端加固
- 主题副本数:建议 replication.factor≥3,提升容错。
- 持久化语义:设置 min.insync.replicas=2(与 acks=all 配套),保证“已提交”的强语义。
- 禁止脏选主:设置 unclean.leader.election.enable=false,避免落后太多的副本被选为 Leader 导致数据空洞。
- 刷盘策略:Kafka 默认依赖 OS 页缓存与定时/定量刷盘,性能优先;若业务要求更强持久化,可结合业务调优 log.flush.interval.messages / log.flush.interval.ms,但需权衡吞吐。
- 容量与保留:监控并扩容磁盘,合理设置 log.retention.hours / log.retention.bytes,避免因磁盘写满导致写入失败或数据不可用。
四 消费者端加固
- 关闭自动提交:enable.auto.commit=false;采用“拉取→处理→同步提交(commitSync)”的模式,确保处理成功再提交位移。
- 处理时长控制:合理设置 max.poll.records 与 max.poll.interval.ms,避免单次处理过久触发重平衡;必要时优化消费逻辑或并行度。
- 稳定性:适当增大 session.timeout.ms,确保心跳与健康检查稳定。
- 语义取舍:关闭自动提交并“先处理再提交”可避免丢数,但可能带来重复;建议业务侧实现幂等(如数据库唯一键、去重表、业务状态机等)。
五 Ubuntu 现场操作命令示例
- 查看磁盘与 Inode 使用(先排除“写满导致写入失败”)
- df -h /var/lib/kafka
- df -i /var/lib/kafka
- 创建高可靠主题(示例:3 副本、2 个分区)
- bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic my_topic --partitions 2 --replication-factor 3
- 调整最小同步副本数(Broker 配置 server.properties)
- 禁止脏选主(Broker 配置 server.properties)
- unclean.leader.election.enable=false
- 在线增加分区数(扩容并行度)
- bin/kafka-topics.sh --alter --topic my_topic --partitions 6 --bootstrap-server localhost:9092
- 紧急清理过期日志(释放空间,谨慎操作)
- bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file cleanup.json
- 消费者手动提交位移(示例)
- enable.auto.commit=false
- 在消费循环结束后执行 consumer.commitSync()
以上命令可直接在 Ubuntu 的 Kafka 安装目录执行(路径以实际环境为准)。