rpc_client.cc 1.9 KB
Newer Older
1
#include <chrono>
2
#include <cstdlib>
3
4
5
6
7
8
9
10
#include <fstream>
#include <stdexcept>
#include <thread>
#include <vector>

#include "rpc_base.h"

class RPCClient {
11
 public:
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  explicit RPCClient(const std::string &ip_config) : ip_config_(ip_config) {
    ParseIPs();
  }

  void Run() {
    std::vector<std::thread> threads;
    for (int i = 0; i < kNumSender; ++i) {
      threads.push_back(std::thread(&RPCClient::StartClient, this));
    }
    for (auto &&t : threads) {
      t.join();
    }
  }

26
 private:
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
  void ParseIPs() {
    std::ifstream ifs(ip_config_);
    if (!ifs) {
      LOG(FATAL) << "Failed to open ip_config: " + ip_config_;
    }
    for (std::string line; std::getline(ifs, line);) {
      std::cout << line << std::endl;
      ips_.push_back(line);
    }
  }
  void StartClient() {
    dgl::rpc::TPSender sender(InitTPContext());
    int recv_id = 0;
    for (const auto &ip : ips_) {
      for (int i = 0; i < kNumReceiver; ++i) {
        const std::string ip_addr =
            std::string{"tcp://"} + ip + ":" + std::to_string(kPort + i);
        while (!sender.ConnectReceiver(ip_addr, recv_id)) {
          std::this_thread::sleep_for(std::chrono::seconds(1));
        }
        ++recv_id;
      }
    }
    for (int i = 0; i < kNumMessage; ++i) {
      for (int n = 0; n < recv_id; ++n) {
        dgl::rpc::RPCMessage msg;
        msg.data = "123456789";
        const auto tensor =
            dgl::runtime::NDArray::FromVector(std::vector<int>(kSizeTensor, 1));
        for (int j = 0; j < kNumTensor; ++j) {
          msg.tensors.push_back(tensor);
        }
        sender.Send(msg, n);
      }
    }
    sender.Finalize();
  }
  const std::string ip_config_;
  std::vector<std::string> ips_;
};

int main(int argc, char **argv) {
  if (argc != 2) {
    LOG(FATAL)
        << "Invalid call. Please call like this: ./rpc_client ip_config.txt";
    return -1;
  }
  RPCClient client(argv[1]);
  client.Run();

  return 0;
}