/*! * Copyright (c) 2019 by Contributors * \file msg_queue.cc * \brief Message queue for DGL distributed training. */ #include #include #include #include "../src/graph/network/socket_communicator.h" using std::string; using dgl::network::SocketSender; using dgl::network::SocketReceiver; void start_client(); bool start_server(); #ifndef WIN32 #include TEST(SocketCommunicatorTest, SendAndRecv) { std::thread client_thread(start_client); start_server(); client_thread.join(); } #else // WIN32 #include #include #pragma comment(lib, "ws2_32.lib") void sleep(int seconds) { Sleep(seconds * 1000); } DWORD WINAPI _ClientThreadFunc(LPVOID param) { start_client(); return 0; } DWORD WINAPI _ServerThreadFunc(LPVOID param) { return start_server() ? 1 : 0; } TEST(SocketCommunicatorTest, SendAndRecv) { HANDLE hThreads[2]; WSADATA wsaData; DWORD retcode, exitcode; ASSERT_EQ(::WSAStartup(MAKEWORD(2, 2), &wsaData), 0); hThreads[0] = ::CreateThread(NULL, 0, _ClientThreadFunc, NULL, 0, NULL); // client ASSERT_TRUE(hThreads[0] != NULL); hThreads[1] = ::CreateThread(NULL, 0, _ServerThreadFunc, NULL, 0, NULL); // server ASSERT_TRUE(hThreads[1] != NULL); retcode = ::WaitForMultipleObjects(2, hThreads, TRUE, INFINITE); EXPECT_TRUE((retcode <= WAIT_OBJECT_0 + 1) && (retcode >= WAIT_OBJECT_0)); EXPECT_EQ(::GetExitCodeThread(hThreads[1], &exitcode), TRUE); EXPECT_EQ(exitcode, 1); EXPECT_EQ(::CloseHandle(hThreads[0]), TRUE); EXPECT_EQ(::CloseHandle(hThreads[1]), TRUE); ::WSACleanup(); } #endif // WIN32 void start_client() { const char * msg = "123456789"; sleep(1); SocketSender sender; sender.AddReceiver("127.0.0.1", 2049, 0); sender.Connect(); sender.Send(msg, 9, 0); sender.Finalize(); } bool start_server() { char serbuff[10]; memset(serbuff, '\0', 10); SocketReceiver receiver; receiver.Wait("127.0.0.1", 2049, 1, 500 * 1024); receiver.Recv(serbuff, 9); receiver.Finalize(); return string("123456789") == string(serbuff); }