Kafka消费者组配置主要包含基础参数、消费行为控制、性能优化及安全相关配置,以下是核心要点:
bootstrap.servers:Kafka集群地址列表,格式为host1:port1,host2:port2,用于建立初始连接。group.id:消费者组唯一标识,组内消费者协同消费,实现负载均衡和容错。key.deserializer/value.deserializer:消息键值的反序列化器,如StringDeserializer。enable.auto.commit:是否自动提交偏移量,默认true。建议关键业务设为false,手动提交以避免消息丢失。auto.commit.interval.ms:自动提交间隔时间,默认5秒,仅在enable.auto.commit=true时生效。auto.offset.reset:无初始偏移量或偏移量失效时的处理策略:
earliest:从最早偏移量开始消费。latest:从最新偏移量开始消费(默认)。none:抛出异常,需手动处理。max.poll.records:单次poll拉取的最大消息数,默认500,可调整以避免处理超时。fetch.min.bytes:单次拉取的最小数据量,默认1字节,增大可减少网络请求次数。fetch.max.wait.ms:等待达到fetch.min.bytes的最长时间,默认500ms。session.timeout.ms:消费者心跳超时时间,默认10秒,超过则触发再平衡,需小于服务端配置的group.min.session.timeout.ms。heartbeat.interval.ms:心跳发送间隔,默认3秒,需小于session.timeout.ms的1/3。max.poll.interval.ms:两次poll调用的最大间隔,默认5分钟,超时会被认为消费者故障,触发再平衡。partition.assignment.strategy:分区分配策略,默认Range(按范围分配),可选RoundRobin(轮询)、Sticky(粘性分配,减少再平衡开销)。isolation.level:事务消息读取级别:
read_committed:仅读取已提交消息,跳过未提交事务中的消息。read_uncommitted(默认):读取所有消息,包括未提交事务中的消息。security.protocol:安全协议,如SASL_SSL、SSL,用于加密通信。sasl.mechanism:认证机制,如PLAIN、GSSAPI,需配合security.protocol使用。session.timeout.ms和max.poll.interval.ms,确保消费者处理能力与参数匹配。consumer.commitSync()或consumer.commitAsync()手动控制,确保消息不丢失。group.instance.id将消费者设为静态成员,减少不必要的再平衡。以上参数可根据业务场景调整,例如高吞吐场景可增大max.poll.records和fetch.max.bytes,低延迟场景需减小session.timeout.ms。