温馨提示×

kafka怎么判断是否发送成功

小亿
552
2023-10-20 18:49:03
栏目: 大数据

在 Kafka 中,生产者发送消息后,可以通过以下几种方式判断消息是否发送成功:

  1. 同步发送方式:生产者调用 send() 方法后,会等待消息的确认返回,如果发送成功,send() 方法会返回一个 RecordMetadata 对象,其中包含了消息的元数据信息;如果发送失败,则可能抛出异常。
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key", "value");
try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println("消息发送成功,offset:" + metadata.offset());
} catch (Exception e) {
    System.out.println("消息发送失败:" + e.getMessage());
}
  1. 异步发送方式:生产者调用 send() 方法后,可以传递一个回调函数,在消息发送完成后,会调用该回调函数,通过回调函数可以获取到发送结果。
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key", "value");
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) {
        if (e != null) {
            System.out.println("消息发送失败:" + e.getMessage());
        } else {
            System.out.println("消息发送成功,offset:" + metadata.offset());
        }
    }
});

无论是同步发送还是异步发送,如果发送失败,可以根据异常信息进行错误处理。

0