温馨提示×

centos环境下kafka配置指南

小樊
53
2025-10-05 01:50:54
栏目: 智能运维

CentOS环境下Kafka配置指南

1. 环境准备

1.1 安装Java环境

Kafka依赖Java运行环境(建议使用JDK 8或更高版本),通过以下命令安装OpenJDK 1.8:

sudo yum install -y java-1.8.0-openjdk-devel

验证安装结果:

java -version

输出应包含java version "1.8.0_xxx",确认Java已正确安装。

1.2 下载并解压Kafka

从Apache Kafka官网下载稳定版本(如3.5.2),解压至指定目录(如/usr/local/kafka):

wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz
tar -xzf kafka_2.12-3.5.2.tgz
sudo mv kafka_2.12-3.5.2 /usr/local/kafka

切换至Kafka目录:

cd /usr/local/kafka

2. 配置Zookeeper(Kafka依赖组件)

Kafka使用Zookeeper管理集群元数据(如Broker信息、Topic分区等),需先配置Zookeeper。

2.1 配置Zookeeper参数

进入Kafka的config目录,编辑zookeeper.properties文件:

vi config/zookeeper.properties

修改以下关键参数:

dataDir=/usr/local/kafka/zookeeper_data  # Zookeeper数据存储目录
clientPort=2181                         # 客户端连接端口
initLimit=5                             # Leader与Follower初始同步时间(秒)
syncLimit=2                             # Leader与Follower同步超时时间(秒)
server.1=localhost:2888:3888            # 集群节点配置(单机环境仅需1个节点)

注意:若为集群环境,需将server.X替换为各节点的IP和端口(如server.1=192.168.1.1:2888:3888)。

2.2 启动Zookeeper

使用nohup命令后台启动Zookeeper:

nohup bin/zookeeper-server-start.sh config/zookeeper.properties > /dev/null 2>&1 &

验证Zookeeper是否运行:

echo stat | nc localhost 2181

输出应包含Mode: standalone(单机模式)或Mode: leader/follower(集群模式)。

3. 配置Kafka Broker

Kafka的核心配置文件为config/server.properties,需修改以下关键参数:

3.1 基础配置

broker.id=0                           # Broker唯一标识(集群中不可重复)
listeners=PLAINTEXT://your_server_ip:9092  # Broker监听地址(替换为服务器IP)
advertised.listeners=PLAINTEXT://your_server_ip:9092  # 对外暴露的地址(客户端连接用)
log.dirs=/usr/local/kafka/kafka_logs  # 日志存储目录(支持多目录逗号分隔)
zookeeper.connect=localhost:2181      # Zookeeper连接字符串(集群环境用逗号分隔)
num.partitions=3                      # Topic默认分区数(根据业务需求调整)
default.replication.factor=1          # Topic默认副本因子(生产环境建议≥2)
delete.topic.enable=true              # 允许删除Topic(默认false,生产环境建议开启)

3.2 性能调优(可选)

根据服务器资源调整以下参数,优化Kafka吞吐量和稳定性:

num.network.threads=3                 # 网络请求处理线程数(默认3,高并发可调整为8)
num.io.threads=8                      # 磁盘IO处理线程数(默认8,机械硬盘可调整为16)
socket.send.buffer.bytes=102400       # 发送缓冲区大小(默认100KB,可调整为1MB)
socket.receive.buffer.bytes=102400    # 接收缓冲区大小(默认100KB,可调整为1MB)
log.retention.hours=168               # 日志保留时间(默认7天,可调整为168小时=7天)
log.segment.bytes=1073741824          # Segment文件大小(默认1GB,可根据磁盘容量调整)

4. 启动Kafka服务

4.1 前台启动(测试用)

进入Kafka的bin目录,执行以下命令启动Kafka:

bin/kafka-server-start.sh config/server.properties

4.2 后台启动(生产用)

使用nohup命令后台启动,避免终端关闭导致服务停止:

nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &

4.3 设置开机自启

创建Systemd服务文件/etc/systemd/system/kafka.service

vi /etc/systemd/system/kafka.service

写入以下内容:

[Unit]
Description=Apache Kafka Server
After=network.target zookeeper.service

[Service]
Type=simple
User=root
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

启用并启动Kafka服务:

sudo systemctl enable kafka
sudo systemctl start kafka

验证Kafka状态:

sudo systemctl status kafka

输出应显示active (running),表示Kafka已成功启动。

5. 验证Kafka功能

5.1 创建Topic

使用kafka-topics.sh命令创建测试Topic(名称为test,分区数1,副本因子1):

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

5.2 查看Topic列表

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

输出应包含test,确认Topic创建成功。

5.3 发送消息

启动生产者,向test Topic发送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

输入任意消息(如Hello Kafka),按回车键发送。

5.4 消费消息

启动消费者,从test Topic消费消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

应能看到生产者发送的Hello Kafka消息,确认Kafka功能正常。

6. 防火墙配置

若Kafka需对外提供服务,需放通9092端口(Kafka默认端口):

sudo firewall-cmd --zone=public --add-port=9092/tcp --permanent
sudo firewall-cmd --reload

验证端口是否开放:

sudo firewall-cmd --zone=public --query-port=9092/tcp

输出yes,表示端口已放通。

7. 生产环境注意事项

  • 集群部署:至少部署3个Broker节点,配置server.properties中的broker.id为唯一值,zookeeper.connect指向集群所有Zookeeper节点。
  • 副本因子:设置default.replication.factor≥2,确保数据冗余和高可用。
  • 监控工具:使用Prometheus+Grafana监控Kafka集群(如Broker状态、Topic吞吐量、分区延迟等)。
  • 数据备份:定期备份log.dirs目录下的日志文件,防止数据丢失。

0