socket_communicator_test.cc 6.6 KB
Newer Older
Chao Ma's avatar
Chao Ma committed
1
2
/*!
 *  Copyright (c) 2019 by Contributors
3
4
 * \file socket_communicator_test.cc
 * \brief Test SocketCommunicator
Chao Ma's avatar
Chao Ma committed
5
6
7
8
 */
#include <gtest/gtest.h>
#include <string.h>
#include <string>
9
10
#include <thread>
#include <vector>
Chao Ma's avatar
Chao Ma committed
11
12
#include <fstream>
#include <streambuf>
13
#include <chrono>
Chao Ma's avatar
Chao Ma committed
14
15
#include <stdlib.h>
#include <time.h>
Chao Ma's avatar
Chao Ma committed
16

17
18
#include "../src/rpc/network/msg_queue.h"
#include "../src/rpc/network/socket_communicator.h"
Chao Ma's avatar
Chao Ma committed
19
20

using std::string;
21

Chao Ma's avatar
Chao Ma committed
22
23
using dgl::network::SocketSender;
using dgl::network::SocketReceiver;
24
25
using dgl::network::Message;
using dgl::network::DefaultMessageDeleter;
Chao Ma's avatar
Chao Ma committed
26

27
const int64_t kQueueSize = 500 * 1024;
28
const int kThreadNum = 2;
29
const int kMaxTryTimes = 1024;
VoVAllen's avatar
VoVAllen committed
30
31
32

#ifndef WIN32

33
34
35
36
37
const int kNumSender = 3;
const int kNumReceiver = 3;
const int kNumMessage = 10;

const char* ip_addr[] = {
38
39
40
  "tcp://127.0.0.1:50091",
  "tcp://127.0.0.1:50092",
  "tcp://127.0.0.1:50093"
41
42
43
44
};

static void start_client();
static void start_server(int id);
VoVAllen's avatar
VoVAllen committed
45

Chao Ma's avatar
Chao Ma committed
46
TEST(SocketCommunicatorTest, SendAndRecv) {
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
  // start 10 client
  std::vector<std::thread*> client_thread;
  for (int i = 0; i < kNumSender; ++i) {
    client_thread.push_back(new std::thread(start_client));
  }
  // start 10 server
  std::vector<std::thread*> server_thread;
  for (int i = 0; i < kNumReceiver; ++i) {
    server_thread.push_back(new std::thread(start_server, i));
  }
  for (int i = 0; i < kNumSender; ++i) {
    client_thread[i]->join();
  }
  for (int i = 0; i < kNumReceiver; ++i) {
    server_thread[i]->join();
  }
VoVAllen's avatar
VoVAllen committed
63
64
}

65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
TEST(SocketCommunicatorTest, SendAndRecvTimeout) {
  std::atomic_bool stop{false};
  // start 1 client, connect to 1 server, send 2 messsage
  auto client = std::thread([&stop]() {
    SocketSender sender(kQueueSize, kThreadNum);
    sender.ConnectReceiver(ip_addr[0], 0);
    sender.ConnectReceiverFinalize(kMaxTryTimes);
    for (int i = 0; i < 2; ++i) {
      char *str_data = new char[9];
      memcpy(str_data, "123456789", 9);
      Message msg = {str_data, 9};
      msg.deallocator = DefaultMessageDeleter;
      EXPECT_EQ(sender.Send(msg, 0), ADD_SUCCESS);
    }
    while (!stop) {
    }
    sender.Finalize();
  });
  // start 1 server, accept 1 client, receive 2 message
  auto server = std::thread([&stop]() {
    SocketReceiver receiver(kQueueSize, kThreadNum);
    receiver.Wait(ip_addr[0], 1);
    Message msg;
    int recv_id;
    // receive 1st message
    EXPECT_EQ(receiver.RecvFrom(&msg, 0, 0), REMOVE_SUCCESS);
    EXPECT_EQ(string(msg.data, msg.size), string("123456789"));
    msg.deallocator(&msg);
    // receive 2nd message
    EXPECT_EQ(receiver.Recv(&msg, &recv_id, 0), REMOVE_SUCCESS);
    EXPECT_EQ(string(msg.data, msg.size), string("123456789"));
    msg.deallocator(&msg);
    // timed out
    EXPECT_EQ(receiver.RecvFrom(&msg, 0, 1000), QUEUE_EMPTY);
    EXPECT_EQ(receiver.Recv(&msg, &recv_id, 1000), QUEUE_EMPTY);
    stop = true;
    receiver.Finalize();
  });
  // join
  client.join();
  server.join();
}

108
void start_client() {
109
  SocketSender sender(kQueueSize, kThreadNum);
110
  for (int i = 0; i < kNumReceiver; ++i) {
111
    sender.ConnectReceiver(ip_addr[i], i);
112
  }
113
  sender.ConnectReceiverFinalize(kMaxTryTimes);
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
  for (int i = 0; i < kNumMessage; ++i) {
    for (int n = 0; n < kNumReceiver; ++n) {
      char* str_data = new char[9];
      memcpy(str_data, "123456789", 9);
      Message msg = {str_data, 9};
      msg.deallocator = DefaultMessageDeleter;
      EXPECT_EQ(sender.Send(msg, n), ADD_SUCCESS);
    }
  }
  for (int i = 0; i < kNumMessage; ++i) {
    for (int n = 0; n < kNumReceiver; ++n) {
      char* str_data = new char[9];
      memcpy(str_data, "123456789", 9);
      Message msg = {str_data, 9};
      msg.deallocator = DefaultMessageDeleter;
      EXPECT_EQ(sender.Send(msg, n), ADD_SUCCESS);
    }
  }
  sender.Finalize();
}

