在 CentOS 上设置 Kafka 消费者的完整步骤
一 环境准备
- 安装 Java(Kafka 运行依赖 Java):执行命令:sudo yum install java-1.8.0-openjdk-devel -y,完成后用 java -version 验证。
- 下载并解压 Kafka(示例为 3.2.0):
- 下载:wget https://downloads.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
- 解压:tar -xzf kafka_2.13-3.2.0.tgz && cd kafka_2.13-3.2.0
- 将 Kafka 的 bin 目录加入 PATH(便于命令行使用):
- 写入:echo ‘export PATH=$PATH:/opt/kafka/bin’ >> ~/.bashrc
- 生效:source ~/.bashrc
- 启动依赖服务(Zookeeper 与 Kafka Broker):
- 启动 Zookeeper:bin/zookeeper-server-start.sh config/zookeeper.properties
- 启动 Kafka:bin/kafka-server-start.sh config/server.properties
以上步骤完成后,Kafka 环境即具备运行消费者的基础条件。
二 消费者配置与命令行消费
- 使用自带控制台消费者进行快速验证(无需写代码):
- 订阅主题并从最早位置消费:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
- 如需指定消费者组,可加参数:–group test-group
- 常用关键配置项(建议写入配置文件或作为命令行参数使用):
- bootstrap.servers:Kafka Broker 地址列表,如:broker1:9092,broker2:9092
- group.id:消费者组标识,如:my-consumer-group
- key.deserializer / value.deserializer:反序列化器,常用:org.apache.kafka.common.serialization.StringDeserializer
- auto.offset.reset:无初始位移或位移越界时的策略,earliest / latest / none
- enable.auto.commit / auto.commit.interval.ms:是否自动提交位移及提交间隔
- max.poll.records:单次 poll 最大记录数
- fetch.min.bytes / fetch.max.wait.ms:一次拉取的最小字节数与等待超时
- session.timeout.ms / heartbeat.interval.ms:会话超时与心跳间隔(建议保持默认或按集群规模微调)
这些配置项覆盖了消费者端的大多数使用场景,命令行与配置文件均可设置。
三 安全认证与进阶设置
- SASL/SCRAM 动态认证(示例创建用户并分配权限):
- 创建管理员用户:./bin/kafka-configs.sh --zookeeper CentOS:2181 --alter --add-config ‘SCRAM-SHA-256=[password=admin-sec],SCRAM-SHA-512=[password=admin-sec]’ --entity-type users --entity-name admin
- 创建生产者用户:./bin/kafka-configs.sh --zookeeper CentOS:2181 --alter --add-config ‘SCRAM-SHA-256=[password=producer-sec],SCRAM-SHA-512=[password=producer-sec]’ --entity-type users --entity-name producer
- 创建消费者用户:./bin/kafka-configs.sh --zookeeper CentOS:2181 --alter --add-config ‘SCRAM-SHA-256=[password=consumer-sec],SCRAM-SHA-512=[password=consumer-sec]’ --entity-type users --entity-name consumer
- 查看用户:./bin/kafka-configs.sh --zookeeper CentOS:2181 --describe --entity-type users
- 客户端安全参数示例(用于配置文件或命令行):
- security.protocol=SASL_SSL
- sasl.mechanism=SCRAM-SHA-256(或 PLAIN)
- sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=“consumer” password=“consumer-sec”;
- 事务与隔离级别(读取已提交消息):isolation.level=read_committed
以上为在 CentOS 上对接启用认证的 Kafka 集群时,消费者端常见且必要的安全配置。
四 常见问题与快速排查
- 无法连接 Broker:确认 bootstrap.servers 指向可达的 9092 端口,且 listeners/advertised.listeners 对外可达;云服务器需开放安全组与防火墙。
- 启动报错或找不到位移:检查 group.id 是否一致;首次消费可设 auto.offset.reset=earliest;若使用安全认证,确认 security.protocol / sasl. / jaas* 配置正确。
- 消费延迟或吞吐不足:结合业务调整 fetch.min.bytes / fetch.max.wait.ms / max.poll.records;网络跨机房可适当增大 receive.buffer.bytes / send.buffer.bytes。
- 频繁重平衡:保持 session.timeout.ms / heartbeat.interval.ms 在合理范围,并确保单次 poll 处理能在超时前完成。
这些排查点覆盖了连接、认证、位移与性能的常见问题。