一 环境准备与系统检查
sudo systemctl stop firewalld(生产环境请按需放行端口)。二 方案一 原生 PyTorch DDP 快速上手
import os, torch, torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, DistributedSampler, TensorDataset
def main(rank, world_size):
# 1) 初始化进程组(TCP 方式)
dist.init_process_group(
backend='nccl',
init_method='tcp://<MASTER_IP>:<MASTER_PORT>',
world_size=world_size,
rank=rank
)
torch.cuda.set_device(rank)
# 2) 模型与 DDP
model = nn.Linear(10, 1).to(rank)
model = DDP(model, device_ids=[rank])
# 3) 数据(示例)
ds = TensorDataset(torch.randn(1024, 10), torch.randn(1024, 1))
sampler = DistributedSampler(ds, num_replicas=world_size, rank=rank)
dl = DataLoader(ds, batch_size=32, sampler=sampler, num_workers=4, pin_memory=True)
# 4) 训练
opt = optim.SGD(model.parameters(), lr=0.01)
loss_fn = nn.MSELoss()
for epoch in range(3):
sampler.set_epoch(epoch)
for x, y in dl:
x, y = x.to(rank), y.to(rank)
opt.zero_grad()
loss = loss_fn(model(x), y)
loss.backward()
opt.step()
dist.destroy_process_group()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--rank', type=int, required=True)
parser.add_argument('--world-size', type=int, required=True)
args = parser.parse_args()
main(args.rank, args.world_size)
python -m torch.distributed.launch \
--nproc_per_node=4 \
--nnodes=1 --node_rank=0 \
--master_addr=<MASTER_IP> --master_port=12345 \
ddp_demo.py --rank 0 --world-size 4
python -m torch.distributed.launch \
--nproc_per_node=4 \
--nnodes=2 --node_rank=0 \
--master_addr=<MASTER_IP> --master_port=12345 \
ddp_demo.py --rank 0 --world-size 8
python -m torch.distributed.launch \
--nproc_per_node=4 \
--nnodes=2 --node_rank=1 \
--master_addr=<MASTER_IP> --master_port=12345 \
ddp_demo.py --rank 1 --world-size 8
三 方案二 Horovod 多机多卡
pip install horovod[pytorch](需提前装好 OpenMPI 或 Gloo)。mpirun -np 8 -H node0:4,node1:4 \
-bind-to none -map-by slot \
-x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH \
python train_horovod.py
import torch
import horovod.torch as hvd
hvd.init()
torch.cuda.set_device(hvd.local_rank())
model = Net().cuda()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01 * hvd.size())
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
# 广播初始参数
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
# 数据:每个进程仅读取自己 shard
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank()
)
train_loader = DataLoader(train_dataset, batch_size=..., sampler=train_sampler, ...)
四 性能与稳定性要点
五 常见问题排查
export NCCL_SOCKET_TIMEOUT=3600、export NCCL_DEBUG=INFO;检查 驱动/CUDA/NCCL 版本匹配与网络健康度。ssh-copy-id 分发公钥。