utils.cpp 4.83 KB
Newer Older
1
2
3
4
5
6
#ifndef VLLM_NUMA_DISABLED
  #include <numa.h>
  #include <unistd.h>
  #include <string>
  #include <sched.h>
#endif
7
8
9
10
11
#if __GLIBC__ == 2 && __GLIBC_MINOR__ < 30
  #include <unistd.h>
  #include <sys/syscall.h>
  #define gettid() syscall(SYS_gettid)
#endif
12

13
#include "cpu/utils.hpp"
14

15
16
17
18
19
20
21
22
23
24
#ifdef VLLM_NUMA_DISABLED
std::string init_cpu_threads_env(const std::string& cpu_ids) {
  return std::string(
      "Warning: NUMA is not enabled in this build. `init_cpu_threads_env` has "
      "no effect to setup thread affinity.");
}

#endif

#ifndef VLLM_NUMA_DISABLED
25
std::string init_cpu_threads_env(const std::string& cpu_ids) {
26
  bitmask* omp_cpu_mask = numa_parse_cpustring_all(cpu_ids.c_str());
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
  TORCH_CHECK(omp_cpu_mask->size > 0);
  std::vector<int> omp_cpu_ids;
  omp_cpu_ids.reserve(omp_cpu_mask->size);

  constexpr int group_size = 8 * sizeof(*omp_cpu_mask->maskp);

  for (int offset = 0; offset < omp_cpu_mask->size; offset += group_size) {
    unsigned long group_mask = omp_cpu_mask->maskp[offset / group_size];
    int i = 0;
    while (group_mask) {
      if (group_mask & 1) {
        omp_cpu_ids.emplace_back(offset + i);
      }
      ++i;
      group_mask >>= 1;
    }
  }

  // Memory node binding
  if (numa_available() != -1) {
    int mem_node_id = numa_node_of_cpu(omp_cpu_ids.front());
48
49
50
51
52
53
    std::set<int> node_ids;
    for (const auto& cpu_id : omp_cpu_ids) {
      int node_id = numa_node_of_cpu(cpu_id);
      if (node_id != -1) {
        node_ids.insert(node_id);
      }
54
55
56
57
58
59
60
      if (node_id != mem_node_id) {
        TORCH_WARN("CPU ", cpu_id, " is on NUMA node ", node_id, ", but CPU ",
                   omp_cpu_ids.front(), " is on NUMA node ", mem_node_id,
                   ". All CPUs should be on the same NUMA node for optimal "
                   "performance. Memory will be bound to NUMA node ",
                   mem_node_id, ".");
      }
61
    }
62
63
64
65
66
67
68
69
70
    // Concatenate all node_ids into a single comma-separated string
    if (!node_ids.empty()) {
      std::string node_ids_str;
      for (const int node_id : node_ids) {
        if (!node_ids_str.empty()) {
          node_ids_str += ",";
        }
        node_ids_str += std::to_string(node_id);
      }
71

72
73
      bitmask* mask = numa_parse_nodestring(node_ids_str.c_str());
      bitmask* src_mask = numa_get_membind();
74

75
76
77
78
79
80
81
82
83
84
      int pid = getpid();

      if (mask && src_mask) {
        // move all existing pages to the specified numa node.
        *(src_mask->maskp) = *(src_mask->maskp) ^ *(mask->maskp);
        int page_num = numa_migrate_pages(pid, src_mask, mask);
        if (page_num == -1) {
          TORCH_WARN("numa_migrate_pages failed. errno: " +
                     std::to_string(errno));
        }
85

86
87
88
89
90
91
92
93
94
95
96
        // restrict memory allocation node.
        numa_set_membind(mask);
        numa_set_strict(1);

        numa_free_nodemask(mask);
        numa_free_nodemask(src_mask);
      } else {
        TORCH_WARN("numa_parse_nodestring or numa_get_membind failed. errno: " +
                   std::to_string(errno));
      }
    }
97
98
99
100
101
102
103
  }

  // OMP threads binding
  omp_set_num_threads((int)omp_cpu_ids.size());
  torch::set_num_threads((int)omp_cpu_ids.size());
  TORCH_CHECK_EQ(omp_cpu_ids.size(), torch::get_num_threads());
  TORCH_CHECK_EQ(omp_cpu_ids.size(), omp_get_max_threads());
104
105
106
107
108
109

  std::vector<std::pair<int, int>> thread_core_mapping;
  thread_core_mapping.reserve(omp_cpu_ids.size());
  omp_lock_t writelock;
  omp_init_lock(&writelock);

110
  #pragma omp parallel for schedule(static, 1)
111
  for (size_t i = 0; i < omp_cpu_ids.size(); ++i) {
112
113
114
115
116
117
118
119
120
121
122
123
    cpu_set_t mask;
    CPU_ZERO(&mask);
    CPU_SET(omp_cpu_ids[i], &mask);
    int ret = sched_setaffinity(0, sizeof(cpu_set_t), &mask);
    if (ret == -1) {
      TORCH_CHECK(false,
                  "sched_setaffinity failed. errno: " + std::to_string(errno));
    }

    omp_set_lock(&writelock);
    thread_core_mapping.emplace_back(gettid(), omp_cpu_ids[i]);
    omp_unset_lock(&writelock);
124
125
  }

126
127
  omp_destroy_lock(&writelock);

128
  numa_free_nodemask(omp_cpu_mask);
129
130
131
132
133
134
135
136
137
138
139

  std::stringstream ss;
  ss << "OMP threads binding of Process " << getpid() << ":\n";
  std::sort(thread_core_mapping.begin(), thread_core_mapping.end(),
            [](auto&& a, auto&& b) { return a.second < b.second; });
  for (auto&& item : thread_core_mapping) {
    ss << "\t"
       << "OMP tid: " << item.first << ", core " << item.second << "\n";
  }

  return ss.str();
140
}
141
#endif  // VLLM_NUMA_DISABLED
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163

namespace cpu_utils {
ScratchPadManager::ScratchPadManager() : size_(0), ptr_(nullptr) {
  this->realloc(allocation_unit * 128);
}

void ScratchPadManager::realloc(size_t new_size) {
  new_size = round(new_size);
  if (new_size > size_) {
    if (ptr_ != nullptr) {
      std::free(ptr_);
    }
    ptr_ = std::aligned_alloc(64, new_size);
    size_ = new_size;
  }
}

ScratchPadManager* ScratchPadManager::get_scratchpad_manager() {
  static ScratchPadManager manager;
  return &manager;
}
}  // namespace cpu_utils