Kafka集群在Debian上的扩容操作步骤
新节点环境配置
确保新节点与现有Kafka集群处于同一网络环境,且已安装与集群一致的Debian版本(如Debian 11/12)。安装必要的依赖:Java(JDK 8或更高版本,推荐OpenJDK)、ZooKeeper(若集群未使用Kafka内置ZooKeeper,需单独安装)。
sudo apt update
sudo apt install -y openjdk-11-jdk-headless zookeeperd # Debian 11+示例
下载并解压Kafka
在新节点上下载与集群版本一致的Kafka二进制包(避免版本冲突),解压至指定目录(如/opt/kafka):
wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz # 替换为集群版本
tar -xzf kafka_2.12-3.5.2.tgz
sudo mv kafka_2.12-3.5.2 /opt/kafka
配置环境变量
将Kafka路径添加至系统环境变量,方便命令调用:
echo 'export KAFKA_HOME=/opt/kafka' >> /etc/profile
echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> /etc/profile
source /etc/profile
分配唯一Broker ID
编辑新节点的Kafka配置文件($KAFKA_HOME/config/server.properties),修改broker.id为集群中未使用的唯一整数(如现有Broker ID为0-3,新节点设为4):
broker.id=4 # 必须唯一,否则会报错
设置监听地址
配置listeners(Broker监听的地址)和advertised.listeners(客户端连接的地址,需为公网或集群内部可达地址):
listeners=PLAINTEXT://0.0.0.0:9092 # 监听所有网络接口
advertised.listeners=PLAINTEXT://新节点IP:9092 # 替换为新节点实际IP
指向ZooKeeper集群
确保zookeeper.connect指向现有ZooKeeper集群的所有节点(逗号分隔,格式为host1:2181,host2:2181,...):
zookeeper.connect=节点1IP:2181,节点2IP:2181,节点3IP:2181 # 替换为集群ZooKeeper地址
创建日志目录
指定Kafka消息日志的存储路径(log.dirs),并确保目录存在且有写入权限:
log.dirs=/var/lib/kafka/logs # 推荐使用专用目录
sudo mkdir -p /var/lib/kafka/logs
sudo chown -R $USER:$USER /var/lib/kafka/logs # 赋予当前用户权限
启动Kafka服务
在新节点上启动Kafka服务,并设置为开机自启:
sudo systemctl start kafka
sudo systemctl enable kafka
检查集群状态
使用Kafka命令行工具查看集群中的Broker列表,确认新节点已成功加入:
kafka-broker-api-versions.sh --bootstrap-server 新节点IP:9092
# 或通过ZooKeeper查看(若未使用内置ZooKeeper)
kafka-topics.sh --describe --bootstrap-server 新节点IP:9092
验证服务连通性
从集群内任意Broker节点执行ping或telnet,确保新节点的网络可达性:
ping 新节点IP
telnet 新节点IP 9092
若需将现有Topic的分区数据迁移至新节点,可使用kafka-reassign-partitions.sh工具:
生成重分配方案
创建JSON文件(如expand-cluster.json),指定新节点的Broker ID,生成迁移计划:
cat > expand-cluster.json <<EOF
{
"version": 1,
"partitions": [
{"topic": "your_topic", "partition": 0, "replicas": ["0", "1", "4"]}, # 将分区0的副本扩展至新节点4
{"topic": "your_topic", "partition": 1, "replicas": ["1", "2", "4"]}
]
}
EOF
执行生成命令:
kafka-reassign-partitions.sh --bootstrap-server 新节点IP:9092 \
--reassignment-json-file expand-cluster.json --generate
执行数据迁移
运行迁移命令,开始将分区数据复制到新节点:
kafka-reassign-partitions.sh --bootstrap-server 新节点IP:9092 \
--reassignment-json-file expand-cluster.json --execute
验证迁移进度
检查迁移状态,确保所有分区已完成数据复制:
kafka-reassign-partitions.sh --bootstrap-server 新节点IP:9092 \
--reassignment-json-file expand-cluster.json --verify
server.properties)和ZooKeeper数据(若有),避免配置错误导致数据丢失。