Unverified Commit c8d4d6fb authored by Jingcheng Yu's avatar Jingcheng Yu Committed by GitHub
Browse files

[Bugfix] fix potential starving in socket receiver (#3176)


Co-authored-by: default avatarJingchengYu94 <jingchengyu94@gmail.com>
Co-authored-by: default avatarxiang song(charlie.song) <classicxsong@gmail.com>
parent 59a7d0d1
...@@ -190,6 +190,7 @@ bool SocketReceiver::Wait(const char* addr, int num_sender) { ...@@ -190,6 +190,7 @@ bool SocketReceiver::Wait(const char* addr, int num_sender) {
for (int i = 0; i < num_sender_; ++i) { for (int i = 0; i < num_sender_; ++i) {
msg_queue_[i] = std::make_shared<MessageQueue>(queue_size_); msg_queue_[i] = std::make_shared<MessageQueue>(queue_size_);
} }
mq_iter_ = msg_queue_.begin();
// Initialize socket and socket-thread // Initialize socket and socket-thread
server_socket_ = new TCPSocket(); server_socket_ = new TCPSocket();
// Bind socket // Bind socket
...@@ -222,16 +223,18 @@ bool SocketReceiver::Wait(const char* addr, int num_sender) { ...@@ -222,16 +223,18 @@ bool SocketReceiver::Wait(const char* addr, int num_sender) {
STATUS SocketReceiver::Recv(Message* msg, int* send_id) { STATUS SocketReceiver::Recv(Message* msg, int* send_id) {
// loop until get a message // loop until get a message
for (;;) { for (;;) {
for (auto& mq : msg_queue_) { for (; mq_iter_ != msg_queue_.end(); ++mq_iter_) {
*send_id = mq.first;
// We use non-block remove here // We use non-block remove here
STATUS code = msg_queue_[*send_id]->Remove(msg, false); STATUS code = mq_iter_->second->Remove(msg, false);
if (code == QUEUE_EMPTY) { if (code == QUEUE_EMPTY) {
continue; // jump to the next queue continue; // jump to the next queue
} else { } else {
*send_id = mq_iter_->first;
++mq_iter_;
return code; return code;
} }
} }
mq_iter_ = msg_queue_.begin();
} }
} }
......
...@@ -202,6 +202,7 @@ class SocketReceiver : public Receiver { ...@@ -202,6 +202,7 @@ class SocketReceiver : public Receiver {
* \brief Message queue for each socket connection * \brief Message queue for each socket connection
*/ */
std::unordered_map<int /* Sender (virtual) ID */, std::shared_ptr<MessageQueue>> msg_queue_; std::unordered_map<int /* Sender (virtual) ID */, std::shared_ptr<MessageQueue>> msg_queue_;
std::unordered_map<int, std::shared_ptr<MessageQueue>>::iterator mq_iter_;
/*! /*!
* \brief Independent thead for each socket connection * \brief Independent thead for each socket connection
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment