Kafka消息保留策略可通过以下方式设置:
配置文件设置
在server.properties中设置全局默认策略:
log.retention.ms:以毫秒为单位设置消息保留时间(如log.retention.ms=604800000保留7天)。log.retention.bytes:设置分区最大保留字节数(如log.retention.bytes=2147483648保留2GB)。log.cleanup.policy:设置清理策略(delete按时间/大小删除,compact保留每个key的最新消息)。命令行工具设置
kafka-topics.sh --create --topic my-topic --config retention.ms=86400000 --config retention.bytes=1073741824(保留1天+1GB)。kafka-configs.sh --alter --topic my-topic --add-config retention.ms=172800000(修改为2天)。动态API设置
使用Kafka AdminClient动态修改Topic配置(需注意部分参数不可动态修改):
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient admin = AdminClient.create(props)) {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "my-topic");
ConfigEntry retentionEntry = new ConfigEntry("retention.ms", "172800000");
admin.incrementalAlterConfigs(Collections.singletonMap(resource, Collections.singletonList(new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET)))).all().get();
}
注意:
retention.ms和retention.bytes可同时设置,满足“时间+大小”双重限制。compact策略时,仅保留每个key的最新消息,适用于状态更新场景。