CentOS环境下Kafka数据迁移常见方法及步骤
当需要迁移Kafka的数据存储目录(log.dirs)或日志目录(logs)时,需通过以下步骤完成:
./kafka-server-stop.sh停止服务,避免迁移过程中数据损坏。/usr/local/kafka/data)和日志目录(如/usr/local/kafka/logs)的内容复制到新位置(如/mnt/kafka/data、/mnt/kafka/logs),使用cp -r命令确保文件完整性。server.properties,更新log.dirs参数为新路径(如log.dirs=/mnt/kafka/data);kafka-run-class.sh(或kafka-server-start.sh),将日志输出目录指向新位置(如LOG_DIR=/mnt/kafka/logs)。./kafka-server-start.sh -daemon server.properties,启动后通过kafka-topics.sh --list --bootstrap-server localhost:9092验证Topic是否正常。MirrorMaker是Kafka官方提供的跨集群数据同步工具,支持全量/增量迁移,适用于将数据从一个集群复制到另一个集群:
mirror-maker.properties,配置源集群和目标集群的连接信息及同步Topic(示例):# 源集群配置
consumer.bootstrap.servers=source-host:9092
# 目标集群配置
producer.bootstrap.servers=target-host:9092
# 同步的Topic(支持正则,如test.*)
topics=test
# 消费者组ID
group.id=mirror-maker-group
./kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config mirror-maker.properties --producer.config mirror-maker.properties --whitelist test(--whitelist指定Topic,支持*通配符)。kafka-console-consumer.sh消费同步后的Topic,确认数据与源集群一致。当需要将Topic的分区从一个Broker迁移到另一个Broker(如集群扩容后平衡负载),可使用kafka-reassign-partitions.sh工具:
topics-to-move.json(指定要迁移的Topic),执行./kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3" --generate,生成包含当前分区分配和建议新分配的JSON文件。reassignment.json),执行./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file reassignment.json --execute。./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file reassignment.json --verify,检查分区是否迁移成功(状态为completed)。若目标集群为AutoMQ,可使用Kafka Linking工具实现零停机迁移,确保业务不中断:
automq cluster create kafka link <link-name> <source-topic> <target-topic>,建立源Topic到目标Topic的镜像链路。consumer groups promote命令将消费组从源集群切换到目标集群,消费者开始从新集群消费。promote topic停止同步,并将目标Topic设为生产可用。CloudCanal是一款数据集成工具,支持Kafka与其他数据源(如MySQL、Elasticsearch)的双向同步,也可用于Kafka集群间迁移:
--sync参数保证同步),避免数据丢失;迁移后验证目标集群的数据完整性(如消息数量、内容)。kafka-dump或kafka-console-consumer导出到文件),以便迁移失败时快速恢复。bootstrap.servers)指向目标集群,重启业务确保连接正常。