Unverified Commit bdc1e649 authored by Chao Ma's avatar Chao Ma Committed by GitHub
Browse files

[RPC] Client will retry connection until the server starts. (#1758)

* update

* update

* update
parent d6b4e286
...@@ -68,13 +68,14 @@ bool SocketSender::Connect() { ...@@ -68,13 +68,14 @@ bool SocketSender::Connect() {
if (client_socket->Connect(ip, port)) { if (client_socket->Connect(ip, port)) {
bo = true; bo = true;
} else { } else {
LOG(ERROR) << "Cannot connect to Receiver: " << ip << ":" << port if (try_count % 10 == 0 && try_count != 0) {
<< ", try again ..."; LOG(INFO) << "Try to connect to: " << ip << ":" << port;
}
try_count++; try_count++;
#ifdef _WIN32 #ifdef _WIN32
Sleep(1); Sleep(5);
#else // !_WIN32 #else // !_WIN32
sleep(1); sleep(5);
#endif // _WIN32 #endif // _WIN32
} }
} }
......
...@@ -44,8 +44,6 @@ bool TCPSocket::Connect(const char * ip, int port) { ...@@ -44,8 +44,6 @@ bool TCPSocket::Connect(const char * ip, int port) {
sizeof(sa_server))) { sizeof(sa_server))) {
return true; return true;
} }
LOG(ERROR) << "Failed connect to " << ip << ":" << port;
return false; return false;
} }
......
...@@ -61,7 +61,6 @@ TEST(SocketCommunicatorTest, SendAndRecv) { ...@@ -61,7 +61,6 @@ TEST(SocketCommunicatorTest, SendAndRecv) {
} }
void start_client() { void start_client() {
sleep(2); // wait server start
SocketSender sender(kQueueSize); SocketSender sender(kQueueSize);
for (int i = 0; i < kNumReceiver; ++i) { for (int i = 0; i < kNumReceiver; ++i) {
sender.AddReceiver(ip_addr[i], i); sender.AddReceiver(ip_addr[i], i);
...@@ -89,6 +88,7 @@ void start_client() { ...@@ -89,6 +88,7 @@ void start_client() {
} }
void start_server(int id) { void start_server(int id) {
sleep(5);
SocketReceiver receiver(kQueueSize); SocketReceiver receiver(kQueueSize);
receiver.Wait(ip_addr[id], kNumSender); receiver.Wait(ip_addr[id], kNumSender);
for (int i = 0; i < kNumMessage; ++i) { for (int i = 0; i < kNumMessage; ++i) {
...@@ -164,7 +164,6 @@ TEST(SocketCommunicatorTest, SendAndRecv) { ...@@ -164,7 +164,6 @@ TEST(SocketCommunicatorTest, SendAndRecv) {
} }
static void start_client() { static void start_client() {
sleep(1);
std::ifstream t("addr.txt"); std::ifstream t("addr.txt");
std::string ip_addr((std::istreambuf_iterator<char>(t)), std::string ip_addr((std::istreambuf_iterator<char>(t)),
std::istreambuf_iterator<char>()); std::istreambuf_iterator<char>());
...@@ -181,6 +180,7 @@ static void start_client() { ...@@ -181,6 +180,7 @@ static void start_client() {
} }
static bool start_server() { static bool start_server() {
sleep(5);
std::ifstream t("addr.txt"); std::ifstream t("addr.txt");
std::string ip_addr((std::istreambuf_iterator<char>(t)), std::string ip_addr((std::istreambuf_iterator<char>(t)),
std::istreambuf_iterator<char>()); std::istreambuf_iterator<char>());
......
...@@ -110,6 +110,8 @@ def test_partition_policy(): ...@@ -110,6 +110,8 @@ def test_partition_policy():
def start_server(server_id, num_clients): def start_server(server_id, num_clients):
# Init kvserver # Init kvserver
print("Sleep 5 seconds to test client re-connect.")
time.sleep(5)
kvserver = dgl.distributed.KVServer(server_id=server_id, kvserver = dgl.distributed.KVServer(server_id=server_id,
ip_config='kv_ip_config.txt', ip_config='kv_ip_config.txt',
num_clients=num_clients) num_clients=num_clients)
...@@ -275,7 +277,6 @@ def test_kv_store(): ...@@ -275,7 +277,6 @@ def test_kv_store():
pserver = ctx.Process(target=start_server, args=(i, num_clients)) pserver = ctx.Process(target=start_server, args=(i, num_clients))
pserver.start() pserver.start()
pserver_list.append(pserver) pserver_list.append(pserver)
time.sleep(2)
for i in range(num_clients): for i in range(num_clients):
pclient = ctx.Process(target=start_client, args=(num_clients,)) pclient = ctx.Process(target=start_client, args=(num_clients,))
pclient.start() pclient.start()
......
...@@ -108,6 +108,8 @@ class HelloRequest(dgl.distributed.Request): ...@@ -108,6 +108,8 @@ class HelloRequest(dgl.distributed.Request):
return res return res
def start_server(num_clients, ip_config): def start_server(num_clients, ip_config):
print("Sleep 5 seconds to test client re-connect.")
time.sleep(5)
server_state = dgl.distributed.ServerState(None, local_g=None, partition_book=None) server_state = dgl.distributed.ServerState(None, local_g=None, partition_book=None)
dgl.distributed.register_service(HELLO_SERVICE_ID, HelloRequest, HelloResponse) dgl.distributed.register_service(HELLO_SERVICE_ID, HelloRequest, HelloResponse)
dgl.distributed.start_server(server_id=0, dgl.distributed.start_server(server_id=0,
...@@ -217,7 +219,6 @@ def test_multi_client(): ...@@ -217,7 +219,6 @@ def test_multi_client():
pclient = ctx.Process(target=start_client, args=("rpc_ip_config_mul_client.txt",)) pclient = ctx.Process(target=start_client, args=("rpc_ip_config_mul_client.txt",))
pclient_list.append(pclient) pclient_list.append(pclient)
pserver.start() pserver.start()
time.sleep(1)
for i in range(10): for i in range(10):
pclient_list[i].start() pclient_list[i].start()
for i in range(10): for i in range(10):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment