Kafka与Ubuntu集成的详细步骤
在开始集成前,请确保Ubuntu系统满足以下要求:
Kafka依赖Java运行环境(JRE/JDK),推荐安装OpenJDK 8或11(兼容性更好)。
# 更新软件包列表
sudo apt update
# 安装OpenJDK 11(或8)
sudo apt install openjdk-11-jdk -y
# 验证Java安装(需显示版本信息)
java -version
若输出类似openjdk version "11.0.xx"的信息,则说明安装成功。
Kafka使用ZooKeeper作为集群协调服务(管理broker注册、分区Leader选举等),需先安装并启动ZooKeeper。
# 下载ZooKeeper(以3.7.0为例)
wget https://downloads.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
# 解压到/opt目录(推荐)
tar -xzvf apache-zookeeper-3.7.0-bin.tar.gz
sudo mv apache-zookeeper-3.7.0 /opt/zookeeper
# 创建数据目录(用于存储ZooKeeper状态)
sudo mkdir -p /opt/zookeeper/data
# 配置ZooKeeper(编辑zoo.cfg)
sudo tee /opt/zookeeper/conf/zoo.cfg <<EOF
tickTime=2000
dataDir=/opt/zookeeper/data
clientPort=2181
EOF
# 启动ZooKeeper(前台模式,便于查看日志)
/opt/zookeeper/bin/zkServer.sh start
# 验证ZooKeeper是否运行(监听2181端口)
sudo netstat -tulnp | grep 2181
若输出包含2181/tcp,则说明ZooKeeper启动成功。
从Apache官网下载最新稳定版本的Kafka(如3.7.0),并解压到指定目录。
# 下载Kafka(以3.7.0为例)
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
# 解压到/opt目录
tar -xzvf kafka_2.13-3.7.0.tgz
sudo mv kafka_2.13-3.7.0 /opt/kafka
为方便使用,建议将Kafka的bin目录添加到系统环境变量(可选步骤):
# 编辑~/.bashrc文件
echo 'export KAFKA_HOME=/opt/kafka' >> ~/.bashrc
echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> ~/.bashrc
# 使环境变量生效
source ~/.bashrc
此时可直接使用kafka-server-start、kafka-topics等命令。
编辑Kafka的配置文件server.properties(位于/opt/kafka/config/目录),设置关键参数:
sudo nano /opt/kafka/config/server.properties
修改以下核心配置(根据实际需求调整):
# broker唯一标识(集群中需唯一)
broker.id=0
# 监听地址(0.0.0.0表示允许所有IP访问,生产环境建议指定本机IP)
listeners=PLAINTEXT://0.0.0.0:9092
# 对外公布的地址(若Kafka部署在NAT后,需设置为公网IP或域名)
advertised.listeners=PLAINTEXT://your_server_ip:9092
# 日志存储目录(需提前创建并授权)
log.dirs=/tmp/kafka-logs
# ZooKeeper连接地址(若ZooKeeper运行在本机,为localhost:2181;集群中需列出所有ZooKeeper节点)
zookeeper.connect=localhost:2181
保存并退出编辑器(Ctrl+O→Enter→Ctrl+X)。
使用以下命令启动Kafka(前台模式,便于查看日志):
/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
若需后台运行(生产环境推荐),可添加-daemon参数:
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
验证Kafka是否启动成功(监听9092端口):
sudo netstat -tulnp | grep 9092
若输出包含9092/tcp,则说明Kafka启动成功。
创建一个名为test-topic的Topic(1个分区、1个副本):
/opt/kafka/bin/kafka-topics.sh --create --topic test-topic --zookeeper localhost:2181 --partitions 1 --replication-factor 1
验证Topic是否创建成功:
/opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
启动生产者,向test-topic发送消息(在终端输入消息,按Ctrl+C结束):
/opt/kafka/bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
启动消费者,从test-topic接收消息(--from-beginning表示从最早的消息开始读取):
/opt/kafka/bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning
在生产者终端输入消息后,消费者终端应同步显示该消息,说明Kafka功能正常。
若Ubuntu系统启用了防火墙(如ufw),需开放Kafka和ZooKeeper的端口:
# 允许ZooKeeper端口(2181)
sudo ufw allow 2181/tcp
# 允许Kafka端口(9092)
sudo ufw allow 9092/tcp
# 重新加载防火墙规则
sudo ufw reload
确保外部设备可通过这些端口访问Kafka服务(生产环境需限制访问IP)。
PLAINTEXT协议(启用SSL/TLS加密),并配置advertised.listeners为公网IP或域名;broker.id和listeners配置,然后分别启动;/opt/zookeeper/data)和Kafka日志目录(/tmp/kafka-logs),避免数据丢失。