/************************************************************************* * Copyright (c) Microsoft Corporation. * Licensed under the MIT License. ************************************************************************/ #include #include #include #include "alloc.h" #include "npkit/npkit.h" #define NPKIT_DEBUG_FILE 1 #define BUFF_SIZE 50 uint64_t NpKit::rank_ = 0; NpKitEvent** NpKit::gpu_event_buffers_[RANK_NUM] = {0}; NpKitEvent** NpKit::cpu_event_buffers_[RANK_NUM] = {0}; int NpKit::gpu_rtc_rate_khz[RANK_NUM] = {0}; NpKitEventCollectContext* NpKit::gpu_collect_contexts_[RANK_NUM] = {0}; NpKitEventCollectContext* NpKit::cpu_collect_contexts_[RANK_NUM] = {0}; uint64_t* NpKit::cpu_timestamp_ = nullptr; pthread_mutex_t NpKit::npKitLock = PTHREAD_MUTEX_INITIALIZER; std::thread* NpKit::cpu_timestamp_update_thread_ = nullptr; volatile bool NpKit::cpu_timestamp_update_thread_should_stop_ = false; void NpKit::CpuTimestampUpdateThread() { uint64_t init_system_clock = std::chrono::system_clock::now().time_since_epoch().count(); uint64_t init_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count(); uint64_t curr_steady_clock = 0; volatile uint64_t* volatile_cpu_timestamp_ = cpu_timestamp_; thread_bind_cpu(20); while (!cpu_timestamp_update_thread_should_stop_) { //curr_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count(); //*volatile_cpu_timestamp_ = init_system_clock + (curr_steady_clock - init_steady_clock); *volatile_cpu_timestamp_ = get_now_ns(); } } ncclResult_t NpKit::Init(int rank) { uint64_t i = 0; NpKitEventCollectContext ctx; ctx.event_buffer_head = 0; rank_ = rank; // Init event data structures NCCLCHECK(ncclCalloc(&gpu_event_buffers_[rank], kNumGpuEventBuffers)); NCCLCHECK(ncclCudaCalloc(&gpu_collect_contexts_[rank], kNumGpuEventBuffers)); for (i = 0; i < kNumGpuEventBuffers; i++) { NCCLCHECK(ncclCudaCalloc(gpu_event_buffers_[rank] + i, kMaxNumGpuEventsPerBuffer)); ctx.event_buffer = gpu_event_buffers_[rank][i]; NCCLCHECK(ncclCudaMemcpy(gpu_collect_contexts_[rank] + i, &ctx, 1)); } NCCLCHECK(ncclCalloc(&cpu_event_buffers_[rank], kNumCpuEventBuffers)); NCCLCHECK(ncclCalloc(&cpu_collect_contexts_[rank], kNumCpuEventBuffers)); for (i = 0; i < kNumCpuEventBuffers; i++) { NCCLCHECK(ncclCalloc(cpu_event_buffers_[rank] + i, kMaxNumCpuEventsPerBuffer)); ctx.event_buffer = cpu_event_buffers_[rank][i]; cpu_collect_contexts_[rank][i] = ctx; } // Init timestamp pthread_mutex_lock(&npKitLock); if (cpu_timestamp_ == NULL) { ncclResult_t res = ncclCudaHostCalloc(&cpu_timestamp_, 1); if (res != ncclSuccess && res != ncclInProgress) { WARN("Fail to alloc cpu timestamp mem"); } volatile uint64_t* volatile_cpu_timestamp = cpu_timestamp_; //*volatile_cpu_timestamp = std::chrono::system_clock::now().time_since_epoch().count(); *volatile_cpu_timestamp = get_now_ns(); cpu_timestamp_update_thread_should_stop_ = false; cpu_timestamp_update_thread_ = new std::thread(CpuTimestampUpdateThread); } pthread_mutex_unlock(&npKitLock); int wallClkRate = 0; //in kilohertz int deviceId = 0; if (hipGetDevice(&deviceId) != hipSuccess) { WARN("Fail to get deviceId"); } if (hipDeviceGetAttribute(&wallClkRate, hipDeviceAttributeWallClockRate, deviceId) == hipSuccess) { gpu_rtc_rate_khz[rank] = wallClkRate; } else { WARN("Fail to get WallClockRate"); } if (wallClkRate == 0) { hipDeviceProp_t devProp; CUDACHECK(hipGetDeviceProperties(&devProp, 0)); if (devProp.gcnArch/10 == 94) gpu_rtc_rate_khz[rank] = 100000; else gpu_rtc_rate_khz[rank] = 25000; } INFO(NCCL_COLL, "npkit init success rank:%d deviceId:%d wallClkRate:%d", rank, deviceId, gpu_rtc_rate_khz[rank]); return ncclSuccess; } ncclResult_t NpKit::Dump(const std::string& dump_dir, int rank) { uint64_t i = 0; std::string dump_file_path; if (rank < 0 || rank >= RANK_NUM) { WARN("npkit dump invalid rank:%d", rank); return ncclSuccess; } INFO(NCCL_COLL, "npkit dump start rank:%d", rank); // Dump CPU events for (i = 0; i < kNumCpuEventBuffers; i++) { dump_file_path = dump_dir; dump_file_path += "/cpu_events_rank_"; dump_file_path += std::to_string(rank); dump_file_path += "_channel_"; dump_file_path += std::to_string(i); if (cpu_collect_contexts_[rank][i].event_buffer_head == 0) { continue; } auto cpu_trace_file = std::fstream(dump_file_path, std::ios::out | std::ios::binary); cpu_trace_file.write(reinterpret_cast(cpu_event_buffers_[rank][i]), cpu_collect_contexts_[rank][i].event_buffer_head * sizeof(NpKitEvent)); cpu_trace_file.close(); #if NPKIT_DEBUG_FILE // dump cpu event txt file char cpu_buffer[BUFF_SIZE]; std::string debug_cpu_file_path; debug_cpu_file_path = dump_dir; debug_cpu_file_path += "/debug_cpu_events_rank_"; debug_cpu_file_path += std::to_string(rank); debug_cpu_file_path += "_buf_"; debug_cpu_file_path += std::to_string(i); auto debug_cpu_file = std::fstream(debug_cpu_file_path, std::ios::out); for (int j = 0; j < cpu_collect_contexts_[rank][i].event_buffer_head; j++) { memset(cpu_buffer, 0, sizeof(cpu_buffer)); snprintf(cpu_buffer, sizeof(cpu_buffer), "%u %u %u %lu\n", cpu_event_buffers_[rank][i][j].fields.type, cpu_event_buffers_[rank][i][j].fields.size, cpu_event_buffers_[rank][i][j].fields.rsvd, cpu_event_buffers_[rank][i][j].fields.timestamp); debug_cpu_file.write(cpu_buffer, strlen(cpu_buffer)); } debug_cpu_file.close(); #endif } // Dump CPU clock info dump_file_path = dump_dir; dump_file_path += "/cpu_clock_period_num_rank_"; dump_file_path += std::to_string(rank); std::string clock_period_num_str = std::to_string(std::chrono::steady_clock::duration::period::num); auto clock_period_num_file = std::fstream(dump_file_path, std::ios::out); clock_period_num_file.write(clock_period_num_str.c_str(), clock_period_num_str.length()); clock_period_num_file.close(); dump_file_path = dump_dir; dump_file_path += "/cpu_clock_period_den_rank_"; dump_file_path += std::to_string(rank); std::string clock_period_den_str = std::to_string(std::chrono::steady_clock::duration::period::den); auto clock_period_den_file = std::fstream(dump_file_path, std::ios::out); clock_period_den_file.write(clock_period_den_str.c_str(), clock_period_den_str.length()); clock_period_den_file.close(); // Dump GPU events, reuse CPU struct for (i = 0; i < kNumGpuEventBuffers; i++) { dump_file_path = dump_dir; dump_file_path += "/gpu_events_rank_"; dump_file_path += std::to_string(rank); dump_file_path += "_buf_"; dump_file_path += std::to_string(i); NCCLCHECK(ncclCudaMemcpy(cpu_collect_contexts_[rank], gpu_collect_contexts_[rank] + i, 1)); if (cpu_collect_contexts_[rank][0].event_buffer_head == 0) { continue; } NCCLCHECK(ncclCudaMemcpy(cpu_event_buffers_[rank][0], gpu_event_buffers_[rank][i], kMaxNumGpuEventsPerBuffer)); auto gpu_trace_file = std::fstream(dump_file_path, std::ios::out | std::ios::binary); gpu_trace_file.write(reinterpret_cast(cpu_event_buffers_[rank][0]), cpu_collect_contexts_[rank][0].event_buffer_head * sizeof(NpKitEvent)); gpu_trace_file.close(); #if NPKIT_DEBUG_FILE // dump gpu event txt file char gpu_buffer[BUFF_SIZE]; std::string debug_gpu_file_path; debug_gpu_file_path = dump_dir; debug_gpu_file_path += "/debug_gpu_events_rank_"; debug_gpu_file_path += std::to_string(rank); debug_gpu_file_path += "_buf_"; debug_gpu_file_path += std::to_string(i); auto debug_gpu_file = std::fstream(debug_gpu_file_path, std::ios::out); for (int j = 0; j < cpu_collect_contexts_[rank][0].event_buffer_head; j++) { memset(gpu_buffer, 0, sizeof(gpu_buffer)); snprintf(gpu_buffer, sizeof(gpu_buffer), "%u %u %u %lu\n", cpu_event_buffers_[rank][0][j].fields.type, cpu_event_buffers_[rank][0][j].fields.size, cpu_event_buffers_[rank][0][j].fields.rsvd, cpu_event_buffers_[rank][0][j].fields.timestamp); debug_gpu_file.write(gpu_buffer, strlen(gpu_buffer)); } debug_gpu_file.close(); #endif } // Dump GPU clockRate dump_file_path = dump_dir; dump_file_path += "/gpu_clock_rate_rank_"; dump_file_path += std::to_string(rank); std::string clock_rate_str = std::to_string(gpu_rtc_rate_khz[rank]); auto gpu_clock_rate_file = std::fstream(dump_file_path, std::ios::out); gpu_clock_rate_file.write(clock_rate_str.c_str(), clock_rate_str.length()); gpu_clock_rate_file.close(); return ncclSuccess; } ncclResult_t NpKit::Shutdown(int rank) { uint64_t i = 0; INFO(NCCL_COLL, "npkit shutdown stop:%d rank:%d", cpu_timestamp_update_thread_should_stop_, rank); if (rank < 0 || rank >= RANK_NUM) { WARN("npkit shutdown invalid rank:%d", rank); return ncclSuccess; } // Stop CPU timestamp updating thread pthread_mutex_lock(&npKitLock); if (!cpu_timestamp_update_thread_should_stop_) { cpu_timestamp_update_thread_should_stop_ = true; cpu_timestamp_update_thread_->join(); } pthread_mutex_unlock(&npKitLock); // Free CPU event data structures for (i = 0; i < kNumCpuEventBuffers; i++) { free(cpu_event_buffers_[rank][i]); } free(cpu_event_buffers_[rank]); free(cpu_collect_contexts_[rank]); // Free GPU event data structures for (i = 0; i < kNumGpuEventBuffers; i++) { CUDACHECK(hipFree(gpu_event_buffers_[rank][i])); } free(gpu_event_buffers_[rank]); CUDACHECK(hipFree(gpu_collect_contexts_[rank])); // Free timestamp pthread_mutex_lock(&npKitLock); if (cpu_timestamp_) { ncclCudaHostFree(cpu_timestamp_); cpu_timestamp_ = nullptr; } pthread_mutex_unlock(&npKitLock); return ncclSuccess; } NpKitEventCollectContext* NpKit::GetGpuEventCollectContexts(int rank) { return gpu_collect_contexts_[rank]; } void NpKit::CollectCpuEvent(int rank, uint8_t type, int64_t size, uint32_t rsvd, uint64_t timestamp, int channel_id) { uint64_t event_buffer_head = cpu_collect_contexts_[rank][channel_id].event_buffer_head; if (event_buffer_head < kMaxNumCpuEventsPerBuffer) { NpKitEvent& event = cpu_collect_contexts_[rank][channel_id].event_buffer[event_buffer_head]; event.fields.type = type; event.fields.size = size < 0 ? 0 : size; event.fields.rsvd = rsvd; event.fields.timestamp = timestamp; cpu_collect_contexts_[rank][channel_id].event_buffer_head++; TRACE(NCCL_COLL, "npkit cpu event rank:%d type:%d timestamp:%lu buff_head:%lu", rank, type, timestamp, cpu_collect_contexts_[rank][channel_id].event_buffer_head); } } uint64_t* NpKit::GetCpuTimestamp() { return cpu_timestamp_; } uint64_t NpKit::GetCpuTimeNs() { return get_now_ns(); }