温馨提示×

Kafka 在 Debian 上如何进行数据迁移

小樊
44
2025-11-16 13:10:17
栏目: 智能运维

在 Debian 上的 Kafka 数据迁移实操指南

一、迁移场景与总体策略

  • 跨集群迁移或上云:优先使用 MirrorMaker 2(MM2) 或厂商提供的复制工具(如云厂商的 Smart Connect),支持跨集群镜像、复制消费者偏移量、双向复制等,适合零停机或低停机迁移。
  • 不停机切换:采用“先同步、再切消费、最后切生产”的三段式,保证业务连续性。
  • 仅做存储层搬迁(同版本、同配置):可直接拷贝 log.dirs 数据目录到新集群对应路径,并调整 broker.id / log.dirs / listeners 后启动;务必确保无写入且做好备份。
  • 仅调整分区与副本分布:使用 kafka-reassign-partitions.sh 执行分区重分配,适合扩容、重均衡或新 broker 上线。
  • 从数据库到 Kafka 的初始化导入:使用 Kafka Connect + Debezium 捕获变更数据(CDC),适合一次性全量 + 持续增量同步。

二、跨集群迁移与不停机切换(推荐 MM2)

  • 步骤
    1. 准备新集群(Debian 上部署 Kafka,配置 broker.id / listeners / log.dirs / zookeeper.connect 或 KRaft 参数),确保网络互通与认证一致(如 SSL/SASL)。
    2. 使用 MirrorMaker 2 进行镜像:
      • 配置文件示例(mm2.properties):
        clusters = src, dst
        src.bootstrap.servers = PLAINTEXT://src-broker:9092
        dst.bootstrap.servers = PLAINTEXT://dst-broker:9092
        
        src->dst.enabled = true
        dst->src.enabled = false
        
        # 同步消费者组偏移量(可选)
        src->dst.sync.group.offsets.enabled = true
        src->dst.sync.group.offsets.interval.seconds = 60
        
        # 主题映射(可选:重命名或过滤)
        topics = .*
        
        # 复制限流(按带宽/分区调节)
        tasks.max = 8
        replication.factor = 3
        
      • 启动:
        bin/connect-mirror-maker.sh mm2.properties
        
    3. 校验数据:对比源/目标集群关键 topic 最新 offset / 消息条数,必要时在控制台或脚本中抽样校验。
    4. 切换消费端:将消费者连接地址改为新集群,分批重启,观察消费延迟与堆积。
    5. 切换生产端:确认消费稳定后,将生产者指向新集群并重启。
    6. 观察与回滚预案:持续监控 Lag / 错误率 / 吞吐,保留源集群只读一段时间以便回滚。
  • 说明
    • MM2 基于 Kafka Connect,支持分布式运行、容错与偏移量映射,适合生产环境。
    • 若使用云厂商工具(如 Smart Connect),流程通常为“先数据复制、再迁消费、最后迁生产”,并提示可能存在 0–100 条/分区 的少量重复消费,业务需具备幂等性。

三、同版本存储层搬迁与分区重分配

  • 存储层搬迁(同版本、同配置)
    1. 停止源集群写入(维护窗口或切到备集群)。
    2. 备份并同步 log.dirs 到新集群相同目录结构(保持 topic/partition 目录层级一致)。
    3. 在新集群设置正确的 broker.id / log.dirs / listeners,逐台启动并校验。
    4. 不建议跨版本直接拷贝数据目录,优先使用镜像或导出/导入方式。
  • 分区重分配(扩容/新 broker 上线/重均衡)
    1. 生成重分配计划:
      bin/kafka-reassign-partitions.sh --bootstrap-server <broker:9092> \
        --generate --topics-to-move-json-file topics.json --broker-list "<new-brokers>"
      
      topics.json 示例:
      {"topics": [{"topic": "my_topic"}], "version": 1}
      
    2. 执行重分配:
      bin/kafka-reassign-partitions.sh --bootstrap-server <broker:9092> \
        --execute --reassignment-json-file reassignment.json
      
    3. 查看进度:
      bin/kafka-reassign-partitions.sh --bootstrap-server <broker:9092> \
        --verify --reassignment-json-file reassignment.json
      
    4. 如需可视化辅助,可使用 Kafka Manager 生成分配计划并执行。

四、数据库到 Kafka 的初始化导入(CDC)

  • 适用:将 MySQL / PostgreSQL 等数据库的历史与增量数据导入 Kafka,作为迁移或同步的起点。
  • 步骤(Debian 上可用 Docker Compose 快速起环境)
    1. 安装 Docker 并启动服务:
      sudo apt-get update
      sudo apt-get install -y docker.io
      sudo systemctl start docker
      sudo systemctl enable docker
      
    2. 使用 docker-compose 启动 Zookeeper / Kafka / Kafka Connect / UI
      version: '2'
      services:
        zookeeper:
          image: quay.io/debezium/zookeeper:2.0
          ports: ["2181:2181","2888:2888","3888:3888"]
        kafka:
          image: quay.io/debezium/kafka:2.0
          ports: ["9092:9092"]
          links: ["zookeeper"]
        connect:
          image: quay.io/debezium/connect:2.0
          ports: ["8083:8083","5005:5005"]
          environment:
            - bootstrap.servers=kafka:9092
            - group.id=1
            - config.storage.topic=my_connect_configs
            - offset.storage.topic=my_connect_offsets
            - status.storage.topic=my_source_connect_statuses
        kafka-ui:
          image: provectuslabs/kafka-ui:latest
          ports: ["9093:8080"]
          environment:
            - KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS=kafka:9092
        debezium-ui:
          image: debezium/debezium-ui:2.0
          ports: ["8080:8080"]
          environment:
            - KAFKA_CONNECT_URIS=http://connect:8083
      
      启动:
      docker-compose -f docker-compose.yaml -p debezium up -d
      
    3. Debezium UI 或 Connect REST API 部署 Source Connector(如 MySQL/PostgreSQL),按需设置 snapshot.mode=initial 先做全量,再持续增量。
    4. 如需落库到目标存储,可再配置 Sink Connector(如到 PostgreSQL/ES)。
    5. 注意网络、权限、时区与 schema 注册表(如 Confluent Schema Registry)的一致性。

五、迁移验证与注意事项

  • 一致性校验:对比源/目标 topic 最新 offset / 消息计数 / 关键业务主键,必要时抽样回放校验。
  • 偏移量与幂等:启用 offset 同步 或业务侧幂等(如 幂等生产者 / 事务 / 去重),容忍少量重复的场景要明确策略。
  • 监控与限流:迁移期间关注 Lag / 吞吐 / 错误率,通过 tasks.max / replication.factor / 带宽 限流避免冲击生产。
  • 安全与网络:统一 SSL/SASL 认证与 ACL,确保跨集群端口(如 9092/8083)可达。
  • 回滚预案:保留源集群只读窗口与最近备份,出现异常可快速切回。
  • 版本与兼容性:跨大版本优先镜像而非拷贝数据目录;客户端依赖与序列化格式需兼容。

0