tcp_socket_test.cc 3.6 KB
Newer Older
Chao Ma's avatar
Chao Ma committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/*!
 *  Copyright (c) 2019 by Contributors
 * \file msg_queue.cc
 * \brief Message queue for DGL distributed training.
 */
#include <gtest/gtest.h>
#include <string.h>
#include <unistd.h>
#include <string>

#include "../src/graph/network/tcp_socket.h"

using std::string;
using dgl::network::TCPSocket;

const size_t kDataLength = 9999999; // 9.5 MB

TEST(TCPSocket, SendRecieve) {
  int pid = fork();
  const char * msg = "0123456789";
  ASSERT_GE(pid, 0);
  if (pid > 0) {  // parent: server
    TCPSocket server;
    TCPSocket client;
    string cl_ip;
    int cl_port;
    char serbuff[10];
    memset(serbuff, '\0', 10);
    server.SetTimeout(5 * 60 * 1000);

    ASSERT_TRUE(server.Bind("127.0.0.1", 2049));
    ASSERT_TRUE(server.Listen(3));
    ASSERT_TRUE(server.Accept(&client, &cl_ip, &cl_port));

    // Small block
    for (int i = 0; i < 9999; ++i) {
      int tmp;
      int recieved_bytes = 0;
      while (recieved_bytes < 10) {
        int max_len = 10 - recieved_bytes;
        tmp = client.Receive(&serbuff[recieved_bytes], max_len);
        ASSERT_GE(tmp, 0);
        recieved_bytes += tmp;
      }
      ASSERT_EQ(string("0123456789"), string(serbuff, 10));
      int sent_bytes = 0;
      while (sent_bytes < 10) {
        int max_len = 10 - sent_bytes;
        tmp = client.Send(&msg[sent_bytes], max_len);
        ASSERT_GE(tmp, 0);
        sent_bytes += tmp;
      }
    }

    // Big block
    char* bigdata = new char[kDataLength];
    char* bigbuff = new char[kDataLength];
    memset(bigdata, 'x', kDataLength);
    memset(bigbuff, '\0', kDataLength);
    int recieved_bytes = 0;
    while (recieved_bytes < kDataLength) {
      int max_len = kDataLength - recieved_bytes;
      int tmp = client.Receive(&bigbuff[recieved_bytes], max_len);
      ASSERT_GE(tmp, 0);
      recieved_bytes += tmp;
    }
    for (size_t i = 0; i < kDataLength; ++i) {
      ASSERT_EQ(bigbuff[i], 'x');
    }
    int sent_bytes = 0;
    while (sent_bytes < kDataLength) {
      int max_len = kDataLength - sent_bytes;
      int tmp = client.Send(&bigdata[sent_bytes], max_len);
      ASSERT_GE(tmp, 0);
      sent_bytes += tmp;
    }
  } else {  // child: client
    sleep(3);   // wait for server
    TCPSocket client;
    ASSERT_TRUE(client.Connect("127.0.0.1", 2049));
    char clibuff[10];
    memset(clibuff, '\0', 10);

    // Small block
    for (int i = 0; i < 9999; ++i) {
      int tmp;
      int sent_bytes = 0;
      while (sent_bytes < 10) {
        int max_len = 10 - sent_bytes;
        tmp = client.Send(&msg[sent_bytes], max_len);
        ASSERT_GE(tmp, 0);
        sent_bytes += tmp;
      }
      int recieved_bytes = 0;
      while (recieved_bytes < 10) {
      	int max_len = 10 - recieved_bytes;
        tmp = client.Receive(&clibuff[recieved_bytes], max_len);
        ASSERT_GE(tmp, 0);
        recieved_bytes += tmp;
      }
      ASSERT_EQ(string("0123456789"), string(clibuff, 10));
    }

    // Big block
    char* bigdata = new char[kDataLength];
    char* bigbuff = new char[kDataLength];
    memset(bigdata, 'x', kDataLength);
    memset(bigbuff, '\0', kDataLength);
    int sent_bytes = 0;
    while (sent_bytes < kDataLength) {
      int max_len = kDataLength - sent_bytes;
      int tmp = client.Send(&bigdata[sent_bytes], max_len);
      ASSERT_GE(tmp, 0);
      sent_bytes += tmp;
    }
    int recieved_bytes = 0;
    while (recieved_bytes < kDataLength) {
      int max_len = kDataLength - recieved_bytes;
      int tmp = client.Receive(&bigbuff[recieved_bytes], max_len);
      ASSERT_GE(tmp, 0);
      recieved_bytes += tmp;
    }
    for (size_t i = 0; i < kDataLength; ++i) {
      ASSERT_EQ(bigbuff[i], 'x');
    }
  }
  wait(0);
}