温馨提示×

springboot自动配置方式有哪些

发布时间:2021-09-28 17:50:52 来源:亿速云 阅读:84 作者:小新 栏目:开发技术

这篇文章给大家分享的是有关springboot自动配置方式有哪些的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

    spring boot自动配置方式整合

    spring boot具有许多自动化配置,对于kafka的自动化配置当然也包含在内,基于spring boot自动配置方式整合kafka,需要做以下步骤。

    引入kafka的pom依赖包

    <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.2.2.RELEASE</version>
    </dependency>

    在配置文件中配置kafka相关属性配置,分别配置生产者和消费者的属性,在程序启动时,spring boot框架会自动读取这些配置的属性,创建相关的生产者、消费者等。下面展示一个简单的配置。

    #kafka默认消费者配置
    spring.kafka.consumer.bootstrap-servers=192.168.0.15:9092
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.auto-offset-reset=earliest
    #kafka默认生产者配置
    spring.kafka.producer.bootstrap-servers=192.168.0.15:9092
    spring.kafka.producer.acks=-1
    spring.kafka.client-id=kafka-producer
    spring.kafka.producer.batch-size=5

    当然,在实际生产中的配置肯定比上面的配置复杂,需要一些定制化的操作,那么spring boot的自动化配置创建的生产者或者消费者都不能满足我们时,应该需要自定义化相关配置,这个在后续举例,这里先分析自动化配置。

    在进行了如上配置之后,需要生产者时,使用方式为下代码所示。

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = {UserSSOApplication.class})
    public class UserSSOApplicationTests {
       @Resource
        //注入kafkatemplete,这个由spring boot自动创建
       KafkaTemplate kafkaTemplate;
       @Test
       public void testKafkaSendMsg() {
           //发送消息
          kafkaTemplate.send("test", 0,12,"1222");
       }
    }

    消费者的使用注解方式整合,代码如下。

    @Component
    @Slf4j
    public class KafkaMessageReceiver2 {
        //指定监听的topic,当前消费者组id
        @KafkaListener(topics = {"test"}, groupId = "receiver")
        public void registryReceiver(ConsumerRecord<Integer, String> integerStringConsumerRecords) {
            log.info(integerStringConsumerRecords.value());
        }
    }

    上面是最简单的配置,实现的一个简单例子,如果需要更加定制化的配置,可以参考类

    org.springframework.boot.autoconfigure.kafka.KafkaProperties这里面包含了大部分需要的kafka配置。针对配置,在properties文件中添加即可。

    spring boot自动配置的不足

    上面是依赖spring boot自动化配置完成的整合方式,实际上所有的配置实现都是在org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中完成。可以看出个是依赖于@Configuration完成bean配置,这种配置方式基本能够实现大部分情况,只要熟悉org.springframework.boot.autoconfigure.kafka.KafkaProperties中的配置即可。

    但是这种方式还有一个问题,就是org.springframework.boot.autoconfigure.kafka.KafkaProperties中并没有涵盖所有的org.apache.kafka.clients.producer.ProducerConfig中的配置,这就导致某些特殊配置不能依赖spring boot自动创建,需要我们手动创建Producer和comsumer。

    @Configuration
    @ConditionalOnClass(KafkaTemplate.class)
    @EnableConfigurationProperties(KafkaProperties.class)
    @Import(KafkaAnnotationDrivenConfiguration.class)
    public class KafkaAutoConfiguration {
       private final KafkaProperties properties;
       private final RecordMessageConverter messageConverter;
       public KafkaAutoConfiguration(KafkaProperties properties,
             ObjectProvider<RecordMessageConverter> messageConverter) {
          this.properties = properties;
          this.messageConverter = messageConverter.getIfUnique();
       }
       @Bean
       @ConditionalOnMissingBean(KafkaTemplate.class)
       public KafkaTemplate<?, ?> kafkaTemplate(
             ProducerFactory<Object, Object> kafkaProducerFactory,
             ProducerListener<Object, Object> kafkaProducerListener) {
          KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(
                kafkaProducerFactory);
          if (this.messageConverter != null) {
             kafkaTemplate.setMessageConverter(this.messageConverter);
          }
          kafkaTemplate.setProducerListener(kafkaProducerListener);
          kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
          return kafkaTemplate;
       }
       @Bean
       @ConditionalOnMissingBean(ConsumerFactory.class)
       public ConsumerFactory<?, ?> kafkaConsumerFactory() {
          return new DefaultKafkaConsumerFactory<>(
                this.properties.buildConsumerProperties());
       }
       @Bean
       @ConditionalOnMissingBean(ProducerFactory.class)
       public ProducerFactory<?, ?> kafkaProducerFactory() {
          DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
                this.properties.buildProducerProperties());
          String transactionIdPrefix = this.properties.getProducer()
                .getTransactionIdPrefix();
          if (transactionIdPrefix != null) {
             factory.setTransactionIdPrefix(transactionIdPrefix);
          }
          return factory;
       }
      //略略略
    }

    spring boot下手动配置kafka

    由于需要对某些特殊配置进行配置,我们可能需要手动配置kafka相关的bean,创建一个配置类如下,类似于

    org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,这里创建了对应类型的bean之后,org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中的对应Bean定义将不起作用。

    所有的生产者配置可以参考ProducerConfig类,所有的消费者配置可以参考ConsumerConfig类。

    /**
     * kafka配置,实际上,在KafkaAutoConfiguration中已经有默认的根据配置文件信息创建配置,但是自动配置属性没有涵盖所有
     * 我们可以自定义创建相关bean,进行如下配置
     *
     * @author zhoujy
     * @date 2018年12月17日
     **/
    @Configuration
    public class KafkaConfig {
        @Value("${spring.kafka.consumer.bootstrap-servers}")
        private String bootstrapServers;
        //构造消费者属性map,ConsumerConfig中的可配置属性比spring boot自动配置要多
        private Map<String, Object> consumerProperties(){
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "activity-service");
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            return props;
        }
        /**
         * 不使用spring boot默认方式创建的DefaultKafkaConsumerFactory,重新定义创建方式
         * @return
         */
        @Bean("consumerFactory")
        public DefaultKafkaConsumerFactory consumerFactory(){
            return new DefaultKafkaConsumerFactory(consumerProperties());
        }
        @Bean("listenerContainerFactory")
        //个性化定义消费者
        public ConcurrentKafkaListenerContainerFactory listenerContainerFactory(DefaultKafkaConsumerFactory consumerFactory) {
            //指定使用DefaultKafkaConsumerFactory
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
            factory.setConsumerFactory(consumerFactory);
            
     //设置消费者ack模式为手动,看需求设置  
            factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
            //设置可批量拉取消息消费,拉取数量一次3,看需求设置
            factory.setConcurrency(3);
            factory.setBatchListener(true);
            return factory;
        }
       /* @Bean
         //代码创建方式topic
        public NewTopic batchTopic() {
            return new NewTopic("topic.quick.batch", 8, (short) 1);
        }*/
        //创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多
        private Map<String, Object> producerProperties(){
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.ACKS_CONFIG, "-1");
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
            props.put(ProducerConfig.LINGER_MS_CONFIG, 500);
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            return props;
        }
        /**
         * 不使用spring boot的KafkaAutoConfiguration默认方式创建的DefaultKafkaProducerFactory,重新定义
         * @return
         */
        @Bean("produceFactory")
        public DefaultKafkaProducerFactory produceFactory(){
            return new DefaultKafkaProducerFactory(producerProperties());
        }
        /**
         * 不使用spring boot的KafkaAutoConfiguration默认方式创建的KafkaTemplate,重新定义
         * @param produceFactory
         * @return
         */
        @Bean
        public KafkaTemplate kafkaTemplate(DefaultKafkaProducerFactory produceFactory){
            return new KafkaTemplate(produceFactory);
        }
    }

    生产者的使用方式,跟自动配置一样,直接注入KafkaTemplate即可。主要是消费者的使用有些不同。

    批量消费消息

    上面的消费者配置配置了一个bean,@Bean(“listenerContainerFactory”),这个bean可以指定为消费者,注解方式中是如下的使用方式。

    containerFactory = "listenerContainerFactory"指定了使用listenerContainerFactory作为消费者。

    • 注意registryReceiver中的参数,ConsumerRecord对比之前的消费者,因为设置listenerContainerFactory是批量消费,因此ConsumerRecord是一个List,如果不是批量消费的话,相对应就是一个对象。

    • 注意第二个参数Acknowledgment,这个参数只有在设置消费者的ack应答模式为AckMode.MANUAL_IMMEDIATE才能注入,意思是需要手动ack。

    @Component
    @Slf4j
    public class KafkaMessageReceiver {   
        /**
         * listenerContainerFactory设置了批量拉取消息,因此参数是List<ConsumerRecord<Integer, String>>,否则是ConsumerRecord
         * @param integerStringConsumerRecords
         * @param acknowledgment
         */
        @KafkaListener(topics = {"test"}, containerFactory = "listenerContainerFactory")
        public void registryReceiver(List<ConsumerRecord<Integer, String>> integerStringConsumerRecords, Acknowledgment acknowledgment) {
            Iterator<ConsumerRecord<Integer, String>> it = integerStringConsumerRecords.iterator();
            while (it.hasNext()){
                ConsumerRecord<Integer, String> consumerRecords = it.next();
                //dosome
                 acknowledgment.acknowledge();
            }
        } 
    }

    如果不想要批量消费消息,那就可以另外定义一个bean类似于@Bean(“listenerContainerFactory”),如下,只要不设置批量消费即可。

    @Bean("listenerContainerFactory2")
        //个性化定义消费者
        public ConcurrentKafkaListenerContainerFactory listenerContainerFactory2(DefaultKafkaConsumerFactory consumerFactory) {
            //指定使用DefaultKafkaConsumerFactory
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
            factory.setConsumerFactory(consumerFactory);
            
     //设置消费者ack模式为手动,看需求设置  
            factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
            return factory;
        }

    spring boot整合kafka报错

    Timeout expired while fetching topic metadata

    这种报错应检查 kafka连接问题,服务是否启动,端口是否正确。

    Kafka Producer error Expiring 10 record(s) for TOPIC:XXXXXX: xxx ms has passed since batch creation plus linger time

    这种报错要考虑kafka和spring对应的版本问题,我的springboot 2.1.2在使用kafka_2.12-2.1.1时出现此问题,将kafka版本换为2.11-1.1.1后,问题解决。

    感谢各位的阅读!关于“springboot自动配置方式有哪些”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

    免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

    免费拨打  400 100 2938 免费拨打 400 100 2938
    24小时售后技术支持 24小时售后技术支持
    返回顶部 返回顶部