Triton Inference Server推理引擎

https://github.com/triton-inference-server/server

Triton 从入门到精通

https://docs.nvidia.com/deeplearning/triton-inference-server/release-notes/index.html

深度学习部署神器——triton-inference-server入门教程指北

triton-inference-server的backend(一)——关于推理框架的一些讨论

Triton Inference Server是Nvida开源的机器学习推理引擎(可以理解为同TF Serving对等的产品),其提供了多种开箱即用的功能帮助我们快速落地AI模型到生产环境以提供业务使用。当我们团队人手资源受限或开发时间不足的情况下,没有能力自研一套机器学习推理引擎,Triton是一个非常好的选择。

从Triton的不断完善的进程来,其已经逐渐的具有了一个深度学习推理引擎应该所具有的的全部功能(当然还是有一些不够完善的地方)。

Triton Inference Server核心的几个功能:

  • 多框架支持:Triton支持了几乎所有主流的机器学习框架,例如Tensorflow、TensorRT、Pytorch、Python、ONNX等;同时也可以custom backend的方式来扩展解码引擎。
  • 高性能:Triton提供了dynamic batching、concurrent execution、optimal model configuration、model ensemble、dali model 等策略来提升在线推理的性能;同时也提供了perf analyze和model analyze工具来辅助我们进行性能调优。
  • MLOps:Triton提供了Prometheus exporter、模型在线更新、http server 、grpc server等多种在线服务策略以满足用户生产场景多样化的部署和运维需求

triton可以充当服务框架去部署你的深度学习模型,其他用户可以通过http或者grpc去请求你搭建的服务,相当于你用flask搭了个服务供别人请求,当然相比flask的性能高很多了。triton也可以摘出C-API充当多线程推理服务框架,去除http和grpc部分,适合本地部署多模型,比如你有很多模型要部署,然后分时段调用,或者有pipeline,有了triton就省去你处理显存、内存和线程的麻烦。

借用官方的图,triton的使用场景结构如下:

涉及到运维部分,我也不是很懂,抛去K8S后,结构清爽了些:

triton的一些优点

通过上述的两个结构图,可以大概知道triton的一些功能和特点

  • 支持HTTP/GRPC
  • 支持多backend,TensorRT、libtorch、onnx、paddle、tvm啥的都支持,也可以自己custom,所以理论上所有backend都可以支持
  • 单GPU、多GPU都可以支持,CPU也支持
  • 模型可以在CPU层面并行执行
  • 很多基本的服务框架的功能都有,模型管理比如热加载、模型版本切换、动态batch,类似于之前的tensorflow server
  • 开源,可以自定义修改,很多问题可以直接issue,官方回复及时
  • NVIDIA官方出品,对NVIDIA系列GPU比较友好,也是大厂购买NVIDIA云服务器推荐使用的框架
  • 很多公司都在用triton,真的很多,不管是互联网大厂还是NVIDIA的竞品都在用,用户多代表啥不用我多说了吧
大厂都在用triton

如何学习triton

两年前开始学习的时候,官方资料比较匮乏, 只能通过看源码来熟悉triton的使用方式,所幸知乎上有个关于TensorRT serving不错的教程[2],跟着看了几篇大致了解了triton的框架结构。那会triton叫做TensorRT serving,专门针对TensorRT设计的服务器框架,后来才变为triton,支持其他推理后端的。

现在triton的教程比较多了,官方的docs写着比较详细,还有issue中各种用例可以参考,B站上也有视频教程[3],比两年前的生态要好了不少。

当然,最重要的,还是上手使用,然后看源码, 然后客制化。

源码学习

从triton的源码中可以学到:

  • C++各种高级语法
  • 设计模式
  • 不同backend(libtorch、TensorRT、onnxruntime等)如何正确创建推理端,如何多线程推理
  • C++多线程编程/互斥/队列
  • API接口暴露/SDK设计
  • CMAKE高级用法

等等等等,不列举了,对于程序员来说,好的源码就是好的学习资料。当然,也可以看老潘的文章哈。

triton系列教程计划

triton相关系列也会写一些文章,目前大概规划是这些:

  • 什么是triton以及triton入门、triton编译、triton运行
  • triton管理模型、调度模型的方式
  • triton的backend介绍、自定义backend
  • 自定义客户端,python和c++
  • 高级特性、优先级、rate limiter等等

编译和安装

一般来说,如果想快速使用triton,直接使用官方的镜像[4]最快。但是官方镜像有个尴尬点,那就是编译好的镜像需要的环境一般都是最新的,和你的不一定一致

比如22.09版本的镜像需要的显卡驱动为520及以上,如果想满足自己的显卡驱动,就需要自行编译了。官方也提供了使用镜像的快速使用方法

# 第一步,创建 model repository 
git clone -b r22.09 https://github.com/triton-inference-server/server.git
cd server/docs/examples
./fetch_models.sh

# 第二步,从 NGC Triton container 中拉取最新的镜像并启动
docker run --gpus=1 --rm --net=host -v ${PWD}/model_repository:/models nvcr.io/nvidia/tritonserver:22.09-py3 tritonserver --model-repository=/models

# 第三步,发送
# In a separate console, launch the image_client example from the NGC Triton SDK container
docker run -it --rm --net=host nvcr.io/nvidia/tritonserver:22.09-py3-sdk
/workspace/install/bin/image_client -m densenet_onnx -c 3 -s INCEPTION /workspace/images/mug.jpg

# Inference should return the following
Image '/workspace/images/mug.jpg':
    15.346230 (504) = COFFEE MUG
    13.224326 (968) = CUP
    10.422965 (505) = COFFEEPOT

triton官方仓库

两年前的triton只有一个大仓库,tensorrt_backend也默认在triton主仓库中,但是现在tensorrt_backend被拆分出来了,很显然triton除了支持tensorrt还支持很多其他的后端,我们可以自定义使用很多后端。

现在是目前的triton包含的一些仓库:

  • [server[5]] triton服务外层框架,包含了http收发请求,服务内存分配等一些功能代码
  • [core[6]] triton主框架,如果处理请求、后端管理、模型调度啥的全在这里
  • [common[7]] 通用工具,没啥好说的,打日志的代码在这里
  • [backend[8]] backend后端框架代码,存放了一些后端通用父类,自定义后端可以集成这些类仿写新的后端
  • [third_party[9]] triton使用的第三方库的汇总,主要是cmake里头会包含
  • [tensorrt_backend[10]] tensorrt后端代码
  • [pytorch_backend[11]] libtorch后端代码

最开始的时候,server、core、common、backend这些代码仓库都是合在一起的,后来都拆分出来了,增加了triton的灵活性。

比如,上述的core仓库可以单独暴露出cAPI作为动态链接库供其他程序调用,去掉http、grpc的外层请求接口,直接一步到位调用。

一般来说,我们都是从最主要的server开始编,编译的时候会链接core、common、backend中的代码,其他自定义backend(比如tensorrt_backend)在编译的时候也需要带上common、core、backend这三个仓库,这些关系我们可以从相应的CMakeList中找到。

自行编译

如果想要研究源码,修改源码实现客制化,那么自行编译是必须的。

triton的编译和安装其实很简单,唯一的难点就是需要加速,因为triton在编译的时候会clone很多第三方库,第三方库也会克隆它们需要的第三方库,这些库当然都是国外的,所以有个好的网络环境很重要。

比如在编译triton的时候需要下载grpc这个库,grpc又依赖很多第三方其他库,网络不好的话,会经常遇到下面的问题:

Failed to recurse into submodule path 'third_party/bloaty'
CMake Error at /tmp/tritonbuild/tritonserver/build/_deps/repo-third-party-build/grpc-repo/tmp/grpc-repo-gitclone.cmake:52 (message):
  Failed to update submodules in:
  '/tmp/tritonbuild/tritonserver/build/_deps/repo-third-party-build/grpc-repo/src/grpc'


make[3]: *** [_deps/repo-third-party-build/CMakeFiles/grpc-repo.dir/build.make:99: _deps/repo-third-party-build/grpc-repo/src/grpc-repo-stamp/grpc-repo-download] Error 1
make[3]: Leaving directory '/tmp/tritonbuild/tritonserver/build'
make[2]: *** [CMakeFiles/Makefile2:590: _deps/repo-third-party-build/CMakeFiles/grpc-repo.dir/all] Error 2
make[2]: Leaving directory '/tmp/tritonbuild/tritonserver/build'
make[1]: *** [CMakeFiles/Makefile2:145: CMakeFiles/server.dir/rule] Error 2
make[1]: Leaving directory '/tmp/tritonbuild/tritonserver/build'

开加速是最好的办法,不管是UI还是命令行,都有相应的软件可以用,比如clash。

如果你的服务器实在是开不了加速,也有其他办法,那就是将triton库中大部分重量级库的git地址全换为国内的

怎么替换,我是在gitee中,同步github上的仓库,比如triton的core仓库,同步过来,就可以使用国内的地址了。

当然也需要将这些库的submodule中的库也修改为国内源,比如grpc这个库依赖很多第三方库,克隆的时候,这是要一个一个下载的:

改起来稍微麻烦,还需要注意,要改特定commit分支的git地址:

ver/build/_deps/repo-third-party-build/grpc-repo/src/grpc/third_part目录然后手动git clone xxx,然后执行一下git submodule init / git submodule update下就可以带进去。

示例:

root@64da25af2629:/tmp/tritonbuild/tritonserver/build/_deps/repo-third-party-build/grpc-repo/src/grpc# git submodule init
root@64da25af2629:/tmp/tritonbuild/tritonserver/build/_deps/repo-third-party-build/grpc-repo/src/grpc# git submodule update
Submodule path 'third_party/googletest': checked out 'c9ccac7cb7345901884aabf5d1a786cfa6e2f397'

太麻烦了,不过确实为一种办法呃呃。

还有一点,triton每次build都会clone,是因为其用了cmake中的ExternalProject_Add指令,假如我们已经有下载好的grpc,那么直接替换到server/build/_deps/repo-third-party-build/grpc-repo/src中然后将/data/oldpan/software/server/build/_deps/repo-third-party-src/CMakeLists.txt

注释掉git下载部分,修改自己本地的就行,就不需要每次再clone一遍了。

#
# Get the protobuf and grpc source used for the GRPC endpoint. We must
# use v1.25.0 because later GRPC has significant performance
# regressions (e.g. resnet50 bs128).
#
ExternalProject_Add(grpc-repo
  PREFIX grpc-repo
  # GIT_REPOSITORY "https://gitee.com/Oldpann/grpc.git"
  # GIT_TAG "v1.25.x"
  SOURCE_DIR "${CMAKE_CURRENT_BINARY_DIR}/grpc-repo/src/grpc"
  CONFIGURE_COMMAND ""
  BUILD_COMMAND ""
  INSTALL_COMMAND ""
  TEST_COMMAND ""
  PATCH_COMMAND python3 ${CMAKE_CURRENT_SOURCE_DIR}/tools/install_src.py --src <SOURCE_DIR> ${INSTALL_SRC_DEST_ARG} --dest-basename=grpc_1.25.0
)

说了这么多,总之,最好的办法当然还是开科学,全局一下就OK,省去那么多麻烦事儿。

搞定好网络问题,编译triton就很简单了!

git clone --recursive https://github.com/triton-inference-server/server.git
cd server
python build.py  --enable-logging --enable-stats --enable-tracing --enable-gpu  --endpoint=http --repo-tag=common:r22.06 --repo-tag=core:r22.06 --repo-tag=backend:r22.06 --repo-tag=thirdparty:r22.06 --backend=ensemble --backend=tensorrt

在克隆好的server的目录下执行以上命令(下面是我的设置,我们可以个根据自己的需求进行修改)就可以了。

执行这个命令后triton就会构建docker在docker中编译,最终会创建3个镜像:

  • tritonserver:latest
  • tritonserver_buildbase:latest
  • tritonserver_cibase:latest

最终编译好的tritonserver_buildbase:latest镜像,我们可以在其中开发,因为环境都帮忙配好了,只需要再执行编译命令,就可以编译了,我们也可以自定义源码进行个性功能的开发。

在镜像中开发

需要注意,在编译的时候需要pull官方默认的镜像,而这个镜像是有显卡驱动限制的,比如r22.06需要显卡驱动版本为470。

同志们看看自己的显卡驱动,别下了不能用hhh

可以通过triton镜像历史[12]查看镜像版本要求:

接上,我们不是编译好了triton镜像,直接进去就可以开发了:

docker run -v/home/oldpan/code:/code -v/home/oldpan/software:/software  -d tritonserver_buildbase:latest /usr/bin/sh -c "while true; do echo hello world; sleep 20;done"

在docker中修改triton的源码,继续执行以下命令就可以编译,和之前的区别就是加了 --no-container-build参数。

python build.py  --enable-logging --enable-stats --enable-tracing --enable-gpu  --endpoint=http --repo-tag=common:r22.06 --repo-tag=core:r22.06 --repo-tag=backend:r22.06 --repo-tag=thirdparty:r22.06 --backend=ensemble --no-container-build --build-dir=./build

我们如果想编译debug版本的triton,可以在命令中添加:--build-type=Debug

另外,原始triton镜像中已经有tensorrt,如果想换版本,可以删除原始docker中的旧的tensorrt,自行安装新的tensorrt即可:

  • https://docs.nvidia.com/deeplearning/tensorrt/install-guide/index.html[13]

说下运行流程吧!

讲了这么多铺垫,接下来简单说下运行流程。

这里通过代码简单梳理下triton运行的整体流程,之后的具体细节,放到接下来的篇章讲解

首先一开始,main函数在servers/main.cc下,triton在启动的时候会执行以下函数:

// src/servers/main.cc 经过简化
int
main(int argc, char** argv)
{
  // 解析参数
  TRITONSERVER_ServerOptions* server_options = nullptr;
  if (!Parse(&server_options, argc, argv)) {
    exit(1);
  }
  ...
  // 这里创建server
  TRITONSERVER_Server* server_ptr = nullptr;
  FAIL_IF_ERR(
      TRITONSERVER_ServerNew(&server_ptr, server_options), "creating server"); // 这里创建server
  FAIL_IF_ERR(
      TRITONSERVER_ServerOptionsDelete(server_options),
      "deleting server options");
  std::shared_ptr<TRITONSERVER_Server> server(
      server_ptr, TRITONSERVER_ServerDelete);
  ...
  // 启动HTTP, GRPC, 以及性能统计的端口
  if (!StartEndpoints(server, trace_manager, shm_manager)) {
    exit(1);
  }
  // Trap SIGINT and SIGTERM to allow server to exit gracefully
  signal(SIGINT, SignalHandler);
  signal(SIGTERM, SignalHandler);
  // 等待kill信号区关闭triton 
  while (!exiting_) {
   ...
      // 做一些监控模型仓库是否变动的操作
  }
  // 优雅地关闭triton
  TRITONSERVER_Error* stop_err = TRITONSERVER_ServerStop(server_ptr);
  // 如果无法优雅地关掉,旧直接exit即可
  if (stop_err != nullptr) {
    LOG_TRITONSERVER_ERROR(stop_err, "failed to stop server");
    exit(1);
  }
  // 停止监控http、grpc
  StopEndpoints();
  ...
  return 0;
}

TRITONSERVER_ServerNew这个函数中,会:

  • new一个triton类InferenceServer对象
  • 根据参数设置配置一下,执行一堆Set函数
  • 配置好参数后,Init服务,这里初始化服务的状态,校验参数
  • 创建各种模块,经常使用的有后端管理TritonBackendManager以及模型仓库管理ModelRepositoryManager
  • 再进行一些检查、配置一些状态

在启动过程中最重要的是模型仓库,运行triton当然你要有模型,要不然你开它干嘛?

这里我使用的模型仓库目录结构如下(是一个识别姿态的hrnet,hrnet官方有很多预训练模型,转tensorrt也很简单):

debug目录下有一个模型文件夹叫做hrnet-pose-estimate-debug的模型文件夹,这个文件夹地址(/path/to/hrnet-pose-estimate-debug)需要传给triton启动命令行,文件夹内的四个子模型文件夹,会被triton检测到并且一一加载。

需要注意的是,除了hrnet_pose_estimate这个其余三个在目录的1子目录下有个so或者model.plan,这代表hrnet-trt-staticimage_preprocess还有pose_postprocess都属于model,使用了backend,backend会在各自的config中指明:

name: "hrnet-trt-static"
backend: "tensorrt"

因为hrnet-trt-static是tensorrt的模型,所以backend设置为tensorrt,model.plan就是tensorrt的engine。其backend的so文件我放到了其他位置(放到和model.plan同目录也是可以的),而另外两个预处理和后处理的backend就放到了模型仓库中,也就是libtorch_image_preprocess.solibtriton_pose_postprocess,包含了你的backend代码,封装成so供triton调用

关于backend、model以及modelinstanc的关系,说实话稍微复杂点,各自有完整的生命周期,这个嘛,之后文章说,感兴趣的也可以提前看官方文档的介绍:

  • https://github.com/triton-inference-server/backend[14]

然后我们就启动triton吧!

# 执行以下函数,模型目录通过 --model-repository 指定     tensorrt的backend通过  --backend-directory 指定
./tritonserver --model-repository=/path/to/hrnet-pose-estimate-debug --backend-directory=/workspace/backends/tensorrt_backend/ 

模型加载成功之后会输出:

