"vscode:/vscode.git/clone" did not exist on "384fcac6df9e536350f84fba4070d5dc5b01ab48"
Unverified Commit d22049e8 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[TensorpipeDeprecation] remove unused arguments from wait_for_senders() (#5917)

parent 070c9677
...@@ -172,7 +172,7 @@ def finalize_receiver(): ...@@ -172,7 +172,7 @@ def finalize_receiver():
_CAPI_DGLRPCFinalizeReceiver() _CAPI_DGLRPCFinalizeReceiver()
def wait_for_senders(ip_addr, port, num_senders, blocking=True): def wait_for_senders(ip_addr, port, num_senders):
"""Wait all of the senders' connections. """Wait all of the senders' connections.
This api will be blocked until all the senders connect to the receiver. This api will be blocked until all the senders connect to the receiver.
...@@ -185,10 +185,8 @@ def wait_for_senders(ip_addr, port, num_senders, blocking=True): ...@@ -185,10 +185,8 @@ def wait_for_senders(ip_addr, port, num_senders, blocking=True):
receiver's port receiver's port
num_senders : int num_senders : int
total number of senders total number of senders
blocking : bool
whether to wait blockingly
""" """
_CAPI_DGLRPCWaitForSenders(ip_addr, int(port), int(num_senders), blocking) _CAPI_DGLRPCWaitForSenders(ip_addr, int(port), int(num_senders))
def connect_receiver(ip_addr, port, recv_id, group_id=-1): def connect_receiver(ip_addr, port, recv_id, group_id=-1):
......
...@@ -206,7 +206,7 @@ def connect_to_server( ...@@ -206,7 +206,7 @@ def connect_to_server(
for server_id in range(num_servers): for server_id in range(num_servers):
rpc.send_request(server_id, register_req) rpc.send_request(server_id, register_req)
# wait server connect back # wait server connect back
rpc.wait_for_senders(client_ip, client_port, num_servers, blocking=True) rpc.wait_for_senders(client_ip, client_port, num_servers)
print( print(
"Client [{}] waits on {}:{}".format(os.getpid(), client_ip, client_port) "Client [{}] waits on {}:{}".format(os.getpid(), client_ip, client_port)
) )
......
...@@ -85,7 +85,7 @@ def start_server( ...@@ -85,7 +85,7 @@ def start_server(
print( print(
"Server is waiting for connections on [{}:{}]...".format(ip_addr, port) "Server is waiting for connections on [{}:{}]...".format(ip_addr, port)
) )
rpc.wait_for_senders(ip_addr, port, num_clients, blocking=True) rpc.wait_for_senders(ip_addr, port, num_clients)
rpc.set_num_client(num_clients) rpc.set_num_client(num_clients)
recv_clients = {} recv_clients = {}
while True: while True:
......
...@@ -202,10 +202,8 @@ void SocketSender::SendLoop( ...@@ -202,10 +202,8 @@ void SocketSender::SendLoop(
/////////////////////////////////////// SocketReceiver /////////////////////////////////////// SocketReceiver
////////////////////////////////////////////// //////////////////////////////////////////////
bool SocketReceiver::Wait( bool SocketReceiver::Wait(const std::string& addr, int num_sender) {
const std::string& addr, int num_sender, bool blocking) {
CHECK_GT(num_sender, 0); CHECK_GT(num_sender, 0);
CHECK_EQ(blocking, true);
std::vector<std::string> substring; std::vector<std::string> substring;
std::vector<std::string> ip_and_port; std::vector<std::string> ip_and_port;
SplitStringUsing(addr, "//", &substring); SplitStringUsing(addr, "//", &substring);
......
...@@ -159,12 +159,11 @@ class SocketReceiver : public Receiver { ...@@ -159,12 +159,11 @@ class SocketReceiver : public Receiver {
* @brief Wait for all the Senders to connect * @brief Wait for all the Senders to connect
* @param addr Networking address, e.g., 'tcp://127.0.0.1:50051', 'mpi://0' * @param addr Networking address, e.g., 'tcp://127.0.0.1:50051', 'mpi://0'
* @param num_sender total number of Senders * @param num_sender total number of Senders
* @param blocking whether wait blockingly
* @return True for success and False for fail * @return True for success and False for fail
* *
* Wait() is not thread-safe and only one thread can invoke this API. * Wait() is not thread-safe and only one thread can invoke this API.
*/ */
bool Wait(const std::string& addr, int num_sender, bool blocking = true); bool Wait(const std::string& addr, int num_sender);
/** /**
* @brief Recv RPCMessage from Sender. Actually removing data from queue. * @brief Recv RPCMessage from Sender. Actually removing data from queue.
......
...@@ -90,11 +90,10 @@ DGL_REGISTER_GLOBAL("distributed.rpc._CAPI_DGLRPCWaitForSenders") ...@@ -90,11 +90,10 @@ DGL_REGISTER_GLOBAL("distributed.rpc._CAPI_DGLRPCWaitForSenders")
std::string ip = args[0]; std::string ip = args[0];
int port = args[1]; int port = args[1];
int num_sender = args[2]; int num_sender = args[2];
bool blocking = args[3];
std::string addr; std::string addr;
addr = StringPrintf("tcp://%s:%d", ip.c_str(), port); addr = StringPrintf("tcp://%s:%d", ip.c_str(), port);
if (RPCContext::getInstance()->receiver->Wait( if (RPCContext::getInstance()->receiver->Wait(addr, num_sender) ==
addr, num_sender, blocking) == false) { false) {
LOG(FATAL) << "Wait sender socket failed."; LOG(FATAL) << "Wait sender socket failed.";
} }
}); });
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment