温馨提示×

Ubuntu Kafka配置中的消费者设置

小樊
94
2025-03-29 11:28:22
栏目: 智能运维

在Ubuntu上配置Kafka消费者时,需要考虑多个方面,包括消费者组的ID、服务器地址、自动提交偏移量等。以下是一些关键步骤和配置参数:

Kafka消费者配置步骤

  1. 配置消费者属性:设置消费者组ID、服务器地址、自动提交偏移量等参数。
  2. 创建Kafka消费者实例:使用上述配置的属性创建一个Kafka消费者实例。
  3. 订阅主题:通过调用subscribe方法,订阅一个或多个主题。
  4. 拉取消息:调用poll方法从Kafka服务器拉取消息。
  5. 处理消息:对拉取到的消息进行业务处理。
  6. 手动提交偏移量(可选):根据需求选择手动提交偏移量,调用commitSynccommitAsync方法。

Kafka消费者重要参数

  • fetch.min.bytes:消费者一次拉取中拉取的最小数据量,默认值为1B。
  • fetch.max.bytes:消费者一次拉取中拉取的最大数据量,默认值为52428800B,即50MB。
  • fetch.max.wait.ms:指定Kafka的等待时间,默认值为500ms。
  • max.partition.fetch.bytes:配置从每个分区里返回给consumer的最大数据量。
  • max.poll.records:配置consumer在一次拉取请求中拉取的最大消息数,默认为500条。
  • connections.max.idle.ms:空连接超时限制。
  • exclude.internal.topics:指定Kafka中的内部主题是否可以向消费者公开,默认为true。
  • receive.buffer.bytes:设置socket接收消息缓冲区大小,默认值为65536B,即64KB。
  • send.buffer.bytes:设置socket发送消息缓冲区大小,默认值为131072B,即128KB。
  • request.timeout.ms:consumer等待请求响应的最长时间,默认为30000ms。
  • metadata.max.age.ms:元数据过期时间,默认300000ms,即5分钟。
  • reconnect.backoff.ms:尝试重新连接主机之前等待时间,默认50ms。
  • retry.backoff.ms:尝试重新发送失败的请求到指定主题分区之前的等待时间,默认100ms。
  • isolation.level:事务隔离级别,有“read_uncommitted”和“read_committed”两种,默认情况为“read_uncommitted”。

使用SpringBoot配置Kafka消费者的示例代码

@Component
@Slf4j
public class MessageReceiveListener {
    @KafkaListener(topics = "ifaas-test", containerFactory = "ifaasContainerFactory")
    public void receiveMessage1(List<ConsumerRecord> consumerRecords, Acknowledgment ack) {
        try {
            log.info("receiveMessage1 接收的kafka消息:" + consumerRecords.size());
            ack.acknowledge();
        } catch (Exception e) {
            log.error("kafka失败消息:{}", JSON.toJSONString(consumerRecords));
        }
    }

    @Bean("ifaasContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> ifaasContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        Map<String, Object> props = staticConsumerProps(ifaasServers, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setBatchListener(true);
        factory.setConcurrency(1);
        return factory;
    }

    private Map<String, Object> staticConsumerProps(String servers, String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 350000);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 4000);
        return props;
    }
}

以上信息提供了在Ubuntu上配置Kafka消费者的一些基本步骤和关键参数,以及使用SpringBoot框架的示例代码。

0