PyTorch是一个流行的开源机器学习库,它在Linux上支持多种并行计算方式,包括数据并行、模型并行和分布式并行。以下是这些并行计算方式的简要介绍:
数据并行是指将数据集分割成多个小批次,并在不同的GPU上并行处理这些小批次。PyTorch通过torch.nn.DataParallel模块来实现数据并行。
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
# 假设我们有一个模型和一个数据集
model = nn.Sequential(
nn.Linear(10, 50),
nn.ReLU(),
nn.Linear(50, 10)
)
dataset = ... # 你的数据集
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
# 使用DataParallel进行数据并行
if torch.cuda.device_count() > 1:
print(f"Let's use {torch.cuda.device_count()} GPUs!")
model = nn.DataParallel(model)
model.to('cuda') # 将模型移动到GPU
# 训练模型
for inputs, labels in dataloader:
inputs, labels = inputs.to('cuda'), labels.to('cuda')
outputs = model(inputs)
loss = nn.CrossEntropyLoss()(outputs, labels)
loss.backward()
optimizer.step()
optimizer.zero_grad()
模型并行是指将模型的不同部分放在不同的GPU上。PyTorch没有内置的模型并行支持,但可以通过手动管理张量和操作来实现。
import torch
import torch.nn as nn
class ModelParallelModel(nn.Module):
def __init__(self):
super(ModelParallelModel, self).__init__()
self.part1 = nn.Linear(10, 50).to('cuda:0')
self.part2 = nn.Linear(50, 10).to('cuda:1')
def forward(self, x):
x = x.to('cuda:0')
x = self.part1(x)
x = x.to('cuda:1')
x = self.part2(x)
return x
model = ModelParallelModel()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# 训练模型
for inputs, labels in dataloader:
inputs, labels = inputs.to('cuda:0'), labels.to('cuda:1')
outputs = model(inputs)
loss = nn.CrossEntropyLoss()(outputs, labels)
loss.backward()
optimizer.step()
optimizer.zero_grad()
分布式并行是指在多个节点上并行运行模型,每个节点可以有多个GPU。PyTorch通过torch.distributed模块来实现分布式并行。
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
def main(rank, world_size):
dist.init_process_group(backend='nccl', init_method='env://', world_size=world_size, rank=rank)
model = nn.Sequential(
nn.Linear(10, 50),
nn.ReLU(),
nn.Linear(50, 10)
).to(rank)
ddp_model = DDP(model, device_ids=[rank])
dataset = ... # 你的数据集
sampler = DistributedSampler(dataset)
dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)
optimizer = torch.optim.SGD(ddp_model.parameters(), lr=0.01)
for epoch in range(num_epochs):
sampler.set_epoch(epoch)
for inputs, labels in dataloader:
inputs, labels = inputs.to(rank), labels.to(rank)
outputs = ddp_model(inputs)
loss = nn.CrossEntropyLoss()(outputs, labels)
loss.backward()
optimizer.step()
optimizer.zero_grad()
if __name__ == "__main__":
world_size = torch.cuda.device_count()
torch.multiprocessing.spawn(main, args=(world_size,), nprocs=world_size, join=True)
选择合适的并行方式取决于你的硬件资源和具体需求。