#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "timeline/timeline.h" #include "alloc.h" #include "devcomm.h" #include "collectives.h" #include "comm.h" #if defined (ENABLE_TIMELINE) #define VEGA_GPU_RTC_FREQUENCY 2.5E7 extern const char* ncclDevRedOpStr[ncclNumDevRedOps]; extern const char *ncclTypeStr[ncclNumTypes]; const char *timeLineEvent[TIMELINE_EVENT_TOTAL_NUM/2] = {"COLL","P2P","ALLGATHER","ALLREDUCE","BROADCAST","REDUCE","REDUCE_SCATTER","SENDRECV","PRIM_SEND","PRIM_DIRECT_SEND","PRIM_RECV","PRIM_DIRECT_RECV","PRIM_COPY_SEND","PRIM_DIRECT_COPY_SEND","PRIM_RECV_COPY_SEND","PRIM_DIRECT_RECV_COPY_SEND","PRIM_RECV_REDUCE_COPY","PRIM_RECV_REDUCE_SEND","PRIM_RECV_REDUCE_COPY_SEND","PRIM_DIRECT_RECV_REDUCE_COPY_SEND","PRIM_SCATTER","PRIM_GATHER","PRIM_LL_SEND","PRIM_LL_RECV","PRIM_LL_RECV_REDUCE_SEND","PRIM_LL_RECV_REDUCE_COPY","PRIM_LL_COPY_SEND","PRIM_LL_RECV_COPY_SEND","PRIM_LL_RECV_REDUCE_COPY_SEND","PRIM_DIRECT_SEND_FROM_OUTPUT"}; void DumpTimelineEventThread(ncclComm_t comm); std::mutex Timeline::mtx; uint Timeline::obj_num = 0; uint64_t* Timeline::cpu_timestamp_ = nullptr; std::thread* Timeline::cpu_timestamp_update_thread_ = nullptr; volatile bool Timeline::cpu_timestamp_update_thread_should_stop_ = false; Timeline::Timeline() {}; void Timeline::CpuTimestampUpdateThread() { uint64_t init_system_clock = std::chrono::system_clock::now().time_since_epoch().count(); uint64_t init_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count(); uint64_t curr_steady_clock = 0; volatile uint64_t* volatile_cpu_timestamp_ = cpu_timestamp_; while (!cpu_timestamp_update_thread_should_stop_) { curr_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count(); *volatile_cpu_timestamp_ = init_system_clock + (curr_steady_clock - init_steady_clock); } } RCCL_PARAM(TimelineExportHost, "TIMELINE_EXPORT_HOST", 0); RCCL_PARAM(TimelineEventSkip, "TIMELINE_EVENT_SKIP", 0); RCCL_PARAM(TimelineMaxProfilingThreads, "TIMELINE_MAXPROFILING_THREADS", 1); RCCL_PARAM(TimelineMaxProfilingEvents, "TIMELINE_MAXPROFILING_EVENTS", 1024); ncclResult_t Timeline::Init(ncclComm_t comm) { comm_ = comm; rank_ = comm->rank; nChannel_ = std::max(comm->nChannels, comm->p2pnChannels); hostIsExport = rcclParamTimelineExportHost(); NCCLCHECK(ncclCudaHostCalloc(&gpu_event_context_, 1)); gpu_event_context_->curOpCount = 0; gpu_event_context_->gpuMaxProfilingThreads = rcclParamTimelineMaxProfilingThreads(); gpu_event_context_->gpuMaxProfilingEvents = rcclParamTimelineMaxProfilingEvents(); gpu_event_context_->isFull = 0; gpu_event_context_->skip = rcclParamTimelineEventSkip(); gpu_event_context_->skipped = 0; size_t size = nChannel_*gpu_event_context_->gpuMaxProfilingThreads; NCCLCHECK(ncclCudaHostCalloc(&gpu_event_context_->handle, size)); for (size_t i = 0; i < size; ++i) { TimelineGpuEventHandle& handle = gpu_event_context_->handle[i]; // handle.skip = rcclParamTimelineEventSkip(); // handle.skipped = 0; handle.event_buffer_head = 0; NCCLCHECK(ncclCudaHostCalloc(&handle.event_buffer, gpu_event_context_->gpuMaxProfilingEvents)); } NCCLCHECK(ncclCalloc(&cpu_event_context_, 1)); // cpu_event_context_ = new TimelineCpuEventContext; cpu_event_context_->cpuCollEvent = new std::map(); cpu_event_context_->cpuP2pEvent = new std::map(); // Init runing state dump if (dump_event_thread_ == nullptr) { dump_event_thread_ = new std::thread(DumpTimelineEventThread, comm); } mtx.lock(); // Init timestamp if (cpu_timestamp_update_thread_ == nullptr) { NCCLCHECK(ncclCudaHostCalloc(&cpu_timestamp_, 1)); cpu_timestamp_update_thread_should_stop_ = false; cpu_timestamp_update_thread_ = new std::thread(CpuTimestampUpdateThread); } obj_num++; mtx.unlock(); return ncclSuccess; } void getCpuEventInfo(std::map* cpuEventMap, uint16_t& key, char * funcName, uint64_t& beginTimestamp) { if (cpuEventMap->count(key) > 0) { std::map::iterator it = cpuEventMap->find(key); TimelineCpuEvent * cpuEvent = it->second; if (funcName) strcpy(funcName, cpuEvent->funcName); beginTimestamp = cpuEvent->beginTimestamp; return; } else if (funcName){ sprintf(funcName, "%d", key); } } void getFuncInfo(uint16_t funcIndex, char * funcName) { if(funcIndex == FUNC_INDEX_P2P) { strcpy(funcName, "ncclKernel_sendRecv_ring_simple_sum_int8_t"); } else if (funcIndex == FUNC_INDEX_P2P+1) { strcpy(funcName, "ncclKernel_alltoall_pivot_ring_simple_sum_int8_t"); } else if (funcIndex >= (FUNC_INDEX_P2P-ncclNumTypes) ) { int type = funcIndex - (FUNC_INDEX_P2P-ncclNumTypes); sprintf(funcName, "ncclFunction_OneRankReduce_PreMulSum_%s", ncclTypeStr[type]); }else{ int protocol = funcIndex%NCCL_NUM_PROTOCOLS; int algorithm = (funcIndex/NCCL_NUM_PROTOCOLS)%NCCL_NUM_ALGORITHMS; int type = ((funcIndex/NCCL_NUM_PROTOCOLS)/NCCL_NUM_ALGORITHMS)%ncclNumTypes; int redop = (((funcIndex/NCCL_NUM_PROTOCOLS)/NCCL_NUM_ALGORITHMS)/ncclNumTypes)%ncclNumOps; int func = (((funcIndex/NCCL_NUM_PROTOCOLS)/NCCL_NUM_ALGORITHMS)/ncclNumTypes)/ncclNumOps; // ncclKernel_##func##_##algo##_##proto##_##redop##_##type sprintf(funcName, "ncclKernel_%s_%s_%s_%s_%s", ncclFuncStr[func], ncclAlgoStr[algorithm], ncclProtoStr[protocol], ncclDevRedOpStr[redop], ncclTypeStr[type]); } } ncclResult_t Timeline::Dump(ncclComm_t comm) { if (comm->timeline->isDumped) return ncclSuccess; comm->timeline->isDumped = true; pid_t pid = getpid(); int rank_ = comm->rank; int nChannel_ = comm->timeline->nChannel_; char dump_file_path[128] = {0}; const char* dump_dir = getenv("RCCL_TIMELINE_DUMP_DIR"); if (!dump_dir) { sprintf(dump_file_path, "./timeline_%d_comm_%p_ranks_%d_rank_%d.json", pid, comm, comm->nRanks, rank_); } else { INFO(NCCL_ALL, "rccl dump dir set by environment to %s.", dump_dir); bool flag = false; while (opendir(dump_dir) == NULL) { if (rank_ == 0) { int ret = mkdir(dump_dir, S_IRWXU | S_IRWXG | S_IRWXO); if (ret != 0) { INFO(NCCL_ALL, "rccl make dump dir %s failed: %d", dump_dir, ret); return ncclSuccess; } } else { if (flag) { INFO(NCCL_ALL, "wait for make dump dir timeout"); return ncclSuccess; } flag = true; sleep(1); } } sprintf(dump_file_path, "%s/timeline_%d_comm_%p_ranks_%d_rank_%d.json", dump_dir, pid, comm, comm->nRanks, rank_); } TimelineCpuEventContext* cpu_event_context_ = comm->timeline->GetCpuEventContext(); TimelineGpuEventContext* gpu_event_context_ = comm->devComm->gpuEventContext; auto trace_file = std::fstream(dump_file_path, std::ios::out | std::ios::binary); trace_file << "["; if (comm->timeline->hostIsExport) { for (std::map::iterator it = cpu_event_context_->cpuCollEvent->begin(); it != cpu_event_context_->cpuCollEvent->end(); it++) { INFO(NCCL_ALL, "coll export %s, %d, opCount %d comm %p ctx %p", basename(__FILE__), __LINE__, it->first, comm, gpu_event_context_); TimelineCpuEvent* event = it->second; memset(event->funcName, 0x0, sizeof(event->funcName)); getFuncInfo(event->funcIndex, event->funcName); trace_file << "{ \"name\": \"" << event->funcName << "\", \"ph\": \"X\", \"pid\": \"" << pid << "\", \"tid\": \"" << pid << "\", \"ts\": " << event->beginTimestamp << ", \"dur\": " << event->endTimestamp-event->beginTimestamp << ", \"args\": { \"opCount\": \"" << event->opCount << "\", \"beginNs\": \"" << event->beginTimestamp << "\", \"endNs\": \"" << event->endTimestamp << "\", \"pid\": \"" << pid << "\", \"funcIndex\": \"" << event->funcIndex << "\", \"count\": \"" << event->count << "\", \"nBytes\": \"" << event->nBytes << "\", \"sendbuff\": \"" << event->sendbuff << "\", \"recvbuff\": \"" << event->recvbuff << "\", \"nChannels\": \"" << event->nChannels << "\", \"nThread\": \"" << event->nThreads << "\", \"root\": \"" << event->root << "\" } },"; } for (std::map::iterator it = cpu_event_context_->cpuP2pEvent->begin(); it != cpu_event_context_->cpuP2pEvent->end(); it++) { INFO(NCCL_ALL, "p2p export %s, %d, opCount %d comm %p ctx %p", basename(__FILE__), __LINE__, it->first, comm, gpu_event_context_); TimelineCpuEvent* event = it->second; memset(event->funcName, 0x0, sizeof(event->funcName)); strcpy(event->funcName, "ncclKernel_SendRecv_Ring_Simpile_Sum_int8_t"); trace_file << "{ \"name\": \"" << event->funcName << "\", \"ph\": \"X\", \"pid\": \"" << pid << "\", \"tid\": \"" << pid << "\", \"ts\": " << event->beginTimestamp << ", \"dur\": " << event->endTimestamp-event->beginTimestamp << ", \"args\": { \"opCount\": \"" << event->opCount << "\", \"beginNs\": \"" << event->beginTimestamp << "\", \"endNs\": \"" << event->endTimestamp << "\", \"pid\": \"" << pid << "\", \"funcIndex\": \"" << event->funcIndex << "\", \"count\": \"" << event->count << "\", \"nBytes\": \"" << event->nBytes << "\", \"sendbuff\": \"" << event->sendbuff << "\", \"recvbuff\": \"" << event->recvbuff << "\", \"nThread\": \"" << event->nThreads << "\", \"root\": \"" << event->root << "\" } },"; } trace_file << "{\"name\": \"process_name\", \"ph\": \"M\", \"pid\": \"" << pid << "\", \"ts\": 0, \"args\": {\"name\": \"cpu timeline\"} },"; } for (int i = 0; i < nChannel_; ++i) { for (int j = 0; j < gpu_event_context_->gpuMaxProfilingThreads; ++j) { TimelineGpuEventHandle& handle = gpu_event_context_->handle[i * gpu_event_context_->gpuMaxProfilingThreads + j]; // INFO(NCCL_ALL, "file %s, line %d, rank %d, channel %d, tid %d, buffer_head %lu skip %d skipped %d", basename(__FILE__), __LINE__, rank_, i, j, handle.event_buffer_head, handle.skip, handle.skipped); INFO(NCCL_ALL, "file %s, line %d, rank %d, channel %d, tid %d, buffer_head %lu skip %d skipped %d", basename(__FILE__), __LINE__, rank_, i, j, handle.event_buffer_head, gpu_event_context_->skip, gpu_event_context_->skipped); for (int k = 0; k < handle.event_buffer_head; ++k) { TimelineGpuEvent& gpuEvent = handle.event_buffer[k]; if (gpuEvent.type == TIMELINE_EVENT_COLL_ENTRY || gpuEvent.type == TIMELINE_EVENT_P2P_ENTRY){ std::map* cpuEventMap = gpuEvent.type == TIMELINE_EVENT_COLL_ENTRY ? cpu_event_context_->cpuCollEvent : cpu_event_context_->cpuP2pEvent; char funcName[128] = {0}; getFuncInfo(gpuEvent.funcIndex, funcName); trace_file << "{ \"name\": \"" << funcName << "\", \"ph\": \"B\", \"pid\": \"" << rank_ << "\", \"tid\": \"b" << i << ":t" << j << "\", \"ts\": " << gpuEvent.timestamp << ", \"args\": { \"opCount\": \"" << gpuEvent.opCount << "\", \"funcIndex\": \"" << gpuEvent.funcIndex << "\", \"beginGpuTimestamp\": \"" << gpuEvent.gpuTimestamp << "\", \"beginNs\": \"" << gpuEvent.timestamp << "\", \"headEntry\": \"" << k << "\" } }, "; if (comm->timeline->hostIsExport) { uint64_t beginTimestamp = 0; getCpuEventInfo(cpuEventMap, gpuEvent.opCount, NULL, beginTimestamp); if (beginTimestamp) { char id[16] = {0}; sprintf(id, "%d%02d%03d%04d", pid, i, j, gpuEvent.opCount); trace_file << "{ \"name\": \"connect\", \"ph\": \"s\", \"pid\": \"" << pid << "\", \"tid\": \"" << pid << "\", \"ts\": " << beginTimestamp << ", \"id\": \"" << id << "\" }, "; trace_file << "{ \"name\": \"connect\", \"ph\": \"f\", \"bp\": \"e\", \"pid\": \"" << rank_ << "\", \"tid\": \"b" << i << ":t" << j << "\", \"ts\": " << gpuEvent.timestamp << ", \"id\": \"" << id << "\" }, "; } } } else if (gpuEvent.type == TIMELINE_EVENT_COLL_EXIT || gpuEvent.type == TIMELINE_EVENT_P2P_EXIT){ std::map* cpuEventMap = gpuEvent.type == TIMELINE_EVENT_COLL_EXIT ? cpu_event_context_->cpuCollEvent : cpu_event_context_->cpuP2pEvent; char funcName[128] = {0}; uint64_t beginTimestamp = 0; getCpuEventInfo(cpuEventMap, gpuEvent.opCount, NULL, beginTimestamp); getFuncInfo(gpuEvent.funcIndex, funcName); trace_file << "{ \"name\": \"" << funcName << "\", \"ph\": \"E\", \"pid\": \"" << rank_ << "\", \"tid\": \"b" << i << ":t" << j << "\", \"ts\": " << gpuEvent.timestamp << ", \"args\": { \"endNs\": \"" << gpuEvent.timestamp << "\", \"endGpuTimestamp\": \"" << gpuEvent.gpuTimestamp << "\", \"headExit\": \"" << k << "\" } }, "; } else if (gpuEvent.type % 2 == 1){ trace_file << "{ \"name\": \"" << timeLineEvent[(gpuEvent.type-1)/2] << "\", \"ph\": \"B\", \"pid\": \"" << rank_ << "\", \"tid\": \"b" << i << ":t" << j << "\", \"ts\": " << gpuEvent.timestamp << ", \"args\": { \"beginGpuTimestamp\": \"" << gpuEvent.gpuTimestamp << "\", \"headEntry\": \"" << k << "\" } }, "; } else if (gpuEvent.type % 2 == 0){ double bandwidth = (gpuEvent.size/1.0E9)/(gpuEvent.gpuTimestamp/VEGA_GPU_RTC_FREQUENCY); trace_file << "{ \"name\": \"" << timeLineEvent[(gpuEvent.type-1)/2] << "\", \"ph\": \"E\", \"pid\": \"" << rank_ << "\", \"tid\": \"b" << i << ":t" << j << "\", \"ts\": " << gpuEvent.timestamp << ", \"args\": { \"size\": \"" << gpuEvent.size << " Bytes\", \"elapsed\": \"" << gpuEvent.gpuTimestamp << "\", \"headExit\": \"" << k << "\", \"bus bandwidth\": \"" << bandwidth << " GB/s\" } }, "; } } } trace_file << "{\"name\": \"process_name\", \"ph\": \"M\", \"pid\": \"" << rank_ << "\", \"ts\": 0, \"args\": {\"name\": \"gpu timeline\"} }"; if (i < nChannel_ - 1) trace_file << ","; } trace_file << "]"; trace_file.close(); INFO(NCCL_ALL, "rccl timeline dump done."); return ncclSuccess; } ncclResult_t Timeline::Shutdown() { // Stop dump thread Timeline::Dump(comm_); dump_event_thread_->join(); dump_event_thread_ = nullptr; mtx.lock(); obj_num--; if (obj_num <= 0) { // Stop CPU timestamp updating thread cpu_timestamp_update_thread_should_stop_ = true; cpu_timestamp_update_thread_->join(); cpu_timestamp_update_thread_ = nullptr; // Free timestamp NCCLCHECK(ncclCudaHostFree(cpu_timestamp_)); } mtx.unlock(); // Free GPU event data structures size_t size = nChannel_*gpu_event_context_->gpuMaxProfilingThreads; for (size_t i = 0; i < size; ++i) { TimelineGpuEventHandle& handle = gpu_event_context_->handle[i]; NCCLCHECK(ncclCudaHostFree(handle.event_buffer)); } NCCLCHECK(ncclCudaHostFree(gpu_event_context_)); // Free CPU event data structures for (std::map::iterator it = cpu_event_context_->cpuCollEvent->begin(); it != cpu_event_context_->cpuCollEvent->end(); it++) { free(it->second); } delete cpu_event_context_->cpuCollEvent; for (std::map::iterator it = cpu_event_context_->cpuP2pEvent->begin(); it != cpu_event_context_->cpuP2pEvent->end(); it++) { free(it->second); } delete cpu_event_context_->cpuP2pEvent; free(cpu_event_context_); // delete cpu_event_context_; return ncclSuccess; } TimelineCpuEventContext* Timeline::GetCpuEventContext() { return cpu_event_context_; } TimelineGpuEventContext* Timeline::GetGpuEventContext() { return gpu_event_context_; } uint64_t* Timeline::GetCpuTimestamp() { return cpu_timestamp_; } ncclResult_t Timeline::CollectCpuEvent(uint64_t beginTimestamp, uint64_t endTimestamp, struct ncclInfo* info, struct ncclKernelPlan* plan) { TimelineCpuEventContext* cpu_event_context_ = info->comm->timeline->cpu_event_context_; TimelineCpuEvent * event = nullptr; NCCLCHECK(ncclCalloc(&event, 1)); event->beginTimestamp = beginTimestamp; event->endTimestamp = endTimestamp; event->funcIndex = plan->workHead->header.funcIndex; event->opCount = event->funcIndex == FUNC_INDEX_P2P ? info->comm->sharedRes->p2pOpCount[0] : info->comm->sharedRes->collOpCount; event->count = info->count; event->nBytes = info->nBytes; event->sendbuff = info->sendbuff; event->recvbuff = info->recvbuff; event->nThreads = info->nThreads; event->nChannels = info->nChannels; event->root = info->root; // INFO(NCCL_COLL, "FILE %s LINE %d opCount %d funcIndex %d nRanks %d comm %p ctx %p", // basename(__FILE__), __LINE__, info->comm->args.op.opCount, info->comm->args.funcIndex, info->comm->nRanks, info->comm, info->comm->hostDevComm.gpuEventContext); std::map* eventMap = event->funcIndex == FUNC_INDEX_P2P ? cpu_event_context_->cpuP2pEvent : cpu_event_context_->cpuCollEvent; eventMap->insert(std::pair(event->opCount, event)); return ncclSuccess; } void AllTrim(char* line) { char* p = line; while(*p == ' ') p++; while(*(p+strlen(p)-1) == ' ') *(p+strlen(p)-1) = '\0'; int iLen = strlen(p); memcpy(line, p, iLen); line[iLen] = 0; } #define TIMELINE_DEBUG "RCCL_TIMELINE_DUMP" void DumpTimelineEventThread(ncclComm_t comm) { char* ptr = NULL; int64_t value = 0; char* etcFileName = getenv("RCCL_TIMELINE_CFG_FILENAME"); if (etcFileName) { do { int idle = 1; char line[1024] = {0}; FILE* fp = fopen(etcFileName, "r"); if (fp != NULL) { idle = 0; } if (idle) { if(comm->timeline->isDumped) break; else { sleep(10); //sleep 10s continue; } } while(fgets(line, sizeof line, fp)) { AllTrim(line); if (strlen(line) == 0 || line[0] == '#') continue; if ((ptr = strchr(line, '#'))) *ptr = 0; if ((ptr = strchr(line, '=')) == NULL) continue; *ptr = 0; AllTrim(line); if (strcmp(line, TIMELINE_DEBUG)) continue; AllTrim(ptr+1); int64_t v = strtoll(ptr+1, NULL, 0); if (errno) { INFO(NCCL_ALL, "Invalid value %s for %s, using default %lu.", ptr+1, TIMELINE_DEBUG, value); } else { value = v; INFO(NCCL_ALL, "%s set by cfg to %lu.", TIMELINE_DEBUG, value); } } fclose(fp); if (value == 1) { INFO(NCCL_ALL, "User config dump value is 1, dump immediately."); } else if (value == 2 && comm->timeline->GetGpuEventContext()->isFull) { INFO(NCCL_ALL, "User config dump value is 2, Gpu buffer is full, dump automatically."); } else { if (value != 0 && value != 2) INFO(NCCL_ALL, "Invalid value %lu for %s, using default 0.", value, TIMELINE_DEBUG); if(comm->timeline->isDumped) break; else { sleep(10); //sleep 10s continue; } } Timeline::Dump(comm); break; } while(1); } // pthread_exit(NULL); } #endif