在Linux上部署Kafka集群需要遵循以下步骤:
sudo apt-get update
sudo apt-get install openjdk-8-jdk
wget https://downloads.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
tar -xzf kafka_2.13-3.2.0.tgz
cd kafka_2.13-3.2.0
在每个节点上编辑config/server.properties文件:
broker.id:每个broker的唯一ID。listeners:监听地址和端口。log.dirs:日志存储目录。zookeeper.connect:Zookeeper连接字符串。例如,在第一个节点上:
broker.id=1
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=zookeeper:2181
在第二个节点上:
broker.id=2
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs
zookeeper.connect=zookeeper:2181
如果你从头开始部署Zookeeper,需要配置config/zookeeper.properties:
dataDir:Zookeeper数据存储目录。clientPort:客户端连接端口。例如:
dataDir=/var/lib/zookeeper
clientPort=2181
在每个节点上启动Zookeeper服务:
bin/zookeeper-server-start.sh config/zookeeper.properties
在每个节点上启动Kafka Broker服务:
bin/kafka-server-start.sh config/server.properties
使用Kafka自带的工具验证集群状态:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
创建一个Topic来测试集群:
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 2 --partitions 3
编写简单的生产者和消费者脚本来测试集群功能。
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('test-topic', b'Hello, Kafka!')
producer.flush()
from kafka import KafkaConsumer
consumer = KafkaConsumer('test-topic', bootstrap_servers='localhost:9092')
for message in consumer:
print(message.value)
运行生产者和消费者脚本:
python producer.py
python consumer.py
通过以上步骤,你可以在Linux上成功部署一个Kafka集群。