"docs/git@developer.sourcefind.cn:SIYIXNI/vllm.git" did not exist on "0b98ba15c744f1dfb0ea4f2135e85ca23d572ae1"
Commit 99ec2047 authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

dist: change NetOpPassEntries signature to include position

parent a065584a
......@@ -387,7 +387,7 @@ static inline void PollPeerTransfer(struct Peer *peer, bool *report) {
}
if (n > 0) {
NetOpPassEntries(peer, n);
NetOpPassEntries(peer, peer->local_pos, n);
peer->local_pos += n;
if (peer->local_pos >= peer->local_enum)
peer->local_pos -= peer->local_enum;
......
......@@ -107,7 +107,7 @@ void NetEntryReceived(struct Peer *peer, uint32_t pos, void *data);
// To be implemented in proxy implementation
int NetOpPassIntro(struct Peer *peer);
int NetOpPassEntries(struct Peer *peer, size_t n);
int NetOpPassEntries(struct Peer *peer, uint32_t pos, uint32_t n);
int NetOpPassReport();
#endif // DIST_NET_RDMA_H_
......@@ -443,10 +443,10 @@ int NetOpPassIntro(struct Peer *peer) {
return 0;
}
int NetOpPassEntries(struct Peer *peer, size_t n) {
int NetOpPassEntries(struct Peer *peer, uint32_t pos, uint32_t n) {
#ifdef RDMA_DEBUG
fprintf(stderr, "NetOpPassEntries(%s,%u)\n", peer->sock_path,
peer->local_pos);
pos);
fprintf(stderr, " remote_base=%lx local_base=%p\n", peer->remote_base,
peer->local_base);
#endif
......@@ -456,9 +456,9 @@ int NetOpPassEntries(struct Peer *peer, size_t n) {
last_signaled = 0;
while (1) {
uint64_t pos = peer->local_pos * peer->local_elen;
uint64_t abs_pos = pos * peer->local_elen;
struct ibv_sge sge;
sge.addr = (uintptr_t) (peer->local_base + pos);
sge.addr = (uintptr_t) (peer->local_base + abs_pos);
sge.length = peer->local_elen * n;
struct ibv_mr *mr = peer->shm_opaque;
sge.lkey = mr->lkey;
......@@ -468,7 +468,7 @@ int NetOpPassEntries(struct Peer *peer, size_t n) {
send_wr.opcode = IBV_WR_RDMA_WRITE;
if (triggerSig)
send_wr.send_flags = IBV_SEND_SIGNALED;
send_wr.wr.rdma.remote_addr = peer->remote_base + pos;
send_wr.wr.rdma.remote_addr = peer->remote_base + abs_pos;
send_wr.wr.rdma.rkey = peer->remote_rkey;
send_wr.sg_list = &sge;
send_wr.num_sge = 1;
......
......@@ -487,14 +487,14 @@ int NetOpPassIntro(struct Peer *peer) {
return ret;
}
int NetOpPassEntries(struct Peer *peer, size_t n) {
int NetOpPassEntries(struct Peer *peer, uint32_t pos, uint32_t n) {
#ifdef SOCK_DEBUG
fprintf(stderr, "NetOpPassEntires(%s, n=%zu, pos=%u)\n", peer->sock_path, n,
peer->local_pos);
pos);
#endif
if (n * peer->local_elen > TXBUF_SIZE) {
fprintf(stderr,
"NetOpPassEntries: tx buffer too small (%u) for n (%zu) entries\n",
"NetOpPassEntries: tx buffer too small (%u) for n (%u) entries\n",
TXBUF_SIZE, n);
abort();
}
......@@ -506,11 +506,11 @@ int NetOpPassEntries(struct Peer *peer, size_t n) {
msg->id = peer - peers;
msg->msg_type = kMsgEntries;
msg->entries.num_entries = n;
msg->entries.pos = peer->local_pos;
msg->entries.pos = pos;
uint64_t pos = peer->local_pos * peer->local_elen;
uint64_t abs_pos = pos * peer->local_elen;
uint32_t len = n * peer->local_elen;
memcpy(msg->entries.data, peer->local_base + pos, len);
memcpy(msg->entries.data, peer->local_base + abs_pos, len);
#ifdef SOCK_DEBUG
/*fprintf(stderr, " data: ");
{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment