MNIST 手写数字分类 Distributed Data Parallel (DDP)

The difference between DistributedDataParallel and DataParallel is: DistributedDataParallel uses multiprocessing where a process is created for each GPU, while DataParallel uses multithreading.
By using multiprocessing, each GPU has its dedicated process, this avoids the performance overhead caused by GIL of Python interpreter.
- DDP vs DP 的并发模式
DDP使用的是多进程multiprocessing
- 每个 GPU 对应一个独立的 Python 进程。
- 各 GPU/进程之间通过通信(比如 NCCL)同步梯度。
- 进程之间可以并行,每个进程独占一个 GPU,自由度高、效率高。
DP 使用的是多线程(multithreading)
- 一个 Python 主进程控制多个线程,每个线程对应一个 GPU 上的模型副本。
- 所有线程共享同一个 Python 解释器(主进程中的 GIL 环境)。
- 在多线程环境下,同一时刻只能有一个线程执行 Python 字节码
- GIL 的性能问题
Python 有个限制叫 GIL(Global Interpreter Lock):
- 在 Python 中,进程之间可以并行,线程之间只能并发。
- 在多线程环境下,同一时刻只能有一个线程执行 Python 字节码。
- 这意味着虽然多个线程运行在不同 GPU 上,但只要你涉及到 Python 层的逻辑(如 forward 调度、数据调度),就会被 GIL 限制,造成瓶颈。
DDP 的多进程模式就天然绕开了 GIL,每个进程有独立的 Python 解释器和 GIL,不会互相争抢锁。所以执行速度更快、效率更高、更适合大模型和多 GPU 并行。
To use DistributedDataParallel on a host with N GPUs, you should spawn up N processes, ensuring that each process exclusively works on a single GPU from 0 to N-1.
总结下,DDP 用多进程给每个 GPU 配一个独立的进程,这样就不用多个线程去抢 Python 的 GIL,避免了 DataParallel 由于多线程带来的性能开销。
分布式数据并行时,模型(model parameters)/优化器(optimizer states)每张卡都会拷贝一份(replicas),在整个训练过程中 DDP 始终在卡间维持着模型参数和优化器状态的同步一致性;
DDP 将 batch input,通过 DistributedSampler split & 分发到不同的 gpus 上,此时虽然模型/optimizer 相同,但因为数据输入不同,导致 loss 不同,反向传播时计算到的梯度也会不同,如何保证卡间,model/optimizer 的同步一致性,之前 DP 用的 parameter server,而它的问题就在于通信压力都在 server,所以 DDP 对这方面的改进是 ring all-reduce algorithm,将 Server 上的通讯压力均衡转到各个 Worker 上
注意有两个核心概念:
- All to one:reduce
- one to All:broadcast
方法名 | 通信结构 | 通信路径 | 数据流向 | 聚合策略 | 通信瓶颈位置 | 通信效率 | 适合场景 |
---|---|---|---|---|---|---|---|
Parameter Server | 中心化(星型) | 所有 Worker ⇄ PS | 上传全部梯度 → 聚合 → 下发参数 | PS 聚合 | PS 带宽和计算压力 | ❌ 低(集中式瓶颈) | 小规模训练,原型实验 |
Tree All-Reduce | 层次化(树型) | 节点间按树结构上传/下传 | 层层上传聚合 → 再层层广播 | 层次加和 & 广播 | 上层节点(树根) | ✅ 中( |
多机多卡,合理拓扑连接 |
Broadcast + Reduce | 两阶段(集中) | 所有 → 主节点(reduce) → 所有 | 所有上传 → 中心聚合 → 广播下发 | 单节点聚合 | 主节点 | ❌ 低 | 小规模单机多卡 |
Ring All-Reduce | 环形(对称) | 相邻节点之间点对点传输 | 均匀传递/聚合,每轮处理一块数据 | 分块加和 & 拼接 | 无集中瓶颈 | ✅✅ 高(带宽最优) | 大规模 GPU 并行,主流方案 |
Parameter Server(PS)和 Broadcast + Reduce 在通信机制上本质相似,区别只在于:
- PS 是显式设计了专门的“参数服务器”角色;
- Broadcast + Reduce 是“隐式指定”某个节点承担聚合与广播任务。
ring all-reduce:
- Reduce-scatter:首先将 gradient 分为 n 块,在第 i 轮 (0<= i < n-1),每个 gpu j 把 第 (i+j) % n 块的数据传给下一个 gpu (j+1 % n),即每个 gpu 都把自己一个块给下一个做加法,在 n 轮结束后,每个 gpu 上都有一个块是完整的聚合了所有不同 gpu 的 gradient。
- All-gather: 将每个 gpu 上的完整聚合后的 gradient 依次传给下一个 gpu,再传递 n-1 次就使所有 gpu 的每块 gradient 都是完整聚合的数据。
虽然传递的数据量还是和 PS 一样,但传输压力平均到每个 gpu 上,不需要单个 worker 承担明显大的压力。
概念/参数名 | 中文含义 | 含义解释 | 示例(2节点 × 每节点4GPU) |
---|---|---|---|
world |
全局进程空间 | 指整个分布式系统中参与训练的所有进程总和 | 2 节点 × 4 GPU = 8 个进程 |
world_size |
全局进程数 | world 中的进程总数,参与通信、同步、梯度聚合的总 worker 数 |
8 |
rank |
全局进程编号 | 当前进程在 world 中的唯一编号,范围是 |
第1节点是 0 |
node |
物理节点/机器 | 实际的服务器或物理机,每个节点运行多个进程,通常对应一台机器 | 2台服务器(假设每台4 GPU) |
node_rank |
节点编号 | 当前节点在所有节点中的编号,通常用于标识不同机器 | 第1台是 0,第2台是 1 |
local_rank |
本地GPU编号 | 当前进程在所在节点上的 GPU 编号,绑定 cuda(local_rank) |
每台机器上分别为 0~3 |
简洁点,world 代表所有服务器上的 gpu,rank 代表 world 视角下的 gpu 编号;node 代表某个具体的服务器,node_rank 代表 world 视角下的 node 编号,local_rank 代表 node 视角下的 gpu 编号。
引入 DDP 相关库
1 | import os |
ddp 对原始代码的修改
参数 | 作用说明 |
---|---|
MASTER_ADDR |
指定 主节点(rank=0 所在节点)的 IP 地址或主机名,作为所有进程连接的“服务器” |
MASTER_PORT |
指定主节点上用于通信监听的端口号,所有进程都通过这个端口进行连接与协调 |
为什么只需要指定主节点的地址和端口?所有进程必须“集合”在一起组成一个通信组(process group);这个过程需要一个 协调者,就像组织会议需要一个人发出会议链接一样;PyTorch DDP 把这个协调角色交给 rank == 0 的进程(主节点);其它进程只需要“知道去哪找这个协调者”就能完成初始化。
主节点负责协调组网,在 DDP 初始化时,所有节点主动连接主节点,每个节点都会告知主节点自己的地址和端口,主节点收集所有其他进程的网络信息,构建全局通信拓扑,将通信配置信息广播回每个进程,包括每个 rank 要连接哪些 peer,这样每个进程就可以进行后续的双向传输,而不再依赖主节点作为中转。
主节点(rank 0) | 工作节点(rank 1,2,…) |
---|---|
在 MASTER_PORT 启动一个监听服务(如 TCP server) |
主动连接 MASTER_ADDR:MASTER_PORT |
监听并接受连接,记录加入者信息 | 与主节点握手,注册自己的 rank 、地址等 |
构建通信拓扑,如 Ring 或 NCCL 分组等 | 一旦接入,就获得组网配置,与其他 worker 点对点通信 |
ddp 初始化和销毁进程
1 | def ddp_setup(rank, world_size): |
DDP 会在每个 GPU 上运行一个进程,每个进程中都有一套完全相同的 Trainer 副本(包括 model 和 optimizer),各个进程之间通过一个进程池进行通信。
ddp 包装 model
训练函数不需要多大的修改,使用 DistributedDataParallel 包装模型,这样模型才能在各个进程间同步参数。包装后 model 变成了一个 DDP 对象,要访问其参数得这样写 self.model.module.state_dict()
运行过程中单独控制某个进程进行某些操作,比如要想保存 ckpt,由于每张卡里都有完整的模型参数,所以只需要控制一个进程保存即可。需要注意的是:使用 DDP 改写的代码会在每个 GPU 上各自运行,因此需要在程序中获取当前 GPU 的 rank(gpu_id),这样才能对针对性地控制各个 GPU 的行为。
1 | class Trainer: |
在程序入口初始化进程池;在程序出口销毁进程池
1 | def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int): |
DistributedSampler
构造 Dataloader 时使用 DistributedSampler 作为 sampler,这个采样器可以自动将数量为 batch_size 的数据分发到各个GPU上,并保证数据不重叠。理解是可以是这样的,但实际是根据 rank 让每个 gpu 能索引到的数据不一样,每个 gpu 上也是有重复的 Dataloader 的,但每个gpu 上 rank 设置不同,Dataloader sample 先根据 shuffle 打乱顺序,再控制不同 rank 能索引到的数据,以实现类似分发的效果。
Rank 0 sees: [4, 7, 3, 0, 6] Rank 1 sees: [1, 5, 9, 8, 2]
1 | def prepare_dataloader(dataset: Dataset, batch_size: int): |
set_epoch(epoch) 用于设置当前训练 epoch,以确保在分布式训练中 每个进程对数据的打乱顺序一致,从而保证每个 rank 分到的数据是互不重叠且可复现的。
当 DistributedSampler 的 shuffle=True 时,它在每个 epoch 会用 torch.Generator().manual_seed(seed) 生成新的随机索引顺序。
但:
- 如果不调用 set_epoch(),每个进程将使用相同的默认种子;
- 会导致每个 epoch 每个进程打乱后的样本索引相同 → 重复取样,每个 epoch 的训练数据都一样 → 训练不正确!
你确实可以不手动设置 rank 和 world_size,因为 DistributedSampler 会自动从环境变量中获取它们。
如果你不传入 rank 和 num_replicas,PyTorch 会调用:
- torch.distributed.get_world_size() # 获取 world_size
- torch.distributed.get_rank() # 获取当前进程 rank
1 | import torch |
1 | Epoch 0 |
multiprocessing.spawn 创建多卡进程
使用 torch.multiprocessing.spawn 方法将代码分发到各个 GPU 的进程中执行。在当前机器上启动 nprocs=world_size 个子进程,每个进程执行一次 main() 函数,并由 mp.spawn 自动赋值第一个参数(目的是执行 nprocs 个进程,第一个参数为 0 ~ nprocs-1)。
1 | def start_process(i): |
可以执行执行以下代码,它展现了 mp 创建进程的效果
1 | import torch.multiprocessing as mp |
效果为:
1 | [Rank 0] Received message: hello world |
torchrun
torchrun 是 PyTorch 官方推荐的分布式训练启动工具,它用来 自动管理多进程启动、环境变量传递和通信初始化,替代早期的 torch.distributed.launch 工具。
- 它帮你在每个 GPU 上自动启动一个训练进程;
- 它设置好 DDP 所需的环境变量(如 RANK, WORLD_SIZE, LOCAL_RANK, MASTER_ADDR 等);
- 它会自动将这些参数传递给你的脚本中的 torch.distributed.init_process_group()。
torchrun == python -m torch.distributed.launch –use-env
参数名 | 类型 | 说明 |
---|---|---|
--nproc_per_node |
int |
每台机器上启动的进程数(默认值为 1) |
--nnodes |
int |
总节点(机器)数 |
--node_rank |
int |
当前节点编号(范围:0 ~ nnodes-1 ) |
--rdzv_backend |
str |
rendezvous 后端(默认 c10d ,一般不改) |
--rdzv_endpoint |
str |
rendezvous 主地址和端口,格式如 "localhost:29500" |
--rdzv_id |
str |
作业唯一标识,默认 "default" |
--rdzv_conf |
str |
可选的 kv 参数,用逗号分隔,如 "key1=val1,key2=val2" |
--max_restarts |
int |
失败时最多重启次数(默认 3) |
--monitor_interval |
float |
monitor 进程检查的间隔(秒) |
--run_path |
str |
若脚本是模块路径形式,比如 my_module.train ,则用此代替 script |
--tee |
str |
控制日志输出,可选值为 "stdout" 或 "stderr" |
--log_dir |
str |
日志输出目录(默认当前目录) |
--redirects |
str |
重定向日志,可选:all , none , rank ,如 all:stdout |
--no_python |
flag |
若已是 Python 脚本(不用再次 python 调用),可加这个 flag |
以上的 rendezvous 是每个进程通过 rendezvous 找到主节点,然后加入。之后的通信阶段用 backend, 即 NCCL,在 init_process_group 设置。
最常见的几个参数的用法是
1 | torchrun \ |
对比下是否使用 torchrun 时的行为差别
两种 DDP 启动模式的关键区别
对比项 | 不使用 torchrun (手动) |
使用 torchrun (推荐方式) |
---|---|---|
启动方式 | 使用 mp.spawn(fn, ...) |
使用 torchrun --nproc_per_node=N |
rank , world_size 设置方式 |
手动传入(通过 spawn 的参数) |
自动由 torchrun 设置环境变量 |
主节点地址 / 端口 | 你必须手动设置 MASTER_ADDR/PORT |
torchrun 会自动设置这些环境变量 |
是否需控制进程数量 | 手动使用 spawn 创建 |
自动由 torchrun 创建 |
是否读取环境变量 | ❌ 默认不会 | ✅ 自动从环境变量中读取(如 RANK , LOCAL_RANK ) |
脚本能否直接运行(python train.py ) |
❌ 通常不行,需要多进程协调 | ✅ 支持直接 torchrun train.py 运行 |
是否适用于多机 | ❌ 手动处理跨节点逻辑 | ✅ 内建 --nnodes , --node_rank , 可跨机运行 |
init_process_group()
的行为
情况 | 说明 |
---|---|
手动传 rank 和 world_size |
常用于 mp.spawn 场景(你在代码里传了参数) |
不传,内部读取环境变量 | 如果你用的是 torchrun ,环境变量如 RANK 、WORLD_SIZE 自动设置了 |
不传又没用 torchrun |
❌ 报错:因为 init_process_group 找不到必要的通信信息 |
当你运行:
1 | torchrun --nproc_per_node=4 --rdzv_endpoint=localhost:29500 train.py |
它在后台自动设置了以下环境变量(对每个进程):
1 | RANK=0 # 每个进程唯一编号 |
而 init_process_group(backend="nccl")
会自动从这些环境变量中解析配置,无需你显式传入。
非 torchrun 完整代码
1 | import os |
启动代码
1 | CUDA_VISIBLE_DEVICES=0,1 python mnist_ddp.py |
torchrun 完整代码
1 | import os |
启动代码
1 | CUDA_VISIBLE_DEVICES=0,1 torchrun --nproc_per_node=2 mnist_ddp_torchrun.py |