温馨提示×

kafka消息centos如何处理

小樊
40
2025-12-23 11:56:42
栏目: 智能运维

在 CentOS 上处理 Kafka 消息的实用指南

一 环境准备与启动

  • 安装 Java 8+(Kafka 运行依赖 JVM),建议使用 OpenJDK 8
  • 启动 Zookeeper(Kafka 元数据管理),默认端口 2181
    • 下载解压后编辑 conf/zoo.cfg,设置 dataDir,执行:
      • 启动:bin/zkServer.sh start
      • 状态:bin/zkServer.sh status
  • 启动 Kafka Broker,默认端口 9092
    • 编辑 config/server.properties
      • 设置唯一 broker.id
      • 配置监听:listeners=PLAINTEXT://0.0.0.0:9092
      • 对外宣传地址:advertised.listeners=PLAINTEXT://<服务器IP>:9092
      • 日志目录:log.dirs=/var/lib/kafka/logs
      • Zookeeper 连接:zookeeper.connect=localhost:2181
    • 启动命令:
      • 前台:bin/kafka-server-start.sh config/server.properties
      • 后台:bin/kafka-server-start.sh -daemon config/server.properties
  • 防火墙放行端口(如 firewalld):
    • firewall-cmd --add-port=2181/tcp --permanent && firewall-cmd --add-port=9092/tcp --permanent && firewall-cmd --reload
  • 简单连通性自检:
    • nc -vz <服务器IP> 9092nc -vz <服务器IP> 2181 应返回 succeeded。

二 命令行收发消息

  • 创建主题(推荐用 bootstrap-server):
    • bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  • 查看主题:
    • bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  • 发送消息(控制台生产者):
    • bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
  • 消费消息(从最早开始):
    • bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  • 删除主题:
    • bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test

三 消费模式与关键机制

  • 消费模式:Kafka 采用 Pull 模式,消费者主动拉取,便于根据处理能力控制节奏。
  • 消费者组与分区:同一 消费者组 内,一个分区只会被该组内的 一个消费者 消费;一个消费者可消费多个分区,实现负载均衡。
  • 偏移量提交:消费进度(offset)提交到系统主题 __consumer_offsets 持久化;可自动或手动提交。
  • 重平衡(Rebalance):组内消费者数量或分区数变化会触发重平衡,协调者负责分区分配;需关注心跳与会话超时设置,避免频繁重平衡。
  • 常用拉取参数:fetch.min.bytesfetch.max.wait.msfetch.max.bytesmax.poll.records,用于平衡延迟与吞吐。

四 可靠性与顺序性配置要点

  • 生产者可靠性(acks):
    • acks=0:不等待确认,延迟最低、可靠性最差;
    • acks=1:等待 Leader 落盘后确认;
    • acks=all(或 -1):等待 Leader+所有 ISR 副本 落盘后确认,可靠性最高。
  • 重试与幂等:开启 retries 与幂等生产者(enable.idempotence=true)可降低失败重试导致的重复与乱序风险。
  • 顺序性:
    • 顺序保证在 分区内;将需要有序的消息使用相同 key 或自定义分区器路由到同一分区;
    • 避免并发消费同一分区;必要时使用事务或幂等消费逻辑。
  • 副本与容错:生产环境常用 replication.factor=3,提升可用性与数据耐久性。

五 编程消费示例 Java

  • 依赖(Maven):
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.6.1</version>
    </dependency>
    
  • 示例代码(自动提交,按需改为手动提交):
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class SimpleConsumer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "test-group");
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            props.put("auto.offset.reset", "earliest"); // 首次消费位置:earliest/latest
            props.put("enable.auto.commit", "true");     // 自动提交
            props.put("auto.commit.interval.ms", "5000");
    
            try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                consumer.subscribe(Collections.singletonList("test"));
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String, String> r : records) {
                        System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
                                r.topic(), r.partition(), r.offset(), r.key(), r.value());
                    }
                    // 若 enable.auto.commit=false,在此处手动提交:
                    // consumer.commitSync();
                }
            }
        }
    }
    
  • 运行前确认:
    • 主题已创建;
    • 服务器 9092 端口可达;
    • 如需远程访问,确保 advertised.listeners 配置为服务器 IP 而非 localhost

0