确保Linux系统已安装Java运行环境(Kafka依赖Java),推荐使用OpenJDK 11及以上版本:
sudo apt update && sudo apt install -y openjdk-11-jdk
java -version # 验证Java安装
从Apache官网下载最新稳定版Kafka(如3.2.0),解压至指定目录(如/opt/kafka):
wget https://downloads.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
sudo tar -xzf kafka_2.13-3.2.0.tgz -C /opt
sudo mv /opt/kafka_2.13-3.2.0 /opt/kafka # 重命名便于管理
编辑Kafka配置文件/opt/kafka/config/server.properties,设置以下关键参数(决定消息持久化的核心行为):
log.dirs=/var/lib/kafka/logs # 推荐使用独立分区,避免磁盘空间不足
log.retention.hours=168 # 保留7天(168小时)
log.retention.bytes=10737418240 # 保留10GB(超过则删除旧数据)
log.segment.bytes=1073741824 # 每个日志段最大1GB
default.replication.factor需≤broker数量):default.replication.factor=3 # 每个分区3个副本
min.insync.replicas=2 # 写入时需至少2个副本确认(保障数据安全)
log.cleanup.policy=compact # 启用压缩模式(默认为delete,即按时间/大小删除)
Kafka需对log.dirs目录有读写权限,否则无法持久化数据:
sudo mkdir -p /var/lib/kafka/logs
sudo chown -R kafka:kafka /var/lib/kafka/logs # 假设Kafka以kafka用户运行
sudo chmod -R 755 /var/lib/kafka/logs
Kafka依赖ZooKeeper集群管理元数据(如Topic、分区信息),需先启动ZooKeeper再启动Kafka:
# 启动ZooKeeper(默认端口2181)
/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties > /dev/null 2>&1 &
# 启动Kafka(后台运行)
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /dev/null 2>&1 &
通过生产者-消费者流程验证消息是否能长期存储:
/opt/kafka/bin/kafka-topics.sh --create --topic persistent_topic \
--bootstrap-server localhost:9092 --partitions 3 --replication-factor 3
/opt/kafka/bin/kafka-console-producer.sh --topic persistent_topic \
--bootstrap-server localhost:9092
输入测试消息(如Hello, Kafka Persistence),按回车发送。/opt/kafka/bin/kafka-console-consumer.sh --topic persistent_topic \
--from-beginning --bootstrap-server localhost:9092
若能看到之前发送的消息,说明消息已持久化。log.dirs所在磁盘的使用情况(避免磁盘满导致数据丢失):df -h /var/lib/kafka/logs
ISR列表包含所有副本):/opt/kafka/bin/kafka-topics.sh --describe --topic persistent_topic --bootstrap-server localhost:9092
rsync或tar备份log.dirs目录(如备份至/backup/kafka):rsync -av /var/lib/kafka/logs /backup/kafka/
通过以上步骤,Kafka将在Linux系统上实现可靠的消息持久化,确保数据不会因Broker重启、故障或磁盘清理而丢失。实际生产环境中,还需根据业务需求调整副本数、保留策略等参数,并配合监控系统(如Prometheus+Grafana)实时跟踪集群状态。