java -version验证安装。log.dirs存储),确保网络带宽充足(如千兆及以上),避免跨数据中心的高延迟。zoo.cfg文件:tickTime=2000 # 心跳周期(毫秒)
initLimit=10 # 初始化同步超时(心跳数)
syncLimit=5 # 同步确认超时(心跳数)
dataDir=/var/lib/zookeeper # 数据目录(避免/tmp)
clientPort=2181
server.1=zoo1:2888:3888 # 节点ID与通信端口
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
在dataDir下创建myid文件(内容为对应节点ID,如1),启动集群:sudo systemctl start zookeeper && sudo systemctl enable zookeeper。server.properties,确保以下配置满足高可用需求:broker.id=1 # 每个Broker唯一标识(集群内不重复)
listeners=PLAINTEXT://:9092 # 监听地址(生产环境建议用SSL加密)
log.dirs=/var/lib/kafka # 日志目录(多块磁盘用逗号分隔,如/data1,/data2)
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181 # ZooKeeper集群地址
default.replication.factor=3 # Topic默认副本数(≥3保证冗余)
min.insync.replicas=2 # 写入时需同步的副本数(避免脑裂)
unclean.leader.election.enable=false # 禁止非同步副本成为Leader(防止数据丢失)
num.partitions=3 # Topic默认分区数(根据消费者线程数调整)
num.network.threads=8 # 网络收发线程数(≈CPU核心数×0.5)
num.io.threads=16 # 磁盘I/O线程数(≈CPU核心数×0.8)
log.segment.bytes=1073741824 # 分段文件大小(1GB,减少分段数量)
log.retention.hours=168 # 数据保留时间(7天,自动清理过期数据)
kafka-topics.sh工具创建Topic,确保replication-factor≥3且partitions与消费者线程数匹配:kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 \
--replication-factor 3 --partitions 3
kafka-topics.sh --describe --topic my_topic --bootstrap-server localhost:9092查看ISR(In-Sync Replicas)列表,确保副本数符合min.insync.replicas要求。batch.size(128KB-1MB,减少网络请求)、linger.ms(50-100ms,允许消息聚合)、compression.type(snappy/lz4,平衡压缩率与CPU开销)、acks(all,确保数据可靠性):batch.size=131072 # 128KB
linger.ms=100
compression.type=snappy
acks=all
fetch.min.bytes(1MB,减少拉取次数)、fetch.max.wait.ms(1000ms,提高拉取效率)、max.poll.records(1000,单次拉取更多消息):fetch.min.bytes=1048576 # 1MB
fetch.max.wait.ms=1000
max.poll.records=1000
num.partitions(与消费者线程数匹配)、num.io.threads(CPU核心数×0.8)、num.network.threads(CPU核心数×0.5)。UnderReplicatedPartitions(未同步分区数,>0表示副本同步异常)、RequestQueueTimeMs(请求队列延迟,反映Broker负载)、CPU使用率(>80%需扩容)、磁盘使用率(>90%需清理或扩容)。UnderReplicatedPartitions>0、CPU使用率>80%),通过邮件、短信通知管理员。unclean.leader.election.enable=false)。log.dirs中的数据(如使用rsync或Kafka自带的kafka-log-dirs工具),灾难发生时可通过备份恢复。read、write)。