...
I1016 08:25:37.952055 51771 server.cc:587] 
+------------------+----------------------------------------------------------------+----------------------------------------------------------------+
| Backend          | Path                                                           | Config                                                         |
+------------------+----------------------------------------------------------------+----------------------------------------------------------------+
| image_preprocess | /workspace/triton-models/debug/hrnet-pose-estimate-debug/image | {"cmdline":{"auto-complete-config":"false","min-compute-capabi |
|                  | _preprocess/1/libtriton_image_preprocess.so                    | lity":"6.000000","backend-directory":"/workspace/backends/tens |
|                  |                                                                | orrt_backend/","default-max-batch-size":"4"}}     |
|                  |                                                                |                                                                |
| pose_postprocess | /workspace/triton-models/debug/hrnet-pose-estimate-debug/pose_ | {"cmdline":{"auto-complete-config":"false","min-compute-capabi |
|                  | postprocess/1/libtriton_pose_postprocess.so                    | lity":"6.000000","backend-directory":"/workspace/backends/tens |
|                  |                                                                | orrt_backend/","default-max-batch-size":"4"}}     |
|                  |                                                                |                                                                |
| tensorrt         | /workspace/backends/tensorrt_backend/li              | {"cmdline":{"auto-complete-config":"false","min-compute-capabi |
|                  | btriton_tensorrt.so                                            | lity":"6.000000","backend-directory":"/workspace/backends/tens |
|                  |                                                                | orrt_backend/","default-max-batch-size":"4"}}     |
|                  |                                                                |                                                                |
+------------------+----------------------------------------------------------------+----------------------------------------------------------------+

I1016 08:25:37.952252 51771 server.cc:630] 
+---------------------+---------+--------+
| Model               | Version | Status |
+---------------------+---------+--------+
| hrnet-trt-static    | 1       | READY  |
| hrnet_pose_estimate | 1       | READY  |
| image_preprocess    | 1       | READY  |
| pose_postprocess    | 1       | READY  |
+---------------------+---------+--------+

I1016 08:25:38.051742 51771 metrics.cc:650] Collecting metrics for GPU 0: NVIDIA GeForce RTX 3080
I1016 08:25:38.055197 51771 tritonserver.cc:2159] 
+----------------------------------+------------------------------------------------------------------------------------------------------------------+
| Option                           | Value                                                                                                            |
+----------------------------------+------------------------------------------------------------------------------------------------------------------+
| server_id                        | triton                                                                                                           |
| server_version                   | 2.23.0                                                                                                           |
| server_extensions                | classification sequence model_repository model_repository(unload_dependents) schedule_policy model_configuration |
|                                  |  system_shared_memory cuda_shared_memory binary_tensor_data statistics trace                                     |
| model_repository_path[0]         | /workspace/triton-models/debug/hrnet-pose-estimate-debug                                                         |
| model_control_mode               | MODE_NONE                                                                                                        |
| strict_model_config              | 1                                                                                                                |
| rate_limit                       | OFF                                                                                                              |
| pinned_memory_pool_byte_size     | 268435456                                                                                                        |
| cuda_memory_pool_byte_size{0}    | 300021772                                                                                                        |
| response_cache_byte_size         | 0                                                                                                                |
| min_supported_compute_capability | 6.0                                                                                                              |
| strict_readiness                 | 1                                                                                                                |
| exit_timeout                     | 30                                                                                                               |
+----------------------------------+------------------------------------------------------------------------------------------------------------------+

I1016 08:25:38.055627 51771 http_server.cc:3303] Started HTTPService at 0.0.0.0:8000
I1016 08:25:38.097213 51771 http_server.cc:178] Started Metrics Service at 0.0.0.0:8001

加载好之后,我们开启了http端口,端口号为8000,另一个是metric接口,端口号8001

此时可以使用http请求试一下。

简单请求

请求的话有http和grpc协议,我对http协议熟悉些,所以就搞http吧。

官方也提供了客户端,C++和python的都可以有,可以直接使用官方的,也可以根据官方提供的http协议构造自己的客户端,只要会构造body,一切都很简单。

请求协议可以参考官方:

  • https://github.com/kserve/kserve/blob/master/docs/predict-api/v2/required_api.md[15]

这里我们用python简单构造一个body

# 构造triton的输入body
json_buf = b'{\"inputs\":[{\"name\":\"INPUT\",\"datatype\":\"BYTES\",\"shape\":[1],\"parameters\":{\"binary_data_size\":' + \
        bytes(str(len(data)), encoding = "utf8") + b'}}],\"outputs\":[{\"name\":\"RESULT\",\"parameters\":{\"binary_data\":true}}]}'
push_data = json_buf + data

print("Inference-Header-Content-Length ",str(len(json_buf)), " Content-Length ",str(len(data) + len(json_buf)))
# 构造triton-header
header = {"Content-Type": "application/octet-stream", "Accept": "*/*",
          "Inference-Header-Content-Length":str(len(json_buf)),
          "Content-Length":str(len(data) + len(json_buf))}

server_url = "127.0.0.1:8000"
model_name = "hrnet_pose_estimate"

# 请求
response = post('http://' + server_url + '/v2/models/' + model_name + '/infer', data=push_data, headers=header)

就可以发送请求,结果也会传回response里。我们也可以使用curl命令,直接传递构造好的body(这个body将上述的push_data写到本地即可):

[oldpan@have-fun client]$ curl -v --max-time 1 --request POST 'http://192.168.1.102:9006/v2/models/hrnet_pose_estimate/infer' --header 'Inference-Header-Content-Length: 230' --header 'Content-Type: application/octet-stream' --data-binary '@data.txt' --output temp_res
Note: Unnecessary use of -X or --request, POST is already inferred.
*   Trying 192.168.1.102:9006...
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0* Connected to 172.29.210.105 (172.29.210.105) port 9006 (#0)
> POST /v2/models/aocr_cnprint_trt8p/infer HTTP/1.1
> Host: 192.168.1.102:9006
> User-Agent: curl/7.71.1
> Accept: */*
> Inference-Header-Content-Length: 230
> Content-Type: application/octet-stream
> Content-Length: 1573102
> Expect: 100-continue

* Mark bundle as not supporting multiuse
< HTTP/1.1 100 Continue
} [56480 bytes data]
* We are completely uploaded and fine
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Content-Type: application/json
< Inference-Header-Content-Length: 394
< Content-Length: 6794

{ [6794 bytes data]
100 1542k  100  6794  100 1536k   127k  28.8M --:--:-- --:--:-- --:--:-- 28.9M

结果就不发了,验证没啥问题。

关于如何使用curl直接请求triton,有一些相关链接可以参考:

  • https://github.com/triton-inference-server/server/issues/2563[16]
  • https://github.com/triton-inference-server/server/issues/1822[17]
参考资料
https://www.bilibili.com/video/BV1KS4y1v7zd/?spm_id_from=333.788&vd_source=eec038509607175d58cdfe2e824e8ba2[18]
https://github.com/triton-inference-server/server/releases[19]
https://docs.nvidia.com/deeplearning/triton-inference-server/release-notes/index.html[20]
https://zhuanlan.zhihu.com/p/462817679[21]
参考资料
[1]
triton: https://github.com/openai/triton

[2]
教程: https://www.zhihu.com/people/pwlazy/posts

[3]
视频教程: https://www.bilibili.com/video/BV1KS4y1v7zd/?spm_id_from=333.337.search-card.all.click&vd_source=eec038509607175d58cdfe2e824e8ba2

[4]
官方的镜像: https://docs.nvidia.com/deeplearning/triton-inference-server/release-notes/index.html

[5]
[server: https://github.com/triton-inference-server/server

[6]
[core: https://github.com/triton-inference-server/core

[7]
[common: https://github.com/triton-inference-server/common

[8]
[backend: https://github.com/triton-inference-server/backend

[9]
[third_party: https://github.com/triton-inference-server/third_party.git

[10]
[tensorrt_backend: https://github.com/triton-inference-server/tensorrt_backend

[11]
[pytorch_backend: https://github.com/triton-inference-server/pytorch_backend

[12]
triton镜像历史: https://docs.nvidia.com/deeplearning/triton-inference-server/release-notes/rel_22-06.html

[13]
https://docs.nvidia.com/deeplearning/tensorrt/install-guide/index.html: https://docs.nvidia.com/deeplearning/tensorrt/install-guide/index.html

[14]
https://github.com/triton-inference-server/backend: https://github.com/triton-inference-server/backend

[15]
https://github.com/kserve/kserve/blob/master/docs/predict-api/v2/required_api.md: https://github.com/kserve/kserve/blob/master/docs/predict-api/v2/required_api.md

[16]
https://github.com/triton-inference-server/server/issues/2563: https://github.com/triton-inference-server/server/issues/2563

[17]
https://github.com/triton-inference-server/server/issues/1822: https://github.com/triton-inference-server/server/issues/1822

[18]
https://www.bilibili.com/video/BV1KS4y1v7zd/?spm_id_from=333.788&vd_source=eec038509607175d58cdfe2e824e8ba2: https://www.bilibili.com/video/BV1KS4y1v7zd/?spm_id_from=333.788&vd_source=eec038509607175d58cdfe2e824e8ba2

[19]
https://github.com/triton-inference-server/server/releases: https://github.com/triton-inference-server/server/releases

[20]
https://docs.nvidia.com/deeplearning/triton-inference-server/release-notes/index.html: https://docs.nvidia.com/deeplearning/triton-inference-server/release-notes/index.html

[21]
https://zhuanlan.zhihu.com/p/462817679: https://zhuanlan.zhihu.com/p/462817679

讨论:如何用1024张显卡训练一个模型?

文章来源于一个知乎问题:如何判断候选人有没有千卡GPU集群的训练经验?确实对于普通开发者来说,大概率从未尝试过使用数千张GPU训练一个模型,这方面确实是一个很好的研究方向,也是成为顶尖算法工程师所必需的必经之路,因此记录下知乎的一些回答,用于学习和记录。虽然目前还没有机会能够调用数千张GPU用于模型训练,但对于目前几十张GPU进行并行训练也有帮助。

高赞回答1:如何用千卡进行训练

最近看到知乎一个回答,把千卡训练的难度吹上天了。但其实真正用过千卡就会发现也就那么几个点。于是想写一篇文章简单讲讲。

本文将包括三个部分:

  • 首先我们将讨论千卡训练的难题,以及应该在什么时候使用千卡训练;
  • 接着,我们将讨论如何在一千张卡上开始训练,如何让他达到近乎线性的性能提升;
  • 最后我们将展开讨论一些千卡训练当中仍然悬而未决(至少对于开源社区来说)的问题。

为什么千卡训练是困难的?

其实那篇回答在这部分说的没错。千卡训练和八卡训练的区别是—显卡多了一百多倍。

这意味着什么呢?

  1. 通信时间增加
  2. 故障概率增加

这俩问题都很好理解。

时间上,PyTorch 内部支持 NCCL / Gloo / MPI 三个通信后端(请务必使用 NCCL。)其中训网络最常用的 AllReduce 操作【从多个sender那里接收数据,最终combine到一个节点上】会根据具体硬件配置走 Ring AllReduce 和 Tree AllReducering allreduce和tree allreduce的具体区别是什么?。Ring 的时间复杂度是 O(pn),Tree 的时间复杂度是 O(log⁡pn)。就算是理论上 128 节点也比单节点慢至少七倍,实践当中跨节点通信要远比单节点慢得多。

故障上,一个节点出问题的概率是 p,128 个节点就是 1−(1−p128)。也就是说如果一个操作在一个训练当中的出错概率是 1%,那么在 128 节点当中的出错概率就是 72.37%。

此外,随着规模的增大,许多问题都会变得难以忍受。比如数据增强要花 0.1s,一亿条数据就是 278 个小时(当然这只是胡拆的一个数字,实际有各种机制所以不会有这么大影响。

因此,钱多烧手并不是使用千卡训练的理由。闲得蛋疼可能是,但你得多蛋疼才能想出这么折磨自己的 idea?

因此,千卡训练解决的问题是大模型&大数据问题如果你的训练时间没有超过 8192 GPU 日,那么你绝对不需要一千张显卡。

看到这里,绝大多数人已经可以关掉这篇文章了。除非你的模型和数据都以 B(十亿)来作为计量单位。当然如果你正在厕所里手机没电想看点儿东西解闷儿的话(虽然我很怀疑是否会有人把他打出来……那么可以继续往下看

如何使用一千张卡训练?

如何提高计算效率?

这件事情其实是一个 case by case 的事情。因为通信、计算速度啥的受硬件影响更多。同样是 A100 集群,我全 DGX 节点,每一张 A100 都是 SXM 接口并配一块儿专属的 IB 网卡。你一个小破普惠服务器插 8 张 PCI-E A100,IB 卡一个节点只给一张。那咱俩遇到的问题就完全不是一个问题。

因此,要讨论如何提高训练效率、减少训练耗时,我们首先要了解训练耗时在哪里。那么,一个训练步的耗时在哪里呢?需要谨记,没有 profile 的优化是没有意义的。

你可能会说,forward backward sync。很好,这说明你了解 PyTorch 的基本流程。不过现实当中要复杂得多。

  1. dataset 读取数据,构建输出
  2. dataloader collate 数据,进行数据预处理
  3. 模型 forward 计算输出
  4. loss compute
  5. 模型 backward 计算梯度
  6. 模型 sync 梯度
  7. 优化器 step 更新权重
  8. 打印 log

当然这是可以无限细分下去的,但一般这些就够了。需要注意的是,除了 4-7 的耗时是真耗时,其他都需要通过异步操作来盖掉。这也是我们的优化目标。

异步执行在 PyTorch 的 dataloader、CUDA 和分布式当中都存在。前者可以通过设置 num_workers 和 prefetch_count 为 0 来关闭,后两者可以通过 cuda.synchornize 和 dist.barrier 来执行手动同步。在 profile 时,我们需要首先需要测整个 step 的时长。然后再在每次测量前执行手动同步来计算每个部分的时长。如果前者的总耗时等于后者 4-7 的耗时之和,那么通常不需要执行任何操作。但这种情况在千卡操作中几乎不可能发生。

第 6 步通信往往需要耗费大量时间。因此,我们还需要进一步优化通信。

以下内容是对PyTorch Distributed的概括,有感兴趣的同学建议通读并背诵全文。

计算-通信重叠

在 PyTorch 当中,梯度的通信和反向传播是交叠进行的。也就是说,每完成一层的梯度计算,都会立即触发当前层的同步。实现起来也很简单,每个进程在完成自己第 k 层的梯度计算后都会触发一个钩子来给计数器+1s。当计数器达到进程数时开火进行梯度通信。有很多同学在计算梯度过程中遇到过 RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. 错误,这就是因为有的模块没有参与计算 loss,导致梯度同步卡住了。需要注意,当 find_unused_parameters=True 时,PyTorch 分布式使用 nn.Module.__init__ 当中定义子模块的反向顺序来作为梯度桶的构建顺序。因此,确保模块定义和调用的顺序一致是一个良好的实践。

梯度合桶

尽管理论上来说,同步发生的越及时,重合度越高,性能越好。但实际上每次发起通信都是有上头的。因此,现实当中梯度同步并不是越多越好越快越好。为此,PyTorch 引入了梯度合桶机制,通过把多个 Tensor 装在一个桶里再通信桶来减少通信次数从而减少总耗时。合桶的 bucket_cap_mb 默认是 25MiB,这对于绝大多数模型来说都是太小的。目前已经有提升这个默认值的特性需求,但是这个还是调一下更好。

梯度累加

当你做完所有操作之后,惊喜的发现 TMD 怎么同步时间还是单节点的好几倍。这其实是正常情况……实际上超过 256 卡的训练想要把通信盖掉就是一件不可能的事情。你说老师我看 FB 论文说他们 256 卡就是线性提升啊…那这里不得不提的一个策略就是梯度累加了。梯度累加会执行 k 次 forward+backward 之后再执行优化器步进。这有很多好处,首先对于大模型 batch size 通常不能开多大,梯度累加可以提升等效 batch size。其次累加期间的 backward 不需要通信梯度,加快了训练速度。

少即是快

Python 是一种很慢的语言。当然你说 JIT trace+torch.compile 有提升我也不反对,但对于最高效率来说,只有必须要存在的代码和不存在的代码两种。

抱抱脸的 Transformers 就是一个反例。两个子模块就能写完的 TransformerLayer 他们硬是能写出来一堆…偏偏他们还信奉 Single Model File Policy……我寻思你这完全不考虑继承的封这么多层是要搞鸡毛啊?正例反而是 PyTorch……(笑死,我竟然会夸脸书代码写得好。具体来说就是 nn.functional 当中的各种实现。你会发现他们第一行往往是 handle_torch_func。熟悉 Python 装饰器的小伙汁通常要问了,为啥这里不用个装饰器统一一下?因为装饰器会引入额外的函数调用,额外的函数调用就是额外的上头。

因此,如果你想确保最高的效率,写一个简单的训练代码和模型代码非常重要。毕竟,1%的效率提升,节省的可能是数百个 GPU 日。

如何平稳训练

这一段当中中咱们只讨论你能控制的问题。

捕捉不致命的异常

故障率高的问题其实很好解决。在训练当中,大部分异常都是非致命异常,接住他们就好了。我之前写过一个装饰器,catch,它的作用就是接住异常,然后调回调函数(默认当然就是把错误打印到 log 里)。所有你需要做的只是使用它来装饰所有非 fatal 的操作。

在实际应用当中,我们遇到的最常见的问题是存 ckpt 写满了磁盘(不准笑,从商汤到上海 AI Lab,这个问题在哪儿都日常出现。咱也不知道为啥肯买那么多显卡但不肯多插点儿硬盘,咱也不敢问)。接住所有保存操作,如果你有闲心可以在回调里删一下之前的 ckpt。没闲心的话…大不了重训一次嘛(逃。)第二常见的问题,你猜对了……存 log 写满了硬盘……所以所有 logging 操作也都是要 catch 的。这就是为啥我都用 tmux 然后开很长的缓存窗口,总是能抢救一些 log 出来的。

咳咳,说点儿正经的。任何联网操作都是需要 catch 的,常见的联网操作主要包括从 ceph 读取数据和…写 log 到远程(逃。其他就没啥了吧,我见过有大哥尝试恢复 OOM 的,但效果似乎不是很好,至少我自己没用过。简单来说,唯一不应捕捉的错误是集群炸了。

那有的大兄弟就说了,集群没爆炸,但是有两张卡突然掉了咋办。这个咱第三部分再讨论。

管好模型的输出

模型训着训着发散了几乎是每个训大模型的人都会遇到的问题。输出和 loss 只要有 nan 果断丢掉。梯度先 clip by value 再 clip by norm 都是常规操作。哦对了,还有初始化……关于大模型收敛性的论文有一堆,此处不再赘述。

比更大,还更大,再更大

弹性训练

实际上当你的训练超过 2048 个 GPU 日时,在整个训练过程当中发生单个 GPU 甚至单个节点下线是再正常不过的事情了。

PyTorch 在 1.10 就引入了 torchelastic 弹性训练机制。这个东西,用过的都骂娘。等下,让我先骂一遍。呸。ok 咱们继续吧。

我印象当中在微软的最后一轮面试当中被问到了这个问题:如何设计一个弹性分布式系统。

我的回答很教科书。每 k 分钟,系统会做一次 AllGather 来统计存活进程数,然后选举出一个主进程。主进程会计算好每个进程的 rank 和 local rank 然后广播给各个进程。所有进程每次前向传播开始时向主进程发送一个心跳包来汇报状态。主进程会根据心跳包来确定这一个 step 参与同步的机器有多少。

但很可惜,2024 年了。还是没人去写。他妈的。

层次化梯度同步

我一直认为梯度同步不应该以 GPU/进程为单位。而应该分为大同步(节点间同步)和小同步(节点内同步)。小同步可以更高频的进行,大同步则可以更慢的执行。这样不仅能提高实际的梯度同步频率,降低同步总耗时,并且还能天然的去结合小 batch 和大 batch 训练的优点—节点内小 batch 关注个体,节点间大 batch 关注整体。

有没有发现所有东西都很简单?
是这样的,千卡训练是任何一个普通CS本科生花三个月就能学会的东西。没有任何复杂的地方。

延伸阅读

PyTorch DDP 设计笔记

PyTorch 微调菜谱

分析与优化使用 PyTorch 训练机器学习模型

使用 Nsight Systems 来分析 GPU 负载

DLProf

NVProf

NCCL AllReduce 设计

Spike No More

Understanding the difficulty of training deep feedforward neural networks

高赞回答2:关于千卡训练的难度分析

千卡,其实是个相对模糊的概念,对于多数人而言,这就跟你告诉你,我有1兆资产和10兆资产一样,你只知道很多很多,但是完全傻傻分不清楚,这到底是多少,能买多少东西。千卡,也是一样。按照常见的一机8卡GPU的类型来看,用过125台机器训练的,就算得上是千卡了。从这个角度上说,其实门槛也没有那么那么的高。可事实呢?真正的大模型要用多少机器训练?答案是远超千卡 —— 看看下面的GPT-4信息,这是千卡吗?比万卡还要多!!!

How much compute was used to train GPT-4?
2.15e25 floating

Some key facts about how this enormous model was trained: Used 25,000 Nvidia A100 GPUs simultaneously. Trained continuously for 90–100 days. Total compute required was 2.15e25 floating point operations.Sep 27, 2023

另外,从根本上说,千卡训练是一个复合体。很多时候,大家就知道一件事,“牛逼”,除了知道喊666之外,就少有了解到底牛逼在哪里。以至于说,觉得非常的高大上。我可以这么说一句,千卡及其以上的训练对于绝大多数人和企业而言,这就是个屠龙术 —— 除了某些个,用手指头数的出来的地方,别的地方完全没有这样的需求,也没有这样的资源来进行千卡训练。这是什么意思?意思就是。如果真有这样经验的人,流了出来,大概率很难对口的找工作,因为他在千卡集训中的训练经验和工程实践,大概率根本别的地方用不上。另一层意思是,如果你只是个一般人,那你想想就得了,就跟你可以意淫下某个自己喜欢的明星,但别真去追求,真要去了,你大概是连榜一大哥的待遇都不会有,注定了人财两空。我当然知道有人会说,那难道流出来的人,不能去那几个指头数的出来的地方吗?可以的,但是别着急,你们往后面看就知道了 —— 去还是能去的,但是如果他不是跟着大佬一起跑,到了新地方他们真不见的能继续干。

再来说说,千卡训练是个什么复合体 —— 至少是,科学,工程和人情世故。先看大模型训练中的任务怎么从“量变到质变”的 —— 任何一个小规模训练上的问题,放大几百几千倍之后,都有可能成为不可忽视的问题。比如,数据预处理,小的时候,你也许完全不在意,这到底是多少个毫秒搞定的。但是,如果你现在有上百T的数据要处理,手一抖,写个不那么高效的算法,多处理个几天,甚至几周都有可能。当然,更可怕的就不是慢,而是坏了 —— 一个小bug可以坏了整条pipeline。有的时候,你甚至都不能说是bug,但是反正不爽就是了。比如,两个人写预处理,一个人把图片弄成了BGR,一个弄成了RGB —— 又不是不能用,但是就是膈应人;又比如,数据原图太大了,要统一缩放,然后有人做的时候直接就缩成了方形,然后呢?我们之后需要的模型要正常长宽比的数据又该怎么办呢?再来一次嘛?再搞两个礼拜?你说这是个什么问题?可以看成是科学,当然也可以看成是工程的一部分,这两个就是紧密结合在一起的,单单你会调模型,在这个千卡训练的事情上,你是玩不转的。因为很多问题卡脖子的问题,根本就是,或者大概率是工程问题 —— 什么网络通信,什么磁盘空间,什么读取速度,什么数值稳定,小规模的时候,你都可以不用管,想怎么搞怎么搞,怎么搞可以怎么有。可是上了规模之后,很多东西都被限定死了,根本不是你想怎么干就能怎么干的。我说的这个话,大概很多用pytorch+cuda的朋友也不见的认同,毕竟这套组合下,没有太多技术支持的团队也干成了这样的事情。但是,这背后是因为,使用能支持这样训练的云服务本身就意味着,付了更多的钱,在已经白嫖了nvidia一波的前提下,外加meta(pytorch),外加微软(deepspeed),外加……,又变相雇佣了一个专门的支持团队。但是,这些都不改变一个事实 —— 那就是,这都是你跟着前人的脚步前进,有人替你已经把这条路上的坑,踩的差不多了。可是,如果你要做些原创性的工作呢?必然是会遇到很多前人都不会有的问题。

多说一句,也许有人会说,“我不关心别的,我就只关心pytorch+cuda下,做训练的经验”。那我告诉你,这本质上这跟你单机单卡训练就不应该有什么不一样,跟是不是pytorch,用不用cuda都没什么大关系 —— 你想想最理想情况下,这是不是就应该跟单机单卡训练一样么,无非就是现在的这个“单机”的GPU内存是所有的机器GPU内存的总和,能让你用一个更大的batch size和学习率。至于,GPU内部怎么通信,数据怎么通信,各个机器怎么通信,gradient传播怎么实现,需要你这个训模师知道吗?你在单机单卡的时候都不用知道,在单机多卡的时候不用知道,在小规模分布式训练的时候不用知道,那为什么到了千卡的时候,你就应该知道了?理想情况下,就算到了百万卡,也不用做建模的你去知道这里的各种工程实践。

那千卡训练到底难在哪里了?首先,就是难在之前提及的工程上面了 —— 简单假设一个卡在一天内不挂掉的概率是p,那么现在现在千卡同时一天内不挂掉的概率是多少?算算你就知道,对于p^1000,其实有机器挂掉才是正常的。如果是万卡呢?你可见的是,机器N越多,p^N就越小,这个事情就是越难。有人要说“我单机训练的时候,几年都遇不到问题,老黄的GPU稳定的一塌糊涂。”对此,我也只有呵呵,没人告诉你训练不下去都是GPU的问题。你大概是选择性忘记了各种自己训练中遇到的事情 —— 比如,上次实验中断,GPU进程没杀干净,还占着内存;和人共享的服务器上,有个卧龙觉得你训练的时候CPU占用率低了点,给你加了点任务;好巧不巧,默认的缓存地址没地方了,包装不上了,预训练模型下不来了…… 说这么多看似和训练无关的事情是因为,所有这些都要能自动化,因为里面有一个地方翻车了,你训练就进行不下去了。通信连不上,磁盘满了,遇上了奇葩的GPU,IO巨慢 …… 不管什么原因挂掉了,关键的是之后应该怎么办?有没有可能对这个出问题的机器进行热替换?怎么办才能最大程度不影响训练进程?怎么样才能在下次避免同样的问题。当然,实际情况可以更加复杂,GPU不见的是同批次的,模型也可以大到,哪怕在A100上也只能这个机器放一部分,那个机器放一部分……

但是也别误解,以为千卡训练,就对训模师而言,其实没什么挑战。这样的理解显然是错的。这对训模师的实操来说,肯定是一个巨大的挑战。完全是拿着卖白菜钱(想想你年薪才多少,算你年薪百万好了),操着卖白粉的心(这千卡训练要花多少钱?你年薪都不够它的一个零头)。因为这机器一开,实在是太烧金了。而且可见的是,你必然是要去debug的 —— 为什么小模型的时候,训练的挺好的,一变大就翻车了?或者说,虽然没翻车,但是为什么性能就涨了一丢丢?或者为什么前面训练挺稳定的,到了后面的loss curve就会有很大的spike?有经验的训模师能更早,更快的发现问题。也能更快和更好的解决问题。很多时候,也真不见的看log就能看出来点啥的,看数据,看gradient的大小分布,和其他模型的训练进行记录做比对,甚至做可视化,都是很有必要的。而这所有的一切,都需要你很有经验 —— 同样的log,有人就能一眼看出来问题在哪里,有人就只能对着发呆,或者机械性的说“换一组参数再试一下”。同样觉得可能哪里有问题,有人就能知道应该来验证这个猜想是对是错,有人就只能天马行空的给出一堆,谁也不知道对不对的原因。所以,一旦这条路线被摸索出来之后,其实也就没什么难度了 —— 数据,脚本,机器都在那里了,我就问你,我在服务器上run那条千卡训练命令,跟你run的能有什么不同?所以,真正的关键不是在于有没有用过千卡GPU训练过模型,而是有没有从头至尾,一路披荆斩棘的自己淌出来一条可重复的模型技术路线!!

当然,如果你要以为,这事情就只是技术,那也是太年轻了点。机器一开,要多少钱,这账真要算准从训模师的角度说是很难的,毕竟具体价格都是大公司之间协议的,属于商业机密,但是估算个大概的数目不难。按照aws的p4d算(8卡A100,见下图),便宜的算法,千卡训练一个月,需要花费 $11.57/每台小时*24小时/天*125台*30天 = $1,041,340;按照阿里云的算法,单卡年费¥170533.8,也就是¥19.47每小时,但是算上多卡的费用,这实际上比上面aws的价格更贵。当然,你也许能用更便宜的价格拿到机器,比如别找这么大的云服务平台,找个小的,但是再少还能少多少,算打5折,这都是50多万美刀,350多万人民币一个月。要知道,这可是训练一次的价格哦。一个能用的模型背后,可是5x,甚至10x更多的不能用模型哦,所以烧个几千万,真跟玩一样。

正是因为这么贵,所以也同样表明了,为什么一定要找有经验的训模师 —— 你要知道,你自己的每个实验决定,都是变相的花出去十几,几十甚至上百万的美金。早发现问题,早停下来;早解决问题,早开始;知道怎么偷懒,什么样的ablation study可以跳什么必须做,什么时候可以用小模型替代,什么时候可以用一个老的checkpoint来个jump start,什么时候直接白嫖论文上的结论就行……,所有这些都和花多少钱才能把这个事情办了,输出一个达标的模型,直接相关 。相反的,万一你要找个拉垮的训模师,前面不知道怎么计划,代码不知道怎么验证,训练起来了不懂怎么有效监控,有了异常不知道如何排除,……,最后都要靠着模型训练全完了之后做evaluation才知道行不行的那种。那么就算预算全花完了,什么都没有训练出来,我也没有什么好奇怪的。

铺垫这么多,终于可以来谈谈最后一个层面 —— 人情世故了。你看,千卡训练这个事情,有这么大的风险翻车,要花这么多的预算,那么现在问题来了,你要是部门领导,你让谁来干这个事情?哦,你想放权给下面的经理,让他来找人?又或者找个刚来的博士?找个顶校+顶会的博士?不管你怎么找,可问题是,你就这么信得过他吗?你怎么保证他,能干这个,能干好这个,不会中途跑路,不会磨洋工…… 要知道,这样大的项目和预算,如果要真干塌了,不说整个部门要一起完蛋,至少这一条线的人员必然是要担责的,哪怕是主要领导也跑不掉。所以喽,关键的关键是,你必须找自己信的过的人,还要找确实有能力可以担当重任的人 —— 真正最后来干这个千卡训练的人,不但自己技术要过硬,更是团队的中坚力量,至少也要得到一两个大头目的支持,而且还要得到小头目支持。你再牛逼,没信任没大佬支持,这事情不说完全不可能,也是基本没可能。你再牛逼,要是真的小头目给你上眼药,比如,跟上面吹风,“好像看见你在看招聘网站”,你想大头目心里会不会有阴影?所以,别给我扯什么,老子有多少顶会顶刊,老子导师是谁谁谁,这在绝大多数情况,都不好使。所以,刚毕业的,或者做实习的,或者刚工作的,如果宣称自己有这个经验,就是一眼丁真。因为上面是绝对不会找不信任的人来这样重要的工作,这跟你有没有相关的工作经验无关。这同样意味着,真正干这些事情的人也很难流出来 —— 因为对于嫡系来说,加薪升职,在干好了的前提下,那还不都是so easy吗?所以,是你,你愿意出来吗?出来了,就算你牛逼,但是获取信任,成为嫡系也要一个时间,不是吗?

NCCL–多卡训练后端[持续补充]

本文主要记录和学习pytorch后端NCCL相关的知识点,为后续大模型训练打好基础

https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/overview.html

https://developer.nvidia.com/nccl

NCCL” 代表 “NVIDIA Collective Communications Library”,”NVIDIA 集体通信库“,它是一种由 NVIDIA 开发的用于高性能计算通信库。NCCL 专门设计用于加速 GPU 群集之间的通信,以便在并行计算深度学习等领域中提供更好的性能。

NVIDIA 集合通信库 (NCCL) 可实现针对 NVIDIA GPU 和网络进行性能优化的多 GPU 和多节点通信基元。NCCL 提供了 all-gather、all-reduce、broadcast、reduce、reduce-scatter、point-to-point send 和 receive 等例程,这些例程均经过优化,可通过节点内的 PCIe 和 NVLink 高速互联以及节点间的 NVIDIA Mellanox 网络实现高带宽和低延迟。

NCCL相关环境变量说明 :

【https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage.html】

  1. NCCL_TIMEOUT:设置集合操作超时阈值,单位毫秒;如果常见超时错误,适当增大该值,但不能太大NCCL_TIMEOUT 环境变量用于设置 NCCL 集体通信操作的超时时间。通过调整这个值,你可以更好地处理网络延迟和不稳定的问题,确保 NCCL 通信的稳定性和可靠性。如果在集体通信过程中遇到超时问题,可以尝试调整此环境变量以解决问题。

设置超时时间:

  • NCCL_TIMEOUT 用于定义 NCCL 集体通信操作的超时时间。超时时间是 NCCL 在执行操作时等待响应的最长时间,超出此时间将触发超时错误。

解决网络问题:

  • 在高性能计算和大规模分布式训练中,网络延迟或不稳定可能导致集体通信操作超时。设置合适的 NCCL_TIMEOUT 可以帮助调节容错设置,避免训练过程中因超时错误而中断。

性能调优:

  • 根据你的集群配置和网络状况,适当调整 NCCL_TIMEOUT 可以帮助优化通信性能和稳定性。
  1. NCCL_ALGO:选择集合通信算法,如Ring, Tree;不同拓扑适合不同算法,测试选更优算法
  2. NCCL_CHUNK_SIZE:定义环形传输缓冲区大小;合理设置可提速,但也会增加内存消耗
  3. NCCL_DEBUG:打开NCCL调试日志;出现问题时打开调试,但会降低速度,不要在生产环境使用
  4. NCCL_DEBUG_FILE设置一个文件地址,变量用于将NCCL的调试日志输出到文件中。有助于调试nccl。
  5. NCCL_P2P_LEVEL:设置点对点通信优化级别;增加该值可减少P2P次数,提高某些操作效率
  6. NCCL_P2P_DISABLE:禁用点对点通信,强制使用集合通信。在某些情况下,P2P 通信可能会导致性能问题或出现错误。禁用 P2P 通信可以帮助解决这些问题。如果你遇到与 P2P 通信相关的错误或不稳定性,禁用 P2P 可能有助于恢复系统的稳定性。
  7. NCCL_PXN_DISABLE:禁用使用非本地 NIC 的节点间通信,使用 NVLink 和一个中间 GPU。建议设置成1。在PyTorch中进行跨节点all-to-all通信时,如果该环境变量是0会出现异常。
  8. NCCL_SOCKET_IFNAME:选择网络接口。
  9. NCCL_SOCKET_NTHREADS 增加它的数量可以提高socker传输的效率,但是会增加CPU的负担
  10. NCCL_NET_GDR_LEVEL:设置GPUDirect RDMA的使用级别。
  11. NCCL_MAX_NRINGS:定义支持的最大NCCL环路数。
  12. NCCL_MIN_NRINGS:定义最小环路数。
  13. NCCL_BUFFSIZE:设置scratch空间大小。
  14. NCCL_BUFFLE_SIZE 缓存数据量,缓存越大一次ring传输的数据就越大自然对带宽的压力最大,但是相应的总延迟次数会少。默认值是4M(4194304),注意设置的时候使用bytes(字节大小)
  15. NCCL_NTHREADS:设置NCCL内部使用的线程数。
  16. NCCL_VERSION:显示NCCL版本信息。
  17. NCCL_MAX/MIN_NCHANNELS 最小和最大的rings,rings越多对GPU的显存、带宽的压力都越大,也会影响计算性能
  18. NCCL_CHECKS_DISABLE 在每次集合通信进行前对参数检验校对,这会增加延迟时间,在生产环境中可以设为1.默认是0
  19. NCCL_CHECK_POINTERS 在每次集合通信进行前对CUDA内存 指针进行校验,这会增加延迟时间,在生产环境中可以设为1.默认是0
  20. NCCL_NET_GDR_LEVEL GDR触发的条件,默认是当GPU和NIC挂载一个swith上面时使用GDR
  21. NCCL_IGNORE_CPU_AFFINITY 忽略CPU与应用的亲和性使用GPU与nic的亲和性为主
  22. NCCL_IB_DISABLE:禁用InfiniBand传输。

禁用 InfiniBand: 设置 NCCL_IB_DISABLE=1 会禁用 NCCL 在 InfiniBand 设备上的使用。这意味着 NCCL 将不会利用 InfiniBand 网络进行数据传输,而是回退到其他网络接口(例如以太网或其他网络接口)。

调试和兼容性: 禁用 InfiniBand 可能用于调试目的,或在系统中 InfiniBand 网络出现问题时回退到其他网络接口。如果你遇到与 InfiniBand 相关的错误或兼容性问题,禁用 InfiniBand 可能有助于解决这些问题。

  1. NCCL_IB_HCA 代表IB使用的设备:Mellanox mlx5系列的HCA设备NCCL_IB_HCA=mlx5 会默认轮询所有的设备。NCCL_IB_HCA=mlx5_0:1 指定其中一台设备。
  2. NCCL_IB_TIMEOUT 改变量用于控制InfiniBand Verbs超时。取值范围1-22。超时时间的计算公式为4.096微秒 * 2 ^ timeout,正确的值取决于网络的大小。增加该值可以在非常大的网络上提供帮助,例如 NCCL在调用ibv_poll_cq时出现错误12时。建议在大模型训练任务中设置成最大值22,可以减少不少nccl timeout异常。设置超时时间: NCCL_IB_TIMEOUT 用于控制 InfiniBand 网络操作的超时时间。通过调整这个值,你可以控制 NCCL 在遇到通信延迟或网络问题时的容忍度。解决网络问题: 在高性能计算和大规模分布式训练中,网络延迟或不稳定可能导致超时错误。调整 NCCL_IB_TIMEOUT 可以帮助你在遇到网络问题时更好地调节超时设置,避免训练过程被中断。
  1. NCCL_IB_RETRY_CNT变量控制 InfiniBand 的重试次数。建议在大模型训练任务中设置成13,尽可能多重试。
  2. NCCL_DEBUG_FILE设置一个文件地址,变量用于将NCCL的调试日志输出到文件中。有助于调试nccl。
  3. NCCL_IB_PCI_RELAXED_ORDERING启用 IB Verbs 传输的Relaxed Ordering。Relaxed Ordering可以极大地提高虚拟化环境下 InfiniBand 网络的性能。设置为 2,如果可用,自动使用Relaxed Ordering。设置为 1,强制使用Relaxed Ordering,如果不可用则失败。设置为 0,禁用使用Relaxed Ordering。默认值为 2。建议值为1

PyTorch 提速

摘自:https://github.com/lartpang/PyTorchTricks?tab=readme-ov-file

Note

原始文档:https://www.yuque.com/lart/ugkv9f/ugysgn

声明: 大部分内容来自知乎和其他博客的分享, 这里只作为一个收集罗列. 欢迎给出更多建议.

知乎回答 (欢迎点赞哦):

预处理提速

  • 尽量减少每次读取数据时的预处理操作, 可以考虑把一些固定的操作, 例如 resize , 事先处理好保存下来, 训练的时候直接拿来用。
  • 将预处理搬到 GPU 上加速。
    • Linux 可以使用 NVIDIA/DALI
    • 使用基于 Tensor 的图像处理操作。

IO 提速

使用更快的图片处理

  • opencv 一般要比 PIL 要快 。
    • 请注意,PIL 的惰性加载的策略使得其看上去 open 要比 opencv 的 imread 要快,但是实际上那并没有完全加载数据。可以对 open 返回的对象调用其 load() 方法,从而手动加载数据,这时的速度才是合理的。
  • 对于 jpeg 读取, 可以尝试 jpeg4py
  • 存 bmp 图 (降低解码时间)。
  • 关于不同图像处理库速度的讨论:Python 的各种 imread 函数在实现方式和读取速度上有何区别? – 知乎

整合数据为单个连续文件 (降低读取次数)

对于大规模的小文件读取,可以保存为一个可以连续读取的连续文件格式。可以选择考虑 TFRecord (Tensorflow) , recordIOhdf5pthn5lmdb

预读取数据

预读取下一次迭代需要的数据。使用案例:

借助内存

  • 直接载到内存里面。
    • 将图片读取后存到一个固定的容器对象中。
  • 把内存映射成磁盘。

借助固态

机械硬盘换成 NVME 固态。参考自 如何给你 PyTorch 里的 Dataloader 打鸡血 – MKFMIKU 的文章 – 知乎

训练策略

低精度训练

在训练中使用低精度 ( FP16 甚至 INT8 、二值网络、三值网络) 表示取代原有精度 ( FP32 ) 表示。

可以节约一定的显存并提速, 但是要小心一些不安全的操作如 mean 和 sum。

更大的 batch

更大的 batch 在固定的 epoch 的情况下往往会带来更短的训练时间。但是大的 batch 面临着超参数的设置、显存占用问题等诸多考量,这又是另一个备受关注的领域了。

代码层面

库设置

  • 在训练循环之前设置 torch.backends.cudnn.benchmark = True 可以加速计算。由于计算不同内核大小卷积的 cuDNN 算法的性能不同,自动调优器可以运行一个基准来找到最佳算法。当你的输入大小不经常改变时,建议开启这个设置。如果输入大小经常改变,那么自动调优器就需要太频繁地进行基准测试,这可能会损害性能。它可以将向前和向后传播速度提高 1.27x 到 1.70x。
  • 使用页面锁定内存,即在 DataLoader 中设定 pin_memory=True
  • 合适的 num_worker,细节讨论可见 Pytorch 提速指南 – 云梦的文章 – 知乎
  • optimizer.zero_grad(set_to_none=False 这里可以通过设置 set_to_none=True 来降低的内存占用,并且可以适度提高性能。但是这也会改变某些行为,具体可见文档。通过 model.zero_grad() 或 optimizer.zero_grad() 将对所有参数执行 memset,并通过读写操作更新梯度。但是,将梯度设置为 None 将不会执行 memset,并且将使用“只写”操作更新梯度。因此,设置梯度为 None 更快。
  • 反向传播期间设定使用 eval 模式并使用 torch.no_grad 关闭梯度计算。
  • 可以考虑使用 channels_last 的内存格式。
  • DistributedDataParallel代替DataParallel。对于多 GPU 来说,即使只有单个节点,也总是优先使用 DistributedDataParallel 而不是 DataParallel ,因为 DistributedDataParallel 应用于多进程,并为每个 GPU 创建一个进程,从而绕过 Python 全局解释器锁 (GIL) 并提高速度。

模型

  • 不要初始化任何用不到的变量,因为 PyTorch 的初始化和 forward 是分开的,他不会因为你不去使用,而不去初始化。
  • @torch.jit.script,使用 PyTroch JIT 将逐点运算融合到单个 CUDA kernel 上。PyTorch 优化了维度很大的张量的运算操作。在 PyTorch 中对小张量进行太多的运算操作是非常低效的。所以有可能的话,将计算操作都重写为批次(batch)的形式,可以减少消耗和提高性能。而如果没办法自己手动实现批次的运算操作,那么可以采用 TorchScript 来提升代码的性能。TorchScript 是一个 Python 函数的子集,但经过了 PyTorch 的验证,PyTorch 可以通过其 just in time(jtt) 编译器来自动优化 TorchScript 代码,提高性能。但更好的做法还是手动实现批次的运算操作。
  • 在使用混合精度的 FP16 时,对于所有不同架构设计,设置尺寸为 8 的倍数。
  • BN 之前的卷积层可以去掉 bias。因为在数学上,bias 可以通过 BN 的均值减法来抵消。我们可以节省模型参数、运行时的内存

数据

  • 将 batch size 设置为 8 的倍数,最大化 GPU 内存的使用。
  • GPU 上尽可能执行 NumPy 风格的操作。
  • 使用 del 释放内存占用。
  • 避免不同设备之间不必要的数据传输。
  • 创建张量的时候,直接指定设备,而不要创建后再传输到目标设备上。
  • 使用 torch.from_numpy(ndarray) 或者 torch.as_tensor(data, dtype=None, device=None),这可以通过共享内存而避免重新申请空间,具体使用细节和注意事项可参考对应文档。如果源设备和目标设备都是 CPU,torch.from_numpy 和 torch.as_tensor 不会拷贝数据。如果源数据是 NumPy 数组,使用 torch.from_numpy 更快。如果源数据是一个具有相同数据类型和设备类型的张量,那么 torch.as_tensor 可以避免拷贝数据,这里的数据可以是 Python 的 list, tuple,或者张量。
  • 使用非阻塞传输,即设定 non_blocking=True。这会在可能的情况下尝试异步转换,例如,将页面锁定内存中的 CPU 张量转换为 CUDA 张量。

对优化器的优化

模型设计

CNN

  • ShuffleNetV2,论文
    • 卷积层输入输出通道一致: 卷积层的输入和输出特征通道数相等时 MAC(内存访问消耗时间, memory access cost 缩写为 MAC ) 最小, 此时模型速度最快
    • 减少卷积分组: 过多的 group 操作会增大 MAC, 从而使模型速度变慢
    • 减少模型分支: 模型中的分支数量越少, 模型速度越快
    • 减少 element-wise 操作: element-wise 操作所带来的时间消耗远比在 FLOPs 上的体现的数值要多, 因此要尽可能减少 element-wise 操作。 depthwise convolution 也具有低 FLOPs 、高 MAC 的特点。

Vision Transformer

  • TRT-ViT: TensorRT-oriented Vision Transformer,论文解读
    • stage-level:Transformer block 适合放置到模型的后期,这可以最大化效率和性能的权衡。
    • stage-level:先浅后深的 stage 设计模式可以提升性能。
    • block-level:Transformer 和 BottleNeck 的混合 block 要比单独的 Transformer 更有效。
    • block-level:先全局再局部的 block 设计模式有助于弥补性能问题。

通用思路

  • 降低复杂度: 例如模型裁剪和剪枝, 减少模型层数和参数规模
  • 改模型结构: 例如模型蒸馏, 通过知识蒸馏方法来获取小模型

推理加速

半精度与权重量化

在推理中使用低精度 ( FP16 甚至 INT8 、二值网络、三值网络) 表示取代原有精度 ( FP32 ) 表示。

  • TensorRT 是 NVIDIA 提出的神经网络推理 (Inference) 引擎, 支持训练后 8BIT 量化, 它使用基于交叉熵的模型量化算法, 通过最小化两个分布的差异程度来实现
  • Pytorch1.3 开始已经支持量化功能, 基于 QNNPACK 实现, 支持训练后量化, 动态量化和量化感知训练等技术
  • 另外 Distiller 是 Intel 基于 Pytorch 开源的模型优化工具, 自然也支持 Pytorch 中的量化技术
  • 微软的 NNI 集成了多种量化感知的训练算法, 并支持 PyTorch/TensorFlow/MXNet/Caffe2 等多个开源框架

更多细节可参考 有三 AI:【杂谈】当前模型量化有哪些可用的开源工具?

操作融合

重参数化(Re-Parameterization)

时间分析

  • Python 自带了几个性能分析的模块 profile , cProfile 和 hotshot , 使用方法基本都差不多, 无非模块是纯 Python 还是用 C 写的。
  • PyTorch Profiler 是一种工具,可在训练和推理过程中收集性能指标。Profiler 的上下文管理器 API 可用于更好地了解哪种模型算子成本最高,检查其输入形状和堆栈记录,研究设备内核活动并可视化执行记录。

项目推荐

  • 基于 Pytorch 实现模型压缩:
    • 量化:8/4/2 bits(dorefa)、三值/二值 (twn/bnn/xnor-net)。
    • 剪枝: 正常、规整、针对分组卷积结构的通道剪枝。
    • 分组卷积结构。
    • 针对特征二值量化的 BN 融合。

扩展阅读

PyTorch 节省显存

原始文档:https://www.yuque.com/lart/ugkv9f/nvffyf

整理自: Pytorch 有什么节省内存 (显存) 的小技巧? – 知乎 https://www.zhihu.com/question/274635237

使用 In-Place 操作

  • 对于默认支持 inplace 的操作尽量启用。比如 relu 可以使用 inplace=True 。
  • 可以将 batchnorm 和一些特定的激活函数打包成 inplace_abn

损失函数

每次循环结束时删除 loss, 可以节约很少显存, 但聊胜于无。可见 Tensor to Variable and memory freeing best practices

混合精度

可以节约一定的显存并提速, 但是要小心一些不安全的操作如 mean 和 sum。

管理不需要反向传播的操作

显存清理

  • torch.cuda.empty_cache() 这是 del 的进阶版, 使用 nvidia-smi 会发现显存有明显的变化. 但是训练时最大的显存占用似乎没变. 大家可以试试: How can we release GPU memory cache?
  • 可以使用 del 删除不必要的中间变量, 或者使用 replacing variables 的形式来减少占用.

梯度累加(Gradient Accumulation)

把一个 batchsize=64 分为两个 32 的 batch,两次 forward 以后,backward 一次。但会影响 batchnorm 等和 batchsize 相关的层。

在 PyTorch 的文档 中提到了梯度累加与混合精度并用的例子。

使用梯度累加技术可以对分布式训练加速,这可以参考:[原创][深度][PyTorch] DDP 系列第三篇:实战与技巧 – 996 黄金一代的文章 – 知乎

梯度检查点(Gradient Checkpointing)

PyTorch 中提供了 torch.utils.checkpoint。这是通过在反向传播期间,在每个检查点位置重新执行一次前向传播来实现的。

论文 Training Deep Nets with Sublinear Memory Cost 基于梯度检查点技术,将显存从 O(N) 降到了 O(sqrt(N))。对于越深的模型, 这个方法省的显存就越多, 且速度不会明显变慢。

相关工具

参考资料

其他技巧

重现

可关注文档中 相关章节

强制确定性操作

避免使用非确定性算法

PyTorch 中,torch.use_deterministic_algorithms() 可以强制使用确定性算法而不是非确定性算法,并且如果已知操作是非确定性的(并且没有确定性的替代方案),则会抛出错误。

设置随机数种子

def seed_torch(seed=1029):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed) # if you are using multi-GPU.
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

seed_torch()

参考自https://www.zdaiot.com/MLFrameworks/Pytorch/Pytorch%E9%9A%8F%E6%9C%BA%E7%A7%8D%E5%AD%90/

PyTorch 1.9 版本前 DataLoader 中的隐藏 BUG

具体细节可见 可能 95%的人还在犯的 PyTorch 错误 – serendipity 的文章 – 知乎

解决方法可参考 文档

def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    numpy.random.seed(worker_seed)
    random.seed(worker_seed)

DataLoader(..., worker_init_fn=seed_worker)

DDP分布式训练–数据加载和训练NCCL

深度学习的发展证明了大数据和大模型的价值。无论是在CV还是NLP领域,在大规模的计算资源上训练模型的能力变得日益重要。GPU以比CPU更快的矩阵乘法和加法运算,加速了模型训练。但随着数据量和模型参数的增长,单块GPU很快变得不够用。因此我们必须找到合适的方法,实现数据和模型在多个GPU甚至多个计算节点间的划分和复制,从而实现更短的训练周期和更大的模型参数量。

DDP大致的流程如下:

  1. 初始化进程组。
  2. 创建分布式并行模型,每个进程都会有相同的模型和参数。
  3. 创建数据分发Sampler,使每个进程加载一个mini batch中不同部分的数据。
  4. 网络中相邻参数分桶,一般为神经网络模型中需要进行参数更新的每一层网络。
  5. 每个进程前向传播并各自计算梯度。
  6. 模型某一层的参数得到梯度后会马上进行通讯并进行梯度平均。
  7. 各GPU更新模型参数。

今天主要来研究 3创建数据分发和Sampler :主要由三部分组成:torch.utils.data.Dataset【可以自定义】、torch.utils.data.DataLoader、以及torch.utils.data.distributed.DistributedSampler【可以自己定义】。

DistributedSampler 确保每个进程(或 GPU)处理数据集的不同部分。DataLoader 使用 DistributedSampler 生成的数据索引来分批数据,并进行数据加载和预处理。

1、 Dataset :

Dataset 是一个抽象类,用于表示数据集。你需要继承这个类并实现其方法,以定义你自己的数据集。它的主要功能包括:

  • 定义数据访问:通过实现 __getitem__ 方法,定义如何访问数据集中单个数据项。
  • 数据集大小:通过实现 __len__ 方法,返回数据集中样本的总数。
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        return self.data[index]

2、DataLoader:

DataLoader 是一个数据加载器,它负责从 Dataset 中批量加载数据。它提供了对数据的批量处理、随机打乱、并行加载等功能。DataLoader 主要功能包括:

  • 批量加载:将数据集分成多个批次,并在每次迭代中返回一个批次的数据。
  • 并行处理:使用多个工作线程(num_workers)来并行加载数据,提高数据加载速度。
  • 数据打乱:通过 shuffle 参数来随机打乱数据顺序。
  • 自动处理样本:使用 collate_fn 将单个样本组合成批次。

1. 数据加载和预处理

DataLoader 负责从数据集(Dataset)中加载数据,并进行必要的预处理操作。预处理可能包括数据增强、归一化等。它通过多线程或多进程的方式并行加载数据,减少了数据加载时间。

  • num_workers:指定用于数据加载的子进程数,帮助加快数据加载速度。

2. 数据分批

DataLoader 将数据集划分为多个批次(batches),以便于模型进行训练和评估。批次的大小可以通过 batch_size 参数进行设置。

  • batch_size:每个批次的数据量,这对于训练过程中每次迭代的数据量非常重要。

3. 分布式训练中的数据划分

在 DDP 下,DataLoader 结合 Sampler 来确保数据在各个进程之间的正确分配。Sampler 控制每个进程(或 GPU)获得数据集的哪一部分。

  • DistributedSampler:当进行分布式训练时,DistributedSampler 确保每个进程处理不同的数据子集,从而实现负载均衡和避免数据重复。

4. 数据的打乱和顺序

为了提高模型的泛化能力,数据通常在每个 epoch 开始时被打乱。DataLoader 提供了打乱数据的功能,这对于训练过程是非常重要的。

  • shuffle:指定是否在每个 epoch 开始时打乱数据,这有助于减少模型对数据顺序的过拟合。

5. 批次丢弃

在训练过程中,如果最后一个批次的样本数不足以构成完整的批次,可以选择丢弃这个批次,以保证每个批次的大小一致。

  • drop_last:指定是否丢弃最后一个批次(如果其大小小于 batch_size)。

6. Sampler 结合使用

DataLoader 可以与不同的 Sampler 结合使用,以支持各种数据加载策略。在 DDP 下,DistributedSampler 是常用的 Sampler,它将数据集划分为多个子集,每个进程处理一个子集。

  • batch_sampler:如果使用自定义的 Sampler,可以将其传递给 batch_sampler 参数来控制数据的分批方式。

data = [1, 2, 3, 4, 5]
dataset = MyDataset(data)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=2, shuffle=True, num_workers=2)

for batch in dataloader:
print(batch)

3、DistributedSampler:

DistributedSampler 用于在分布式训练中对数据进行采样。它的主要作用是确保每个进程(或 GPU)在分布式训练中获得数据的不同子集,从而避免数据重复和确保数据均匀分配。主要功能包括:

  • 分布式数据分配:根据进程的 rank 和总进程数,计算出每个进程应该处理的数据子集。
  • 随机打乱:支持在每个 epoch 重新打乱数据,以增加训练的随机性。
  • 同步:在多个进程之间协调数据的采样。

1. 数据分配

在分布式训练中,数据集被划分成多个子集,每个进程(或 GPU)处理数据集的一部分。Sampler 确保每个进程(或 GPU)得到不同的数据子集,以避免重复和数据丢失。

  • DistributedSampler:这是 PyTorch 提供的专门用于分布式训练的采样器。它根据当前进程的 rank 和总进程数 num_replicas 来划分数据集。每个进程获得数据集的不同部分,从而实现数据的有效分配和负载均衡。

2. 确保数据覆盖

在每个 epoch 中,每个进程需要获取数据集的不同部分,以确保整个数据集被覆盖。Sampler 可以帮助实现这种数据分配策略,避免数据遗漏和冗余。

  • 随机打乱DistributedSampler 还支持在每个 epoch 开始时打乱数据集,这对于训练模型具有更好的泛化能力是非常重要的。

3. 避免数据重复

如果不使用合适的 Sampler,多个进程可能会处理相同的数据,从而导致数据重复。这不仅浪费计算资源,还可能影响模型的训练效果。

  • 去重DistributedSampler 确保每个进程仅处理数据集的一部分,从而避免数据重复。

4. 适应批量大小

在分布式训练中,数据的分配和批处理需要适应分布式环境中的批量大小。Sampler 负责将数据分成适合训练的批次,并确保每个进程处理的数据量与其他进程一致。

  • BatchSamplerBatchSampler 将由 Sampler 生成的索引列表分成批次,以便用于训练。它与 DistributedSampler 结合使用时,可以确保每个进程处理的数据批次符合预期的批量大小。

5. 支持多样本处理策略

不同的任务和模型可能需要不同的数据处理策略,如排序、动态采样等。通过自定义 Sampler,可以实现特定的采样策略以满足任务需求。

  • 自定义采样器:可以实现自定义的 Sampler 类,来满足特定的需求,如按样本长度排序、动态调整批次大小等。
sampler = torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=4, rank=0)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=2, sampler=sampler)

动手实现一个采样器:

CustomDistributedBufferDynamicBatchSampler 是一个用于分布式训练的自定义数据采样器,它结合了动态批量大小和缓冲区的排序策略。它的目的是通过更复杂的策略来生成批量,以适应各种训练需求。下面是对这个采样器的详细解释:

__iter__ 方法生成数据批次,考虑到动态批量大小和缓冲区的排序:

数据打乱:如果 shuffle 为 True,数据将被打乱。缓冲区排序:数据被分成多个缓冲区,每个缓冲区的大小由 sort_size 控制,并按样本长度进行排序。批量生成:根据 batch_sizebatch_size_sample_max 生成批量。如果当前缓冲区中的数据无法满足批次大小,则将现有数据作为一个批次。数据重复和分配:确保每个进程获得相同数量的批次。如果总批次不足以均分,重复一些批次以满足每个进程的需求。

dataset: 数据集实例。batch_size: 批次大小。batch_type: 批次的类型(例如按 token 或样本)。num_replicas: 总的进程数。rank: 当前进程的 rank。rank_split: 是否分割 rank。shuffle: 是否打乱数据。drop_last: 是否丢弃最后一个批次。is_training: 是否处于训练模式。sort_size: 缓冲区的大小,用于排序数据。start_step: 起始步数(用于从特定步数开始训练)。

def __init__(
    self,
    dataset,
    batch_size,
    batch_type="token",
    num_replicas=None,
    rank=None,
    rank_split=False,
    shuffle=True,
    drop_last=False,
    is_training: bool = True,
    sort_size: int = 1024,
    start_step: int = 0,
    **kwargs,
):
    try:
        rank = dist.get_rank()
        num_replicas = dist.get_world_size()
    except:
        rank = 0
        num_replicas = 1

    self.rank = rank
    self.num_replicas = num_replicas
    self.dataset = dataset
    self.batch_size = batch_size
    self.batch_type = batch_type
    self.is_training = is_training
    self.shuffle = shuffle and is_training
    self.drop_last = drop_last

    self.total_size = len(self.dataset)
    self.num_samples = int(math.ceil(self.total_size / self.num_replicas))
    self.epoch = 0
    self.sort_size = sort_size * num_replicas
    self.max_token_length = kwargs.get("max_token_length", 2048)
    self.length_scale_source = kwargs.get("length_scale_source", 1.0)
    self.batch_size_sample_max = kwargs.get("batch_size_sample_max", 200)
    self.start_step = start_step
    self.batch_num = 1
    if self.start_step > 0:
        logging.info(f"Warning, start_step > 0, dataloader start from step: {self.start_step}")
def __iter__(self):
    if self.shuffle:
        g = torch.Generator()
        g.manual_seed(self.epoch)
        random.seed(self.epoch)
        indices = torch.randperm(len(self.dataset), generator=g).tolist()
    else:
        indices = list(range(len(self.dataset)))

    # Create sorted buffers and form batches
    buffer_batches = []
    for i in range(0, len(indices), self.sort_size):
        buffer = sorted(
            indices[i : i + self.sort_size], key=lambda idx: self.dataset.get_source_len(idx)
        )
        batch = []
        max_len_in_batch = 0
        count = 1
        for idx in buffer:
            original_sample_length = self.dataset.get_source_len(idx)
            if original_sample_length > self.max_token_length:
                continue
            sample_length = 1 if self.batch_type == "example" else original_sample_length
            potential_batch_length = max(max_len_in_batch, sample_length) * (len(batch) + 1)
            if potential_batch_length <= self.batch_size and count < self.batch_size_sample_max:
                batch.append(idx)
                max_len_in_batch = max(max_len_in_batch, sample_length)
                count += 1
            else:
                buffer_batches.append(batch)
                batch = [idx]
                max_len_in_batch = sample_length
                count = 1
        if batch:
            buffer_batches.append(batch)

    # Ensure each rank gets the same number of batches, duplicate data if needed
    batches_per_rank = math.ceil(len(buffer_batches) / self.num_replicas)
    total_batches_needed = batches_per_rank * self.num_replicas
    extra_batches = total_batches_needed - len(buffer_batches)
    buffer_batches += random.choices(buffer_batches, k=extra_batches)

    # Evenly distribute batches from buffer_batches to each rank
    rank_batches = [[] for _ in range(self.num_replicas)]
    for i, batch in enumerate(buffer_batches):
        rank_batches[i % self.num_replicas].append(batch)

    # Assign all batches for the current rank directly
    final_batches = rank_batches[self.rank][self.start_step :]
    self.batch_num = len(final_batches)

    logging.info(
        f"rank: {self.rank}, dataloader start from step: {self.start_step}, batch_num: {len(rank_batches[self.rank])}, after: {self.batch_num}"
    )
    return iter(final_batches)

CustomDistributedBufferDynamicBatchSampler 通过以下方式增强了数据采样:

  • 动态批量大小:根据数据的实际长度动态调整批量大小。
  • 缓冲区排序:使用排序缓冲区策略提高数据处理效率。
  • 数据均匀分配确保每个进程获得相同数量的批次,避免数据不均衡。

这些特性使得 CustomDistributedBufferDynamicBatchSampler 能够更好地处理大规模数据集,并在分布式训练中提供高效的数据加载和批次生成策略。

数据均匀分配至关重要:如果分配不均,会导致某个节点的GPU显存爆炸,导致短筒效应,所以需要对数据进行平均分配:

分布式训练的时候 如何定义自己的samper,如何保证不同的节点使用不同的数据训练?

根据rank数量将索引分成不同的rank份。 分割数据以确保每个进程获取不同的索引

        if self.num_replicas is not None and self.rank is not None:
            # 每个进程处理的数据索引范围
            num_samples = int(np.ceil(len(indices) / self.num_replicas))
            start = self.rank * num_samples
            end = min(start + num_samples, len(indices))
            indices = indices[start:end]

1. 定义自定义Sampler

自定义Sampler需要继承torch.utils.data.Sampler并实现__iter__方法,返回数据索引的迭代器。以下是一个简单的示例:

python复制代码import torch
import numpy as np

class CustomSampler(torch.utils.data.Sampler):
    def __init__(self, data_source, num_replicas=None, rank=None):
        self.data_source = data_source
        self.num_replicas = num_replicas
        self.rank = rank

    def __iter__(self):
        # 获取所有样本索引
        indices = np.arange(len(self.data_source))

        # 分割数据以确保每个进程获取不同的索引
        if self.num_replicas is not None and self.rank is not None:
            # 每个进程处理的数据索引范围
            num_samples = int(np.ceil(len(indices) / self.num_replicas))
            start = self.rank * num_samples
            end = min(start + num_samples, len(indices))
            indices = indices[start:end]

        # 打乱数据
        np.random.shuffle(indices)
        return iter(indices)

    def __len__(self):
        if self.num_replicas is not None and self.rank is not None:
            num_samples = int(np.ceil(len(self.data_source) / self.num_replicas))
            return num_samples
        return len(self.data_source)

2. 初始化分布式环境

在训练脚本中,初始化分布式环境并创建自定义采样器。

python复制代码import torch
import torch.distributed as dist

dist.init_process_group(backend='nccl')  # 或 'gloo'
local_rank = dist.get_rank()
world_size = dist.get_world_size()

# 数据集
from torchvision import datasets, transforms
transform = transforms.Compose([transforms.ToTensor()])
dataset = datasets.CIFAR10(root='./data', train=True, transform=transform, download=True)

# 创建自定义采样器
sampler = CustomSampler(dataset, num_replicas=world_size, rank=local_rank)

# 创建数据加载器
dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, sampler=sampler)

3. 在训练时设置采样器的epoch

如果你的自定义Sampler需要在每个epoch中更改数据顺序,可以在每个epoch开始时调用sampler.set_epoch(epoch)

python复制代码for epoch in range(num_epochs):
    sampler.set_epoch(epoch)  # 如果你的自定义Sampler支持这个方法
    for batch in dataloader:
        # 训练代码

这样,你就可以定义一个适合你需求的自定义Sampler,并在分布式训练中使用它。

DDP分布式训练时候 batchsize设置是指单卡还多卡所有的总batch?

在分布式数据并行(DDP)训练中,batch_size的设置是指每个单卡(即每个GPU)的batch size。总的batch size是每个单卡的batch size乘以GPU的数量。【在samper采样的时候,根据rank数量,将index 分割成 rank份,每一份里面进行batchsize的采样,所以bs指的是单个GPU的bs】

例如,如果你有4个GPU,并且每个GPU的batch size设置为32,那么总的batch size就是32 * 4 = 128。每个GPU在每次训练迭代中处理32个样本,所有4个GPU在每次训练迭代中处理总共128个样本。

如果你使用的是分布式数据并行的训练策略,确保将batch_size设置为每个GPU上希望的大小,而不是总的batch size。

datalaoder中设置的 number_work在DDP训练中如何工作的?

首先明确一点: num_works指的是单个GPU的num_works数据加载进程数量。

  • **num_workers**参数定义了并行数据加载的进程数量。每个进程独立地从数据集中读取和预处理数据。
  • **collate_fn**可以自定义如何将数据项组合成batch。
  • 数据加载进程将预处理后的数据批次传递给主进程,主进程将这些批次数据送入模型进行训练。

使用多个数据加载进程可以提高数据预处理的速度,减少GPU在训练时的等待时间,从而加快整体训练过程。

num_workers的作用

  • 数据加载: num_workers决定了用于加载数据的子进程的数量。更多的工作进程可以并行地读取和预处理数据,从而加快数据加载速度,减少GPU的等待时间。
  • 性能影响: 增加num_workers的数量通常可以提高数据加载速度,但也会增加系统的内存使用。合理设置num_workers的值可以在数据加载效率和系统资源使用之间找到平衡。

在DDP训练中的考虑

  1. 每个进程的num_workers: 每个分布式进程(即每个GPU)都有自己的数据加载子进程。这意味着总的num_workers会是每个GPU上num_workers的值乘以GPU的数量(分布式进程数)。例如,如果有4个GPU,并且每个GPU的num_workers设置为4,那么总的工作进程数将是4 * 4 = 16。
  2. 避免数据重叠: 在分布式训练中,需要确保每个进程处理的数据子集是不同的。使用DistributedSampler可以确保数据在各个进程间均匀分配,从而避免数据重复和丢失。
  3. 同步和通信开销: 增加num_workers的数量可能会增加进程间的同步和通信开销,特别是在多GPU的情况下。需要根据具体的硬件配置和数据集大小来调整num_workers
  4. 内存和CPU资源: 每增加一个工作进程,都会消耗额外的CPU资源和内存。确保你的系统有足够的资源来支持设置的num_workers值。

单个numberwork子进程单独负责一个batch的数据吗,然后多个进程负责加载多个不同batch数据?

在PyTorch中,DataLoader中的子进程(由num_workers参数指定)并不一定是每个进程负责一个完整的batch的数据。实际操作中,多个子进程负责并行地预处理数据并将其送入主进程。下面是详细的解释:

数据加载进程的工作方式

  1. 子进程的任务: 每个数据加载进程从数据集中提取样本,并执行预处理任务。子进程会从数据集中读取单个样本(或多个样本),进行必要的转换和预处理,然后将这些处理后的样本返回给主进程。
  2. 批次的生成: DataLoader在主进程中处理批次的生成。主进程负责将从各个子进程中接收到的样本聚合成一个完整的batch。具体来说,当主进程需要一个新的batch时,它会从子进程中获取样本,然后使用collate_fn将这些样本组合成一个batch。
  3. 数据并行处理: 如果设置了num_workers > 1DataLoader会启动多个子进程来并行地加载数据。这些子进程并不独立处理完整的batch,而是并行地从数据集中提取和预处理样本。主进程会从这些子进程中收集样本,并在主进程中将样本组成batch。

数据加载示例

假设你设置了num_workers=4,这意味着会有4个子进程并行地处理数据。数据加载的过程大致如下:

  1. 子进程读取数据:
    • 每个子进程从数据集中读取样本。
    • 子进程对样本进行预处理(如数据增强、标准化等)。
  2. 主进程收集数据:
    • 主进程从子进程中收集处理后的样本。
    • 主进程使用collate_fn将样本组合成一个batch。
  3. 生成批次:
    • 主进程将生成的batch传递给训练循环。

为什么numberwork设置大了会增加CPU内存?

设置较大的 num_workers 值会增加 CPU 内存使用的原因有几个方面:

1. 进程数量和内存占用

  • 每个子进程的内存消耗: 每个数据加载子进程(由 num_workers 定义)都会独立地运行,并加载一部分数据集。每个子进程会使用自己的内存来存储数据和进行预处理操作。
  • 内存需求: 如果 num_workers 设置得很高,系统将会启动多个子进程,这些进程会同时存在并占用内存。每个进程都需要一定的内存来存储数据和运行预处理代码,从而导致总的内存使用增加。

2. 数据预处理和缓存

  • 数据缓冲: DataLoader 使用子进程来并行加载和预处理数据。在预处理过程中,子进程可能会创建和维护缓存,这些缓存可能会消耗额外的内存。
  • 数据加载: 进程在数据加载过程中可能会在内存中保持一定量的数据,以提高数据处理效率。这种内存的占用也会随着 num_workers 的增加而增加。

3. 并发处理

  • 并发开销: 启动大量的子进程进行数据处理会增加系统的并发开销。操作系统需要为每个进程分配内存和管理资源,这会导致系统整体的内存使用增加。
  • 进程间通信: 多个子进程之间可能会有数据交换和同步操作,这些操作也可能增加内存开销。

大模型训练中的数据加载和NCCL通信问题

A、训练大模型时候,有两亿的数据,数据索引保存到了jsonl文件中,在torch dataloader 加载数据jsonl文件时候爆内存,如何解决

1. 使用分块加载(Chunk Loading)【法1】

将数据分块处理,而不是一次性加载所有数据。可以在Dataset类中实现这一点。示例代码如下:

import json
import torch
from torch.utils.data import Dataset, DataLoader

class LargeJSONLDataset(Dataset):
    def __init__(self, jsonl_file, chunk_size=1000):
        self.jsonl_file = jsonl_file
        self.chunk_size = chunk_size
        self.data = []
        self._load_chunk(0)

    def _load_chunk(self, chunk_index):
        start_line = chunk_index * self.chunk_size
        end_line = start_line + self.chunk_size
        self.data = []
        with open(self.jsonl_file, 'r') as f:
            for i, line in enumerate(f):
                if start_line <= i < end_line:
                    self.data.append(json.loads(line))
                if i >= end_line:
                    break

    def __len__(self):
        with open(self.jsonl_file, 'r') as f:
            return sum(1 for _ in f)

    def __getitem__(self, idx):
        chunk_index = idx // self.chunk_size
        self._load_chunk(chunk_index)
        local_idx = idx % self.chunk_size
        return self.data[local_idx]

# 创建 Dataset 和 DataLoader
dataset = LargeJSONLDataset('data.jsonl')
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

实现的逻辑:采样器sampler 获取 index = len(self.dataset),然后进行index随机抽样,将抽到的id送给dataloader加载器,dataloader根据这些id,去dataset类里面执行getitem。 dataset 不在需要加载所有的jsonl文件,只需要根据id//self.chunk_size判断数据在第几个chunk,然后对应需要加载目标chunk的数据即可,然后在id% self.chunk_size 得到在该chunk的真实id,读取。这样做缺点是每次都需要重新laod jsonl文件,加载时间变慢。

2. 使用内存映射

内存映射可以帮助将大文件映射到内存中而不是完全加载。jsonl格式通常不支持直接内存映射,但可以使用分块处理与内存映射结合的方法。

内存映射是一种将磁盘上的文件映射到内存中的方法。通过使用内存映射,我们可以在不将整个文件加载到内存中的情况下访问文件的内容。这对于处理大型数据集非常有用,因为它可以节省内存空间,并且可以快速访问文件的任意部分。

内存映射:将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上,即完成了对文件的操作而不必再调用 read、write 等系统调用函数。相反,内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。

使用内存映射有以下几个优点:

  1. 节省内存空间:通过内存映射,我们可以在不将整个文件加载到内存中的情况下访问文件的内容。这对于处理大型数据集非常有用,因为它可以节省大量的内存空间。
  2. 快速访问文件的任意部分:由于内存映射将文件映射到内存中,我们可以快速访问文件的任意部分,而不需要读取整个文件。这对于随机访问大型文件非常有用。
  3. 支持并发访问:多个进程可以同时访问内存映射文件,而不会发生冲突。这使得内存映射非常适合多进程的数据处理任务。

https://github.com/DACUS1995/pytorch-mmap-dataset

3. 优化数据存储格式

考虑将数据存储为其他格式,如HDF5或Parquet,这些格式支持更高效的分块读写和压缩。例如,可以使用pandas将JSONL文件转换为Parquet格式,然后使用pandas读取它们。

4. 使用数据流处理

使用生成器逐行读取数据,而不是将整个文件加载到内存中:

def data_generator(file_path):
    with open(file_path, 'r') as f:
        for line in f:
            yield json.loads(line)

# 在 DataLoader 中使用生成器
def collate_fn(batch):
    # 自定义你的批处理操作
    return batch

dataset = data_generator('data.jsonl')
dataloader = DataLoader(dataset, batch_size=32, collate_fn=collate_fn)

5. 多进程数据加载

使用torch.utils.data.DataLoadernum_workers参数来并行加载数据:

dataloader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4)

6. 数据预处理

在数据加载之前进行预处理,将数据处理成更紧凑的格式或者将其划分为多个较小的文件进行分段加载。这样可以减少每次加载的数据量。

7. 使用分块加载【法2】

方法1 每次读取单个数据,都需要重新读取一边jsonl文件,大大增加了数据加载的时间,为了尽量不影响数据加载时间,我们考虑牺牲一部分随机性来提高速度。

具体方法为:jsonl数据被分成N份,在训练1轮中,数据datalaoder先加载第一份的jsonl数据,然后part1数据加载训练结束后,继续加载part2的jsonl数据…..直到所有的jsonl数据加载完成,训练1轮结束。这样做的好处是每次batch不需要重新读取jsonl,但缺点就是不同part的jsonl之间数据不互通,数据的随机性降低,具体代码实现参考:FunASR

1:在训练1个epoch时候:传递data_split_num【数据分成几份】 data_split_i 【当前第几分】

2、datalaoder的 build_iter代码实现:本质上就是 重新执行 torch.utils.data.Dataset【可以自定义】、torch.utils.data.DataLoader、以及torch.utils.data.distributed.DistributedSampler【可以自己定义】 ,需要向 Dataset 传递 data_split_i 参数;

    def build_iter(self, epoch=0, data_split_i=0, start_step=0, **kwargs):

        # reload dataset slice
        if self.data_split_num > 1:
            del self.dataset_tr
            self.dataset_tr = self.dataset_class(
                self.kwargs.get("train_data_set_list"),
                frontend=self.frontend,
                tokenizer=self.tokenizer,
                is_training=True,
                **self.kwargs.get("dataset_conf"),
                data_split_i=data_split_i,
            )

        # dataloader
        batch_sampler = self.kwargs["dataset_conf"].get("batch_sampler", "BatchSampler")
        batch_sampler_val = None
        if batch_sampler is not None:
            batch_sampler_class = tables.batch_sampler_classes.get(batch_sampler)
            batch_sampler = batch_sampler_class(
                self.dataset_tr, start_step=start_step, **self.kwargs.get("dataset_conf")
            )
            batch_sampler_val = batch_sampler_class(
                self.dataset_val, is_training=False, **self.kwargs.get("dataset_conf")
            )

        batch_sampler["batch_sampler"].set_epoch(epoch)
        batch_sampler_val["batch_sampler"].set_epoch(epoch)
        dataloader_tr = torch.utils.data.DataLoader(
            self.dataset_tr, collate_fn=self.dataset_tr.collator, **batch_sampler
        )
        dataloader_val = torch.utils.data.DataLoader(
            self.dataset_val, collate_fn=self.dataset_val.collator, **batch_sampler_val
        )

        return dataloader_tr, dataloader_val

3、 Dataset 的具体实现:

可以看出,AudioDataset里面实际上利用的index_ds来具体读取jsonl文件内容的。

4、index_ds的实现:只返回部分jsonl数据,虽然函数里面加载了整个文件,但函数结束file_list_all解释放掉了,最后只有file_list一直在占用内存。

8、pytorch pin_memory 设置为Fasle【牺牲时间换空间】

在PyTorch中,何时使用pin_memory?【CPU内存不足,建议关闭该功能】 当计算机的内存充足的时候,可以设置pin_memory=True。当系统卡住,或者交换内存使用过多的时候,设置pin_memory=False。

pin_memory就是锁页内存,创建DataLoader时,设置pin_memory=True,则意味着生成的Tensor数据最开始是属于内存中的锁页内存,这样将内存的Tensor转义到GPU的显存就会更快一些。pin_memory=False表示将load进数据放至非锁页内存区,速度会较慢。

当计算机的内存充足的时候,设置pin_memory=True。当系统卡住,或者交换内存使用过多的时候,设置pin_memory=False。

主机中的内存,有两种存在方式: 一是锁页,二是不锁页,

锁页内存存放的内容在任何情况下都不会与主机的虚拟内存进行交换(注:虚拟内存就是硬盘),而不锁页内存在主机内存不足时,数据会存放在虚拟内存中。显卡中的显存全部是锁页内存,当计算机的内存充足的时候,可以设置pin_memory=True。

在使用PyTorch进行数据加载时,pin_memory是一个可选的,它通常用于将数据存储在主机内存(RAM)中的固定内存页(pinned memory)上,以便更高效地将数据传输到GPU内存。

主要作用如下:

  1. 提高数据传输效率:当使用GPU进行训练时,通常需要将数据从主机内存传输到GPU内存。使用pin_memory可以将数据存储在固定内存页中,减少数据传输的时间和开销,提高数据传输的效率。
  2. 减少数据传输延迟:主机内存和GPU内存之间的数据传输通常涉及内存拷贝操作,而内存拷贝是一项相对较慢的操作。pin_memory可以在数据加载时将数据直接存放在固定内存页中,避免不必要的内存拷贝过程,从而减少数据传输的延迟。

需要注意的是,使用pin_memory会占用额外的主机内存,并且只在使用CUDA设备的情况下才有效果。

锁页内存和GPU显存之间的拷贝速度大约是6GB/s
可分页内存和GPU显存间的拷贝速度大约是3GB/s。
GPU内存间速度是30GB/s,CPU间内存速度是10GB/s

通常我们的主机处理器是支持虚拟内存系统的,也就是使用硬盘空间来代替内存。大多数系统中虚拟内存空间被划分成许多页,它们是寻址的单元,页的大小至少是4096个字节。虚拟寻址能使一个连续的虚拟地址空间映射到物理内存并不连续的一些页。

如果某页的物理内存被标记为换出状态,它就可以被更换到磁盘上,也就是说被踢出内存了。如果下次需要该页了,则重新加载到内存里。显然如果这一页切换的非常频繁,那么会浪费不少时间。

锁页(pinned page)是操作系统常用的操作,就是为了使硬件外设直接访问CPU内存,从而避免过多的复制操作。被锁定的页面会被操作系统标记为不可被换出的,所以设备驱动程序给这些外设编程时,可以使用页面的物理地址直接访问内存,CPU也可以访问上述锁页内存,但是此内存是不能移动或换页到磁盘上的。另外,在GPU上分配的内存默认都是锁页内存,这只是因为GPU不支持将内存交换到磁盘上。

Host(例如CPU)的数据分配默认是**pageable(可分页的)**,但是GPU是没法直接读取pageable内存里的数据的,所以需要先创建一个临时的缓冲区(pinned memory),把数据从pageable内存拷贝pinned内存上,然后GPU才能从pinned内存上读取数据,如上图(左)所示。

9、number_works降低参数值

从磁盘加载数据到 host 的page-locked内存. 采用多个 worker 进程并行地数据加载 ,会增加内存占用,因此为了降低内存占用,可以考虑number_work从低到高设置:2、4、8、16,知道训练速度达到最优。

每个进程的num_workers: 每个分布式进程(即每个GPU)都有自己的数据加载子进程。这意味着总的num_workers会是每个GPU上num_workers的值乘以GPU的数量(分布式进程数)。

例如,如果有4个GPU,并且每个GPU的num_workers设置为4,那么总的工作进程数将是4 * 4 = 16。

避免数据重叠: 在分布式训练中,需要确保每个进程处理的数据子集是不同的。使用DistributedSampler可以确保数据在各个进程间均匀分配,从而避免数据重复和丢失。

同步和通信开销: 增加num_workers的数量可能会增加进程间的同步和通信开销,特别是在多GPU的情况下。需要根据具体的硬件配置和数据集大小来调整num_workers

内存和CPU资源: 每增加一个工作进程,都会消耗额外的CPU资源和内存。确保你的系统有足够的资源来支持设置的num_workers值。

在给Dataloader设置worker数量(num_worker)时,到底设置多少合适?这个worker到底怎么工作的?

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)

参数详解:

每次dataloader加载数据时:dataloader一次性创建num_worker个worker,(也可以说dataloader一次性创建num_worker个工作进程,worker也是普通的工作进程),并用batch_sampler将指定第几个batch分配给指定worker,worker将它负责的batch加载进RAM。

然后,dataloader从RAM中找本轮迭代要用的batch,如果找到了,就使用。如果没找到,就要num_worker个worker继续加载batch到内存,直到dataloader在RAM中找到目标batch。一般情况下都是能找到的,因为batch_sampler指定batch时当然优先指定本轮要用的batch。

num_worker设置得大,好处是寻batch速度快,因为下一轮迭代的batch很可能在上一轮/上上一轮…迭代时已经加载好了。坏处是内存开销大,也加重了CPU负担(worker加载数据到RAM的进程是CPU复制的嘛)。num_workers的经验设置值是自己电脑/服务器的CPU核心数,如果CPU很强、RAM也很充足,就可以设置得更大些。

如果num_worker设为0,意味着每一轮迭代时,dataloader不再有自主加载数据到RAM这一步骤(因为没有worker了),而是在RAM中找batch,找不到时再加载相应的batch。缺点当然是速度更慢。

  1. 根据硬件配置调整: 在多核 CPU 环境下,设置较高的 num_workers(如 4 到 16)可以有效利用多核资源,提高数据加载速度。具体的最佳值需要根据系统的 CPU 核心数和内存情况来调整。
  2. 数据加载瓶颈: 如果你发现训练时 GPU 经常处于等待数据的状态,这可能是因为数据加载成为了瓶颈。增加 num_workers 可以帮助缓解这一问题。
  3. 系统负载: 在某些情况下,设置过高的 num_workers 可能会导致系统负载过高,影响其他任务或整体系统性能。因此需要找到一个平衡点。
  4. 实验调整: 实际应用中,最好的做法是从较小的值开始(如 2 或 4),然后逐步增加,观察训练过程中的数据加载速度和系统资源使用情况,从而确定最佳设置。

DistributedDataParallel 消除了 DataParallel 中上述不足. 其不再需要主 GPU,每个 GPU 分别进行各自任务. 每个 GPU 上的训练是其独立进程,而在 DataParallel 中是采用多线程(multi-thread) 的.

DistributedDataParallel 的工作过程如,

[1] – 从磁盘加载数据到 host 的page-locked内存. 采用多个 worker 进程并行地数据加载;其中,distributed data sampler 确保了加载的数据在跨进程间是不重叠的.

[2] – 将 mini-batch 数据由 page-locked 内存转移到 GPU. 不需要任何数据广播. 因为每个 GPU 分别有模型副本,因此也不需要模型广播.

[3] – 分别在各 GPU 独立进行前向计算和损失函数计算. 因此,也不需要收集各 GPUs 的输出.

[4] – 后向梯度计算,梯度是跨GPUs all-reduced的. 确保在后向传播结束时,每个 GPU 最终得到相同的平均梯度的副本.

[5] – 更新模型参数. 由于每个 GPU 是由相同的模型副本开始的,且梯度是 all-reduced 的,因此所有 GPUs 上的权重更新是相同的,无需再进行模型同步.

以上即完成了一次迭代. 这种设计确保了模型参数的更新是相同的,因此消除了每次开始时的模型同步.

B 、NCCL通信超时问题

[PG 1 Rank 9] Timeout at NCCL work: 957, last enqueued NCCL work: 957, last completed NCCL work: 956.
[rank9]:[E ProcessGroupNCCL.cpp:577] [Rank 9] Some NCCL operations have failed or timed out. Due to the asynchronous nature of CUDA kernels, subsequent GPU operations might run on corrupted/incomplete data.
[rank9]:[E ProcessGroupNCCL.cpp:583] [Rank 9] To avoid data inconsistency, we are taking the entire process down.

这种报错需要具体情况具体分析

1、尝试增加NCCL 超时时间/设置过NCCL变量

如何设置:

1、查看变量:查看环境变量 NCCL_IB_TIMEOUT 的值

echo $NCCL_IB_TIMEOUT # 如果环境变量已设置,这个命令将显示其值;如果没有设置,则不会有任何输出。

printenv 命令可以显示所有环境变量的值,也可以查看特定的环境变量:

printenv NCCL_IB_TIMEOUT #如果环境变量未设置,该命令不会输出任何内容。

也可以使用 env 命令来列出所有环境变量,并查找 NCCL_IB_TIMEOUT

env | grep NCCL_IB_TIMEOUT

NCCL相关环境变量说明 【https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage.html】

  1. NCCL_TIMEOUT:设置集合操作超时阈值,单位毫秒;如果常见超时错误,适当增大该值,但不能太大NCCL_TIMEOUT 环境变量用于设置 NCCL 集体通信操作的超时时间。通过调整这个值,你可以更好地处理网络延迟和不稳定的问题,确保 NCCL 通信的稳定性和可靠性。如果在集体通信过程中遇到超时问题,可以尝试调整此环境变量以解决问题。

设置超时时间:

  • NCCL_TIMEOUT 用于定义 NCCL 集体通信操作的超时时间。超时时间是 NCCL 在执行操作时等待响应的最长时间,超出此时间将触发超时错误。

解决网络问题:

  • 在高性能计算和大规模分布式训练中,网络延迟或不稳定可能导致集体通信操作超时。设置合适的 NCCL_TIMEOUT 可以帮助调节容错设置,避免训练过程中因超时错误而中断。

性能调优:

  • 根据你的集群配置和网络状况,适当调整 NCCL_TIMEOUT 可以帮助优化通信性能和稳定性。
  1. NCCL_ALGO:选择集合通信算法,如Ring, Tree;不同拓扑适合不同算法,测试选更优算法
  2. NCCL_CHUNK_SIZE:定义环形传输缓冲区大小;合理设置可提速,但也会增加内存消耗
  3. NCCL_DEBUG:打开NCCL调试日志;出现问题时打开调试,但会降低速度,不要在生产环境使用
  4. NCCL_DEBUG_FILE设置一个文件地址,变量用于将NCCL的调试日志输出到文件中。有助于调试nccl。
  5. NCCL_P2P_LEVEL:设置点对点通信优化级别;增加该值可减少P2P次数,提高某些操作效率
  6. NCCL_P2P_DISABLE:禁用点对点通信,强制使用集合通信。在某些情况下,P2P 通信可能会导致性能问题或出现错误。禁用 P2P 通信可以帮助解决这些问题。如果你遇到与 P2P 通信相关的错误或不稳定性,禁用 P2P 可能有助于恢复系统的稳定性。
  7. NCCL_PXN_DISABLE:禁用使用非本地 NIC 的节点间通信,使用 NVLink 和一个中间 GPU。建议设置成1。在PyTorch中进行跨节点all-to-all通信时,如果该环境变量是0会出现异常。
  8. NCCL_SOCKET_IFNAME:选择网络接口。
  9. NCCL_SOCKET_NTHREADS 增加它的数量可以提高socker传输的效率,但是会增加CPU的负担
  10. NCCL_NET_GDR_LEVEL:设置GPUDirect RDMA的使用级别。
  11. NCCL_MAX_NRINGS:定义支持的最大NCCL环路数。
  12. NCCL_MIN_NRINGS:定义最小环路数。
  13. NCCL_BUFFSIZE:设置scratch空间大小。
  14. NCCL_BUFFLE_SIZE 缓存数据量,缓存越大一次ring传输的数据就越大自然对带宽的压力最大,但是相应的总延迟次数会少。默认值是4M(4194304),注意设置的时候使用bytes(字节大小)
  15. NCCL_NTHREADS:设置NCCL内部使用的线程数。
  16. NCCL_VERSION:显示NCCL版本信息。
  17. NCCL_MAX/MIN_NCHANNELS 最小和最大的rings,rings越多对GPU的显存、带宽的压力都越大,也会影响计算性能
  18. NCCL_CHECKS_DISABLE 在每次集合通信进行前对参数检验校对,这会增加延迟时间,在生产环境中可以设为1.默认是0
  19. NCCL_CHECK_POINTERS 在每次集合通信进行前对CUDA内存 指针进行校验,这会增加延迟时间,在生产环境中可以设为1.默认是0
  20. NCCL_NET_GDR_LEVEL GDR触发的条件,默认是当GPU和NIC挂载一个swith上面时使用GDR
  21. NCCL_IGNORE_CPU_AFFINITY 忽略CPU与应用的亲和性使用GPU与nic的亲和性为主
  22. NCCL_IB_DISABLE:禁用InfiniBand传输。

禁用 InfiniBand: 设置 NCCL_IB_DISABLE=1 会禁用 NCCL 在 InfiniBand 设备上的使用。这意味着 NCCL 将不会利用 InfiniBand 网络进行数据传输,而是回退到其他网络接口(例如以太网或其他网络接口)。

调试和兼容性: 禁用 InfiniBand 可能用于调试目的,或在系统中 InfiniBand 网络出现问题时回退到其他网络接口。如果你遇到与 InfiniBand 相关的错误或兼容性问题,禁用 InfiniBand 可能有助于解决这些问题。

  1. NCCL_IB_HCA 代表IB使用的设备:Mellanox mlx5系列的HCA设备NCCL_IB_HCA=mlx5 会默认轮询所有的设备。NCCL_IB_HCA=mlx5_0:1 指定其中一台设备。
  2. NCCL_IB_TIMEOUT 改变量用于控制InfiniBand Verbs超时。取值范围1-22。超时时间的计算公式为4.096微秒 * 2 ^ timeout,正确的值取决于网络的大小。增加该值可以在非常大的网络上提供帮助,例如 NCCL在调用ibv_poll_cq时出现错误12时。建议在大模型训练任务中设置成最大值22,可以减少不少nccl timeout异常。设置超时时间: NCCL_IB_TIMEOUT 用于控制 InfiniBand 网络操作的超时时间。通过调整这个值,你可以控制 NCCL 在遇到通信延迟或网络问题时的容忍度。解决网络问题: 在高性能计算和大规模分布式训练中,网络延迟或不稳定可能导致超时错误。调整 NCCL_IB_TIMEOUT 可以帮助你在遇到网络问题时更好地调节超时设置,避免训练过程被中断。
  1. NCCL_IB_RETRY_CNT变量控制 InfiniBand 的重试次数。建议在大模型训练任务中设置成13,尽可能多重试。
  2. NCCL_DEBUG_FILE设置一个文件地址,变量用于将NCCL的调试日志输出到文件中。有助于调试nccl。
  3. NCCL_IB_PCI_RELAXED_ORDERING启用 IB Verbs 传输的Relaxed Ordering。Relaxed Ordering可以极大地提高虚拟化环境下 InfiniBand 网络的性能。设置为 2,如果可用,自动使用Relaxed Ordering。设置为 1,强制使用Relaxed Ordering,如果不可用则失败。设置为 0,禁用使用Relaxed Ordering。默认值为 2。建议值为1

2、增加 dist.init_process_group 超时时间,还要对应修改NCCL变量: export TORCH_NCCL_BLOCKING_WAIT !!

dist.init_process_group(backend=kwargs.get(“backend”, “nccl”), init_method=”env://”,timeout=timedelta(seconds=7200000)) # 7200s 等待2h


export TORCH_NCCL_BLOCKING_WAIT=1  # 是否堵塞等待某节点错误超时 “0” 不堵塞等待  “1” 堵塞等待
echo $TORCH_NCCL_BLOCKING_WAIT
printenv TORCH_NCCL_BLOCKING_WAIT  # 新版本torch

export TORCH_NCCL_ASYNC_ERROR_HANDLING=1 # 是否堵塞等待某节点错误超时 “0” 不堵塞等待  “1” 堵塞等待
echo $TORCH_NCCL_ASYNC_ERROR_HANDLING
printenv TORCH_NCCL_ASYNC_ERROR_HANDLING # 新版本torch

export NCCL_BLOCKING_WAIT=1
echo $NCCL_BLOCKING_WAIT
printenv NCCL_BLOCKING_WAIT      #旧版本torch

export NCCL_ASYNC_ERROR_HANDLING=1
echo $NCCL_ASYNC_ERROR_HANDLING
printenv NCCL_ASYNC_ERROR_HANDLING   #旧版本torch

在使用 torch.distributed.init_process_group 初始化分布式训练时,timeout 参数用于指定集群中进程之间进行集体通信操作时的超时时间。这个超时时间决定了分布式进程在等待其他进程响应时的最长时间。

torch.distributed.init_process_group(backend=Noneinit_method=Nonetimeout=Noneworld_size=-1rank=-1store=Nonegroup_name=”pg_options=Nonedevice_id=None)

说明文档:https://pytorch.org/docs/stable/distributed.html

新版本torch
旧版本torch

超时设置:

  • timeout 参数用于设置分布式通信操作的超时时间。超时时间是 timedelta 对象,表示在等待其他进程响应时的最长时间。
  • 在你提供的示例中,timeout 被设置为 timedelta(seconds=108000),即 30 小时。这意味着分布式通信操作将在 30 小时内等待其他进程响应。

用途:

  • 容错性: 提高容错性,确保在长时间等待期间不会因为网络延迟或通信问题导致进程失败。
  • 调试: 在调试和测试中,设置较长的超时时间可以帮助识别是否因为超时设置过短而导致的通信问题。
  • 防止死锁: 在复杂的分布式训练任务中,长时间的超时时间有助于防止因通信死锁而导致的进程失败。

超时处理:

  • 如果在指定的超时时间内没有收到预期的响应,init_process_group 将会引发超时错误。这通常表示进程之间的通信出现了问题,可能需要检查网络连接、进程配置或其他潜在问题。

TORCH_NCCL_BLOCKING_WAIT 是一个环境变量,用于控制 PyTorch 在使用 NCCL 后端时的通信等待策略。具体来说,它决定了 NCCL 操作是否使用阻塞等待方式来处理通信操作。

TORCH_NCCL_BLOCKING_WAIT 的作用

  • TORCH_NCCL_BLOCKING_WAIT=1:
    • 启用阻塞等待: 当设置为 1 时,PyTorch 在执行 NCCL 操作(如 all-reducebroadcast)时,会使用阻塞等待的方式。这意味着 PyTorch 会等待操作完全完成或超时之后才继续执行。这种设置可以帮助确保所有进程在继续之前都完成了通信,有助于解决因异步操作引起的数据同步问题或错误。
  • TORCH_NCCL_BLOCKING_WAIT=0:
    • 禁用阻塞等待: 默认情况下(即设置为 0),PyTorch 使用非阻塞等待方式。NCCL 操作在后台异步进行,可能会导致在操作完成之前程序继续执行。这种方式可能会在网络延迟或系统负载较高时引发通信超时或数据不一致的问题。

如何设置 TORCH_NCCL_BLOCKING_WAIT

你可以通过以下方式设置 TORCH_NCCL_BLOCKING_WAIT 环境变量:

  1. 临时设置: 在运行程序时,可以在命令行中临时设置环境变量:bash复制代码TORCH_NCCL_BLOCKING_WAIT=1 python your_training_script.py
  2. 永久设置: 在终端会话中,可以通过 export 命令永久设置:bash复制代码export TORCH_NCCL_BLOCKING_WAIT=1 这个设置会在当前终端会话中生效,直到会话结束或重新启动。
  3. 在脚本中设置: 如果你希望在 Python 脚本内部设置这个变量,可以在脚本的开头添加:python复制代码import os os.environ['TORCH_NCCL_BLOCKING_WAIT'] = '1'

使用场景

  • 调试和稳定性:
    • 启用阻塞等待有助于调试和解决 NCCL 操作中的同步问题。它确保所有通信操作完成后才继续执行,有助于提高系统的稳定性。
  • 网络不稳定和负载高:
    • 在网络延迟较高或系统负载较大的环境中,启用阻塞等待可以减少由于异步操作导致的超时和错误。

注意事项

  • 性能影响:
    • 阻塞等待可能会增加通信操作的等待时间,影响整体训练性能,特别是在大规模分布式训练任务中。
  • 超时问题:
    • 如果超时时间设置过短或网络状况较差,启用阻塞等待可能导致更多的超时错误。因此,需要平衡稳定性和性能。

总结

TORCH_NCCL_BLOCKING_WAIT 环境变量控制 PyTorch 使用 NCCL 后端时的通信等待策略。设置为 1 可以启用阻塞等待,有助于提高系统稳定性和调试能力,但可能会影响性能。根据具体的训练任务和环境,可以选择合适的设置来优化训练过程。

相关环境变量解释:

https://pytorch.org/docs/stable/torch_nccl_environment_variables.html

3、增加 num_workers 来加快处理数据【Dataloader阶段导致 NCCL超时】

如果是在数据加载的时间过长,导致NCCL通信超时,考虑增加num_workers来提高数据加载速度。

减少数据加载瓶颈:

  • 增加 num_workers 可以提高数据加载速度,减少训练过程中因数据加载而导致的等待时间。这可以间接减少由于数据处理缓慢而可能引发的 NCCL 超时问题。

提高训练效率:

  • 更高效的数据加载可以提高整体训练效率,使训练过程更加顺畅,从而可能减少由于系统负载不均导致的通信超时问题。

4、 DistributedSampler 采样阶段导致 NCCL超时:

如果分布式训练中 NCCL 超时问题发生在采样阶段(特别是在使用 DistributedSampler 或自定义的采样器时),可能表明存在某些潜在的问题,这些问题可能导致训练进程之间的同步或数据传输效率低下。以下是一些可能的原因和解决方法:

可能的原因

  1. 数据加载和采样速度问题:
    • 如果采样器的性能不佳,可能会导致数据加载速度变慢,从而影响训练过程。虽然这不会直接导致 NCCL 超时,但它会间接影响整体训练性能。
  2. 进程同步问题:
    • 在使用 DistributedSampler 时,所有进程需要同步以确保数据的一致性。如果采样器在某些进程中出现延迟或阻塞,可能会导致通信超时。
  3. 数据分布不均:
    • 如果数据分布不均,某些进程可能会比其他进程处理更多的数据,从而导致通信延迟和超时问题。
  4. 数据预处理复杂:数据预处理太复杂,会导致数据加载过慢,也有可能导致超时

解决方法:

  1. 优化采样器和数据加载:
    • 确保自定义采样器或 DistributedSampler 以高效的方式进行数据采样和分配。优化数据加载速度,确保每个进程在采样时不会长时间等待。
    • 使用 num_workers 设置合理的数量,以加快数据加载速度,但要注意 CPU 内存和系统负载。
  2. 调整超时时间:
    • 增加 NCCL_TIMEOUT 环境变量值或 dist.init_process_group 中的 timeout 参数,以允许更长的等待时间。

4、基于HugingFace的Trainer多级多卡训练LLM导致NCCL超时

  1. 启动命令前增加了OMP_NUM_THREADS=1 MKL_NUM_THREADS=1,避免多线程导致死锁;
  2. 去掉了加载数据时的tqdm;
  3. 记在数据的DataLoader的drop_last设置为True,pin_memory设置为True,num_workers设置为0;
  4. 设置训练批大小为auto/设置小一点

查阅了一些资料

  1. pytorch 多机多卡卡住问题汇总
  2. Script freezes with no output when using DistributedDataParallel
  3. PyTorch 训练时中遇到的卡住停住等问题
  4. PyTorch训练时,Dataloader卡死、挂起,跑一个epoch停了,问题解决方案
  5. 运行开始训练,卡住半小时,一直不动
  6. 关于炼丹,你是否知道这些细节?
  7. ultralytics/yolov5#7481
  8. https://www.zhihu.com/question/512132168
  9. https://discuss.pytorch.org/t/nccl-timed-out-when-using-the-torch-distributed-run/153276
  10. https://stackoverflow.com/questions/69693950/error-some-nccl-operations-have-failed-or-timed-out

深度学习常用linux命令

1、查看某个端口占用的进程号: lsof –i:端口号

2、查看当前 python 进程

ps命令是”process status”的缩写,它用于查看当前系统中正在运行的进程的状态和信息。

ps -ef | grep python或者ps aux | grep python | grep 用户名

查看用户 wuliyttaotao 的所用进程: ps aux|grep wuliyttaotao

实时查看所有进程:top 或者(同时能查看各个 CPU 利用率)htop

杀死某个进程:kill -9 进程号

暂停某个进程:kill -STOP 进程号

恢复进程执行:kill -CONT 进程号

3、杀死所有python 进程:

ps列出ttlsa的pid,然后依次kill掉,比较繁琐.【python 可以替换成 /mnt/conda/python 等等,过滤出想要kill的进程】

ps -ef | grep python | grep -v grep | awk ‘{print $2}’ | xargs kill -9

注意:不要用 ps -ef | grep bash名字 来杀死进程,这样是杀不掉进程的!!要用bash里面执行的python文件名称来杀进程!!!

如果不能杀死进程:

如果使用的是tmux:直接杀掉终端:tmux kill-session -t 0

或者使用htop 打印所有pid,然后kill掉

4、查看 GPU 利用率nvidia-smi

或者,使用 pip 安装 gpustat,之后使用 gpustat 查看 gpu 利用状态:(-cpu 分别是三个状态指示)gpustat -cpu

结合 watch 指令持续查看 GPU 状态:

watch -n 0.1 nvidia-smi

watch -n 0.1 –color gpustat -cpu –color

5、Linux CPU 利用率/CPU核数

在执行 top 指令之后,按数字 1 号键可以看到每个 CPU 的详细情况:top

htop 指令比 top 指令更加直观:htop

查看CPU情况:

1、cpu个数

grep ‘physical id’ /proc/cpuinfo | sort -u

2、核心数【当数据集较大时建议采用,num_works一般设置为(CPU核心数+-1)为最佳】

grep ‘core id’ /proc/cpuinfo | sort -u | wc -l

3、线程数

grep ‘processor’ /proc/cpuinfo | sort -u | wc -l

6、动态查看新增的log.txt日志文件:

tail -f log.txt  会把 filename 文件里的最尾部的内容显示在屏幕上,并且不断刷新,只要 filename 更新就可以看到最新的文件内容。

tail –f log.txt 追踪最新的内容

tail –n 5 log.txt 查看最后五行内容

tail 命令可用于查看文件的内容

-f 循环读取

-f 循环读取

-q 不显示处理信息

-v 显示详细的处理信息

-c<数目> 显示的字节数

-n<行数> 显示文件的尾部 n 行内容

–pid=PID -f合用,表示在进程ID,PID死掉之后结束

-q, –quiet, –silent 从不输出给出文件名的首部

-s, –sleep-interval=S 与-f合用,表示在每次反复的间隔休眠S秒

6.查看文件有多少行:

wc –l 文件名

  • 7 查看目录下文件数量

ls -l | grep “^-” | wc -l

pytorch分布式 训练参数设置

# 自己的数据获取
dataset = MyDataset(input_size, data_size)
 
# 使用 DistributedSampler
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset)
 
trainloader = DataLoader(dataset=dataset,
                         pin_memory=true,
                         shuffle=(train_sampler is None),   # 使用分布式训练 shuffle 应该设置为 False
                         batch_size=args.batch_size,
                         num_workers=args.workers,
                         sampler=train_sampler)

需要注意的几个参数:batch_size、num_workers、shuffle、pin_memory在进行多机多卡以及单机多卡的设置。

1、 Batch_size设置:

Dataparallel : 设置 batch_size 是指总多卡的Batch size,数据被直接划分到多个 GPU 上

DistributedDataParallel batch size 设置成单卡一样即可,因为各个GPU对应的进程独立从磁盘中加载数据这里的 Batch_size指的是单卡的。

2、shuffle设置:

shuffle:

Dataparallel  :设置 ‘shuffle’: True

DistributedDataParallel  :为了能够按顺序划分数据子集,拿到不同部分数据,所以数据集不能够进行随机打散,所以用了参数 ‘shuffle’: False

3、 pin_memory 设置:

是否提前申请CUDA内存(默认为False,但有说法除非数据集很小,否则在N卡上推荐总是打开)。

如果开了pin memory:
每个worker都需要缓存一个batch的数据.
batch size和num_workers都大, 显存会炸

为什么 设置 pip_memory=true, 看解释:
多GPU训练的时候注意机器的内存是否足够(一般内存为显卡显存x2),如果不够,建议关闭pin_memory(锁页内存)选项。
采用DistributedDataParallel多GPUs训练的方式比DataParallel更快一些,如果你的Pytorch编译时有nccl的支持,那么最好使用DistributedDataParallel方式。
关于什么是锁页内存:
pin_memory就是锁页内存,创建DataLoader时,设置pin_memory=True,则意味着生成的Tensor数据最开始是属于内存中的锁页内存,这样将内存的Tensor转义到GPU的显存就会更快一些。
主机中的内存,有两种存在方式,一是锁页,二是不锁页,锁页内存存放的内容在任何情况下都不会与主机的虚拟内存进行交换(注:虚拟内存就是硬盘),而不锁页内存在主机内存不足时,数据会存放在虚拟内存中。显卡中的显存全部是锁页内存,当计算机的内存充足的时候,可以设置pin_memory=True。当系统卡住,或者交换内存使用过多的时候,设置pin_memory=False。因为pin_memory与电脑硬件性能有关,pytorch开发者不能确保每一个炼丹玩家都有高端设备,因此pin_memory默认为False。

当计算机的内存充足的时候,可以设置pin_memory=True。当系统卡住,或者交换内存使用过多的时候,设置pin_memory=False。pin_memory默认为False。

4、 num_workers 设置:num_worker的设置值一般是所运行机子上的CPU核心数

可以设置set num_workers =4 x number of available GPUs 

um_worker大: 下一轮迭代的batch可能在上一轮/上上一轮…迭代时已经加载好了。 坏处是GPU memory开销大 (这是开了pin memory的情况吧) ,也加重了CPU负担。

CPU的物理个数:grep ‘physical id’ /proc/cpuinfo | sort | uniq | wc -l 结果为2,说明CPU有两个。 每个CPU的核数:cat /proc/cpuinfo |grep “cores”|uniq 10,说明每个10核。 cpu核数 = 2×10

1、cpu个数

grep ‘physical id’ /proc/cpuinfo | sort -u

2、核心数【当数据集较大时建议采用,num_works一般设置为(CPU 核心数 +-1)为最佳】

grep ‘core id’ /proc/cpuinfo | sort -u | wc -l

3、线程数

grep ‘processor’ /proc/cpuinfo | sort -u | wc -l

一般建议 num_workers 的值接近 CPU 核心数,但不要超过,以免导致过多的上下文切换。

如果数据集较大且预处理复杂,较高的 num_workers 值可能会更有效。反之,如果数据集较小或者预处理简单,则可能不需要太多的工作线程。

Num workers:只要你的 GPU 计算占用没有用满,说明 GPU 要等数据准备。可以试着增加进程数目,同时观察是否是硬盘 IO 瓶颈,如果是多机训练,还要注意网络瓶颈。不过,最大也不能超过核心数,一般还要减一点,因为主进程,多卡多进程训练,都会占用核心。

num_worker通过影响数据加载速度,从而影响训练速度。 每轮dataloader加载数据时:dataloader一次性创建num_worker个worker,worker就是普通的工作进程。并用batch_sampler将指定batch分配给指定的worker,worker将它负责的batch加载进RAM。然后,dataloader从RAM中找本轮迭代要用的batch,如果找到了,就使用;如果没找到,就用num_worker个worker继续加载batch到RAM,直到dataloader在RAM中找到目标batch。

pytorch单机多卡训练【分布式数据并行 和 数据并行方案】

https://github.com/KaiiZhang/DDP-Tutorial/blob/main/DDP-Tutorial.md

数据并行和分布式数据并行方案:

第一: 数据并行 , 开一个进程(process),该进程下每个线程(threading)负责一部分数据,分别跑在不同卡上,前向传播,devices各玩各的,计算loss时候需要所有devices的输出输送到主GPU【默认device0】上计算梯度均值,并更新device0上的参数,然后将参数广播到其他device上。总结:单机-多线程,通过torch.nn.DataParallel 实现。
第二: 分布式数据并行,开多个进程,一个进程运行在一张卡上,每个进程负责一部分数据。在各进程梯度计算完成之后,各进程需要将梯度进行汇总平均,然后再由 rank=0 的进程,将其 broadcast 到所有进程。各进程用该梯度来更新参数。由于各进程中的模型,初始参数一致 (初始时刻进行一次 broadcast),而每次用于更新参数的梯度也一致,因此,各进程的模型参数始终保持一致。

总结:单机/多机-多进程,通过torch.nn.parallel.DistributedDataParallel 实现。

毫无疑问,第一种简单,第二种复杂,毕竟 进程间 通信比较复杂。

torch.nn.DataParallel 和 torch.nn.parallel.DistributedDataParallel,下面简称为DPDDP

总结: 两个函数主要用于在多张显卡上训练模型,也就是所谓的分布式训练

数据并行 torch.nn.DataParallel  :

原理:

  • 网络前向传播前,输入数据被分成几份送到不同显卡上,网络模型每个显卡上拷贝一份。
  • 前向传播时,devices各玩各的。
  • 前向传播完成后,每张显卡上的网络输出会送到主device上(默认第一张卡),在主device上计算loss。然后,loss送给每个device,每个device计算得到梯度,再把梯度送到主device上,主device对汇总得到的梯度求均值后,更新主device上的网络参数。最后,将更新后的网络权重广播(broadcast)到其它device上,实现所有device网络权重同步。
  • torch.nn.DataParallel是把每张卡的输出聚合到GPU0上,然后在GPU0上与label计算loss,根据计算图反向传播,让每张卡上获得自己的梯度。优化器则对梯度进行聚合,在主GPU更新模型参数,再把新的参数分发到每个GPU。

从上面介绍可知,DataParallel 对主device依赖较高,会造成负载不均衡,限制模型训练速度。

DP使用教程:

主程序DP_main.py中,下面这行代码实现数据并行化分布式训练。

相比单卡单机代码:只需要修改以下代码:

model_train = torch.nn.DataParallel(model)	

通过终端运行命令,

CUDA_VISIBLE_DEVICES=0,1 python3 DP_main.py

DP_main.py代码:

import torch
import torchvision
import torch.nn as nn
import torch.backends.cudnn as cudnn
import torchvision.transforms as transforms
from net import ToyModel
import torch.optim as optim


#---------------------------#
#   获得学习率
#---------------------------#
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

#---------------------------#
#   获得数据集
#---------------------------#
def get_dataset():
    transform_train = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    CIFAR10_trainset = torchvision.datasets.CIFAR10(root='./data', train=True, 
        download=True, transform=transform_train)
    
    # ----------------------------------------------------------#
    #   num_workers:加载数据集使用的线程数
    #   pin_memory=True:锁页内存, 可以加速数据读取. (可能会导致Bug)
    # ----------------------------------------------------------#
    trainloader = torch.utils.data.DataLoader(CIFAR10_trainset, 
        batch_size=16, num_workers=2, pin_memory=True)
    return trainloader

#---------------------------#
#   训练
#---------------------------#
def train(model, device, trainloader, optimizer, loss_func, print_frequence, epoch):
    train_loss = 0
    correct = 0
    total = 0
    for batch_idx, (inputs, targets) in enumerate(trainloader):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_func(outputs, targets)
        loss.backward()
        optimizer.step()

        # loss.item()把其中的梯度信息去掉,没.item()可能会导致程序所占内存一直增长,然后被计算机killed
        train_loss += loss.item()       
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        if batch_idx % print_frequence == print_frequence - 1 or print_frequence == trainloader.__len__() - 1:
            print('epoch: %d | Loss: %.3f | Acc: %.3f%% (%d/%d)' % (
                epoch, train_loss / (batch_idx + 1), 100. * correct / total, correct, total))
    torch.save(model.state_dict(), "%d.ckpt" % epoch)	
    # torch.save(model.module.state_dict(), "%d.ckpt" % epoch)	用双卡训练保存权重,重新加载时,也需要这样保存,否则,权重前面会多module
    
    # -------------------------------------#
    #   只是想看看lr有没有衰减
    # -------------------------------------#
    lr = get_lr(optimizer)
    print("lr:", lr)
    lr_scheduler.step()


if __name__ == '__main__':
    trainloader = get_dataset()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = ToyModel()
    print(model)

    model_train = model.train()
    if torch.cuda.is_available():   
        model_train = torch.nn.DataParallel(model)  # 单GPU跑套DP的话,指标可能会降
        cudnn.benchmark = True
        model_train = model_train.cuda()            # 等效于model_train = model_train.to(device)

    loss_func = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model_train.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
    # -------------------------------------#
    #   step_size控制多少个epoch衰减一次学习率
    # -------------------------------------#
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1)   
    
    print_frequence = 500
    epochs = 100
    for epoch in range(0, epochs):
        train(model_train, device, trainloader, optimizer, loss_func, print_frequence, epoch)

分布式并行DistributedDataParallel 

  • 更快的训练速度
  • 多进程的运行方式
  • 支持单机多卡和多机多卡
  • 平衡的GPU使用

DDP原理:

先说分布式几个名词:
一个world里进程个数为world_size,全局看,每个进程都有一个序号rank;分开看,一个进程在每台机器里面也有序号local_rank。

  • group:进程组,默认一个组,即一个world
  • world_size:全局进程个数
  • rank:进程序号,用于进程间通信。rank=0为GPU主卡,主要用于多机多卡。本文中仅涉及到一台机器内多张卡。
  • locak_rank:进程(一台机器)内的GPU编号,通过指令torch.distributed.run自动指定,不需要用户输入该参数。

DDP 在每次迭代中,操作系统会为每个GPU创建一个进程,每个进程具有自己的 optimizer ,并独立完成所有的优化步骤,进程内与一般的训练无异。在各进程梯度计算完成之后,各进程需要将梯度进行汇总平均,然后再由 rank=0 的进程,将其 broadcast 到所有进程。各进程用该梯度来更新参数。由于各进程中的模型,初始参数一致 (初始时刻进行一次 broadcast),而每次用于更新参数的梯度也一致,因此,各进程的模型参数始终保持一致。

而在 DataParallel 中,全程维护一个 optimizer,对各 GPU 上梯度进行求和,在主 GPU 进行参数更新,之后再将模型参数 broadcast 到其他 GPU。相较于 DP,DDP传输的数据量更少,速度更快,效率更高。

DDP的流程示意图如上图所示,DDP需要额外的建立进程组阶段(Construction)。在Construction阶段需要首先明确通信协议和总进程数。通信协议是实现DDP的底层基础,我们在之后单独介绍。总进程数就是指有多少个独立的并行进程,被称为worldsize。根据需求每个进程可以占用一个或多个GPU,但并不推荐多个进程共享一个GPU,这会造成潜在的性能损失。为了便于理解,在本文的所有示例中我们假定每个进程只占用1个GPU,占用多个GPU的情况只需要简单的调整GPU映射关系就好。

并行组建立之后,每个GPU上会独立的构建模型,然后GPU-1中模型的状态会被广播到其它所有进程中以保证所有模型都具有相同的初始状态。值得注意的是Construction只在训练开始前执行,在训练中只会不断迭代前向和后向过程,因此不会带来额外的延迟。

相比于DataParallel,DDP的前向后向过程更加简洁。推理、损失函数计算,梯度计算都是并行独立完成的。DDP实现并行训练的核心在于梯度同步。梯度在模型间的同步使用的是allreduce通信操作,每个GPU会得到完全相同的梯度。如图中后向过程的步骤2,GPU间的通信在梯度计算完成后被触发(hook函数)。图中没有画出的是,通常每个GPU也会建立独立的优化器。由于模型具有同样的初始状态和后续相同的梯度,因此每轮迭代后不同进程间的模型是完全相同的,这保证了DDP的数理一致性。

为了优化性能,DDP中针对allreduce操作进行了更深入的设计。梯度的计算过程和进程间的通信过程分别需要消耗一定量的时间。等待模型所有的参数都计算完梯度再进行通信显然不是最优的。如下图所示,DDP中的设计是通过将全部模型参数划分为无数个小的bucket,在bucket级别建立allreduce。当所有进程中bucket0的梯度计算完成后就立刻开始通信,此时bucket1中梯度还在计算。这样可以实现计算和通信过程的时间重叠。这种设计能够使得DDP的训练更高效。

在最后我们对DDP的通信部分进行介绍。DDP后端的通信由多种CPP编写的协议支持,不同协议具有不同的通信算子的支持,在开发中可以根据需求选择。

对于CV和NLP常用GPU训练的任务而言,选择Gloo或NCCL协议即可。一个决定因素是你使用的计算机集群的网络环境:

  • 当使用的是Ethernet(以太网,大部分机器都是这个环境):那么优先选择NCCL,具有更好的性能;如果在使用中遇到了NCCL通信的问题,那么就选择Gloo作为备用。(经验:单机多卡直接NCCL;多机多卡先尝试NCCL,如果通信有问题,而且自己解决不了,那就Gloo。
  • 当使用的是InfiniBand:只支持NCCL。

另一个决定性因素是二者支持的算子范围不同,因此在使用时还需要结合代码里的功能来确定。下图记录了每种通信协议能够支持的算子,Gloo能够实现GPU中最基本的DDP训练,而NCCL能够支持更加多样的算子.

不同Backend的算子支持情况

DDP使用:

  • 设备间通信
    为了保证不同卡上的模型参数同步,设备间需要通讯。
    设备间通讯通过后端backend实现,GPU上用nccl,CPU上用gloo
torch.distributed.init_process_group('nccl')
  • 指定GPU
    指定使用哪些GPU,作用相当于CUDA_VISIBLE_DEVICES命令。
torch.cuda.set_device(args.local_rank)   
  • 构造模型
    构造DDP model,[args.local_rank]是一个list
model = DistributedDataParallel(model, device_ids=[args.local_rank], 
   										output_device=args.local_rank)
  • 构建数据集
    构建数据集中需要用到train_sampler来shuffle数据,继而实现把trainset中的样本随机分配到不同的GPU上,
train_sampler = torch.utils.data.distributed.DistributedSampler(trainset)
# ---------------------------------------------------------------#
#   sampler参数和shuffle参数是互斥的,两个传一个就好,都用于数据打乱。
# ----------------------------------------------------------------#
trainloader = torch.utils.data.DataLoader(trainset, 
        batch_size=16, num_workers=2, sampler=train_sampler)
  • 数据放到多卡上
    模型、损失函数、输入数据要放到多卡上,代码例如:
data = data.to(args.local_rank)		# 等效于data.cuda(args.local_rank)

通过终端运行命令,

# CUDA_VISIBLE_DEVICES="gpu_0, gpu1,..." python -m torch.distributed.launch --nproc_per_node n_gpus DDP_main.py
CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nproc_per_node=2 DDP_main.py # 因为是单机多卡,所以只需要指定nproc_per_node【GPU数量】即可。local_rank不需要设置。
大概内容就是,这个命令行参数“–loacl_rank”是必须声明的,但它不是由用户填写的,而是由pytorch为用户填写,也就是说这个值是会被自动赋值为当前进程在本机上的rank

DDP_main.py中内容如下:

import argparse         # 从命令行接受参数
from tqdm import tqdm   # 用于进度条
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
from net import ToyModel
import torchvision.transforms as transforms
# ---------------------------#
#   下面两个包用于分布式训练
# ---------------------------#
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

# ---------------------------#
#   获得数据集
# ---------------------------#
def get_dataset():
    transform = torchvision.transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])
    trainset = torchvision.datasets.CIFAR10(root='./data', train=True, 
        download=True, transform=transform)
    # -----------------------------------------------#
    #   train_sampler主要用于DataLoader中shuffle数据
    #       把trainset中的样本随机分配到不同的GPU上
    # -----------------------------------------------#
    train_sampler = torch.utils.data.distributed.DistributedSampler(trainset)
    # ---------------------------------------------------------------#
    #   batch_size:每个进程(GPU/卡)下的batch_size。
    #       总batch_size = 这里的batch_size * 进程并行数
    #       全局进程个数world_size = 节点数量 * 每个节点上process数量
    #       总卡数                =  电脑数  * 每台电脑上有多少张卡
    #   sampler参数和shuffle参数是互斥的,两个传一个就好,都用于数据打乱。
    #   在DDP中,用sampler参数
    # ----------------------------------------------------------------#
    trainloader = torch.utils.data.DataLoader(trainset, 
        batch_size=16, num_workers=2, sampler=train_sampler)
    return trainloader

#---------------------------#
#   训练
#---------------------------#
def train(model, trainloader, optimizer, loss_func, lr_scheduler, epoch):
    model.train()
    iterator = tqdm(range(epoch))       # 为了进度条显示而已
    for epoch in iterator:
        # ------------------------------------------------------------------#
        #   设置sampler的epoch,DistributedSampler需要这个来指定shuffle方式,
        #   通过维持各个进程之间的相同随机数种子使不同进程能获得同样的shuffle效果。
        #   这一步是必须的,让数据充分打乱,训练效果更好
        # ------------------------------------------------------------------#
        trainloader.sampler.set_epoch(epoch)

        for data, label in trainloader:
            data, label = data.to(args.local_rank), label.to(args.local_rank)
            optimizer.zero_grad()
            prediction = model(data)
            loss = loss_func(prediction, label)
            loss.backward()
            iterator.desc = "loss = %0.3f" % loss
            optimizer.step()
        # ------------------------------------------------------------------#
        #   save模型的时候:保存的是model.module而不是model,
        #       因为model其实是DDP model,参数是被`model=DDP(model)`包起来的。
        #   只需要在进程0(local_rank=0)上保存一次就行了,避免多次重复保存。
        # ------------------------------------------------------------------#
        if dist.get_rank() == 0:        # 等效于 if local_rank == 0:
            torch.save(model.module.state_dict(), "%d.ckpt" % epoch)
        
        lr_scheduler.step()

# -----------------------------------------------#
# 初始化配置local_rank配置
# -----------------------------------------------#
parser = argparse.ArgumentParser()
# local_rank:当前这个节点上的第几张卡,从外部传入
#   该步骤必须有,launch会自动传入这个参数
parser.add_argument("--local_rank",help="local device id on current node", type=int)
args = parser.parse_args()
local_rank = args.local_rank        # 纯属想写代码时用local_rank还是args.local_rank都行
print('local_rank:', args.local_rank)
"""
local_rank: 0
local_rank: 1
"""


if __name__ == "__main__":
    # DDP 初始化
    torch.cuda.set_device(args.local_rank)   # 作用相当于CUDA_VISIBLE_DEVICES命令,修改环境变量
    dist.init_process_group(backend='nccl')  # 设备间通讯通过后端backend实现,GPU上用nccl,CPU上用gloo

    # 准备数据,要在DDP初始化之后进行
    trainloader = get_dataset()

    # 初始化model
    model = ToyModel().to(args.local_rank)    # 等效于model = ToyModel().cuda(args.local_rank)

    # Load模型参数要在构造DDP model之前,且只需要在 master卡 上加载即可
    ckpt_path = None
    if dist.get_rank() == 0 and ckpt_path is not None:
        model.load_state_dict(torch.load(ckpt_path))

    # 构造DDP model
    model = DDP(model, device_ids=[args.local_rank], output_device=args.local_rank)

    # 初始化optimizer,要在构造DDP model之后
    optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

    # 学习率衰减方式
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1)   

    # 初始化loss
    loss_func = nn.CrossEntropyLoss().to(args.local_rank)

    # 模型训练
    train(model, trainloader, optimizer, loss_func, lr_scheduler, epoch=100)
# ----------------------------------------------------------------------------------#
#   CUDA_VISIBLE_DEVICES:来决定使用哪些GPU,个数和后面n_gpus相同
#   torch.distributed.launch:启动DDP模式,构建多个进程,也会向代码中传入local_rank参数,
#       没有CUDA_VISIBLE_DEVICES限制的话,传入为从 0 到 n_gpus-1 的索引
#   --nproc_per_node=n_gpus:单机多卡,用几个gpu
# -----------------------------------------------------------------------------------#
# 用 2 张卡跑
CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nproc_per_node 2 DDP_main.py
# 用 3 张卡跑     
CUDA_VISIBLE_DEVICES="1,2,3" python -m torch.distributed.launch --nproc_per_node 3 DDP_main.py  

pytorch多机多卡训练【DistributedDataParallel】

https://github.com/KaiiZhang/DDP-Tutorial/blob/main/DDP-Tutorial.md#distributeddataparallel

原理

DDP的流程示意图如上图所示,DDP需要额外的建立进程组阶段(Construction)。在Construction阶段需要首先明确通信协议和总进程数。通信协议是实现DDP的底层基础,我们在之后单独介绍。总进程数就是指有多少个独立的并行进程,被称为worldsize。根据需求每个进程可以占用一个或多个GPU,但并不推荐多个进程共享一个GPU,这会造成潜在的性能损失。为了便于理解,在本文的所有示例中我们假定每个进程只占用1个GPU,占用多个GPU的情况只需要简单的调整GPU映射关系就好。

并行组建立之后,每个GPU上会独立的构建模型,然后GPU-1中模型的状态会被广播到其它所有进程中以保证所有模型都具有相同的初始状态。值得注意的是Construction只在训练开始前执行,在训练中只会不断迭代前向和后向过程,因此不会带来额外的延迟。

相比于DataParallel,DDP的前向后向过程更加简洁。推理、损失函数计算,梯度计算都是并行独立完成的。DDP实现并行训练的核心在于梯度同步。梯度在模型间的同步使用的是allreduce通信操作,每个GPU会得到完全相同的梯度。如图中后向过程的步骤2,GPU间的通信在梯度计算完成后被触发(hook函数)。图中没有画出的是,通常每个GPU也会建立独立的优化器。由于模型具有同样的初始状态和后续相同的梯度,因此每轮迭代后不同进程间的模型是完全相同的,这保证了DDP的数理一致性。

为了优化性能,DDP中针对allreduce操作进行了更深入的设计。梯度的计算过程和进程间的通信过程分别需要消耗一定量的时间。等待模型所有的参数都计算完梯度再进行通信显然不是最优的。如下图所示,DDP中的设计是通过将全部模型参数划分为无数个小的bucket,在bucket级别建立allreduce。当所有进程中bucket0的梯度计算完成后就立刻开始通信,此时bucket1中梯度还在计算。这样可以实现计算和通信过程的时间重叠。这种设计能够使得DDP的训练更高效。

在最后我们对DDP的通信部分进行介绍。DDP后端的通信由多种CPP编写的协议支持,不同协议具有不同的通信算子的支持,在开发中可以根据需求选择。

对于CV和NLP常用GPU训练的任务而言,选择Gloo或NCCL协议即可。一个决定因素是你使用的计算机集群的网络环境:

  • 当使用的是Ethernet(以太网,大部分机器都是这个环境):那么优先选择NCCL,具有更好的性能;如果在使用中遇到了NCCL通信的问题,那么就选择Gloo作为备用。(经验:单机多卡直接NCCL;多机多卡先尝试NCCL,如果通信有问题,而且自己解决不了,那就Gloo。)
  • 当使用的是InfiniBand:只支持NCCL。

另一个决定性因素是二者支持的算子范围不同,因此在使用时还需要结合代码里的功能来确定。下图记录了每种通信协议能够支持的算子,Gloo能够实现GPU中最基本的DDP训练,而NCCL能够支持更加多样的算子

综上,得益于DDP的分布式并行设计,DDP并不受PythonGIL争用的影响,是以多进程的方式运行的。这也使得DDP可以支持多机多卡的训练。我们将DDP的优缺点概括如下:

不同Backend的算子支持情况

优点

  • 更快的训练速度
  • 多进程的运行方式
  • 支持单机多卡和多机多卡
  • 平衡的GPU使用

缺点

  • 需要更多的代码书写和设计

代码实现和参数讲解:

本文首先会基于MNIST图像分类建立一个最小原型,然后逐步改进它以实现多机多卡的训练和混合精度的支持。在讲述的思路上本文借鉴了Kevin Kaichuang Yang的教程,但在实现细节上有较大的差异。特别的是本文增加了对DDP启动方式的探讨,并且介绍了多进程通信操作的使用样例。

名词解释:一个world里进程个数为world_size【对于2卡2GPU, world_size =4】,全局看,每个进程都有一个序号rank【0为主机GPU主卡】;分开看,一个进程在每台机器里面也有序号local_rank。

  • group:进程组,默认一个组,即一个world
  • world_size:全局进程个数【对于2卡2GPU, world_size =4】
  • rank:进程序号,用于进程间通信。rank=0为GPU主卡,主要用于多机多卡。本文中仅涉及到一台机器内多张卡。
  • locak_rank:进程内的GPU编号,通过指令torch.distributed.run自动指定,不需要认为设置。

非多进程示例

首先引入了所有用到的库。

from datetime import datetime
import argparse
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
from tqdm import tqdm

定义一个简单的卷积神经网络模型。

class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7*7*32, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out

定义主函数,添加一些启动脚本的可选参数。

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-g', '--gpuid', default=0, type=int,
                        help="which gpu to use")
    parser.add_argument('-e', '--epochs', default=2, type=int, 
                        metavar='N',
                        help='number of total epochs to run')
    parser.add_argument('-b', '--batch_size', default=4, type=int, 
                        metavar='N',
                        help='number of batchsize')         

    args = parser.parse_args()
    train(args.gpuid, args)

然后给出训练函数的详细内容。

def train(gpu, args):
    model = ConvNet()
    model.cuda(gpu)
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().to(gpu)
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)

    # Data loading code
    train_dataset = torchvision.datasets.MNIST(root='./data',
                                               train=True,
                                               transform=transforms.ToTensor(),
                                               download=True)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=args.batch_size,
                                               shuffle=True,
                                               num_workers=0,
                                               pin_memory=True,
                                               sampler=None)

    start = datetime.now()
    total_step = len(train_loader)
    for epoch in range(args.epochs):
        model.train()
        for i, (images, labels) in enumerate(tqdm(train_loader)):
            images = images.to(gpu)
            labels = labels.to(gpu)
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if (i + 1) % 100 == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, args.epochs, i + 1, total_step,
                                                                   loss.item()))
    print("Training complete in: " + str(datetime.now() - start))

最后确保主函数被启动。

if __name__ == '__main__':
    main()

以上是我们的MNIST图像分类最小原型,可以通过如下命令启动在指定单个GPU上的训练:

python train.py -g 0

多进程示例

在开始对最小原型的改造之前,我们还需要交代一些事情。在DDP的代码实现中,最重要的步骤之一就是初始化。所谓初始化对应于上文介绍的Construction阶段,每个进程中需要指明几个关键的参数:

  • backend:明确后端通信方式,NCCL还是Gloo
  • init_method:初始化方式,TCP还是Environment variable(Env),可以简单理解为进程获取关键参数的地址和方式
  • world_size:总的进程数有多少
  • rank:当前进程是总进程中的第几个

初始化方式不同会影响代码的启动部分。本文会分别给出TCP和ENV模式的样例。TCP模式

让我们先从TCP开始,注意那些标记被更改的代码部分:

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-g', '--gpuid', default=0, type=int,
                        help="which gpu to use")
    parser.add_argument('-e', '--epochs', default=1, type=int, 
                        metavar='N',
                        help='number of total epochs to run')
    parser.add_argument('-b', '--batch_size', default=4, type=int, 
                        metavar='N',
                        help='number of batchsize')   
    ##################################################################################
    parser.add_argument('--init_method', default='tcp://localhost:18888',            #
                        help="init-method")                                          #
    parser.add_argument('-r', '--rank', default=0, type=int,                         #
                    help='rank of current process')                                  #
    parser.add_argument('--world_size', default=2, type=int,                         #
                        help="world size")                                           #
    parser.add_argument('--use_mix_precision', default=False,                        #
                        action='store_true', help="whether to use mix precision")    #
    ##################################################################################                  
    args = parser.parse_args()
    train(args.gpuid, args)

在main函数中需要增加了以下参数:

  • args.init_method:url地址,用来指明的初始化方法。在tcp初始化方法中,其格式应为:tcp:[ IP ]:[ Port ] 。IP为rank=0进程所在的机器IP地址,Port为任意一个空闲的端口号。当采用的是单机多卡模式时,IP可以默认为//localhost
  • args.rank:当前进程在所有进程中的序号
  • args.world_size:进程总数【一共几块GPU】
  • args.use_mix_precision:布尔变量,控制是否使用混合精度
def train(gpu, args):
    ########################################    N1    ####################################################################
    dist.init_process_group(backend='nccl', init_method=args.init_method, rank=args.rank, world_size=args.world_size)    #
    ######################################################################################################################
    model = ConvNet()
    model.cuda(gpu)
    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss().to(gpu)
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)
    # Wrap the model
    #######################################    N2    ########################
    model = nn.SyncBatchNorm.convert_sync_batchnorm(model)                  #
    model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu])    #
    scaler = GradScaler(enabled=args.use_mix_precision)                   #
    #########################################################################
    # Data loading code
    train_dataset = torchvision.datasets.MNIST(root='./data',
                                               train=True,
                                               transform=transforms.ToTensor(),
                                               download=True)
    ####################################    N3    #######################################
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)      #
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,                   #
                                               batch_size=args.batch_size,              #
                                               shuffle=False,                           #
                                               num_workers=0,                           #
                                               pin_memory=True,                         #
                                               sampler=train_sampler)                   #
    #####################################################################################
    start = datetime.now()
    total_step = len(train_loader) # The number changes to orignal_length // args.world_size
    for epoch in range(args.epochs):
        ################    N4    ################
        train_loader.sampler.set_epoch(epoch)    #
        ##########################################
        model.train()
        for i, (images, labels) in enumerate(tqdm(train_loader)):
            images = images.to(gpu)
            labels = labels.to(gpu)
            # Forward pass
            ########################    N5    ################################
            with torch.cuda.amp.autocast(enabled=args.use_mix_precision):    #
                outputs = model(images)                                      #
                loss = criterion(outputs, labels)                            #
            ##################################################################  
            # Backward and optimize
            optimizer.zero_grad()
            ##############    N6    ##########
            scaler.scale(loss).backward()    #
            scaler.step(optimizer)           #
            scaler.update()                  #
            ##################################
            ################    N7    ####################
            if (i + 1) % 100 == 0 and args.rank == 0:    #
            ##############################################   
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, args.epochs, i + 1, total_step,
                                                                   loss.item()))            
    ############    N8    ###########
    dist.destroy_process_group()    #                                       
    if args.rank == 0:              #
    #################################
        print("Training complete in: " + str(datetime.now() - start))

在训练函数中增加/修改了以下内容:

  • N1:增加了DDP初始化的代码,需要指明backend、init_method、rank和world_size。其含义在前文都有介绍。
  • N2:在并行环境下,对于用到BN层的模型需要转换为同步BN层;其次,用DistributedDataParallel将模型封装为一个DDP模型,并复制到指定的GPU上。封装时不需要更改模型内部的代码;设置混合精度中的scaler,通过设置enabled参数控制是否生效。
  • N3:DDP要求定义distributed.DistributedSampler,通过封装train_dataset实现;在建立DataLoader时指定sampler。此外还要注意:shuffle=False。DDP的数据打乱需要通过设置sampler,参考N4。
  • N4:在每个epoch开始前打乱数据顺序。(注意total_step已经变为orignal_length // args.world_size。)
  • N5:利用torch.cuda.amp.autocast控制前向过程中是否使用半精度计算。
  • N6: 当使用混合精度时,scaler会缩放loss来避免由于精度变化导致梯度为0的情况。
  • N7:为了避免log信息的重复打印,可以只允许rank0号进程打印。
  • N8: 清理进程;然后,同上。

假设服务器环境为2台服务器(也称为2个node),每台服务器两块GPU。启动方式为:

# Node 0 : ip 192.168.1.201  port : 12345
# terminal-0
python mnist-tcp.py --init_method tcp://192.168.1.201:12345 -g 0 --rank 0 --world_size 4 --use_mix_precision
# terminal-1
python mnist-tcp.py --init_method tcp://192.168.1.201:12345 -g 1 --rank 1 --world_size 4 --use_mix_precision

# Node 1 : 
# terminal-0
python tcp_init.py --init_method tcp://192.168.1.201:12345 -g 0 --rank 2 --world_size 4 --use_mix_precision
# terminal-1
python tcp_init.py --init_method tcp://192.168.1.201:12345 -g 1 --rank 3 --world_size 4 --use_mix_precision

TCP模式启动很好理解,需要在bash中独立的启动每一个进程,并为每个进程分配好其rank序号。缺点是当进程数多的时候启动比较麻烦。完整的脚本文件见这里


ENV模式

ENV模式启动会更简洁,对于每个进程并不需要在dist.init_process_group中手动的指定其rank、world_size和url。程序会在环境变量中去寻找这些值。代码如下:

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-g', '--gpuid', default=0, type=int,
                        help="which gpu to use")
    parser.add_argument('-e', '--epochs', default=1, type=int, 
                        metavar='N',
                        help='number of total epochs to run')
    parser.add_argument('-b', '--batch_size', default=4, type=int, 
                        metavar='N',
                        help='number of batchsize')   
    ##################################################################################
    parser.add_argument("--local_rank", type=int,                                    #
                        help='rank in current node')                                 #
    parser.add_argument('--use_mix_precision', default=False,                        #
                        action='store_true', help="whether to use mix precision")    #
    ##################################################################################                  
    args = parser.parse_args()
    #################################
    train(args.local_rank, args)    #
    #################################
  • args.local_rank:这里指的是当前进程在当前机器中的序号,注意和在全部进程中序号的区别。在ENV模式中,这个参数是必须的,由启动脚本自动划分,不需要手动指定。要善用local_rank来分配GPU_ID。
  • train(args.local_rank, args):一般情况下保持local_rank与进程所用GPU_ID一致。
def train(gpu, args):
    ##################################################################
    dist.init_process_group(backend='nccl', init_method='env://')    #
    args.rank = dist.get_rank()                                      #
    ##################################################################
    model = ConvNet()
    ...
  • 训练函数中仅需要更改初始化方式即可。在ENV中只需要指定init_method='env://'。TCP所需的关键参数模型会从环境变量中自动获取,环境变量可以在程序外部启动时设定,参考启动方式。
  • 当前进程的rank值可以通过dist.get_rank()得到
  • 之后的代码与TCP完全相同

假设服务器环境为2台服务器(也称为2个node),每台服务器两块GPU。ENV模式的启动方式为:

# Node 0 : ip 192.168.1.201  port : 12345
# terminal-0
python -m torch.distributed.launch --nproc_per_node=2 --nnodes=2 --node_rank=0 --master_addr="192.168.1.201" --master_port=12345 mnist-env.py --use_mix_precision

# Node 1 : 
# terminal-0
python -m torch.distributed.launch --nproc_per_node=2 --nnodes=2 --node_rank=1 --master_addr="192.168.1.201" --master_port=12345 mnist-env.py --use_mix_precision

ENV模式可以使用pytorch中的启动脚本torch.distributed.launch启动。在启动命令中需要指明多个参数:

  • nproc_per_node: 每台机器中运行几个进程【每台机器几个GPU】
  • nnodes:一共使用多少台机器
  • node_rank:当前机器的序号【非GPU序号】
  • master_addr:0号机器的IP
  • master_port:0号机器的可用端口

可以看到无论一台机器中的进程数为多少,只需要一行命令就可以启动,相比于TCP模式启动方式更加简洁。

训练中对模型在验证集上进行验证也是必不可少的步骤之一,那么如何在上述demo中增加模型验证的代码呢?如何实现模型的并行验证?

####################################    N11    ##################################
def evaluate(model, gpu, test_loader, rank):
    model.eval()
    size = torch.tensor(0.).to(gpu)
    correct = torch.tensor(0.).to(gpu)
    with torch.no_grad():
        for i, (images, labels) in enumerate(tqdm(test_loader)):
            images = images.to(gpu)
            labels = labels.to(gpu)
            outputs = model(images)
            size += images.shape[0]
            correct += (outputs.argmax(1) == labels).type(torch.float).sum() 
    dist.reduce(size, 0, op=dist.ReduceOp.SUM) # 群体通信 reduce 操作 change to allreduce if Gloo
    dist.reduce(correct, 0, op=dist.ReduceOp.SUM) # 群体通信 reduce 操作 change to allreduce if Gloo
    if rank==0:
        print('Evaluate accuracy is {:.2f}'.format(correct / size))
 #################################################################################

def train(gpu, args):
    ...
    ####################################    N9    ###################################
    test_dataset = torchvision.datasets.MNIST(root='./data',                        #
                                               train=False,                         #
                                               transform=transforms.ToTensor(),     #
                                               download=True)                       #
    test_sampler = torch.utils.data.distributed.DistributedSampler(test_dataset)    #
    test_loader = torch.utils.data.DataLoader(dataset=test_dataset,                 #
                                               batch_size=args.batch_size,               #
                                               shuffle=False,                       #
                                               num_workers=0,                       #
                                               pin_memory=True,                     #
                                               sampler=test_sampler)                #
    #################################################################################
    start = datetime.now()
    total_step = len(train_loader) # The number changes to orignal_length // args.world_size
    for epoch in range(args.epochs):
        ...
        #####################    N10    #################
        evaluate(model, gpu, test_loader, args.rank)    #
        #################################################
    ...        

省略了代码不变的部分,完整的程序见脚本

  • N9:增加验证集的DataLoader,设置sampler实现数据的并行切分
  • N10:在每个epoch结束前验证模型
  • N11: 利用群体通信Reduce操作,将计算准确率所需的正确预测数和全局样本数收集到rank0进程中

只需要利用群体通信将验证集样本数和预测正确的样本数汇集在rank0中即可实现并行的模型验证,对于其它任务也可以参考这个思路实现。例如图像语义分割中计算mIoU只需要将每个进程的混淆矩阵汇总相加到rank0即可。

一些可能遇到的问题

网络防火墙有可能在首次多机多卡训练时造成计算节点间的通信失败。单机多卡成功运行的代码在扩展至多机多卡遇到问题后可以首先尝试将init_method切换为Gloo,能够回避掉一些潜在的问题。记录一下本人在实践中遇到的问题和解决方法。

address family mismatch 错误

解决方案是手动设置通信的网络端口。机器的网络端口通过ifconfig命令查询,有多个网口时可以都尝试一下。

当backend==NCCL

# Node 0 
# terminal-0
export NCCL_SOCKET_IFNAME=eth0
python ...

# Node 1 : 
# terminal-0
export NCCL_SOCKET_IFNAME=eth0
python ...

当backend==Gloo

# Node 0 
# terminal-0
export GLOO_SOCKET_IFNAME=eth0
python ...

# Node 1 : 
# terminal-0
export GLOO_SOCKET_IFNAME=eth0
python ...

参考

  1. https://pytorch.org/docs/stable/distributed.html#choosing-the-network-interface-to-use
  2. https://pytorch.org/tutorials/beginner/dist_overview.html
  3. Li, S., Zhao, Y., Varma, R., Salpekar, O., Noordhuis, P., Li, T., … & Chintala, S. (2020). Pytorch distributed: Experiences on accelerating data parallel training. arXiv preprint arXiv:2006.15704.
  4. https://zhuanlan.zhihu.com/p/76638962
  5. https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html
  6. https://medium.com/huggingface/training-larger-batches-practical-tips-on-1-gpu-multi-gpu-distributed-setups-ec88c3e51255

torchrun-torch多机多卡分布式训练命令

https://pytorch.org/docs/stable/elastic/run.html

torchrun 提供了作为 torch.distributed.launch 的功能的超集,并具有以下附加功能:

  1. Worker failures are handled gracefully by restarting all workers.
    通过重新启动所有工作进程来优雅地处理工作进程故障。
  2. Worker RANK and WORLD_SIZE are assigned automatically.
    工作器 RANK 和 WORLD_SIZE 自动分配。
  3. Number of nodes is allowed to change between minimum and maximum sizes (elasticity).
    允许节点数量在最小和最大大小之间变化(弹性)。

torchrun 是setup.py中 entry_points 配置中声明的主模块torch.distributed.run的Python控制台脚本。它相当于调用 python -m torch.distributed.run 。

torchrun启动单机多卡DDP并行训练:

启动方式:

  • 使用 torchrun 命令来启动程序
  • torchrun –standalone –nproc_per_node=gpu XXX.py
  1. --standalone 代表单机运行
  2. --nproc_per_node=gpu 代表使用所有可用GPU。等于号后也可写gpu数量n,这样会使用前n个GPU

如果想要进一步指定要运行的 GPU,可以通过 CUDA_VISIBLE_DEVICES 设置GPU可见性,比如:

CUDA_VISIBLE_DEVICES=2,3 torchrun –standalone –nproc_per_node=gpu multi_gpu_torchrun.py

多机多卡

torchrun –nproc_per_node=4 –nnodes=3 –node_rank=0 –master_addr=192.168.0.101 –master_port=29500 test_mpi.py

1.指定每个节点(机器)上的进程数,这里是4个。意味着每个机器将启动4个进程来参与分布式训练。 –nproc_per_node=4 【一般设置为为节点GPU数量】

2.指定总共的节点数,这里是3个。意味着总共有3个机器参与分布式训练。

--nnodes=3

3.指定当前节点(机器)的排名,这里是0。排名从0开始,用于在分布式环境中区分不同的节点。

--node_rank=0 【0代表主节点】

4.指定主节点的IP地址,这里是192.168.0.101(更根据实际修改)。主节点用于协调分布式训练过程。

--master_addr=192.168.0.101

5.指定主节点的端口号,这里是29500。主节点使用指定的端口来与其他节点进行通信。

–master_port=29500

6.单机运行

--standalone