Linux上Kafka数据备份与恢复实操指南
一、备份方式总览与选择
二、方法一 逻辑导出与导入(控制台工具)
BACKUP_TOPIC=test
BACKUP_DIR=/tmp/backup
mkdir -p $BACKUP_DIR
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic $BACKUP_TOPIC \
--from-beginning \
--property print.key=true \
--property key.separator="|" \
> $BACKUP_DIR/${BACKUP_TOPIC}.txt
RESTORE_TOPIC=test
BACKUP_FILE=$BACKUP_DIR/${RESTORE_TOPIC}.txt
kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic $RESTORE_TOPIC \
--new-producer \
< $BACKUP_FILE
三、方法二 跨集群持续镜像 MirrorMaker 2(推荐)
# 源集群
clusters = source, target
source.bootstrap.servers = source-cluster:9092
target.bootstrap.servers = backup-cluster:9092
# 复制关系:将源集群全部主题镜像到目标集群
source->target.enabled = true
source->target.topics = .*
# 消费者组(避免与线上冲突)
groups = mirror-group
# 其他建议参数
replication.factor = 3
bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties \
--producer.config config/producer.properties \
--config mirror-maker-2.properties
四、方法三 外部存储归档 Kafka Connect(HDFS/S3)
curl -X POST -H "Content-Type: application/json" \
--data @hdfs-sink-connector.json http://localhost:8083/connectors
hdfs-sink-connector.json 示例:{
"name": "hdfs-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": 3,
"topics": "your_topic",
"hdfs.url": "hdfs://namenode:8020",
"hdfs.path": "/kafka/backup",
"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
"flush.size": 100000
}
}
五、方法四 目录拷贝与注意事项
# 备份
rsync -avz /var/lib/kafka/logs/ user@remote:/backup/kafka_logs/
# 恢复
rsync -avz user@remote:/backup/kafka_logs/ /var/lib/kafka/logs/