#include #include #include "proxy.h" namespace sccl { namespace hardware { namespace topology { namespace bootstrap {} } // namespace topology } // namespace hardware } // namespace sccl // static bool NeedProxy(int type, int pattern, int root, struct scclRing* ring, int nranks) { // if(pattern == scclPatternRing || pattern == scclPatternRingTwice) // return true; // /* In chains, one rank does not need a proxy. Let's figure out which one it is */ // /* Which index in the reorganized rings should we compare root against */ // const int myrank = 0, nextrank = 1, prevrank = nranks - 1; // int index = pattern == scclPatternPipelineFrom ? // /* no recv / no send if root = */ // /* bcast */ (type == proxyRecv ? myrank : nextrank) // : // /* reduce */ (type == proxyRecv ? prevrank : myrank); // int rank = ring->userRanks[index]; // return (root != rank); // } // #define PROXYARGS_ALLOCATE_SIZE SCCL_MAX_OPS // struct scclProxyPool { // struct scclProxyPool* next; // struct scclProxyArgs elems[PROXYARGS_ALLOCATE_SIZE]; // }; // static void expectedProxyResponseFree(struct scclProxyState* state) { // struct scclExpectedProxyResponse* elem = state->expectedResponses; // struct scclExpectedProxyResponse* prev = NULL; // while(elem) { // prev = elem; // elem = elem->next; // free(prev->respBuff); // free(prev); // } // } // static scclResult_t expectedProxyResponseStore(struct scclProxyState* state, void* opId, void* respBuff, int respSize) { // struct scclExpectedProxyResponse* elem = state->expectedResponses; // while(elem) { // if(elem->opId == opId) { // if(respSize != elem->respSize) { // WARN("Mismatched response size for opId=%p", opId); // return scclInternalError; // } // if(elem->done) { // WARN("Storing response for already completed opId=%p", opId); // return scclInternalError; // } // memcpy(elem->respBuff, respBuff, respSize); // free(respBuff); // elem->done = true; // return scclSuccess; // } // elem = elem->next; // } // WARN("Proxy response for opId=%p doesn't match any expected response", opId); // return scclInternalError; // } // static scclResult_t expectedProxyResponseEnqueue(struct scclProxyState* state, void* opId, int respSize) { // struct scclExpectedProxyResponse* ex; // scclCHECK(scclCalloc(&ex, 1)); // ex->opId = opId; // // Pre-alloc response buffer // ex->respBuff = malloc(respSize); // ex->respSize = respSize; // ex->done = false; // // Enqueue // struct scclExpectedProxyResponse* list = state->expectedResponses; // if(list == NULL) { // state->expectedResponses = ex; // return scclSuccess; // } // while(list->next) // list = list->next; // list->next = ex; // return scclSuccess; // } // static scclResult_t expectedProxyResponseDequeue(struct scclProxyState* state, void* opId, void* respBuff, int* found) { // struct scclExpectedProxyResponse* elem = state->expectedResponses; // struct scclExpectedProxyResponse* prev = NULL; // *found = 0; // while(elem) { // if((elem->opId == opId) && elem->done) { // if(prev == NULL) { // state->expectedResponses = elem->next; // } else { // prev->next = elem->next; // } // memcpy(respBuff, elem->respBuff, elem->respSize); // free(elem->respBuff); // free(elem); // *found = 1; // return scclSuccess; // } // prev = elem; // elem = elem->next; // } // return scclSuccess; // } // static scclResult_t expectedProxyResponseRemove(struct scclProxyState* state, void* opId) { // struct scclExpectedProxyResponse* elem = state->expectedResponses; // struct scclExpectedProxyResponse* prev = NULL; // while(elem) { // if(elem->opId == opId) { // if(prev == NULL) { // state->expectedResponses = elem->next; // } else { // prev->next = elem->next; // } // free(elem->respBuff); // free(elem); // return scclSuccess; // } // prev = elem; // elem = elem->next; // } // WARN("Couldn't find opId=%p", opId); // return scclInternalError; // } // static scclResult_t asyncProxyOpEnqueue(struct scclProxyLocalPeer* peer, scclProxyAsyncOp* op) { // scclProxyAsyncOp* list = peer->asyncOps; // if(list == NULL) { // peer->asyncOps = op; // return scclSuccess; // } // while(list->next) // list = list->next; // list->next = op; // return scclSuccess; // } // static scclResult_t asyncProxyOpDequeue(struct scclProxyLocalPeer* peer, scclProxyAsyncOp* op) { // struct scclProxyAsyncOp* elem = peer->asyncOps; // struct scclProxyAsyncOp* prev = NULL; // while(elem) { // if(elem->opId == op->opId) { // if(prev == NULL) { // peer->asyncOps = elem->next; // } else { // prev->next = elem->next; // } // if(elem->reqBuff) { // free(elem->reqBuff); // } // if(elem->respBuff) { // free(elem->respBuff); // } // free(elem); // return scclSuccess; // } // prev = elem; // elem = elem->next; // } // if(op) { // WARN("Attempting to dequeue nonexistent async opId=%p", op->opId); // } else { // WARN("Attempting to dequeue null operation"); // } // return scclInternalError; // } // static scclResult_t allocateArgs(struct scclProxyProgressState* state, struct scclProxyArgs** argsptr) { // struct scclProxyArgs* elem; // if(state->pool == NULL) { // // Allocate a new pool of elements. Make sure we allocate the memory close // // to the network thread // struct scclProxyPool* newPool; // scclCHECK(scclCalloc(&newPool, 1)); // struct scclProxyArgs* newElems = newPool->elems; // // Chain newly allocated elements // for(int i = 0; i < PROXYARGS_ALLOCATE_SIZE; i++) { // if(i + 1 < PROXYARGS_ALLOCATE_SIZE) // newElems[i].next = newElems + i + 1; // } // // Add them all to the pool list // state->pool = newElems; // // Save the pool memory block for later resource release // newPool->next = state->pools; // state->pools = newPool; // } // elem = state->pool; // state->pool = state->pool->next; // elem->next = elem->nextPeer = NULL; // *argsptr = elem; // return scclSuccess; // } // // #define DEBUG_PROXY 1 // #ifdef DEBUG_PROXY // #define DEBUG_PROXY_PRINT printf // #else // #define DEBUG_PROXY_PRINT(...) // #endif // #define OP_INDEX(op) ((op) ? (op) - state->pools->elems : -1) // #define OP_SEEN 0x100000 // scclResult_t getOpIndex(struct scclProxyArgs* op, struct scclProxyProgressState* state, int* poolIndex, int* opIndex) { // struct scclProxyPool* pool = state->pools; // int p = 0; // while(pool) { // uint64_t o = op - pool->elems; // if(o < PROXYARGS_ALLOCATE_SIZE) { // *opIndex = o; // *poolIndex = p; // return scclSuccess; // } // pool = pool->next; // p++; // } // WARN("Could not find pool of op %p", op); // return scclInternalError; // } // scclResult_t printProxyOp(struct scclProxyArgs* op, int poolIndex, int opIndex) { // printf("[%d-%d|%ld| %s", poolIndex, opIndex, op->opCount, op->pattern == scclPatternSend ? "Send" : op->pattern == scclPatternRecv ? "Recv" : "Coll"); // for(int s = 0; s < op->nsubs; s++) { // struct scclProxySubArgs* sub = op->subs + s; // if(op->state == scclProxyOpProgress) { // char status = ' '; // if(op->pattern == scclPatternRecv) { // if(sub->posted < sub->nsteps && sub->posted < sub->done + SCCL_STEPS) // status = 'I'; // Init // else if(sub->received < sub->posted) // status = 'R'; // Receiving // else if(sub->received < sub->transmitted) // status = 'R'; // Receiving // else if(sub->transmitted < sub->received) // status = 'F'; // Flushing // else if(sub->done < sub->transmitted) // status = 'G'; // Waiting on GPU // else // status = 'D'; // Done // } else if(op->pattern == scclPatternSend) { // if(sub->posted < sub->nsteps && sub->posted < sub->done + SCCL_STEPS) // status = 'I'; // Init // else if(sub->transmitted < sub->posted) // status = 'G'; // Waiting on GPU // else if(sub->done < sub->transmitted) // status = 'S'; // Sending // else // status = 'D'; // Done // } // printf(" %d%c/%d", sub->peer, status, sub->channelId); // } else { // printf(" %d/%d", sub->peer, sub->channelId); // } // } // printf("]"); // return scclSuccess; // } // scclResult_t dumpProxyState(struct scclProxyProgressState* state) { // struct scclProxyArgs* op = state->active; // int poolIndex, opIndex; // printf("ACTIVE OPS\n"); // while(op) { // scclCHECK(getOpIndex(op, state, &poolIndex, &opIndex)); // if(op->state & OP_SEEN) { // WARN("List loop at element %d-%d", poolIndex, opIndex); // } // scclCHECK(printProxyOp(op, poolIndex, opIndex)); // op->state |= OP_SEEN; // printf("\n"); // struct scclProxyArgs* nextOp = op->nextPeer; // while(nextOp) { // scclCHECK(getOpIndex(nextOp, state, &poolIndex, &opIndex)); // if(nextOp->state & OP_SEEN) { // WARN("List loop at element %d-%d", poolIndex, opIndex); // } // printf("| `-> "); // scclCHECK(printProxyOp(nextOp, poolIndex, opIndex)); // nextOp->state |= OP_SEEN; // printf("\n"); // if(nextOp->next) { // WARN("Inactive op has next set!"); // } // nextOp = nextOp->nextPeer; // } // if(op->nextPeer == NULL) // printf("|\n"); // op = op->next; // printf("v\n"); // } // printf("[X]\n"); // #if 0 // printf("FREE OPS\n"); // op = state->pool; // while (op) { // scclCHECK(getOpIndex(op, state, &poolIndex, &opIndex)); // if (op->state & OP_SEEN) { // WARN("List loop at element %d-%d", poolIndex, opIndex); // } // scclCHECK(printProxyOp(op, poolIndex, opIndex)); // op->state |= OP_SEEN; // printf("->"); // op = op->next; // } // printf("[X]\n"); // #else // op = state->pool; // while(op) { // scclCHECK(getOpIndex(op, state, &poolIndex, &opIndex)); // if(op->state & OP_SEEN) { // WARN("List loop at element %d-%d", poolIndex, opIndex); // } // op->state |= OP_SEEN; // op = op->next; // } // #endif // struct scclProxyPool* pool = state->pools; // poolIndex = 0; // while(pool) { // struct scclProxyArgs* elem = pool->elems; // for(int e = 0; e < PROXYARGS_ALLOCATE_SIZE; e++, elem++) { // if((elem->state & OP_SEEN) == 0) { // printf("Elem %d-%d is not in any list:\n", poolIndex, e); // scclCHECK(printProxyOp(elem, poolIndex, e)); // printf("\n"); // } else { // elem->state -= OP_SEEN; // } // } // pool = pool->next; // poolIndex++; // } // return scclSuccess; // } // static scclResult_t scclProxyOpToArgs(struct scclProxyOp* op, struct scclProxyArgs* args, int subIndex) { // struct scclProxySubArgs* sub = args->subs + subIndex; // if(subIndex >= SCCL_PROXY_MAX_SUBS) { // WARN("Proxy append out of bounds"); // return scclInternalError; // } // // memset(sub, 0, sizeof(struct scclProxySubArgs)); // sub->connection = op->connection; // sub->channelId = op->channelId; // sub->nsteps = op->nsteps; // sub->nbytes = op->nbytes; // sub->peer = op->root; // args->nsubs = subIndex + 1; // if(subIndex) { // if((args->sliceSteps != op->sliceSteps) || (args->chunkSteps != op->chunkSteps) || (args->protocol != op->protocol) || (args->dtype != op->dtype) || // (args->redOp != op->redOp)) { // WARN("Proxy append mismatch"); // return scclInternalError; // } // if(args->state != scclProxyOpReady) { // WARN("Proxy append on running operation"); // return scclInternalError; // } // return scclSuccess; // } // // memset(&args->progress, 0, sizeof(struct scclProxyArgs)-offsetof(struct scclProxyArgs, progress)); // args->done = 0; // args->opCount = op->opCount; // args->sliceSteps = op->sliceSteps; // args->chunkSteps = op->chunkSteps; // args->chunkSize = op->chunkSize; // args->dtype = op->dtype; // args->redOp = op->redOp; // args->pattern = op->pattern; // args->protocol = op->protocol; // args->state = scclProxyOpReady; // args->progress = op->connection->tcomm->proxyProgress; // args->proxyAppendPtr = op->connection->proxyAppendPtr; // return scclSuccess; // } // static scclResult_t ProxyAppend(struct scclProxyProgressState* state, struct scclProxyOp* op) { // struct scclProxyConnection* connection = op->connection; // int shared = connection->shared; // struct scclProxyArgs* args = *connection->proxyAppendPtr; // if(args) { // if(shared && args->opCount == op->opCount) { // scclCHECK(scclProxyOpToArgs(op, args, args->nsubs)); // DEBUG_PROXY_PRINT("Insert (%d/%5ld/%5ld) as group with %5ld\n", shared, args->opCount, op->opCount, OP_INDEX(args)); // } else { // struct scclProxyArgs* prevArgs = args; // scclCHECK(allocateArgs(state, &args)); // scclCHECK(scclProxyOpToArgs(op, args, 0)); // prevArgs->nextPeer = args; // DEBUG_PROXY_PRINT( // "Insert %5ld (%d/%5ld/%5ld) as nextPeer of %5ld\n", OP_INDEX(args), shared, prevArgs->opCount, args->opCount, OP_INDEX(prevArgs)); // *(args->proxyAppendPtr) = args; // } // } else { // // Nothing running for that peer. Add to the list // scclCHECK(allocateArgs(state, &args)); // scclCHECK(scclProxyOpToArgs(op, args, 0)); // if(state->active == NULL) { // // Create the list // DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld) as first element\n", OP_INDEX(args), shared, args->opCount); // state->active = args; // } else { // // Append element at the end of the list // struct scclProxyArgs* last = state->active; // while(last->next) // last = last->next; // last->next = args; // DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld) as last element\n", OP_INDEX(args), shared, args->opCount); // } // *(args->proxyAppendPtr) = args; // } // return scclSuccess; // } // scclResult_t scclProxyPost(struct scclProxyOpsPool* pool, int nextOps, int nextOpsEnd) { // pthread_mutex_lock(&pool->mutex); // if(pool->nextOps == -1) { // pool->nextOps = nextOps; // pthread_cond_signal(&pool->cond); // } else { // pool->ops[pool->nextOpsEnd].next = nextOps; // } // pool->nextOpsEnd = nextOpsEnd; // pthread_mutex_unlock(&pool->mutex); // return scclSuccess; // } // static scclResult_t scclLocalOpAppend(struct scclComm* comm, struct scclProxyConnector* proxyConn, struct scclProxyOp* proxyOp) { // int tpLocalRank = comm->topParentLocalRanks[comm->localRank]; // struct scclProxyOps* proxyOps = comm->proxyState->proxyOps; // if(proxyOps == NULL) // return scclInternalError; // proxyOps += proxyConn->tpLocalRank; // struct scclProxyOpsPool* pool = proxyOps->pool; // TIME_START(0); // int opIndex = proxyOps->freeOp; // struct scclProxyOp* op; // if(opIndex != -1) { // op = pool->ops + opIndex; // proxyOps->freeOp = op->next; // } else { // int freeOp; // while((freeOp = pool->freeOps[tpLocalRank]) == -1) // sched_yield(); // int freeOpNew; // while((freeOpNew = __sync_val_compare_and_swap(pool->freeOps + tpLocalRank, freeOp, -1)) != freeOp) // freeOp = freeOpNew; // opIndex = freeOp; // op = pool->ops + opIndex; // proxyOps->freeOp = op->next; // } // if(op->next != -1) // __builtin_prefetch(pool->ops + op->next); // Prefetch next free op // memcpy(op, proxyOp, sizeof(struct scclProxyOp)); // op->next = -1; // op->connection = proxyConn->connection; // if(proxyOps->nextOps == -1) { // proxyOps->nextOps = proxyOps->nextOpsEnd = opIndex; // } else { // pool->ops[proxyOps->nextOpsEnd].next = opIndex; // proxyOps->nextOpsEnd = opIndex; // } // if(++proxyOps->count == MAX_OPS_PER_PEER) { // // Post what we have so far to free some ops in the pool // // Do not post last operations as we could have more coming with the same opCount, and posting // // them in different batches would break proxyArgs aggregation with subs. // uint64_t lastOpCount = pool->ops[proxyOps->nextOpsEnd].opCount; // int lastOp = -1; // int toSend = 0; // int ops = 0; // for(int op = proxyOps->nextOps; op != proxyOps->nextOpsEnd; op = pool->ops[op].next) { // ops++; // if(pool->ops[op].opCount != lastOpCount) { // lastOp = op; // toSend = ops; // } // } // if(lastOp == -1) { // WARN("Unable to post incomplete proxy op chain %d..%d (opCount %ld)", proxyOps->nextOps, proxyOps->nextOpsEnd, lastOpCount); // return scclInternalError; // } // // Cut chain at lastOp // int nextOps = proxyOps->nextOps; // proxyOps->nextOps = pool->ops[lastOp].next; // pool->ops[lastOp].next = -1; // scclCHECK(scclProxyPost(proxyOps->pool, nextOps, lastOp)); // proxyOps->count -= toSend; // } // TIME_STOP(0); // return scclSuccess; // } // static scclResult_t // SaveProxy(struct scclComm* comm, struct scclChannel* channel, int type, int peer, struct scclProxyOp* op, int connIndex, bool* justInquire) { // if(peer < 0) // return scclSuccess; // struct scclChannelPeer* peerComm = channel->peers[peer]; // struct scclConnector* connector = type == proxyRecv ? peerComm->recv + connIndex : peerComm->send + connIndex; // if(connector->transportComm == NULL) { // WARN("Rank %d has no transport for %s peer %d on channel %d/%d", comm->rank, type == proxyRecv ? "recv" : "send", peer, channel->id, connIndex); // return scclInternalError; // } // if(connector->transportComm->proxyProgress == NULL) // return scclSuccess; // if(justInquire) // *justInquire = true; // else { // scclCHECK(scclLocalOpAppend(comm, &connector->proxyConn, op)); // } // return scclSuccess; // } // scclResult_t mscclSaveProxy(struct scclComm* comm, struct scclChannel* channel, int type, int peer, struct scclProxyOp* op, int connIndex) { // scclCHECK(SaveProxy(comm, channel, type, peer, op, connIndex, nullptr)); // return scclSuccess; // } // // justInquire != nullptr means don't actually do anything, just assertain need of // // scclProxySaveOp for this op. // scclResult_t scclProxySaveOp(struct scclComm* comm, struct scclProxyOp* op, bool* justInquire) { // struct scclChannel* channel = &comm->channels[op->channelId]; // if(justInquire) // *justInquire = false; // switch(op->pattern) { // case scclPatternRing: // case scclPatternRingTwice: // case scclPatternPipelineFrom: // case scclPatternPipelineTo: { // struct scclRing* ring = &channel->ring; // if(NeedProxy(proxyRecv, op->pattern, op->root, ring, comm->nRanks)) { // scclCHECK(SaveProxy(comm, channel, proxyRecv, ring->prev, op, op->connIndex, justInquire)); // } // if(NeedProxy(proxySend, op->pattern, op->root, ring, comm->nRanks)) { // scclCHECK(SaveProxy(comm, channel, proxySend, ring->next, op, op->connIndex, justInquire)); // } // } break; // case scclPatternTreeUp: // case scclPatternTreeDown: // case scclPatternTreeUpDown: { // if(op->pattern != scclPatternTreeDown) { // Tree up // struct scclTree* tree = &channel->tree; // for(int i = 0; i < SCCL_MAX_TREE_ARITY; i++) { // scclCHECK(SaveProxy(comm, channel, proxyRecv, tree->down[i], op, 0, justInquire)); // } // scclCHECK(SaveProxy(comm, channel, proxySend, tree->up, op, 0, justInquire)); // } // if(op->pattern != scclPatternTreeUp) { // Tree down // struct scclTree* tree = &channel->tree; // for(int i = 0; i < SCCL_MAX_TREE_ARITY; i++) { // scclCHECK(SaveProxy(comm, channel, proxySend, tree->down[i], op, 0, justInquire)); // } // scclCHECK(SaveProxy(comm, channel, proxyRecv, tree->up, op, 0, justInquire)); // } // } break; // case scclPatternCollnetChain: { // scclCHECK(SaveProxy(comm, channel, proxySend, channel->collnetChain.up, op, 1, justInquire)); // scclCHECK(SaveProxy(comm, channel, proxyRecv, channel->collnetChain.up, op, 0, justInquire)); // } break; // case scclPatternCollnetDirect: { // scclCHECK(SaveProxy(comm, channel, proxySend, channel->collnetDirect.out, op, 1, justInquire)); // scclCHECK(SaveProxy(comm, channel, proxyRecv, channel->collnetDirect.out, op, 0, justInquire)); // } break; // case scclPatternNvls: { // scclCHECK(SaveProxy(comm, channel, proxySend, channel->nvls.out, op, 1, justInquire)); // scclCHECK(SaveProxy(comm, channel, proxyRecv, channel->nvls.out, op, 0, justInquire)); // } break; // case scclPatternNvlsTree: { // scclCHECK(SaveProxy(comm, channel, proxyRecv, channel->nvls.treeDown[1], op, 0, justInquire)); // scclCHECK(SaveProxy(comm, channel, proxyRecv, channel->nvls.treeDown[2], op, 0, justInquire)); // scclCHECK(SaveProxy(comm, channel, proxySend, channel->nvls.treeUp, op, 0, justInquire)); // scclCHECK(SaveProxy(comm, channel, proxySend, channel->nvls.treeDown[1], op, 0, justInquire)); // scclCHECK(SaveProxy(comm, channel, proxySend, channel->nvls.treeDown[2], op, 0, justInquire)); // scclCHECK(SaveProxy(comm, channel, proxyRecv, channel->nvls.treeUp, op, 0, justInquire)); // } break; // case scclPatternSend: // case scclPatternRecv: { // if(op->root == comm->rank) // return scclSuccess; // scclCHECK(SaveProxy(comm, channel, op->pattern == scclPatternSend ? proxySend : proxyRecv, op->root, op, op->connIndex, justInquire)); // } break; // } // return scclSuccess; // } // SCCL_PARAM(ChunkSize, "CHUNK_SIZE", 0); // scclResult_t scclProxyComputeP2p(struct scclInfo* info, struct scclProxyOp* op) { // memset(op, 0, sizeof(struct scclProxyOp)); // int channelId = info->channelId; // struct scclChannel* channel = info->comm->channels + channelId; // op->channelId = channelId; // op->sliceSteps = P2P_SLICESTEPS; // op->chunkSteps = P2P_CHUNKSTEPS; // op->dtype = info->datatype; // op->protocol = info->protocol; // int stepSize = info->comm->buffSizes[op->protocol] / SCCL_STEPS; // if(op->protocol == SCCL_PROTO_SIMPLE) // stepSize = info->comm->p2pChunkSize; // #ifdef HCU_SDMA_FEATURE // info->chunkSize = info->comm->p2pRealChunkSize; // #else // info->chunkSize = stepSize; // #endif // op->root = info->root; // struct scclChannelPeer* peer = channel->peers[op->root]; // if(info->coll == scclFuncSend) { // op->pattern = scclPatternSend; // if(op->root != info->comm->rank && peer->send[1].transportComm == &netTransport.send) { // // Tune chunk size for the network // if(info->count < stepSize) // info->chunkSize /= 4; // else if(info->count < 8 * stepSize) // info->chunkSize /= 2; // } // } else if(info->coll == scclFuncRecv) { // op->pattern = scclPatternRecv; // if(op->root != info->comm->rank && peer->recv[1].transportComm == &netTransport.recv) { // // Tune chunk size for the network // if(info->count < stepSize) // info->chunkSize /= 4; // else if(info->count < 8 * stepSize) // info->chunkSize /= 2; // } // } else { // WARN("P2p operation is neither send or recv"); // return scclInternalError; // } // if(scclParamChunkSize() != 0) { // info->chunkSize = scclParamChunkSize(); // } // op->chunkSize = info->chunkSize; // // Compute nSteps for proxies // int chunkEffectiveSize = op->chunkSize; // if(op->protocol == SCCL_PROTO_LL) { // chunkEffectiveSize /= 2; // } // op->nbytes = stepSize; // op->nsteps = DIVUP(info->count, chunkEffectiveSize); // if(op->nsteps == 0) // op->nsteps = 1; // return scclSuccess; // } // static scclResult_t removeOp(struct scclProxyProgressState* state, struct scclProxyArgs** opPtr, struct scclProxyArgs** prevOpPtr) { // struct scclProxyArgs* freeOp = *opPtr; // struct scclProxyArgs* next = freeOp->next; // DEBUG_PROXY_PRINT("Remove %ld -> %ld -> %ld\n", OP_INDEX(*prevOpPtr), OP_INDEX(freeOp), OP_INDEX(next)); // *opPtr = next; // if(freeOp->nextPeer) { // // replace op by nextPeer // struct scclProxyArgs* nextPeer = freeOp->nextPeer; // if(*prevOpPtr) { // (*prevOpPtr)->next = nextPeer; // } else { // state->active = nextPeer; // } // nextPeer->next = next; // *(prevOpPtr) = nextPeer; // } else { // *(freeOp->proxyAppendPtr) = NULL; // if(*prevOpPtr) { // (*prevOpPtr)->next = next; // } else { // state->active = next; // } // } // freeOp->next = state->pool; // state->pool = freeOp; // DEBUG_PROXY_PRINT("Removed %5ld (%5ld) : ", OP_INDEX(freeOp), OP_INDEX(*freeOp->proxyAppendPtr)); // #ifdef DEBUG_PROXY // scclCHECK(dumpProxyState(state)); // #endif // return scclSuccess; // } // static scclResult_t progressOps(struct scclProxyState* proxyState, struct scclProxyProgressState* state, struct scclProxyArgs* opStart, int* idle) { // struct scclProxyArgs* prevOp = NULL; // struct scclProxyArgs* op = opStart; // while(op) { // if(op->state == scclProxyOpNone) // return scclInternalError; // TIME_START(0); // TIME_START(1); // scclCHECK(op->progress(proxyState, op)); // if(op->idle) { // TIME_STOP(1); // TIME_CANCEL(0); // } else { // TIME_CANCEL(1); // TIME_STOP(0); // } // *idle &= op->idle; // if(op->state == scclProxyOpNone) { // TIME_START(2); // scclCHECK(removeOp(state, &op, &prevOp)); // TIME_STOP(2); // } else { // prevOp = op; // op = op->next; // } // } // return scclSuccess; // } // SCCL_PARAM(ProxyAppendBatchSize, "PROXY_APPEND_BATCH_SIZE", 16); // static scclResult_t scclProxyGetPostedOps(struct scclProxyState* proxyState, int* added) { // struct scclProxyProgressState* state = &proxyState->progressState; // if(state->opsPool == NULL) // return scclInternalError; // struct scclProxyOpsPool* pool = state->opsPool; // struct scclProxyArgs profArgs; // Only used for profiling purposes // if(state->nextOps != -1) // goto process_nextops; // // If we have ops to progress, no need to block waiting for something to arrive or even wait for the lock // // to be available. Exit, continue progress, and come back later. // if(state->active != NULL && (pool->nextOps == -1 || pthread_mutex_trylock(&pool->mutex) != 0)) // return scclSuccess; // if(state->active == NULL) { // pthread_mutex_lock(&pool->mutex); // while(pool->nextOps == -1 && !state->stop) { // struct scclProxyArgs profArgs; // Only used for profiling purposes // scclProfilingRecord(&profArgs, 0, 0, scclProxyProfileSleep); // pthread_cond_wait(&pool->cond, &pool->mutex); // scclProfilingRecord(&profArgs, 0, 0, scclProxyProfileWakeup); // } // if(state->stop) { // We might have been woken up to stop. // pthread_mutex_unlock(&pool->mutex); // return scclSuccess; // } // } // state->nextOps = pool->nextOps; // pool->nextOps = pool->nextOpsEnd = -1; // pthread_mutex_unlock(&pool->mutex); // if(state->nextOps == -1) // return scclInternalError; // process_nextops: // scclProfilingRecord(&profArgs, 0, 0, scclProxyProfileAppend); // TIME_START(2); // int freeOp[SCCL_MAX_LOCAL_RANKS]; // int freeOpEnd[SCCL_MAX_LOCAL_RANKS]; // for(int i = 0; i < proxyState->tpLocalnRanks; i++) // freeOp[i] = -1; // uint64_t lastOpCount = 0; // int lastPeer = -1; // int count = 0; // for(int opIndex = state->nextOps; opIndex != -1;) { // struct scclProxyOp* peerOp = pool->ops + opIndex; // int peer = opIndex / MAX_OPS_PER_PEER; // if((lastOpCount && peerOp->opCount != lastOpCount) || ((lastPeer != -1) && peer != lastPeer)) // count++; // if(count == scclParamProxyAppendBatchSize() + 1) // break; // lastOpCount = peerOp->opCount; // lastPeer = peer; // if(peerOp->connection == NULL) // return scclInternalError; // if(peerOp->next != -1) // __builtin_prefetch(pool->ops + peerOp->next); // scclCHECK(ProxyAppend(state, peerOp)); // (*added)++; // int lastOpIndex = opIndex; // opIndex = peerOp->next; // // Return op to peer pool // if(freeOp[peer] == -1) { // freeOpEnd[peer] = lastOpIndex; // } else { // peerOp->next = freeOp[peer]; // } // freeOp[peer] = lastOpIndex; // state->nextOps = opIndex; // } // for(int i = 0; i < proxyState->tpLocalnRanks; i++) { // if(freeOp[i] == -1) // continue; // int newFree = freeOp[i]; // int oldFree = pool->freeOps[i]; // pool->ops[freeOpEnd[i]].next = oldFree; // if(oldFree == -1) { // // Nothing for the main thread to consume, we can set it. // pool->freeOps[i] = newFree; // } else { // // The main thread may recycle free ops at any time, replace the freeOps value atomically and check it worked. // int swap = __sync_val_compare_and_swap(pool->freeOps + i, oldFree, newFree); // if(swap != oldFree) { // if(swap != -1) // return scclInternalError; // // Ops were recycled while we were trying to swap, just set the value directly now. // pool->ops[freeOpEnd[i]].next = -1; // pool->freeOps[i] = newFree; // } // } // } // profArgs.opCount = *added; // scclProfilingRecord(&profArgs, 0, 0, scclProxyProfileAppendEnd); // TIME_STOP(2); // return scclSuccess; // } // #include // static scclProxyProgressState* scclLastProxyState; // void scclDumpProxyState(int signal) { dumpProxyState(scclLastProxyState); } // SCCL_PARAM(CreateThreadContext, "CREATE_THREAD_CONTEXT", 0); // static int setProxyThreadContext(struct scclProxyState* proxyState) { // #if CUDART_VERSION >= 11030 // static int createThreadContext = -1; // if(createThreadContext == -1) { // createThreadContext = scclParamCreateThreadContext(); // if(createThreadContext) { // if(CUPFN(cuCtxCreate) == nullptr || CUPFN(cuCtxDestroy) == nullptr || CUPFN(cuCtxSetCurrent) == nullptr) { // WARN("Unable to create thread context due to old driver, disabling."); // createThreadContext = 0; // } // } // } // if(createThreadContext) { // if(proxyState->cudaCtx == NULL) { // if(CUPFN(cuCtxCreate(&proxyState->cudaCtx, CU_CTX_SCHED_SPIN | CU_CTX_MAP_HOST, proxyState->cudaDev)) != CUDA_SUCCESS) { // WARN("Failed to create CUDA context on device %d", proxyState->cudaDev); // createThreadContext = 0; // } // } else { // if(CUPFN(cuCtxSetCurrent(proxyState->cudaCtx)) != CUDA_SUCCESS) { // WARN("Failed to set CUDA context on device %d", proxyState->cudaDev); // return 0; // } // return 1; // } // } // #endif // return 0; // } // // Set to SIGUSR1 or SIGUSR2 to help debug proxy state during hangs // SCCL_PARAM(ProxyDumpSignal, "PROXY_DUMP_SIGNAL", -1); // SCCL_PARAM(ProgressAppendOpFreq, "PROGRESS_APPENDOP_FREQ", 8); // void* scclProxyProgress(void* proxyState_) { // struct scclProxyState* proxyState = (struct scclProxyState*)proxyState_; // if(setProxyThreadContext(proxyState)) { // INFO(SCCL_INIT, "[Proxy Progress] Created CUDA context on device %d", proxyState->cudaDev); // } else if(cudaSetDevice(proxyState->cudaDev) != cudaSuccess) { // WARN("[Proxy Progress] Failed to set CUDA device %d", proxyState->cudaDev); // } // // if (CPU_COUNT(&comm->cpuAffinity)) sched_setaffinity(0, sizeof(cpu_set_t), &comm->cpuAffinity); // struct scclProxyProgressState* state = &proxyState->progressState; // state->nextOps = -1; // const int sig = scclParamProxyDumpSignal(); // if(sig != -1) // signal(sig, scclDumpProxyState); // scclLastProxyState = state; // char threadName[SCCL_THREAD_NAMELEN]; // snprintf(threadName, SCCL_THREAD_NAMELEN, "sccl Progress%2d", proxyState->cudaDev); // nvtxNameOsThreadA(syscall(SYS_gettid), threadName); // int lastIdle = 0; // /* Too frequent call of scclProxyGetPostedOps() will result in perf regression for small message // * communication. proxyOpAppendCounter is a counter that helps us decide if we need to append proxy ops. // * After each progress, proxyOpAppendCounter will increase by 1 and compare with environment variable // * scclParamProgressAppendOpFreq(). If they are equal, we will append proxy ops. This will decrease the // * frequency of calling scclProxyGetPostedOps() and reduce the perf impact. */ // int proxyOpAppendCounter = 0; // struct scclProxyArgs profArgs; // Only used for profiling purposes // while((state->stop == false || (state->stop == true && state->active)) && *proxyState->abortFlag == 0) { // int idle = 1; // scclResult_t ret = progressOps(proxyState, state, state->active, &idle); // if(ret != scclSuccess) { // INFO(SCCL_ALL, "%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret); // return NULL; // } // if(lastIdle == 0 && idle == 1) // scclProfilingRecord(&profArgs, 0, 0, scclProxyProfileIdle); // if(lastIdle == 1 && idle == 0) // scclProfilingRecord(&profArgs, 0, 0, scclProxyProfileActive); // if(idle || (++proxyOpAppendCounter == scclParamProgressAppendOpFreq())) { // int added = 0; // proxyOpAppendCounter = 0; // TIME_START(3); // if(state->stop == false) // ret = scclProxyGetPostedOps(proxyState, &added); // if(added) { // TIME_STOP(3); // } else { // TIME_CANCEL(3); // } // if(ret != scclSuccess) { // INFO(SCCL_ALL, "%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret); // } // if(added == 0) { // sched_yield(); // No request progressed. Let others run. // } // } // lastIdle = idle; // } // return NULL; // } // scclResult_t scclProxyStart(struct scclComm* comm) { // struct scclProxyOps* proxyOps = comm->proxyState->proxyOps; // if(proxyOps == NULL) // return scclSuccess; // TIME_START(1); // for(int r = 0; r < comm->sharedRes->tpNLocalRanks; r++) { // struct scclProxyOps* ops = proxyOps + r; // if(ops->pool == NULL || ops->nextOps == -1) // continue; // scclCHECK(scclProxyPost(ops->pool, ops->nextOps, ops->nextOpsEnd)); // ops->nextOps = ops->nextOpsEnd = -1; // ops->count = 0; // } // comm->opCount++; // TIME_STOP(1); // return scclSuccess; // } // static scclResult_t scclProxyProgressCreate(struct scclProxyState* proxyState) { // struct scclProxyProgressState* state = &proxyState->progressState; // if(!state->thread) { // pthread_create(&state->thread, NULL, scclProxyProgress, proxyState); // scclSetThreadName(state->thread, "sccl Progress%2d", proxyState->tpLocalnRanks); // } // return scclSuccess; // } // scclResult_t scclProxyProgressDestroy(struct scclProxyState* proxyState) { // struct scclProxyProgressState* state = &proxyState->progressState; // // Request the proxy to stop and then wake it // if(state->opsPool) { // pthread_mutex_lock(&state->opsPool->mutex); // state->stop = true; // pthread_cond_signal(&state->opsPool->cond); // pthread_mutex_unlock(&state->opsPool->mutex); // pthread_join(state->thread, NULL); // } // // Free off any memory allocated for the proxy arg pools // while(state->pools != NULL) { // struct scclProxyPool* next = state->pools->next; // free(state->pools); // state->pools = next; // } // scclProfilingDump(); // TIME_PRINT("Proxy"); // return scclSuccess; // } // #define SCCL_PROXY_CONN_POOL_SIZE_POW2 7 // #define SCCL_PROXY_CONN_POOL_SIZE (1 << (SCCL_PROXY_CONN_POOL_SIZE_POW2)) // #define SCCL_PROXY_CONN_POOL_MASK ((SCCL_PROXY_CONN_POOL_SIZE) - 1) // struct scclProxyConnectionPool { // struct scclProxyConnection** pools; // int banks; // int offset; // }; // static scclResult_t scclProxyNewConnection(struct scclProxyConnectionPool* pool, int* id) { // if(pool->offset == SCCL_PROXY_CONN_POOL_SIZE) { // scclCHECK(scclRealloc(&pool->pools, pool->banks, pool->banks + 1)); // scclCHECK(scclCalloc(pool->pools + pool->banks, SCCL_PROXY_CONN_POOL_SIZE)); // pool->banks++; // pool->offset = 0; // } // *id = ((pool->banks - 1) << SCCL_PROXY_CONN_POOL_SIZE_POW2) + pool->offset; // pool->offset++; // return scclSuccess; // } // static scclResult_t scclProxyGetConnection(struct scclProxyConnectionPool* pool, int id, struct scclProxyConnection** conn) { // int bank = id >> SCCL_PROXY_CONN_POOL_SIZE_POW2; // int offset = id & SCCL_PROXY_CONN_POOL_MASK; // if((pool->pools == NULL) || (bank > pool->banks) || (pool->pools[bank] == NULL)) // return scclInternalError; // *conn = pool->pools[bank] + offset; // return scclSuccess; // } // static scclResult_t proxyFree(struct scclProxyConnection* connection, struct scclProxyState* proxyState) { // if(connection->send) { // if(scclTransports[connection->transport]->send.proxyFree) { // scclCHECK(scclTransports[connection->transport]->send.proxyFree(connection, proxyState)); // } // } else { // if(scclTransports[connection->transport]->recv.proxyFree) { // scclCHECK(scclTransports[connection->transport]->recv.proxyFree(connection, proxyState)); // } // } // return scclSuccess; // } // static scclResult_t scclProxyFreeConnections(struct scclProxyConnectionPool* pool, struct scclProxyState* proxyState) { // for(int b = 0; b < pool->banks; b++) { // int max = b == pool->banks - 1 ? pool->offset : SCCL_PROXY_CONN_POOL_SIZE; // for(int i = 0; i < max; i++) { // scclProxyConnection* connection = pool->pools[b] + i; // if(connection->state != connUninitialized) { // scclCHECK(proxyFree(connection, proxyState)); // } // } // free(pool->pools[b]); // } // free(pool->pools); // return scclSuccess; // } // #include "transport.h" // struct scclProxyInitReq { // int transport; // int send; // int tpLocalRank; // int tpRank; // int sameProcess; // }; // struct scclProxyInitResp { // scclProxyConnection* connection; // char devShmPath[6]; // "XXXXXX" - May or may not be set // }; // scclResult_t scclProxyConnect(struct scclComm* comm, int transport, int send, int tpProxyRank, struct scclProxyConnector* proxyConn) { // struct scclSocket* sock; // int ready, proxyRank = -1; // struct scclProxyState* sharedProxyState = comm->proxyState; // // Keep one connection per mlocal rank // for(int i = 0; i < comm->localRanks; ++i) { // /* find the proxy rank in comm. */ // if(comm->topParentRanks[comm->localRankToRank[i]] == tpProxyRank) { // proxyRank = comm->localRankToRank[i]; // break; // } // } // proxyConn->sameProcess = comm->peerInfo[proxyRank].pidHash == comm->peerInfo[comm->rank].pidHash ? 1 : 0; // // Keep one connection per local rank // proxyConn->connection = NULL; // proxyConn->tpRank = tpProxyRank; // if(sharedProxyState->peerSocks == NULL) { // scclCHECK(scclCalloc(&sharedProxyState->peerSocks, comm->sharedRes->tpNLocalRanks)); // scclCHECK(scclCalloc(&sharedProxyState->proxyOps, comm->sharedRes->tpNLocalRanks)); // scclCHECK(scclCalloc(&sharedProxyState->sharedDevMems, comm->sharedRes->tpNLocalRanks)); // for(int i = 0; i < comm->sharedRes->tpNLocalRanks; ++i) { // scclCHECK(scclSocketSetFd(-1, &sharedProxyState->peerSocks[i])); // } // } // proxyConn->tpLocalRank = comm->sharedRes->tpRankToLocalRank[proxyConn->tpRank]; // sock = sharedProxyState->peerSocks + proxyConn->tpLocalRank; // scclCHECK(scclSocketReady(sock, &ready)); // if(!ready) { // scclCHECK(scclSocketInit(sock, sharedProxyState->peerAddresses + proxyConn->tpRank, comm->sharedRes->magic, scclSocketTypeProxy, comm->abortFlag)); // scclCHECK(scclSocketConnect(sock)); // } // struct scclProxyInitReq req = {0}; // req.transport = transport; // req.send = send; // req.tpLocalRank = comm->topParentLocalRanks[comm->localRank]; // req.tpRank = comm->topParentRanks[comm->rank]; // req.sameProcess = proxyConn->sameProcess; // struct scclProxyInitResp resp = {0}; // // This usually sends proxyConn->connection to identify which connection this is. // // However, this is part of the response and therefore is ignored // scclCHECK(scclProxyCallBlocking(comm, proxyConn, scclProxyMsgInit, &req, sizeof(req), &resp, sizeof(resp))); // proxyConn->connection = resp.connection; // // If we need proxy progress, map progress ops // struct scclTransportComm* tcomm = send ? &scclTransports[transport]->send : &scclTransports[transport]->recv; // if(tcomm->proxyProgress) { // char poolPath[] = "/dev/shm/sccl-XXXXXX"; // strncpy(poolPath + sizeof("/dev/shm/sccl-") - 1, resp.devShmPath, sizeof("XXXXXX") - 1); // struct scclProxyOps* proxyOps = sharedProxyState->proxyOps + proxyConn->tpLocalRank; // if(proxyOps->pool == NULL) { // scclCHECK(scclShmOpen(poolPath, sizeof(struct scclProxyOpsPool), (void**)(&proxyOps->pool), NULL, 0, &proxyOps->handle)); // proxyOps->nextOps = proxyOps->nextOpsEnd = proxyOps->freeOp = -1; // } // } // INFO(SCCL_NET | SCCL_PROXY, "Connection to proxy localRank %d -> connection %p", proxyConn->tpLocalRank, proxyConn->connection); // return scclSuccess; // } // // cuMem API support // // The response is sent out-of-band using scclIpcSocket for this specific command // /** // * 通过代理连接将文件描述符转换为跨进程可用的描述符 // * // * @param comm sccl通信器 // * @param proxyConn 代理连接器 // * @param fd 待转换的文件描述符 // * @param convertedFd 输出参数,存储转换后的文件描述符 // * @return 操作结果(scclSuccess表示成功) // * // * 该函数会阻塞直到转换完成或失败。首先创建UDS socket接收转换后的fd, // * 然后通过代理请求转换,最后轮询代理响应直到操作完成。 // * 出错时会关闭socket并返回错误信息。 // */ // scclResult_t scclProxyClientConvertFdBlocking(struct scclComm* comm, struct scclProxyConnector* proxyConn, int fd, int* convertedFd) { // scclResult_t ret = scclSuccess; // scclResult_t res = scclInProgress; // struct scclIpcSocket ipcSock = {0}; // void* opId = malloc(1); // // Create a UDS socket to receive the converted fd // scclCHECK(scclIpcSocketInit(&ipcSock, comm->topParentLocalRanks[comm->localRank], (uint64_t)opId, comm->abortFlag)); // // Request the conversion of the fd over sockets // scclCHECKGOTO(scclProxyCallAsync(comm, proxyConn, scclProxyMsgConvertFd, &fd, sizeof(int), 0, opId), ret, error); // // Receive converted fd over UDS // scclCHECK(scclIpcSocketRecvFd(&ipcSock, convertedFd)); // TRACE(SCCL_PROXY, "UDS: ConvertFd rank %d returned %p %d", proxyConn->tpLocalRank, convertedFd, *convertedFd); // scclCHECK(scclIpcSocketClose(&ipcSock)); // while(res == scclInProgress) { // res = scclPollProxyResponse(comm, proxyConn, NULL, opId); // } // free(opId); // return res; // error: // scclCHECK(scclIpcSocketClose(&ipcSock)); // WARN("scclProxyClientConvertFd call to top parent rank %d failed", proxyConn->tpRank); // return ret; // } // const char* scclProxyMsgTypeStr[] = {"Unknown", "Init", "SharedInit", "Setup", "Connect", "Start", "Close", "Abort", "Stop", "ConvertFd"}; // scclResult_t scclProxyCallAsync(struct scclComm* comm, struct scclProxyConnector* proxyConn, int type, void* reqBuff, int reqSize, int respSize, void* opId) // { // struct scclSocket* sock; // scclResult_t ret = scclSuccess; // struct scclProxyState* sharedProxyState = comm->proxyState; // if(sharedProxyState->peerSocks == NULL) // return scclInternalError; // sock = sharedProxyState->peerSocks + proxyConn->tpLocalRank; // if(sock == NULL) // return scclInternalError; // scclCHECKGOTO(scclSocketSend(sock, &type, sizeof(int)), ret, error); // scclCHECKGOTO(scclSocketSend(sock, &proxyConn->connection, sizeof(void*)), ret, error); // scclCHECKGOTO(scclSocketSend(sock, &reqSize, sizeof(int)), ret, error); // scclCHECKGOTO(scclSocketSend(sock, &respSize, sizeof(int)), ret, error); // if(reqSize) // scclCHECKGOTO(scclSocketSend(sock, reqBuff, reqSize), ret, error); // // Send opId to proxy // scclCHECKGOTO(scclSocketSend(sock, &opId, sizeof(opId)), ret, error); // // Add proxyOp to expected response queue // scclCHECK(expectedProxyResponseEnqueue(sharedProxyState, opId, respSize)); // return scclSuccess; // error: // return ret; // } // scclResult_t scclPollProxyResponse(struct scclComm* comm, struct scclProxyConnector* proxyConn, void* respBuff, void* opId) { // struct scclProxyState* sharedProxyState = comm->proxyState; // // Receive the connection pointer from the Proxy // if(*comm->abortFlag) { // WARN("Comm %p is in abort state", comm); // return scclInternalError; // } // if(sharedProxyState->peerSocks == NULL) // return scclInternalError; // // Check response queue // int found = 0; // scclCHECK(expectedProxyResponseDequeue(sharedProxyState, opId, respBuff, &found)); // if(found == 0) { // // Attempt to read in a new response header from the proxy thread // struct scclSocket* sock = sharedProxyState->peerSocks + proxyConn->tpLocalRank; // void* recvOpId; // int offset = 0; // if(scclSuccess != scclSocketProgress(SCCL_SOCKET_RECV, sock, &recvOpId, sizeof(recvOpId), &offset)) { // WARN("Socket recv failed while polling for opId=%p", opId); // return scclInternalError; // } // if(offset == 0) { // return scclInProgress; // // If we've returned a partial response, block to receive the rest of it // } else if(offset < sizeof(recvOpId)) { // while(offset < sizeof(recvOpId)) // scclCHECK(scclSocketProgress(SCCL_SOCKET_RECV, sock, &recvOpId, sizeof(recvOpId), &offset)); // } // INFO(SCCL_PROXY, "scclPollProxyResponse Received new opId=%p", recvOpId); // // Now do a blocking recv of the response size // int respSize = 0; // scclCHECK(scclSocketRecv(sock, &respSize, sizeof(respSize))); // // If there's a respSize to recv // if(respSize > 0) { // if(recvOpId != opId) { // // Unexpected response, need to buffer the socket data // respBuff = malloc(respSize); // } // assert(respBuff != NULL); // scclCHECK(scclSocketRecv(sock, respBuff, respSize)); // } // if(recvOpId == opId) { // INFO(SCCL_PROXY, "recvOpId=%p matches expected opId=%p", recvOpId, opId); // scclCHECK(expectedProxyResponseRemove(sharedProxyState, recvOpId)); // return scclSuccess; // } else { // INFO(SCCL_PROXY, "Queuing opId=%p respBuff=%p respSize=%d", recvOpId, respBuff, respSize); // // Store the result and mark response as completed // scclCHECK(expectedProxyResponseStore(sharedProxyState, recvOpId, respBuff, respSize)); // return scclInProgress; // } // } else { // INFO(SCCL_PROXY, "scclPollProxyResponse Dequeued cached opId=%p", opId); // } // return scclSuccess; // } // scclResult_t // scclProxyCallBlocking(struct scclComm* comm, struct scclProxyConnector* proxyConn, int type, void* reqBuff, int reqSize, void* respBuff, int respSize) { // // Alloc some memory to act as a handle // scclResult_t res = scclSuccess; // void* opId = malloc(1); // scclCHECKGOTO(scclProxyCallAsync(comm, proxyConn, type, reqBuff, reqSize, respSize, opId), res, fail); // do { // res = scclPollProxyResponse(comm, proxyConn, respBuff, opId); // } while(res == scclInProgress); // exit: // free(opId); // return res; // fail: // goto exit; // } // static scclResult_t proxyProgressInit(struct scclProxyState* proxyState) { // struct scclProxyProgressState* state = &proxyState->progressState; // if(state->opsPool == NULL) { // int size = sizeof(struct scclProxyOpsPool); // struct scclProxyOpsPool* pool = NULL; // char shmPath[sizeof("/dev/shm/sccl-XXXXXX")]; // shmPath[0] = '\0'; // scclCHECK(scclShmOpen(shmPath, size, (void**)&pool, NULL, proxyState->tpLocalnRanks + 1, &state->handle)); // // Init pool // pool->nextOps = -1; // for(int r = 0; r < proxyState->tpLocalnRanks; r++) { // pool->freeOps[r] = r * MAX_OPS_PER_PEER; // for(int i = 0; i < MAX_OPS_PER_PEER - 1; i++) // pool->ops[r * MAX_OPS_PER_PEER + i].next = r * MAX_OPS_PER_PEER + i + 1; // pool->ops[(r + 1) * MAX_OPS_PER_PEER - 1].next = -1; // } // // Setup mutex/cond to work inter-process // pthread_mutexattr_t mutexAttr; // pthread_mutexattr_init(&mutexAttr); // pthread_mutexattr_setpshared(&mutexAttr, PTHREAD_PROCESS_SHARED); // pthread_mutex_init(&pool->mutex, &mutexAttr); // pthread_condattr_t condAttr; // pthread_condattr_setpshared(&condAttr, PTHREAD_PROCESS_SHARED); // pthread_cond_init(&pool->cond, &condAttr); // state->opsPool = pool; // memcpy(state->opsPoolShmSuffix, shmPath + sizeof("/dev/shm/sccl-") - 1, sizeof("XXXXXX") - 1); // // All ops structures are created, we can start the progress thread // scclCHECK(scclProxyProgressCreate(proxyState)); // } // return scclSuccess; // } // static void proxyOpsFree(struct scclProxyState* proxyState) { // struct scclProxyProgressState* state = &proxyState->progressState; // if(scclShmClose(state->handle) != scclSuccess) { // WARN("[Service thread] shm close failed"); // } // } // scclResult_t scclProxyShmUnlink(struct scclComm* comm) { // struct scclProxyProgressState* state = &comm->proxyState->progressState; // if(state->opsPool == NULL) // return scclSuccess; // if(scclShmUnlink(state->handle) != scclSuccess) { // WARN("[Service thread] proxy ops shm unlink failed"); // } // return scclSuccess; // } // static scclResult_t proxyConnInit(struct scclProxyLocalPeer* peer, // struct scclProxyConnectionPool* connectionPool, // struct scclProxyState* proxyState, // scclProxyInitReq* req, // scclProxyInitResp* resp, // struct scclProxyConnection** connection) { // int id; // scclCHECK(scclProxyNewConnection(connectionPool, &id)); // scclCHECK(scclProxyGetConnection(connectionPool, id, connection)); // (*connection)->sock = &peer->sock; // (*connection)->transport = req->transport; // (*connection)->send = req->send; // (*connection)->tpLocalRank = req->tpLocalRank; // (*connection)->sameProcess = req->sameProcess; // peer->tpLocalRank = req->tpLocalRank; // peer->tpRank = req->tpRank; // resp->connection = *connection; // (*connection)->tcomm = (*connection)->send ? &scclTransports[(*connection)->transport]->send : &scclTransports[(*connection)->transport]->recv; // // If we need proxy progress, let's allocate ops and start the thread // if((*connection)->tcomm->proxyProgress) { // scclCHECK(proxyProgressInit(proxyState)); // struct scclProxyProgressState* state = &proxyState->progressState; // strncpy(resp->devShmPath, state->opsPoolShmSuffix, sizeof(resp->devShmPath)); // } // INFO(SCCL_NET | SCCL_PROXY, // "New proxy %s connection %d from local rank %d, transport %d", // (*connection)->send ? "send" : "recv", // id, // (*connection)->tpLocalRank, // (*connection)->transport); // __atomic_store_n(&(*connection)->state, connInitialized, __ATOMIC_RELEASE); // return scclSuccess; // } // // cuMem API support // static scclResult_t proxyConvertFd(struct scclProxyLocalPeer* peer, void* opId, struct scclProxyState* proxyState, int fd) { // struct scclIpcSocket ipcSock = {0}; // uint64_t hash = (uint64_t)opId; // INFO(SCCL_PROXY, "UDS proxyConvertFd received fd %d peer %d opId %lx", fd, peer->tpLocalRank, hash); // // Send back the converted fd using UDS // scclCHECK(scclIpcSocketInit(&ipcSock, proxyState->tpRank, hash ^ 1, proxyState->abortFlag)); // scclCHECK(scclIpcSocketSendFd(&ipcSock, fd, peer->tpLocalRank, hash)); // scclCHECK(scclIpcSocketClose(&ipcSock)); // return scclSuccess; // } // static scclResult_t proxyProgressAsync(struct scclProxyAsyncOp* op, // struct scclProxyState* proxyState, // int* asyncOpCount, // struct scclProxyLocalPeer* peer, // struct scclProxyConnectionPool* connectionPool) { // int done = 1; // if(op->type == scclProxyMsgSetup) { // TRACE(SCCL_PROXY, "proxyProgressAsync::proxySetup() opId=%p", op->opId); // scclCHECK(op->connection->tcomm->proxySetup(op->connection, proxyState, op->reqBuff, op->reqSize, op->respBuff, op->respSize, &done)); // } else if(op->type == scclProxyMsgConnect) { // TRACE(SCCL_PROXY, "proxyProgressAsync::proxyConnect() opId=%p op.reqBuff=%p", op->opId, op->reqBuff); // scclCHECK(op->connection->tcomm->proxyConnect(op->connection, proxyState, op->reqBuff, op->reqSize, op->respBuff, op->respSize, &done)); // } else if(op->type == scclProxyMsgSharedInit) { // int nChannels = (int)*op->reqBuff; // TRACE(SCCL_PROXY, "proxyProgressAsync::scclProxyMsgSharedInit opId=%p op.reqBuff=%p nChannels=%d", op->opId, op->reqBuff, nChannels); // if(op->connection->tcomm->proxySharedInit) // scclCHECK(op->connection->tcomm->proxySharedInit(op->connection, proxyState, nChannels)); // __atomic_store_n(&op->connection->state, connSharedInitialized, __ATOMIC_RELEASE); // } else if(op->type == scclProxyMsgConvertFd) { // int fd = *(int*)op->reqBuff; // TRACE(SCCL_PROXY, "proxyProgressAsync::scclProxyMsgConvertFd opId=%p op.reqBuff=%p fd=%d", op->opId, op->reqBuff, fd); // scclCHECK(proxyConvertFd(peer, op->opId, proxyState, fd)); // cuMem API support // } else if(op->type == scclProxyMsgInit) { // TRACE(SCCL_PROXY, "proxyProgressAsync::scclProxyMsgInit opId=%p op.reqBuff=%p", op->opId, op->reqBuff); // scclCHECK(proxyConnInit(peer, connectionPool, proxyState, (scclProxyInitReq*)op->reqBuff, (scclProxyInitResp*)op->respBuff, &op->connection)); // } else // return scclInternalError; // if(done) { // INFO(SCCL_PROXY, "proxyProgressAsync opId=%p op.type=%d op.reqBuff=%p op.respSize=%d done", op->opId, op->type, op->reqBuff, op->respSize); // if(op->type == scclProxyMsgSetup) // __atomic_store_n(&op->connection->state, connSetupDone, __ATOMIC_RELEASE); // else if(op->type == scclProxyMsgConnect) // __atomic_store_n(&op->connection->state, connConnected, __ATOMIC_RELEASE); // /* if setup or connect is done, we should not return any error at this point since // * scclSocketSend might already send the respBuff to the requester. If we still choose // * to abort and close the connection, it can cause segfault if the requester is using // * the respBuff. */ // // Send the opId for referencing async operation // scclCHECK(scclSocketSend(op->connection->sock, &op->opId, sizeof(op->opId))); // // Send the response size // scclCHECK(scclSocketSend(op->connection->sock, &op->respSize, sizeof(op->respSize))); // if(op->respSize) { // // Send the response // scclCHECK(scclSocketSend(op->connection->sock, op->respBuff, op->respSize)); // } // asyncProxyOpDequeue(peer, op); // (*asyncOpCount)--; // return scclSuccess; // } else if(*proxyState->abortFlag != 0) { // return scclInternalError; // } // return scclInProgress; // } // static scclResult_t proxyServiceInitOp( // int type, struct scclProxyLocalPeer* peer, struct scclProxyConnectionPool* connectionPool, struct scclProxyState* proxyState, int* asyncOpCount) { // struct scclSocket* sock = &peer->sock; // struct scclProxyAsyncOp* asyncOp; // scclCHECK(scclCalloc(&asyncOp, 1)); // asyncOp->type = type; // scclCHECK(scclSocketRecv(sock, &asyncOp->connection, sizeof(void*))); // scclCHECK(scclSocketRecv(sock, &asyncOp->reqSize, sizeof(int))); // scclCHECK(scclSocketRecv(sock, &asyncOp->respSize, sizeof(int))); // if(asyncOp->reqSize) { // scclCHECK(scclCalloc(&asyncOp->reqBuff, asyncOp->reqSize)); // scclCHECK(scclSocketRecv(sock, asyncOp->reqBuff, asyncOp->reqSize)); // } // // Store opId for completion response // scclCHECK(scclSocketRecv(sock, &asyncOp->opId, sizeof(asyncOp->opId))); // if(asyncOp->respSize) // scclCHECK(scclCalloc(&asyncOp->respBuff, asyncOp->respSize)); // asyncProxyOpEnqueue(peer, asyncOp); // (*asyncOpCount)++; // scclCHECK(proxyProgressAsync(asyncOp, proxyState, asyncOpCount, peer, connectionPool)); // return scclSuccess; // } // #include // static bool proxyMatchOpType(int type) { // switch(type) { // case scclProxyMsgInit: // case scclProxyMsgSharedInit: // case scclProxyMsgSetup: // case scclProxyMsgConnect: // case scclProxyMsgConvertFd: return true; // default: return false; // } // } // void* scclProxyService(void* _args) { // struct scclProxyState* proxyState = (struct scclProxyState*)_args; // // if (CPU_COUNT(&comm->cpuAffinity)) sched_setaffinity(0, sizeof(cpu_set_t), &comm->cpuAffinity); // if(setProxyThreadContext(proxyState)) { // INFO(SCCL_INIT, "[Proxy Service] Created CUDA context on device %d", proxyState->cudaDev); // } else if(cudaSetDevice(proxyState->cudaDev) != cudaSuccess) { // WARN("[Proxy Service] Failed to set CUDA device %d", proxyState->cudaDev); // } // // if (CPU_COUNT(&comm->cpuAffinity)) sched_setaffinity(0, sizeof(cpu_set_t), &comm->cpuAffinity); // // Prepare poll descriptor // struct scclProxyConnectionPool connectionPool; // connectionPool.pools = NULL; // connectionPool.banks = 0; // connectionPool.offset = SCCL_PROXY_CONN_POOL_SIZE; // struct pollfd pollfds[SCCL_MAX_LOCAL_RANKS + 1]; // struct scclProxyLocalPeer peers[SCCL_MAX_LOCAL_RANKS]; // memset(&peers, 0, sizeof(struct scclProxyLocalPeer) * SCCL_MAX_LOCAL_RANKS); // for(int s = 0; s < SCCL_MAX_LOCAL_RANKS; s++) { // pollfds[s].fd = -1; // pollfds[s].events = POLLHUP | POLLIN; // } // if(scclSocketGetFd(proxyState->listenSock, &pollfds[SCCL_MAX_LOCAL_RANKS].fd) != scclSuccess) { // WARN("[Proxy Service] Get listenSock fd fails"); // return NULL; // }; // pollfds[SCCL_MAX_LOCAL_RANKS].events = POLLIN; // int maxnpeers = 0; // int npeers = 0; // int stop = 0; // int asyncOpCount = 0; // while(stop == 0 || (stop == 1 && npeers > 0)) { // /* Even if local comm aborts, we cannot let proxy thread exit if we still have peer // * connections. Need to wait until all other related comms call abort and safely exit // * together, or we could face segmentation fault. */ // if(*proxyState->abortFlag != 0) // stop = 1; // /* never let proxy service thread blocks in poll, or it cannot receive abortFlag. */ // int ret; // do { // ret = poll(pollfds, SCCL_MAX_LOCAL_RANKS + 1, asyncOpCount ? 0 : 500); // } while(ret < 0 && errno == EINTR); // if(ret < 0) { // WARN("[Proxy Service] Poll failed: %s", strerror(errno)); // return NULL; // } // if(pollfds[SCCL_MAX_LOCAL_RANKS].revents) { // int s = 0; // while(s < SCCL_MAX_LOCAL_RANKS && pollfds[s].fd >= 0) // s++; // if(s == SCCL_MAX_LOCAL_RANKS) { // WARN("[Proxy service] Too many connections (%d max)", SCCL_MAX_LOCAL_RANKS); // return NULL; // } // if(maxnpeers < s + 1) // maxnpeers = s + 1; // if(scclSocketInit(&peers[s].sock) != scclSuccess) { // WARN("[Service thread] Initialize peers[%d].sock fails", s); // return NULL; // } // if(scclSocketAccept(&peers[s].sock, proxyState->listenSock) != scclSuccess) { // WARN("[Service thread] Accept failed %s", strerror(errno)); // } else { // if(scclSocketGetFd(&peers[s].sock, &pollfds[s].fd) != scclSuccess) { // WARN("[Service thread] Get peers[%d].sock fd fails", s); // return NULL; // } // npeers++; // peers[s].tpLocalRank = -1; // } // } // for(int s = 0; s < maxnpeers; s++) { // struct scclProxyLocalPeer* peer = peers + s; // struct scclSocket* sock = &peer->sock; // int closeConn = 0; // int type = 0; // scclResult_t res = scclSuccess; // if(pollfds[s].fd == -1) // continue; // // Progress all ops for this scclProxyLocalPeer // scclProxyAsyncOp* op = peer->asyncOps; // while(op != nullptr) { // scclProxyAsyncOp* opnext = op->next; /* in case op is freed in proxyProgressAsync */ // type = op->type; // res = proxyProgressAsync(op, proxyState, &asyncOpCount, peer, &connectionPool); // if(res == scclSuccess || res == scclInProgress) { // op = opnext; // } else { // // Res is a bad result // closeConn = 1; // WARN("[Service thread] Error encountered progressing operation=%s, res=%d, closing connection", scclProxyMsgTypeStr[type], res); // break; // } // } // // Check for additional ops coming in // if(pollfds[s].revents & POLLIN) { // int closed; // res = scclSocketTryRecv(sock, &type, sizeof(int), &closed, false /*blocking*/); // if(res != scclSuccess && res != scclInProgress) { // WARN("[Service thread] Could not receive type from localRank %d, res=%u, closed=%d", peer->tpLocalRank, res, closed); // closeConn = 1; // } else if(closed) { // INFO(SCCL_INIT | SCCL_NET | SCCL_PROXY, "[Service thread] Connection closed by localRank %d", peer->tpLocalRank); // closeConn = 1; // } else if(res == scclSuccess) { // We received something from the sock // if(type == scclProxyMsgStop) { // stop = 1; // closeConn = 1; // } else if(type == scclProxyMsgClose) { // closeConn = 1; // } else if(proxyMatchOpType(type)) { // res = proxyServiceInitOp(type, peers + s, &connectionPool, proxyState, &asyncOpCount); // } else { // WARN("[Service thread] Unknown command %d from localRank %d", type, peer->tpLocalRank); // closeConn = 1; // } // INFO(SCCL_PROXY, "Received and initiated operation=%s res=%d", scclProxyMsgTypeStr[type], res); // } // } else if(pollfds[s].revents & POLLHUP) { // closeConn = 1; // } // if(res != scclSuccess && res != scclInProgress) { // WARN("[Proxy Service %d] Failed to execute operation %s from rank %d, retcode %d", // proxyState->tpRank, // scclProxyMsgTypeStr[type], // peer->tpRank, // res); // closeConn = 1; // } // if(closeConn) { // scclSocketClose(sock); // if(op != nullptr) { // asyncProxyOpDequeue(peer, op); // asyncOpCount--; // } // pollfds[s].fd = -1; // npeers--; // } // } // } // // Wait for all operations to complete and stop progress thread before freeing any resource // if(scclProxyProgressDestroy(proxyState) != scclSuccess) { // WARN("[Proxy Service] proxyDestroy failed"); // } // for(int s = 0; s < maxnpeers; s++) { // scclSocketClose(&peers[s].sock); // } // scclProxyFreeConnections(&connectionPool, proxyState); // scclSocketClose(proxyState->listenSock); // free(proxyState->listenSock); // proxyOpsFree(proxyState); // return NULL; // } // scclResult_t scclProxyInit(struct scclComm* comm, struct scclSocket* sock, union scclSocketAddress* peerAddresses) { // assert(comm->sharedRes->proxyState == NULL); // scclCHECK(scclCalloc(&comm->sharedRes->proxyState, 1)); // comm->proxyState = comm->sharedRes->proxyState; // comm->proxyState->refCount = 1; // comm->proxyState->listenSock = sock; // comm->proxyState->peerAddresses = peerAddresses; // return scclSuccess; // } // scclResult_t scclProxyCreate(struct scclComm* comm) { // /* proxyState is shared among parent comm and split comms. comm->proxyState->thread is // * pthread_join()'d by commFree() in init.cc when the refCount reduces down to 0. */ // struct scclProxyState* proxyState = comm->proxyState; // if(proxyState->refCount == 1) { // /* we have to make sure all following fields in comm have been initialized. */ // proxyState->tpRank = comm->rank; // proxyState->tpnRanks = comm->nRanks; // proxyState->tpLocalnRanks = comm->localRanks; // proxyState->cudaDev = comm->cudaDev; // proxyState->abortFlag = comm->abortFlag; // proxyState->p2pnChannels = comm->p2pnChannels; // proxyState->p2pChunkSize = comm->p2pChunkSize; // proxyState->nChannels = comm->nChannels; // proxyState->allocP2pNetLLBuffers = comm->allocP2pNetLLBuffers; // proxyState->dmaBufSupport = comm->dmaBufSupport; // proxyState->scclNet = comm->scclNet; // proxyState->scclCollNet = comm->scclCollNet; // memcpy(proxyState->buffSizes, comm->buffSizes, sizeof(comm->buffSizes)); // pthread_create(&comm->proxyState->thread, NULL, scclProxyService, comm->proxyState); // scclSetThreadName(comm->proxyState->thread, "sccl Service %2d", comm->cudaDev); // } // return scclSuccess; // } // scclResult_t scclProxyStop(struct scclComm* comm) { // if(comm->sharedRes && comm->sharedRes->proxyState) { // struct scclProxyState* sharedProxyState = comm->sharedRes->proxyState; // if((comm->proxyRefCountOld = scclAtomicRefCountDecrement(&sharedProxyState->refCount)) == 0) { // if(sharedProxyState->peerAddresses) { // if(*comm->abortFlag == 0) { // struct scclSocket sock; // int type = scclProxyMsgStop; // scclCHECK(scclSocketInit(&sock, // sharedProxyState->peerAddresses + comm->topParentRanks[comm->rank], // comm->sharedRes->magic, // scclSocketTypeProxy, // comm->abortFlag)); // scclCHECK(scclSocketConnect(&sock)); // scclCHECK(scclSocketSend(&sock, &type, sizeof(int))); // scclCHECK(scclSocketClose(&sock)); // } // } // if(sharedProxyState->peerSocks) { // int tplocalRanks = comm->sharedRes->tpNLocalRanks; // for(int i = 0; i < tplocalRanks; i++) { // int fd; // scclCHECK(scclSocketGetFd(sharedProxyState->peerSocks + i, &fd)); // if(fd >= 0) { // if(sharedProxyState->proxyOps[i].pool) { // scclCHECK(scclShmClose(sharedProxyState->proxyOps[i].handle)); // } // if(sharedProxyState->sharedDevMems[i]) { // if(!scclCuMemEnable()) { // CUDACHECK(cudaIpcCloseMemHandle(sharedProxyState->sharedDevMems[i])); // } // } // int type = scclProxyMsgClose; // if(*comm->abortFlag == 0) // scclCHECK(scclSocketSend(sharedProxyState->peerSocks + i, &type, sizeof(int))); // scclCHECK(scclSocketClose(sharedProxyState->peerSocks + i)); // } // } // } // } // } // return scclSuccess; // } // scclResult_t scclProxyDestroy(struct scclComm* comm) { // struct scclProxyState* sharedProxyState = comm->sharedRes->proxyState; // assert(sharedProxyState->refCount == 0); // free(sharedProxyState->peerAddresses); // free(sharedProxyState->peerSocks); // free(sharedProxyState->proxyOps); // free(sharedProxyState->sharedDevMems); // expectedProxyResponseFree(sharedProxyState); // free(sharedProxyState); // return scclSuccess; // }