温馨提示×

Ubuntu Kafka集群搭建步骤

小樊
60
2025-03-09 07:21:48
栏目: 智能运维

在Ubuntu上搭建Kafka集群的步骤如下:

1. 环境准备

  • 安装JDK
    sudo apt update
    sudo apt install openjdk-8-jdk
    java -version
    
  • 下载并解压Kafka
    wget https://downloads.apache.org/kafka/2.5.0/kafka_2.13-2.5.0.tgz
    tar -xvf kafka_2.13-2.5.0.tgz
    cd kafka_2.13-2.5.0
    

2. 配置Zookeeper

  • 创建Zookeeper目录
    sudo mkdir -p /opt/zookeeper/data
    sudo mkdir -p /opt/zookeeper/log
    
  • 修改Zookeeper配置文件
    cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
    nano /opt/zookeeper/conf/zoo.cfg
    
    修改以下配置:
    dataDir=/opt/zookeeper/data
    dataLogDir=/opt/zookeeper/log
    clientPort=2181
    server.1=zookeeper1:2888:3888
    server.2=zookeeper2:2888:3888
    server.3=zookeeper3:2888:3888
    
  • 启动Zookeeper
    bin/zookeeper-server-start.sh config/zoo.cfg
    

3. 配置Kafka

  • 修改Kafka配置文件

    cp config/server.properties config/server-1.properties
    cp config/server.properties config/server-2.properties
    cp config/server.properties config/server-3.properties
    

    修改以下配置:

    • broker.id:每台机器不同
    • listeners:例如 listeners=PLAINTEXT://192.168.1.100:9092
    • advertised.listeners:例如 advertised.listeners=PLAINTEXT://192.168.1.100:9092
    • zookeeper.connect:例如 zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
    • log.dirs:例如 /tmp/kafka-logs
    • num.network.threads:例如 3
    • num.io.threads:例如 8
    • socket.send.buffer.bytes:例如 1048576
    • socket.receive.buffer.bytes:例如 1048576
    • socket.request.max.bytes:例如 104857600
    • log.flush.interval.messages:例如 1000
    • log.flush.interval.ms:例如 1000
    • log.segment.bytes:例如 1073741824
    • log.retention.hours:例如 168
    • log.retention.check.interval.ms:例如 300000
    • log.cleaner.min.compaction.lag.ms:例如 10000
    • log.cleaner.max.compaction.lag.ms:例如 900000
    • log.cleaner.花椒.max.size:例如 10737418240
    • num.partitions:例如 1
    • default.replication.factor:例如 1
    • min.insync.replicas:例如 1
    • replica.lag.time.max.ms:例如 10000
    • replica.lag.max.messages:例如 100000
    • message.max.bytes:例如 1000000
    • replica.fetch.max.bytes:例如 1000000
    • fetch.min.bytes:例如 1048576
    • fetch.max.wait.ms:例如 100
    • max.partition.fetch.bytes:例如 1048576
    • num.recovery.threads.per.data.dir:例如 1
    • transaction.timeout.ms:例如 180000
    • transaction.state.log.replication.factor:例如 1
    • transaction.state.log.min.isr:例如 1
    • transaction.state.log.max.size.mb:例如 100
    • transactional.id.max.bytes:例如 900000000
    • auto.commit.interval.ms:例如 1000
    • auto.commit.delta.bytes:例如 1048576
    • group.initial.rebalance.delay.ms:例如 0
    • partition.assignment.strategy:例如 org.apache.kafka.common.cluster.assign.RangeAssignor
    • num.network.threads:例如 3
    • num.io.threads:例如 8
    • socket.send.buffer.bytes:例如 1048576
    • socket.receive.buffer.bytes:例如 1048576
    • socket.request.max.bytes:例如 104857600
    • log.flush.interval.messages:例如 1000
    • log.flush.interval.ms:例如 1000
    • log.segment.bytes:例如 1073741824
    • log.retention.hours:例如 168
    • log.retention.check.interval.ms:例如 300000
    • log.cleaner.min.compaction.lag.ms:例如 10000
    • log.cleaner.max.compaction.lag.ms:例如 900000
    • log.cleaner.花椒.max.size:例如 10737418240
    • num.partitions:例如 1
    • default.replication.factor:例如 1
    • min.insync.replicas:例如 1
    • replica.lag.time.max.ms:例如 10000
    • replica.lag.max.messages:例如 100000
    • message.max.bytes:例如 1000000
    • replica.fetch.max.bytes:例如 1000000
    • fetch.min.bytes:例如 1048576
    • fetch.max.wait.ms:例如 100
    • max.partition.fetch.bytes:例如 1048576
    • num.recovery.threads.per.data.dir:例如 1
    • transaction.timeout.ms:例如 180000
    • transaction.state.log.replication.factor:例如 1
    • transaction.state.log.min.isr:例如 1
    • transaction.state.log.max.size.mb:例如 100
    • transactional.id.max.bytes:例如 900000000
    • auto.commit.interval.ms:例如 1000
    • auto.commit.delta.bytes:例如 1048576
    • group.initial.rebalance.delay.ms:例如 0
    • partition.assignment.strategy:例如 org.apache.kafka.common.cluster.assign.RangeAssignor
  • 启动Kafka

    bin/kafka-server-start.sh config/

0