void start_server(int id) {
136
  sleep(5);
137
  SocketReceiver receiver(kQueueSize, kThreadNum);
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
  receiver.Wait(ip_addr[id], kNumSender);
  for (int i = 0; i < kNumMessage; ++i) {
    for (int n = 0; n < kNumSender; ++n) {
      Message msg;
      EXPECT_EQ(receiver.RecvFrom(&msg, n), REMOVE_SUCCESS);
      EXPECT_EQ(string(msg.data, msg.size), string("123456789"));
      msg.deallocator(&msg);
    }
  }
  for (int n = 0; n < kNumSender*kNumMessage; ++n) {
    Message msg;
    int recv_id;
    EXPECT_EQ(receiver.Recv(&msg, &recv_id), REMOVE_SUCCESS);
    EXPECT_EQ(string(msg.data, msg.size), string("123456789"));
    msg.deallocator(&msg);
  }
  receiver.Finalize();
}

#else
VoVAllen's avatar
VoVAllen committed
158
159
160
161
162
163
164
165
166
167

#include <windows.h>
#include <winsock2.h>

#pragma comment(lib, "ws2_32.lib")

void sleep(int seconds) {
  Sleep(seconds * 1000);
}

168
169
170
static void start_client();
static bool start_server();

VoVAllen's avatar
VoVAllen committed
171
172
173
174
175
176
177
178
179
180
181
182
183
184
DWORD WINAPI _ClientThreadFunc(LPVOID param) {
  start_client();
  return 0;
}

DWORD WINAPI _ServerThreadFunc(LPVOID param) {
  return start_server() ? 1 : 0;
}

TEST(SocketCommunicatorTest, SendAndRecv) {
  HANDLE hThreads[2];
  WSADATA wsaData;
  DWORD retcode, exitcode;

Chao Ma's avatar
Chao Ma committed
185
186
  srand((unsigned)time(NULL));
  int port = (rand() % (5000-3000+1))+ 3000;
187
  std::string ip_addr = "tcp://127.0.0.1:" + std::to_string(port);
Chao Ma's avatar
Chao Ma committed
188
189
190
191
  std::ofstream out("addr.txt");
  out << ip_addr;
  out.close();

VoVAllen's avatar
VoVAllen committed
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
  ASSERT_EQ(::WSAStartup(MAKEWORD(2, 2), &wsaData), 0);

  hThreads[0] = ::CreateThread(NULL, 0, _ClientThreadFunc, NULL, 0, NULL);  // client
  ASSERT_TRUE(hThreads[0] != NULL);
  hThreads[1] = ::CreateThread(NULL, 0, _ServerThreadFunc, NULL, 0, NULL);  // server
  ASSERT_TRUE(hThreads[1] != NULL);

  retcode = ::WaitForMultipleObjects(2, hThreads, TRUE, INFINITE);
  EXPECT_TRUE((retcode <= WAIT_OBJECT_0 + 1) && (retcode >= WAIT_OBJECT_0));

  EXPECT_EQ(::GetExitCodeThread(hThreads[1], &exitcode), TRUE);
  EXPECT_EQ(exitcode, 1);

  EXPECT_EQ(::CloseHandle(hThreads[0]), TRUE);
  EXPECT_EQ(::CloseHandle(hThreads[1]), TRUE);

  ::WSACleanup();
}

211
static void start_client() {
Chao Ma's avatar
Chao Ma committed
212
213
214
215
  std::ifstream t("addr.txt");
  std::string ip_addr((std::istreambuf_iterator<char>(t)),
                       std::istreambuf_iterator<char>());
  t.close();
216
  SocketSender sender(kQueueSize, kThreadNum);
217
  sender.ConnectReceiver(ip_addr.c_str(), 0);
218
  sender.ConnectReceiverFinalize(kMaxTryTimes);
219
220
221
222
223
  char* str_data = new char[9];
  memcpy(str_data, "123456789", 9);
  Message msg = {str_data, 9};
  msg.deallocator = DefaultMessageDeleter;
  sender.Send(msg, 0);
VoVAllen's avatar
VoVAllen committed
224
225
226
  sender.Finalize();
}

227
static bool start_server() {
228
  sleep(5);
Chao Ma's avatar
Chao Ma committed
229
230
231
232
  std::ifstream t("addr.txt");
  std::string ip_addr((std::istreambuf_iterator<char>(t)),
                       std::istreambuf_iterator<char>());
  t.close();
233
  SocketReceiver receiver(kQueueSize, kThreadNum);
Chao Ma's avatar
Chao Ma committed
234
  receiver.Wait(ip_addr.c_str(), 1);
235
236
  Message msg;
  EXPECT_EQ(receiver.RecvFrom(&msg, 0), REMOVE_SUCCESS);
VoVAllen's avatar
VoVAllen committed
237
  receiver.Finalize();
238
  return string("123456789") == string(msg.data, msg.size);
VoVAllen's avatar
VoVAllen committed
239
}
240
241

#endif