温馨提示×

Kafka生产者如何设置消息发送超时

小樊
32
2025-12-19 02:15:26
栏目: 大数据

在Kafka生产者中,可以通过设置request.timeout.ms参数来控制消息发送的超时时间。request.timeout.ms参数定义了生产者在放弃发送请求之前等待服务器响应的最长时间。如果在这个时间内没有收到服务器的响应,生产者将抛出一个异常。

要设置消息发送超时,可以在创建Kafka生产者时配置Properties对象。以下是一个示例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerTimeoutExample {
    public static void main(String[] args) {
        // 创建配置对象
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 设置消息发送超时时间为10秒
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);

        // 创建Kafka生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 创建一个生产者记录
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("my-topic", "key", "value");

        try {
            // 发送消息
            producer.send(record);
            System.out.println("消息发送成功");
        } catch (Exception e) {
            System.err.println("消息发送失败: " + e.getMessage());
        } finally {
            // 关闭生产者
            producer.close();
        }
    }
}

在这个示例中,我们将request.timeout.ms设置为10000毫秒(10秒)。如果在10秒内没有收到服务器的响应,生产者将抛出一个异常。

0