Kafka配置中的线程数主要涉及服务端和客户端,具体设置如下:
网络请求处理线程
num.network.threads:处理网络请求的线程数,建议设置为CPU核数的2-3倍。num.network.threads=8(8核服务器可设为8-12)。磁盘IO处理线程
num.io.threads:处理磁盘读写的线程数,建议设置为CPU核数的50%,不超过总核数。num.io.threads=4(8核服务器可设为4-6)。分区恢复线程
num.recovery.threads.per.data.dir:每个数据目录的恢复线程数,通常设置为1-3,用于日志恢复和清理。num.recovery.threads.per.data.dir=2。生产者多线程
ExecutorService producerExecutor = Executors.newFixedThreadPool(10); // 10个生产者线程
producerExecutor.submit(() -> {
// 生产者发送逻辑
});
消费者多线程
createMessageStreams的topicCountMap设置每个主题的消费线程数,需确保线程数≤分区数。Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put("topic1", 3); // 3个线程消费topic1
ConcurrentKafkaListenerContainerFactory的concurrency参数,需与分区数匹配。@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(3); // 每个消费者实例的线程数(不超过分区数)
return factory;
}
buffer.memory)。参数来源: