infer_engine.cpp 4.42 KB
Newer Older
1
#include "infer_engine.hpp"
Ceng's avatar
Ceng committed
2
#include "spdlog/spdlog.h"
3
4
5
6
7
8
9

namespace infinilm::engine {

//------------------------------------------------------
// Constructor
//------------------------------------------------------
InferEngine::InferEngine(
Jiacheng Huang's avatar
Jiacheng Huang committed
10
    const InfinilmModel::Config &config,
11
    const distributed::DistConfig &distributed_config,
12
    infinicore::Device::Type device_type,
PanZezhong's avatar
PanZezhong committed
13
    const cache::CacheConfig *cache_config) // Changed parameter
14
    : communication_group_(distributed_config, device_type),
PanZezhong's avatar
PanZezhong committed
15
      model_config_(config) {
16

PanZezhong's avatar
PanZezhong committed
17
18
    if (cache_config != nullptr) {
        cache_config_ = cache_config->unique_copy();
19
    }
20
21
22
23
    // Create one RankWorker per rank
    int world_size = communication_group_.get_world_size();
    workers_.reserve(world_size);
    for (int r = 0; r < world_size; ++r) {
24
25
26
        workers_.emplace_back(std::make_unique<RankWorker>(
            model_config_,
            communication_group_.get_rank_info(r),
PanZezhong's avatar
PanZezhong committed
27
            cache_config_ != nullptr ? cache_config_.get() : nullptr));
28
29
30
31
32
33
34
35
36
37
38
39
    }
}

//------------------------------------------------------
// load_param
//------------------------------------------------------
void InferEngine::load_param(const std::string &name, const infinicore::Tensor &param) {
    // Load the parameter on all workers
    for (auto &worker : workers_) {
        worker->load_param(name, param);
    }
}
40

41
42
43
//------------------------------------------------------
// state_dict
//------------------------------------------------------
44
45
std::vector<std::unordered_map<std::string, infinicore::nn::Parameter>> InferEngine::state_dict() {
    std::vector<std::unordered_map<std::string, infinicore::nn::Parameter>> results;
46
47
48
    if (0 == workers_.size()) {
        throw std::runtime_error(" Model object not found. ");
    }
49
50
51
52
53

    for (auto &worker : workers_) {
        results.push_back(worker->state_dict());
    }
    return results;
54
55
}

56
//------------------------------------------------------
57
// forward
58
//------------------------------------------------------
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
infinilm::InfinilmModel::Input InferEngine::Input::to_model_input(infinicore::Device device) const {

    std::optional<infinicore::Tensor> position_ids_on_device;
    if (position_ids.has_value()) {
        position_ids_on_device = position_ids.value()->to(device);
    }

    std::optional<infinicore::Tensor> cache_lengths_on_device;
    if (cache_lengths.has_value()) {
        if (block_tables.has_value()) {
            cache_lengths_on_device = cache_lengths.value()->to(device);
        } else { // @todo: only paged kv cache support device tensor so far
            cache_lengths_on_device = cache_lengths.value();
        }
    }

    std::optional<infinicore::Tensor> input_offsets_on_device;
    if (input_offsets.has_value()) {
        input_offsets_on_device = input_offsets.value()->to(device);
    }

    std::optional<infinicore::Tensor> block_tables_on_device;
    if (block_tables.has_value()) {
        block_tables_on_device = block_tables.value()->to(device);
    }

    std::optional<infinicore::Tensor> slot_mapping_on_device;
    if (slot_mapping.has_value()) {
        slot_mapping_on_device = slot_mapping.value()->to(device);
    }

    return {
        input_ids, // @todo: on device in the future
        position_ids_on_device,
        cache_lengths_on_device,
        input_offsets_on_device,
        block_tables_on_device,
        slot_mapping_on_device};
PanZezhong's avatar
PanZezhong committed
97
}
98

PanZezhong's avatar
PanZezhong committed
99
InferEngine::Output InferEngine::forward(const InferEngine::Input &input) {
100
101
    // Trigger each worker to run inference
    for (auto &worker : workers_) {
102
        worker->run(input);
103
    }
PanZezhong's avatar
PanZezhong committed
104
105
106
107
    // Wait for all workers
    for (auto &worker : workers_) {
        worker->wait();
    }
108

109
    return workers_[0]->get_output();
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
}

//------------------------------------------------------
// Destructor
//------------------------------------------------------
InferEngine::~InferEngine() {
    // Close all workers
    for (auto &worker : workers_) {
        worker->close();
    }
}

const distributed::DistConfig &InferEngine::get_dist_config() const {
    return communication_group_.get_dist_config();
}

126
127
128
//------------------------------------------------------
// reset_cache (overloaded with CacheConfig)
//------------------------------------------------------
PanZezhong's avatar
PanZezhong committed
129
void InferEngine::reset_cache(const cache::CacheConfig *new_config) {
Ceng's avatar
Ceng committed
130
    for (auto &worker : workers_) {
PanZezhong's avatar
PanZezhong committed
131
        worker->reset_cache(new_config);
132
133
134
    }
    for (auto &worker : workers_) {
        worker->wait();
Ceng's avatar
Ceng committed
135
136
137
    }
}

138
} // namespace infinilm::engine