Commit 3f12093b authored by Antoine Kaufmann's avatar Antoine Kaufmann
Browse files

dist/sockets: more fixes after refactor

parent 088c846e
...@@ -331,6 +331,16 @@ int BasePeerEvent(struct Peer *peer, uint32_t events) { ...@@ -331,6 +331,16 @@ int BasePeerEvent(struct Peer *peer, uint32_t events) {
if (!(peer->shm_base = ShmMap(peer->shm_fd, &peer->shm_size))) if (!(peer->shm_base = ShmMap(peer->shm_fd, &peer->shm_size)))
return 1; return 1;
struct SimbricksProtoListenerIntro *li =
(struct SimbricksProtoListenerIntro *) peer->intro_local;
peer->local_base = (void *) ((uintptr_t) peer->shm_base + li->l2c_offset);
peer->local_elen = li->l2c_elen;
peer->local_enum = li->l2c_nentries;
peer->cleanup_base = (void *) ((uintptr_t) peer->shm_base + li->c2l_offset);
peer->cleanup_elen = li->c2l_elen;
peer->cleanup_enum = li->c2l_nentries;
} else { } else {
/* as a listener, we use our local shm region, so no fd is sent to us */ /* as a listener, we use our local shm region, so no fd is sent to us */
ret = recv(peer->sock_fd, peer->intro_local, ret = recv(peer->sock_fd, peer->intro_local,
...@@ -358,8 +368,6 @@ int BasePeerEvent(struct Peer *peer, uint32_t events) { ...@@ -358,8 +368,6 @@ int BasePeerEvent(struct Peer *peer, uint32_t events) {
} }
static inline void PollPeerTransfer(struct Peer *peer, bool *report) { static inline void PollPeerTransfer(struct Peer *peer, bool *report) {
// XXX: consider batching this to forward multiple entries at once if possible
uint32_t n; uint32_t n;
for (n = 0; n < kPollMax && peer->local_pos + n < peer->local_enum; n++) { for (n = 0; n < kPollMax && peer->local_pos + n < peer->local_enum; n++) {
// stop if we would pass the cleanup position // stop if we would pass the cleanup position
...@@ -367,7 +375,7 @@ static inline void PollPeerTransfer(struct Peer *peer, bool *report) { ...@@ -367,7 +375,7 @@ static inline void PollPeerTransfer(struct Peer *peer, bool *report) {
peer->local_pos_cleaned) { peer->local_pos_cleaned) {
#ifdef DEBUG #ifdef DEBUG
fprintf(stderr, "PollPeerTransfer: waiting for cleanup (%u %u)\n", fprintf(stderr, "PollPeerTransfer: waiting for cleanup (%u %u)\n",
pos, peer->local_pos_cleaned); n, peer->local_pos_cleaned);
#endif #endif
break; break;
} }
...@@ -381,6 +389,11 @@ static inline void PollPeerTransfer(struct Peer *peer, bool *report) { ...@@ -381,6 +389,11 @@ static inline void PollPeerTransfer(struct Peer *peer, bool *report) {
} }
if (n > 0) { if (n > 0) {
#ifdef DEBUG
fprintf(stderr, "PollPeerTransfer: transferring [%u,%u] (lpc=%u lpr=%u)\n",
peer->local_pos, peer->local_pos + n, peer->local_pos_cleaned,
peer->local_pos_reported);
#endif
BaseOpPassEntries(peer, peer->local_pos, n); BaseOpPassEntries(peer, peer->local_pos, n);
uint32_t newpos = peer->local_pos + n; uint32_t newpos = peer->local_pos + n;
peer->local_pos = (newpos < peer->local_enum ? peer->local_pos = (newpos < peer->local_enum ?
...@@ -444,6 +457,10 @@ void BasePoll() { ...@@ -444,6 +457,10 @@ void BasePoll() {
void BaseEntryReceived(struct Peer *peer, uint32_t pos, void *data) void BaseEntryReceived(struct Peer *peer, uint32_t pos, void *data)
{ {
#ifdef DEBUG
fprintf(stderr, "BaseEntryReceived: pos=%u (cpr=%u cpl=%u)\n",
pos, peer->cleanup_pos_reported, peer->cleanup_pos_last);
#endif
uint64_t off = (uint64_t) pos * peer->cleanup_elen; uint64_t off = (uint64_t) pos * peer->cleanup_elen;
void *entry = peer->cleanup_base + off; void *entry = peer->cleanup_base + off;
......
...@@ -99,7 +99,6 @@ extern struct Peer *peers; ...@@ -99,7 +99,6 @@ extern struct Peer *peers;
int BaseInit(const char *shm_path_, size_t shm_size_, int epfd_); int BaseInit(const char *shm_path_, size_t shm_size_, int epfd_);
bool BasePeerAdd(const char *path, bool listener); bool BasePeerAdd(const char *path, bool listener);
struct Peer *BasePeerLookup(uint32_t id);
int BaseListen(void); int BaseListen(void);
int BaseConnect(void); int BaseConnect(void);
void BasePoll(void); void BasePoll(void);
......
...@@ -489,6 +489,8 @@ int BaseOpPassIntro(struct Peer *peer) { ...@@ -489,6 +489,8 @@ int BaseOpPassIntro(struct Peer *peer) {
return 1; return 1;
msg->msg_len = offsetof(struct SockMsg, intro.data) + peer->intro_local_len; msg->msg_len = offsetof(struct SockMsg, intro.data) + peer->intro_local_len;
if (msg->msg_len < sizeof(*msg))
msg->msg_len = sizeof(*msg);
msg->id = peer - peers; msg->id = peer - peers;
msg->msg_type = kMsgIntro; msg->msg_type = kMsgIntro;
msg->intro.payload_len = peer->intro_local_len; msg->intro.payload_len = peer->intro_local_len;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment