<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
NCCL是英偉達開源的GPU通訊庫,支援集合通訊和對等通訊。
看下官方給的一個demo:
#include <stdio.h> #include "cuda_runtime.h" #include "nccl.h" #include "mpi.h" #include <unistd.h> #include <stdint.h> #define MPICHECK(cmd) do { int e = cmd; if( e != MPI_SUCCESS ) { printf("Failed: MPI error %s:%d '%d'n", __FILE__,__LINE__, e); exit(EXIT_FAILURE); } } while(0) #define CUDACHECK(cmd) do { cudaError_t e = cmd; if( e != cudaSuccess ) { printf("Failed: Cuda error %s:%d '%s'n", __FILE__,__LINE__,cudaGetErrorString(e)); exit(EXIT_FAILURE); } } while(0) #define NCCLCHECK(cmd) do { ncclResult_t r = cmd; if (r!= ncclSuccess) { printf("Failed, NCCL error %s:%d '%s'n", __FILE__,__LINE__,ncclGetErrorString(r)); exit(EXIT_FAILURE); } } while(0) static uint64_t getHostHash(const char* string) { // Based on DJB2a, result = result * 33 ^ char uint64_t result = 5381; for (int c = 0; string[c] != ' '; c++){ result = ((result << 5) + result) ^ string[c]; } return result; } static void getHostName(char* hostname, int maxlen) { gethostname(hostname, maxlen); for (int i=0; i< maxlen; i++) { if (hostname[i] == '.') { hostname[i] = ' '; return; } } } int main(int argc, char* argv[]) { int size = 32*1024*1024; int myRank, nRanks, localRank = 0; //initializing MPI MPICHECK(MPI_Init(&argc, &argv)); MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank)); MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks)); //calculating localRank which is used in selecting a GPU uint64_t hostHashs[nRanks]; char hostname[1024]; getHostName(hostname, 1024); hostHashs[myRank] = getHostHash(hostname); MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD)); for (int p=0; p<nRanks; p++) { if (p == myRank) break; if (hostHashs[p] == hostHashs[myRank]) localRank++; } //each process is using two GPUs int nDev = 2; float** sendbuff = (float**)malloc(nDev * sizeof(float*)); float** recvbuff = (float**)malloc(nDev * sizeof(float*)); cudaStream_t* s = (cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev); //picking GPUs based on localRank for (int i = 0; i < nDev; ++i) { CUDACHECK(cudaSetDevice(localRank*nDev + i)); CUDACHECK(cudaMalloc(sendbuff + i, size * sizeof(float))); CUDACHECK(cudaMalloc(recvbuff + i, size * sizeof(float))); CUDACHECK(cudaMemset(sendbuff[i], 1, size * sizeof(float))); CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(float))); CUDACHECK(cudaStreamCreate(s+i)); } ncclUniqueId id; ncclComm_t comms[nDev]; //generating NCCL unique ID at one process and broadcasting it to all if (myRank == 0) ncclGetUniqueId(&id); MPICHECK(MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD)); //initializing NCCL, group API is required around ncclCommInitRank as it is //called across multiple GPUs in each thread/process NCCLCHECK(ncclGroupStart()); for (int i=0; i<nDev; i++) { CUDACHECK(cudaSetDevice(localRank*nDev + i)); NCCLCHECK(ncclCommInitRank(comms+i, nRanks*nDev, id, myRank*nDev + i)); } NCCLCHECK(ncclGroupEnd()); //calling NCCL communication API. Group API is required when using //multiple devices per thread/process NCCLCHECK(ncclGroupStart()); for (int i=0; i<nDev; i++) NCCLCHECK(ncclAllReduce((const void*)sendbuff[i], (void*)recvbuff[i], size, ncclFloat, ncclSum, comms[i], s[i])); NCCLCHECK(ncclGroupEnd()); //synchronizing on CUDA stream to complete NCCL communication for (int i=0; i<nDev; i++) CUDACHECK(cudaStreamSynchronize(s[i])); //freeing device memory for (int i=0; i<nDev; i++) { CUDACHECK(cudaFree(sendbuff[i])); CUDACHECK(cudaFree(recvbuff[i])); } //finalizing NCCL for (int i=0; i<nDev; i++) { ncclCommDestroy(comms[i]); } //finalizing MPI MPICHECK(MPI_Finalize()); printf("[MPI Rank %d] Success n", myRank); return 0; }
在上邊的範例中,rank0會執行ncclGetUniqueId獲取Id,然後通過mpi廣播給其他rank,接下來看下UniqueId是怎麼產生的。
ncclResult_t ncclGetUniqueId(ncclUniqueId* out) { NCCLCHECK(ncclInit()); NCCLCHECK(PtrCheck(out, "GetUniqueId", "out")); return bootstrapGetUniqueId(out); }
然後執行initNet,用來初始化nccl所需要的網路,包括兩個,一個是bootstrap網路,另外一個是資料通訊網路,bootstrap網路主要用於初始化時交換一些簡單的資訊,比如每個機器的ip埠,由於資料量很小,而且主要是在初始化階段執行一次,因此bootstrap使用的是tcp;而通訊網路是用於實際資料的傳輸,因此會優先使用rdma(支援gdr的話會優先使用gdr)。
ncclResult_t initNet() { // Always initialize bootstrap network NCCLCHECK(bootstrapNetInit()); NCCLCHECK(initNetPlugin(&ncclNet, &ncclCollNet)); if (ncclNet != NULL) return ncclSuccess; if (initNet(&ncclNetIb) == ncclSuccess) { ncclNet = &ncclNetIb; } else { NCCLCHECK(initNet(&ncclNetSocket)); ncclNet = &ncclNetSocket; } return ncclSuccess; }
bootstrapNetInit就是bootstrap網路的初始化,主要就是通過findInterfaces遍歷機器上所有的網路卡資訊,通過prefixList匹配選擇使用哪些網路卡,將可用網路卡的資訊儲存下來,將ifa_name儲存到全域性的bootstrapNetIfNames,ip地址儲存到全域性bootstrapNetIfAddrs,預設除了docker和lo其他的網路卡都可以使用。
例如在測試機器上有三張網路卡,分別是xgbe0、xgbe1、xgbe2,那麼就會把這三個ifaname和對應的ip地址儲存下來,另外nccl提供了環境變數NCCL_SOCKET_IFNAME可以用來指定想用的網路卡名,例如通過export NCCL_SOCKET_IFNAME=xgbe0來指定使用xgbe0,其實就是通過prefixList來匹配做到的。
static int findInterfaces(const char* prefixList, char* names, union socketAddress *addrs, int sock_family, int maxIfNameSize, int maxIfs) { struct netIf userIfs[MAX_IFS]; bool searchNot = prefixList && prefixList[0] == '^'; if (searchNot) prefixList++; bool searchExact = prefixList && prefixList[0] == '='; if (searchExact) prefixList++; int nUserIfs = parseStringList(prefixList, userIfs, MAX_IFS); int found = 0; struct ifaddrs *interfaces, *interface; getifaddrs(&interfaces); for (interface = interfaces; interface && found < maxIfs; interface = interface->ifa_next) { if (interface->ifa_addr == NULL) continue; int family = interface->ifa_addr->sa_family; if (family != AF_INET && family != AF_INET6) continue; if (sock_family != -1 && family != sock_family) continue; if (family == AF_INET6) { struct sockaddr_in6* sa = (struct sockaddr_in6*)(interface->ifa_addr); if (IN6_IS_ADDR_LOOPBACK(&sa->sin6_addr)) continue; } if (!(matchIfList(interface->ifa_name, -1, userIfs, nUserIfs, searchExact) ^ searchNot)) { continue; } bool duplicate = false; for (int i = 0; i < found; i++) { if (strcmp(interface->ifa_name, names+i*maxIfNameSize) == 0) { duplicate = true; break; } } if (!duplicate) { strncpy(names+found*maxIfNameSize, interface->ifa_name, maxIfNameSize); int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6); memcpy(addrs+found, interface->ifa_addr, salen); found++; } } freeifaddrs(interfaces); return found; }
ncclNet_t結構體是一系列的函數指標,比如初始化,傳送,接收等;socket,IB等通訊方式都實現了自己的ncclNet_t,如ncclNetSocket,ncclNetIb,初始化通訊網路的過程就是依次看哪個通訊模式可用,然後賦值給全域性的ncclNet。
首先執行initNetPlugin,檢視是否有libnccl-net.so,測試環境沒有這個so,所以直接返回。
然後嘗試使用IB網路:
ncclResult_t ncclIbInit(ncclDebugLogger_t logFunction) { static int shownIbHcaEnv = 0; if(wrap_ibv_symbols() != ncclSuccess) { return ncclInternalError; } if (ncclParamIbDisable()) return ncclInternalError; if (ncclNIbDevs == -1) { pthread_mutex_lock(&ncclIbLock); wrap_ibv_fork_init(); if (ncclNIbDevs == -1) { ncclNIbDevs = 0; if (findInterfaces(ncclIbIfName, &ncclIbIfAddr, MAX_IF_NAME_SIZE, 1) != 1) { WARN("NET/IB : No IP interface found."); return ncclInternalError; } // Detect IB cards int nIbDevs; struct ibv_device** devices; // Check if user defined which IB device:port to use char* userIbEnv = getenv("NCCL_IB_HCA"); if (userIbEnv != NULL && shownIbHcaEnv++ == 0) INFO(NCCL_NET|NCCL_ENV, "NCCL_IB_HCA set to %s", userIbEnv); struct netIf userIfs[MAX_IB_DEVS]; bool searchNot = userIbEnv && userIbEnv[0] == '^'; if (searchNot) userIbEnv++; bool searchExact = userIbEnv && userIbEnv[0] == '='; if (searchExact) userIbEnv++; int nUserIfs = parseStringList(userIbEnv, userIfs, MAX_IB_DEVS); if (ncclSuccess != wrap_ibv_get_device_list(&devices, &nIbDevs)) return ncclInternalError; for (int d=0; d<nIbDevs && ncclNIbDevs<MAX_IB_DEVS; d++) { struct ibv_context * context; if (ncclSuccess != wrap_ibv_open_device(&context, devices[d]) || context == NULL) { WARN("NET/IB : Unable to open device %s", devices[d]->name); continue; } int nPorts = 0; struct ibv_device_attr devAttr; memset(&devAttr, 0, sizeof(devAttr)); if (ncclSuccess != wrap_ibv_query_device(context, &devAttr)) { WARN("NET/IB : Unable to query device %s", devices[d]->name); if (ncclSuccess != wrap_ibv_close_device(context)) { return ncclInternalError; } continue; } for (int port = 1; port <= devAttr.phys_port_cnt; port++) { struct ibv_port_attr portAttr; if (ncclSuccess != wrap_ibv_query_port(context, port, &portAttr)) { WARN("NET/IB : Unable to query port %d", port); continue; } if (portAttr.state != IBV_PORT_ACTIVE) continue; if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND && portAttr.link_layer != IBV_LINK_LAYER_ETHERNET) continue; // check against user specified HCAs/ports if (! (matchIfList(devices[d]->name, port, userIfs, nUserIfs, searchExact) ^ searchNot)) { continue; } TRACE(NCCL_INIT|NCCL_NET,"NET/IB: [%d] %s:%d/%s ", d, devices[d]->name, port, portAttr.link_layer == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE"); ncclIbDevs[ncclNIbDevs].device = d; ncclIbDevs[ncclNIbDevs].guid = devAttr.sys_image_guid; ncclIbDevs[ncclNIbDevs].port = port; ncclIbDevs[ncclNIbDevs].link = portAttr.link_layer; ncclIbDevs[ncclNIbDevs].speed = ncclIbSpeed(portAttr.active_speed) * ncclIbWidth(portAttr.active_width); ncclIbDevs[ncclNIbDevs].context = context; strncpy(ncclIbDevs[ncclNIbDevs].devName, devices[d]->name, MAXNAMESIZE); NCCLCHECK(ncclIbGetPciPath(ncclIbDevs[ncclNIbDevs].devName, &ncclIbDevs[ncclNIbDevs].pciPath, &ncclIbDevs[ncclNIbDevs].realPort)); ncclIbDevs[ncclNIbDevs].maxQp = devAttr.max_qp; ncclNIbDevs++; nPorts++; pthread_create(&ncclIbAsyncThread, NULL, ncclIbAsyncThreadMain, context); } if (nPorts == 0 && ncclSuccess != wrap_ibv_close_device(context)) { return ncclInternalError; } } if (nIbDevs && (ncclSuccess != wrap_ibv_free_device_list(devices))) { return ncclInternalError; }; } if (ncclNIbDevs == 0) { INFO(NCCL_INIT|NCCL_NET, "NET/IB : No device found."); } else { char line[1024]; line[0] = ' '; for (int d=0; d<ncclNIbDevs; d++) { snprintf(line+strlen(line), 1023-strlen(line), " [%d]%s:%d/%s", d, ncclIbDevs[d].devName, ncclIbDevs[d].port, ncclIbDevs[d].link == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE"); } line[1023] = ' '; char addrline[1024]; INFO(NCCL_INIT|NCCL_NET, "NET/IB : Using%s ; OOB %s:%s", line, ncclIbIfName, socketToString(&ncclIbIfAddr.sa, addrline)); } pthread_mutex_unlock(&ncclIbLock); } return ncclSuccess; }
首先第三行通過wrap_ibv_symbols載入動態庫libibverbs.so,然後獲取動態庫的各個函數。
然後通過wrap_ibv_fork_init避免fork引起rdma網路卡讀寫出錯。
後面會講到ib網路也會用到socket進行帶外網路的傳輸,所以這裡也通過findInterfaces獲取一個可用的網路卡儲存到ncclIbIfAddr。
通過ibv_get_device_list獲取所有rdma裝置到devices中,遍歷devices的每個device,因為每個HCA可能有多個物理port,所以對每個device遍歷每一個物理port,獲取每個port的資訊。
然後將相關資訊儲存到全域性的ncclIbDevs中,比如是哪個device的哪個port,使用的是IB還是ROCE,device的pci路徑,maxqp,device的name等,注意這裡也有類似bootstrap網路NCCL_SOCKET_IFNAME的環境變數,叫NCCL_IB_HCA,可以指定使用哪個IB HCA。
到這裡整個初始化的過程就完成了,一句話總結就是,獲取了當前機器上所有可用的IB網路卡和普通乙太網卡之後儲存下來。
ncclResult_t bootstrapCreateRoot(ncclUniqueId* id, bool idFromEnv) { ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id; void* listenComm; NCCLCHECK(bootstrapNetListen(idFromEnv ? dontCareIf : 0, netHandle, &listenComm)); pthread_t thread; pthread_create(&thread, NULL, bootstrapRoot, listenComm); return ncclSuccess; }
ncclNetHandle_t也是一個字元陣列,然後執行bootstrapNetListen。
static ncclResult_t bootstrapNetListen(int dev, ncclNetHandle_t* netHandle, void** listenComm) { union socketAddress* connectAddr = (union socketAddress*) netHandle; static_assert(sizeof(union socketAddress) < NCCL_NET_HANDLE_MAXSIZE, "union socketAddress size is too large"); // if dev >= 0, listen based on dev if (dev >= 0) { NCCLCHECK(bootstrapNetGetSocketAddr(dev, connectAddr)); } else if (dev == findSubnetIf) { ... } // Otherwise, handle stores a local address struct bootstrapNetComm* comm; NCCLCHECK(bootstrapNetNewComm(&comm)); NCCLCHECK(createListenSocket(&comm->fd, connectAddr)); *listenComm = comm; return ncclSuccess; }
依次看下這三個函數,通過bootstrapNetGetSocketAddr獲取一個可用的ip地址。
static ncclResult_t bootstrapNetGetSocketAddr(int dev, union socketAddress* addr) { if (dev >= bootstrapNetIfs) return ncclInternalError; memcpy(addr, bootstrapNetIfAddrs+dev, sizeof(*addr)); return ncclSuccess; }
此時dev是0, bootstrapNetIfs是初始化bootstrap網路的時候一共找到了幾個可用的網路卡,這裡就是獲取了第0個可用的ip地址。
然後通過bootstrapNetNewComm建立bootstrapNetComm,bootstrapNetComm其實就是fd,bootstrapNetNewComm其實就是new了一個bootstrapNetComm。
struct bootstrapNetComm { int fd; };
static ncclResult_t createListenSocket(int *fd, union socketAddress *localAddr) { /* IPv4/IPv6 support */ int family = localAddr->sa.sa_family; int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6); /* Create socket and bind it to a port */ int sockfd = socket(family, SOCK_STREAM, 0); if (sockfd == -1) { WARN("Net : Socket creation failed : %s", strerror(errno)); return ncclSystemError; } if (socketToPort(&localAddr->sa)) { // Port is forced by env. Make sure we get the port. int opt = 1; #if defined(SO_REUSEPORT) SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)), "setsockopt"); #else SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)), "setsockopt"); #endif } // localAddr port should be 0 (Any port) SYSCHECK(bind(sockfd, &localAddr->sa, salen), "bind"); /* Get the assigned Port */ socklen_t size = salen; SYSCHECK(getsockname(sockfd, &localAddr->sa, &size), "getsockname"); #ifdef ENABLE_TRACE char line[1024]; TRACE(NCCL_INIT|NCCL_NET,"Listening on socket %s", socketToString(&localAddr->sa, line)); #endif /* Put the socket in listen mode * NB: The backlog will be silently truncated to the value in /proc/sys/net/core/somaxconn */ SYSCHECK(listen(sockfd, 16384), "listen"); *fd = sockfd; return ncclSuccess; }
建立監聽fd,ip由localaddr指定,初始埠為0,bind時隨機找一個可用埠,並通過getsockname(sockfd, &localAddr->sa, &size)將ip埠寫回到localaddr,這裡localaddr就是UniqueId。
到這裡UniqueId也就產生了,其實就是當前機器的ip和port。
歡迎 Star、試用 OneFlow 最新版本:github.com/Oneflow-Inc…
以上就是NCCL原始碼解析之初始化及ncclUniqueId的產生詳解的詳細內容,更多關於NCCL初始化ncclUniqueId產生的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45