温馨提示×

Kafka配置中的消息保留策略如何设置

小樊
63
2025-08-07 04:02:43
栏目: 大数据

Kafka消息保留策略可通过以下方式设置:

  1. 配置文件设置
    server.properties中设置全局默认策略:

    • log.retention.ms:以毫秒为单位设置消息保留时间(如log.retention.ms=604800000保留7天)。
    • log.retention.bytes:设置分区最大保留字节数(如log.retention.bytes=2147483648保留2GB)。
    • log.cleanup.policy:设置清理策略(delete按时间/大小删除,compact保留每个key的最新消息)。
  2. 命令行工具设置

    • 创建Topic时指定:
      kafka-topics.sh --create --topic my-topic --config retention.ms=86400000 --config retention.bytes=1073741824(保留1天+1GB)。
    • 修改已有Topic配置:
      kafka-configs.sh --alter --topic my-topic --add-config retention.ms=172800000(修改为2天)。
  3. 动态API设置
    使用Kafka AdminClient动态修改Topic配置(需注意部分参数不可动态修改):

    Properties props = new Properties();  
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");  
    try (AdminClient admin = AdminClient.create(props)) {  
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "my-topic");  
        ConfigEntry retentionEntry = new ConfigEntry("retention.ms", "172800000");  
        admin.incrementalAlterConfigs(Collections.singletonMap(resource, Collections.singletonList(new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET)))).all().get();  
    }  
    

注意

  • retention.msretention.bytes可同时设置,满足“时间+大小”双重限制。
  • 启用compact策略时,仅保留每个key的最新消息,适用于状态更新场景。
  • 修改配置后需重启Kafka或等待日志段滚动生效。

0