温馨提示×

Ubuntu Kafka消费者配置要点

小樊
51
2025-07-04 08:42:43
栏目: 智能运维

在Ubuntu上配置Kafka消费者涉及多个步骤,包括安装必要的软件、配置Kafka和Zookeeper、创建和订阅主题,以及编写和运行消费者代码。以下是详细的配置要点:

1. 安装Java和Zookeeper

  • 安装Java:Kafka需要Java运行时环境,首先确保你的系统上安装了Java。可以通过以下命令安装OpenJDK:

    sudo apt update
    sudo apt install openjdk-11-jdk
    
  • 安装Zookeeper:下载并解压Zookeeper,配置并启动Zookeeper服务。

    wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
    tar xvf zookeeper-3.4.6.tar.gz
    sudo mv zookeeper-3.4.6 /usr/local/zookeeper
    sudo cat > /usr/local/zookeeper/conf/zoo.cfg << EOF
    tickTime=2000
    dataDir=/var/lib/zookeeper
    clientPort=2181
    EOF
    sudo /usr/local/zookeeper/bin/zkServer.sh start
    

2. 安装Kafka

  • 下载并解压Kafka:从Apache Kafka官方网站下载最新版本的Kafka,并解压到指定目录。

    wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz
    tar -xzf kafka_2.12-3.5.2.tgz
    sudo mv kafka_2.12-3.5.2 /usr/local/kafka
    sudo mkdir /tmp/kafka-logs
    
  • 配置Kafka:编辑Kafka的配置文件server.properties,确保以下配置正确。

    sudo nano /usr/local/kafka/config/server.properties
    

    确保以下配置正确:

    listeners=PLAINTEXT://your_server_ip:9092
    zookeeper.connect=localhost:2181
    
  • 启动Kafka:启动Kafka服务器。

    sudo /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
    

3. 创建和订阅主题

  • 创建主题:使用Kafka命令行工具创建一个主题。

    sudo /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    
  • 订阅主题:使用Kafka命令行工具订阅主题。

    sudo /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

4. 编写和运行消费者代码

  • 编写消费者配置文件:创建一个名为consumer.properties的文件,并添加以下配置:

    bootstrap.servers=localhost:9092
    group.id=your_group_id
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    auto.offset.reset=earliest
    enable.auto.commit=true
    auto.commit.interval.ms=1000
    
  • 编写消费者代码:使用你喜欢的编程语言编写消费者代码。以下是一个简单的Java消费者示例:

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class SimpleConsumer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "your_group_id");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList("your_topic_name"));
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    records.forEach(record -> {
                        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    });
                }
            } finally {
                consumer.close();
            }
        }
    }
    
  • 运行消费者:编译并运行你的消费者程序。

    javac -cp $(find . -name "*.jar") SimpleConsumer.java
    java -cp .:$(find . -name "*.jar") SimpleConsumer
    

5. 高级配置

  • 消费者属性

    • auto.offset.reset:控制消费者在没有有效偏移量或分区不存在时从哪里开始读取。
    • enable.auto.commit:是否自动提交偏移量。
    • max.poll.records:每次poll调用返回的最大记录数。
    • fetch.min.bytes:每次fetch请求的最小字节数。
    • max.partition.fetch.bytes:每个分区返回给消费者的最大字节数。
    • session.timeout.ms:消费者发送心跳信号的时间间隔。

6. 开机自启配置

为了确保Kafka和Zookeeper在系统启动时自动启动,可以配置它们的开机自启。

  • Zookeeper服务配置

    sudo nano /lib/systemd/system/zookeeper.service
    

    添加内容:

    [Unit]
    Description=Zookeeper service
    After network.target
    
    [Service]
    Type=simple
    Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/kafka/bin"
    User=root
    Group=root
    ExecStart=/usr/local/zookeeper/bin/zookeeper-server-start.sh /usr/local/zookeeper/config/zookeeper.properties
    ExecStop=/usr/local/zookeeper/bin/zookeeper-server-stop.sh
    Restart=on-failure
    
    [Install]
    WantedBy=multi-user.target
    
  • Kafka服务配置

    sudo nano /lib/systemd/system/kafka.service
    

    添加内容:

    [Unit]
    Description=Apache Kafka server (broker)
    After zookeeper.service
    
    [Service]
    Type=simple
    Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/kafka/bin"
    User=root
    Group=root
    ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
    ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
    Restart=on-failure
    
    [Install]
    WantedBy=multi-user.target
    
  • 刷新配置并启动服务

    sudo systemctl daemon-reloads
    sudo systemctl enable
    

通过以上步骤,你应该能够在Ubuntu上成功配置Kafka消费者。如果有任何问题,请检查日志文件以获取更多信息。

0