一、环境准备
sudo apt update && sudo apt install -y openjdk-11-jdk
java -version # 验证安装,需输出Java版本信息
二、传统模式(ZooKeeper依赖)配置步骤
下载并解压Kafka:
从Apache官网下载最新稳定版Kafka(如3.7.0),解压至指定目录:
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -xzf kafka_2.13-3.7.0.tgz -C /opt/
sudo mv /opt/kafka_2.13-3.7.0 /opt/kafka
配置ZooKeeper集群(Kafka 3.3以下版本必需):
/opt/kafka/config/zookeeper.properties,设置集群节点:tickTime=2000
initLimit=5
syncLimit=2
dataDir=/var/lib/zookeeper # ZooKeeper数据目录
clientPort=2181
server.1=node1_ip:2888:3888 # 集群节点1(需替换为实际IP)
server.2=node2_ip:2888:3888 # 集群节点2
server.3=node3_ip:2888:3888 # 集群节点3
/var/lib/zookeeper目录下创建myid文件,写入对应节点ID(如node1写1,node2写2):echo "1" | sudo tee /var/lib/zookeeper/myid # node1操作
/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
echo stat | nc node1_ip 2181 # 应输出Leader/Follower信息
配置Kafka Broker:
broker.id和listeners):cd /opt/kafka/config
cp server.properties server-1.properties # node1
cp server.properties server-2.properties # node2
cp server.properties server-3.properties # node3
server-1.properties(以node1为例):broker.id=1 # 唯一标识,不能重复
listeners=PLAINTEXT://node1_ip:9092 # 监听本机IP
advertised.listeners=PLAINTEXT://node1_ip:9092 # 对外暴露的地址(客户端连接用)
log.dirs=/var/lib/kafka-logs-1 # 日志存储目录(需提前创建)
zookeeper.connect=node1_ip:2181,node2_ip:2181,node3_ip:2181 # ZooKeeper集群地址
sudo mkdir -p /var/lib/kafka-logs-{1,2,3} # 每台Broker对应一个目录
sudo chown -R $USER:$USER /var/lib/kafka-logs-* # 授权当前用户
启动Kafka集群:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server-1.properties # node1
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server-2.properties # node2
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server-3.properties # node3
ps -ef | grep kafka # 应看到3个Kafka进程
三、KRaft模式(无ZooKeeper,Kafka 3.3+推荐)配置步骤
生成集群ID:
在任意一台Broker上生成集群唯一ID:
/opt/kafka/bin/kafka-storage.sh random-uuid
# 输出类似:r1b2c3d4-e5f6-7g8h-9i0j-1k2l3m4n5o6p
配置Kafka Broker(KRaft模式):
编辑/opt/kafka/config/kraft/server.properties(需覆盖默认配置):
process.roles=broker,controller # 同时作为Broker和Controller(集群中至少3个Controller)
node.id=1 # 唯一标识(每台Broker不同)
controller.quorum.voters=1@node1_ip:9093,2@node2_ip:9093,3@node3_ip:9093 # Controller节点列表
listeners=PLAINTEXT://node1_ip:9092,CONTROLLER://node1_ip:9093 # 监听Broker和Controller端口
advertised.listeners=PLAINTEXT://node1_ip:9092 # 对外暴露的Broker地址
log.dirs=/var/lib/kafka-logs-1 # 日志存储目录
启动KRaft集群:
/opt/kafka/bin/kafka-storage.sh format -t <生成的集群ID> -c /opt/kafka/config/kraft/server.properties
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/kraft/server.properties
四、验证集群功能
创建测试Topic:
使用kafka-topics.sh创建带副本的Topic(以KRaft模式为例):
/opt/kafka/bin/kafka-topics.sh --create \
--bootstrap-server node1_ip:9092,node2_ip:9092,node3_ip:9092 \
--replication-factor 3 \ # 副本数(需<=Broker数量)
--partitions 1 \ # 分区数
--topic test_topic
列出Topic:
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server node1_ip:9092
# 应输出test_topic
生产与消费消息:
/opt/kafka/bin/kafka-console-producer.sh --topic test_topic --bootstrap-server node1_ip:9092
# 输入消息(如"Hello Kafka Cluster"),按Ctrl+C退出
/opt/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning --bootstrap-server node1_ip:9092
# 应输出生产者发送的消息
五、注意事项
advertised.listeners指向正确的公网/内网IP,防火墙开放9092(Broker)、2181(ZooKeeper)端口。replication.factor≥3(保证高可用),partitions根据吞吐量需求调整(如10-100)。