Unverified Commit 96e53dbb authored by Jiacheng Huang's avatar Jiacheng Huang Committed by GitHub
Browse files

issue/160: 梳理 InferEngine 相关接口

* 将 `cpp.LlamaForCausalLM` 提出,变为 `infinilm.infer_engine.InferEngine`

* 将 `Config` 构造逻辑拆分至 `AutoConfig` 中

* 在 `examples` 脚本中直接构造 `InferEngine`

* 将 `random_sample` 计算放入模型中

* 为 `InferEngine` 单独实现 `generate`

* 允许通过 `GenerationConfig` 传递 `temperature`、`top_k`、`top_p`

* 将 `random_sample` 处理从 `LlamaForCausalLM` 中转移到 `RankWorker` 里

* 在 `InferEngine.generate` 中直接 `append(output_id)`

* 修复 commit `13aa90c57de369f9985593c0066b6b06a7508b24` 引入的分布式卡死问题

* 将 `InferEngine.forward` 的接口与 C++ 层的 `InferEngine.Input` 对齐

* 提供了 `_measure_and_log_time` 参数来开启之前的 `generate` 内部计时功能
parent 23b1306c
...@@ -63,14 +63,14 @@ infinilm::InfinilmModel::Input InferEngine::Input::to_model_input() const { ...@@ -63,14 +63,14 @@ infinilm::InfinilmModel::Input InferEngine::Input::to_model_input() const {
InferEngine::Output InferEngine::forward(const InferEngine::Input &input) { InferEngine::Output InferEngine::forward(const InferEngine::Input &input) {
// Trigger each worker to run inference // Trigger each worker to run inference
for (auto &worker : workers_) { for (auto &worker : workers_) {
worker->run(input.to_model_input()); worker->run(input);
} }
// Wait for all workers // Wait for all workers
for (auto &worker : workers_) { for (auto &worker : workers_) {
worker->wait(); worker->wait();
} }
return {workers_[0]->get_output().logits}; return workers_[0]->get_output();
} }
//------------------------------------------------------ //------------------------------------------------------
......
...@@ -13,28 +13,9 @@ namespace infinilm::engine { ...@@ -13,28 +13,9 @@ namespace infinilm::engine {
class InferEngine { class InferEngine {
public: public:
struct Input { using Input = RankWorker::Input;
/// Token IDs tensor of shape `[batch, seq_len]`.
std::optional<infinicore::Tensor> input_ids;
/// Position IDs tensor of shape `[batch, seq_len]` or `[seq_len]`.
std::optional<infinicore::Tensor> position_ids;
/// Past Lengths of cached sequence for each request, of shape `[num_requests]`.
std::optional<infinicore::Tensor> cache_lengths;
/// Input Lengths of each request in a continous-batched sequence, of shape `[num_requests]`.
std::optional<infinicore::Tensor> input_lengths;
/// Offsets of each request in a continous-batched sequence, of shape `[num_requests]`.
std::optional<infinicore::Tensor> input_offsets;
/// Block ids for each request `[batch, max_block_table_length]`. Used for paged cache.
std::optional<infinicore::Tensor> block_tables;
/// Slot ids for each token `[seq]`. Used for paged cache.
std::optional<infinicore::Tensor> slot_mapping;
infinilm::InfinilmModel::Input to_model_input() const; using Output = RankWorker::Output;
};
struct Output {
infinicore::Tensor logits;
};
// Updated constructor: accept CacheConfig instead of CacheType // Updated constructor: accept CacheConfig instead of CacheType
InferEngine( InferEngine(
......
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
#include "../models/model_factory.hpp" #include "../models/model_factory.hpp"
#include "infinicore/ops.hpp"
#include <iostream> #include <iostream>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <stdexcept> #include <stdexcept>
...@@ -95,7 +97,7 @@ std::unordered_map<std::string, infinicore::nn::Parameter> RankWorker::state_dic ...@@ -95,7 +97,7 @@ std::unordered_map<std::string, infinicore::nn::Parameter> RankWorker::state_dic
//------------------------------------------------------ //------------------------------------------------------
// run -- asynchronous // run -- asynchronous
//------------------------------------------------------ //------------------------------------------------------
void RankWorker::run(const InfinilmModel::Input &args) { void RankWorker::run(const Input &args) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (should_exit_) { if (should_exit_) {
...@@ -156,7 +158,7 @@ void RankWorker::close() { ...@@ -156,7 +158,7 @@ void RankWorker::close() {
//------------------------------------------------------ //------------------------------------------------------
// get_output (thread safe) // get_output (thread safe)
//------------------------------------------------------ //------------------------------------------------------
InfinilmModel::Output RankWorker::get_output() { RankWorker::Output RankWorker::get_output() {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
return output_; return output_;
} }
...@@ -204,7 +206,7 @@ void RankWorker::thread_loop() { ...@@ -204,7 +206,7 @@ void RankWorker::thread_loop() {
local_param_name = pending_param_name_; local_param_name = pending_param_name_;
local_param = pending_param_; local_param = pending_param_;
} else if (local_cmd == Command::RUN) { } else if (local_cmd == Command::RUN) {
local_args = pending_args_; local_args = pending_args_.to_model_input();
} else if (local_cmd == Command::RESET_CACHE) { } else if (local_cmd == Command::RESET_CACHE) {
if (pending_cache_config_ != nullptr) { if (pending_cache_config_ != nullptr) {
local_cache_config = pending_cache_config_->unique_copy(); local_cache_config = pending_cache_config_->unique_copy();
...@@ -239,12 +241,40 @@ void RankWorker::thread_loop() { ...@@ -239,12 +241,40 @@ void RankWorker::thread_loop() {
} else if (local_cmd == Command::RUN) { } else if (local_cmd == Command::RUN) {
try { try {
auto out = model_->forward(local_args);
infinicore::context::syncStream();
{ {
std::lock_guard<std::mutex> lk(mutex_); std::lock_guard<std::mutex> lk(mutex_);
output_ = std::move(out);
auto logits{model_->forward(local_args).logits};
if (rank_info_.tp_rank == 0) {
// Perform random sampling.
auto temperature{pending_args_.temperature};
auto top_p{pending_args_.top_p};
auto top_k{pending_args_.top_k};
auto random_val{pending_args_.random_val};
const auto &logits_shape{logits->shape()};
const auto &batch_size{logits_shape[0]};
const auto &vocab_size{logits_shape[2]};
auto output_ids{infinicore::Tensor::empty({batch_size}, infinicore::DataType::I32, rank_info_.device)};
for (auto i{decltype(batch_size)(0)}; i < batch_size; ++i) {
auto score{logits->narrow({{0, i, 1}})->view({vocab_size})};
auto out{output_ids->narrow({{0, i, 1}})->view({})};
infinicore::op::random_sample_(
out, score, random_val, top_p, top_k, temperature);
}
output_ids = output_ids->to(infinicore::Device::cpu());
infinicore::context::syncStream();
auto out{Output{output_ids}};
output_ = std::move(out);
}
job_done_ = true; job_done_ = true;
} }
cv_.notify_all(); cv_.notify_all();
......
...@@ -23,6 +23,37 @@ class RankWorker { ...@@ -23,6 +23,37 @@ class RankWorker {
}; };
public: public:
struct Input {
/// Token IDs tensor of shape `[batch, seq_len]`.
std::optional<infinicore::Tensor> input_ids;
/// Position IDs tensor of shape `[batch, seq_len]` or `[seq_len]`.
std::optional<infinicore::Tensor> position_ids;
/// Past Lengths of cached sequence for each request, of shape `[num_requests]`.
std::optional<infinicore::Tensor> cache_lengths;
/// Input Lengths of each request in a continous-batched sequence, of shape `[num_requests]`.
std::optional<infinicore::Tensor> input_lengths;
/// Offsets of each request in a continous-batched sequence, of shape `[num_requests]`.
std::optional<infinicore::Tensor> input_offsets;
/// Block ids for each request `[batch, max_block_table_length]`. Used for paged cache.
std::optional<infinicore::Tensor> block_tables;
/// Slot ids for each token `[seq]`. Used for paged cache.
std::optional<infinicore::Tensor> slot_mapping;
float temperature{1};
int top_k{50};
float top_p{1};
float random_val{0.1};
infinilm::InfinilmModel::Input to_model_input() const;
};
struct Output {
infinicore::Tensor output_ids;
};
RankWorker(const InfinilmModel::Config &model_config, RankWorker(const InfinilmModel::Config &model_config,
const distributed::RankInfo &rank_info, const distributed::RankInfo &rank_info,
const cache::CacheConfig *cache_config); const cache::CacheConfig *cache_config);
...@@ -35,7 +66,7 @@ public: ...@@ -35,7 +66,7 @@ public:
std::unordered_map<std::string, infinicore::nn::Parameter> state_dict(); std::unordered_map<std::string, infinicore::nn::Parameter> state_dict();
// Submit a run (forward) job. // Submit a run (forward) job.
void run(const InfinilmModel::Input &args); void run(const Input &args);
// Reset the internal cache with a new configuration // Reset the internal cache with a new configuration
void reset_cache(const cache::CacheConfig *new_config); void reset_cache(const cache::CacheConfig *new_config);
...@@ -47,7 +78,7 @@ public: ...@@ -47,7 +78,7 @@ public:
void close(); void close();
// Thread-safe accessor for last output produced by RUN. // Thread-safe accessor for last output produced by RUN.
InfinilmModel::Output get_output(); Output get_output();
std::string info() const; std::string info() const;
...@@ -73,11 +104,11 @@ private: ...@@ -73,11 +104,11 @@ private:
// Task payloads (protected by mutex) // Task payloads (protected by mutex)
std::string pending_param_name_; std::string pending_param_name_;
infinicore::Tensor pending_param_; infinicore::Tensor pending_param_;
InfinilmModel::Input pending_args_; Input pending_args_;
std::unique_ptr<cache::CacheConfig> pending_cache_config_; std::unique_ptr<cache::CacheConfig> pending_cache_config_;
// Output (protected by mutex) // Output (protected by mutex)
InfinilmModel::Output output_; Output output_;
// Thread sync // Thread sync
std::thread thread_; std::thread thread_;
......
...@@ -35,7 +35,7 @@ public: ...@@ -35,7 +35,7 @@ public:
}; };
struct Output { struct Output {
/// Output tensor of shape [batch, seq_len, vocab_size]. /// Logits.
infinicore::Tensor logits; infinicore::Tensor logits;
}; };
......
...@@ -84,13 +84,28 @@ inline void bind_infer_engine(py::module &m) { ...@@ -84,13 +84,28 @@ inline void bind_infer_engine(py::module &m) {
std::optional<infinicore::Tensor> input_lengths, std::optional<infinicore::Tensor> input_lengths,
std::optional<infinicore::Tensor> input_offsets, std::optional<infinicore::Tensor> input_offsets,
std::optional<infinicore::Tensor> block_tables, std::optional<infinicore::Tensor> block_tables,
std::optional<infinicore::Tensor> slot_mapping) { std::optional<infinicore::Tensor> slot_mapping,
return InferEngine::Input{ py::kwargs kwargs) {
auto input{InferEngine::Input{
std::move(input_ids), std::move(input_ids),
std::move(position_ids), std::move(position_ids),
std::move(cache_lengths), std::move(cache_lengths),
std::move(block_tables), std::move(block_tables),
std::move(slot_mapping)}; std::move(slot_mapping)}};
if (kwargs) {
if (kwargs.contains("temperature")) {
input.temperature = kwargs["temperature"].cast<float>();
}
if (kwargs.contains("top_k")) {
input.top_k = kwargs["top_k"].cast<int>();
}
if (kwargs.contains("top_p")) {
input.top_p = kwargs["top_p"].cast<float>();
}
}
return input;
}), }),
py::arg("input_ids") = std::nullopt, py::arg("input_ids") = std::nullopt,
py::arg("position_ids") = std::nullopt, py::arg("position_ids") = std::nullopt,
...@@ -108,7 +123,7 @@ inline void bind_infer_engine(py::module &m) { ...@@ -108,7 +123,7 @@ inline void bind_infer_engine(py::module &m) {
.def_readwrite("slot_mapping", &InferEngine::Input::slot_mapping); .def_readwrite("slot_mapping", &InferEngine::Input::slot_mapping);
py::class_<InferEngine::Output>(infer_engine, "Output") py::class_<InferEngine::Output>(infer_engine, "Output")
.def_readwrite("logits", &InferEngine::Output::logits, "Output tensor"); .def_readwrite("output_ids", &InferEngine::Output::output_ids, "Output tensor");
} }
} // namespace infinilm::engine } // namespace infinilm::engine
import infinicore import infinicore
from transformers import AutoTokenizer from transformers import AutoTokenizer
from infinilm.modeling_utils import load_model_state_dict_by_file from infinilm.modeling_utils import load_model_state_dict_by_file
import infinilm
from infinilm.distributed import DistConfig from infinilm.distributed import DistConfig
from infinilm.infer_engine import GenerationConfig, InferEngine
import argparse import argparse
import sys import sys
import time import time
import os import os
import json import json
from collections import OrderedDict from collections import OrderedDict
import numpy as np
from tqdm import tqdm from tqdm import tqdm
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../python")) sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../python"))
...@@ -205,10 +206,9 @@ class TestModel: ...@@ -205,10 +206,9 @@ class TestModel:
# ---------------------------------------------------------------------------- # # ---------------------------------------------------------------------------- #
# 创建模型, # 创建模型,
# ---------------------------------------------------------------------------- # # ---------------------------------------------------------------------------- #
model = infinilm.AutoLlamaModel.from_pretrained( model = InferEngine(
model_path, model_path,
device=infini_device, device=infini_device,
backend="cpp",
distributed_config=DistConfig(tp), distributed_config=DistConfig(tp),
) )
...@@ -257,14 +257,17 @@ class TestModel: ...@@ -257,14 +257,17 @@ class TestModel:
t1 = time.time() t1 = time.time()
print("=================== start generate ====================") print("=================== start generate ====================")
self.model.generate( output_ids = self.model.generate(
input_ids_infini, input_ids_infini,
max_new_tokens=output_len, GenerationConfig(max_new_tokens=output_len, eos_token_id=[]),
tokenizer=self.tokenizer,
stop_on_eos=False,
) )
t2 = time.time() t2 = time.time()
numpy_output_ids = np.array(
[output_id.to_numpy()[0] for output_id in output_ids]
)
print(self.tokenizer.decode(numpy_output_ids, skip_special_tokens=True))
print( print(
f"total_time: {round((t2 - t1) * 1000, 2)} ms", f"total_time: {round((t2 - t1) * 1000, 2)} ms",
) )
......
...@@ -2,12 +2,13 @@ import infinicore ...@@ -2,12 +2,13 @@ import infinicore
from transformers import AutoTokenizer from transformers import AutoTokenizer
from tokenizers import decoders as _dec from tokenizers import decoders as _dec
from infinilm.modeling_utils import load_model_state_dict_by_file from infinilm.modeling_utils import load_model_state_dict_by_file
import infinilm
from infinilm.distributed import DistConfig from infinilm.distributed import DistConfig
from infinilm.infer_engine import GenerationConfig, InferEngine
import argparse import argparse
import sys import sys
import time import time
import os import os
import numpy as np
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../python")) sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../python"))
...@@ -90,17 +91,15 @@ def test( ...@@ -90,17 +91,15 @@ def test(
model_path, model_path,
max_new_tokens=100, max_new_tokens=100,
infini_device=infinicore.device("cpu", 0), infini_device=infinicore.device("cpu", 0),
backend="python",
tp=1, tp=1,
): ):
model_path = os.path.expanduser(model_path) model_path = os.path.expanduser(model_path)
# ---------------------------------------------------------------------------- # # ---------------------------------------------------------------------------- #
# 创建模型, # 创建模型,
# ---------------------------------------------------------------------------- # # ---------------------------------------------------------------------------- #
model = infinilm.AutoLlamaModel.from_pretrained( model = InferEngine(
model_path, model_path,
device=infini_device, device=infini_device,
backend=backend,
distributed_config=DistConfig(tp), distributed_config=DistConfig(tp),
) )
...@@ -165,13 +164,18 @@ def test( ...@@ -165,13 +164,18 @@ def test(
t1 = time.time() t1 = time.time()
print("=================== start generate ====================") print("=================== start generate ====================")
model.generate( output_ids = model.generate(
input_ids_infini, input_ids_infini,
max_new_tokens=max_new_tokens, GenerationConfig(
tokenizer=tokenizer, max_new_tokens=max_new_tokens, temperature=1, top_k=1, top_p=0.8
),
_measure_and_log_time=True,
) )
t2 = time.time() t2 = time.time()
numpy_output_ids = np.array([output_id.to_numpy()[0] for output_id in output_ids])
print(tokenizer.decode(numpy_output_ids, skip_special_tokens=True))
print( print(
f"total_time: {round((t2 - t1) * 1000, 2)} ms", f"total_time: {round((t2 - t1) * 1000, 2)} ms",
) )
...@@ -208,6 +212,9 @@ if __name__ == "__main__": ...@@ -208,6 +212,9 @@ if __name__ == "__main__":
backend = args.backend backend = args.backend
tp = args.tp tp = args.tp
if backend != "cpp":
raise ValueError(f"Unsupported backend: {backend}.")
infini_device = infinicore.device(device_str, 0) infini_device = infinicore.device(device_str, 0)
test( test(
...@@ -215,6 +222,5 @@ if __name__ == "__main__": ...@@ -215,6 +222,5 @@ if __name__ == "__main__":
model_path, model_path,
max_new_tokens, max_new_tokens,
infini_device=infini_device, infini_device=infini_device,
backend=backend,
tp=tp, tp=tp,
) )
...@@ -78,7 +78,6 @@ def test( ...@@ -78,7 +78,6 @@ def test(
model_path, model_path,
max_new_tokens=100, max_new_tokens=100,
infini_device=infinicore.device("cpu", 0), infini_device=infinicore.device("cpu", 0),
backend="python",
): ):
model_path = os.path.expanduser(model_path) model_path = os.path.expanduser(model_path)
# ---------------------------------------------------------------------------- # # ---------------------------------------------------------------------------- #
...@@ -87,7 +86,6 @@ def test( ...@@ -87,7 +86,6 @@ def test(
model = infinilm.AutoLlamaModel.from_pretrained( model = infinilm.AutoLlamaModel.from_pretrained(
model_path, model_path,
device=infini_device, device=infini_device,
backend=backend,
) )
# ---------------------------------------------------------------------------- # # ---------------------------------------------------------------------------- #
...@@ -192,6 +190,9 @@ if __name__ == "__main__": ...@@ -192,6 +190,9 @@ if __name__ == "__main__":
max_new_tokens = args.max_new_tokens max_new_tokens = args.max_new_tokens
backend = args.backend backend = args.backend
if backend != "python":
raise ValueError(f"Unsupported backend: {backend}.")
infini_device = infinicore.device(device_str, 0) infini_device = infinicore.device(device_str, 0)
test( test(
...@@ -199,5 +200,4 @@ if __name__ == "__main__": ...@@ -199,5 +200,4 @@ if __name__ == "__main__":
model_path, model_path,
max_new_tokens, max_new_tokens,
infini_device=infini_device, infini_device=infini_device,
backend=backend,
) )
import json
import os
from infinilm.models.llama.configuration_llama import LlamaConfig
class AutoConfig:
def from_pretrained(model_path):
config_path = os.path.join(model_path, "config.json")
if not os.path.exists(config_path):
raise FileNotFoundError(f"`{config_path}` not found")
with open(config_path) as f:
config_dict = json.load(f)
if "model_type" not in config_dict:
raise ValueError(
f"`model_type` is not specified in the config file `{config_path}`."
)
if config_dict["model_type"] == "llama":
return LlamaConfig(**config_dict)
raise ValueError(f"Unsupported model type `{config_dict['model_type']}`.")
import time
from dataclasses import dataclass
import infinicore
from infinilm.auto_config import AutoConfig
from infinilm.cache import StaticKVCacheConfig
from infinilm.distributed import DistConfig
from infinilm.lib import _infinilm
@dataclass
class GenerationConfig:
max_new_tokens: int | None = None
temperature: float = 1.0
top_k: int = 50
top_p: float = 1.0
eos_token_id: list[int] | None = None
class InferEngine(_infinilm.InferEngine):
def __init__(
self,
model_path,
device=None,
distributed_config=DistConfig(1),
cache_config=None,
):
self.config = AutoConfig.from_pretrained(model_path)
if device is None:
device = infinicore.device()
super().__init__(
self.config,
distributed_config._underlying,
device._underlying.type,
cache_config,
)
self.use_cache = False
def __call__(self, *args, **kwargs):
return self.forward(*args, **kwargs)
def forward(
self,
input_ids,
*,
position_ids=None,
cache_lengths=None,
input_lengths=None,
input_offsets=None,
block_tables=None,
slot_mapping=None,
temperature=None,
top_k=None,
top_p=None,
):
# TODO: Remove `_underlying` and simplify the corresponding code.
input_ids = input_ids._underlying if input_ids is not None else None
position_ids = position_ids._underlying if position_ids is not None else None
cache_lengths = cache_lengths._underlying if cache_lengths is not None else None
input_lengths = input_lengths._underlying if input_lengths is not None else None
input_offsets = input_offsets._underlying if input_offsets is not None else None
block_tables = block_tables._underlying if block_tables is not None else None
slot_mapping = slot_mapping._underlying if slot_mapping is not None else None
return infinicore.Tensor(
super()
.forward(
super().Input(
input_ids,
position_ids=position_ids,
cache_lengths=cache_lengths,
input_lengths=input_lengths,
input_offsets=input_offsets,
block_tables=block_tables,
slot_mapping=slot_mapping,
temperature=temperature,
top_k=top_k,
top_p=top_p,
)
)
.output_ids
)
def generate(self, input_ids, generation_config, *, _measure_and_log_time=False):
if generation_config.eos_token_id is None:
eos_token_id = self.config.eos_token_id
else:
eos_token_id = generation_config.eos_token_id
# TODO: Remove the `to_numpy` calls and simplify the corresponding code.
batch_size, seq_len = input_ids.shape[:2]
position_ids = infinicore.from_list(
[list(range(0, seq_len)) for _ in range(batch_size)], dtype=infinicore.int64
)
cache_lengths = infinicore.from_list([0], dtype=infinicore.int64)
output_ids = []
if batch_size != 1 and generation_config.max_new_tokens is None:
raise ValueError(
"When `batch_size > 1`, `max_new_tokens` must be specified."
)
if _measure_and_log_time:
time_measurements = []
for _ in range(0, generation_config.max_new_tokens):
if _measure_and_log_time:
start_time = time.perf_counter()
output_id = self(
input_ids,
position_ids=position_ids,
cache_lengths=cache_lengths,
temperature=generation_config.temperature,
top_k=generation_config.top_k,
top_p=generation_config.top_p,
)
output_ids.append(output_id)
if (
generation_config.max_new_tokens is not None
and output_id.to_numpy()[0] in eos_token_id
):
break
seq_len = position_ids.shape[-1]
input_ids = infinicore.from_list(
[[output_id] for output_id in output_id.to_numpy().tolist()]
)
position_ids = infinicore.from_list(
[1 for _ in range(batch_size)],
dtype=position_ids.dtype,
device=position_ids.device,
).view((batch_size, 1)) + position_ids.narrow(1, seq_len - 1, 1)
cache_lengths += infinicore.from_list(
[seq_len], dtype=cache_lengths.dtype, device=cache_lengths.device
)
if _measure_and_log_time:
end_time = time.perf_counter()
time_measurements.append((end_time - start_time))
if _measure_and_log_time:
print(
f"\n\n\n Generation completed in {round(sum(time_measurements) * 1000, 2)} ms"
)
print(
f" Batchsize={batch_size} Per_Batch_Input_Len={seq_len} Per_Batch_New_Tokens={len(time_measurements)}\n"
)
print(
f" Prefill TTFT: {round(time_measurements[0], 2)}ms Throughput: {round((batch_size * seq_len) / time_measurements[0], 2)}tok/s\n",
)
if len(time_measurements) > 1:
print(
f" Decode Avg ITL: {round(sum(time_measurements[1:]) * 1000 / (len(time_measurements) - 1), 2)}ms Throughput: {round((batch_size * (len(time_measurements) - 1)) / sum(time_measurements[1:]), 2)}tok/s\n",
)
return output_ids
def reset_cache(self, batch_size: int, initial_capacity: int = 1024):
infinicore.sync_device()
cache_config = StaticKVCacheConfig(batch_size, initial_capacity)
super().reset_cache(cache_config)
def state_dict_keyname(self):
return super().state_dict()[0].keys()
def load_state_dict(self, state_dict, strict=None):
for name, param in state_dict.items():
super().load_param(name, param._underlying)
...@@ -2,6 +2,7 @@ import os ...@@ -2,6 +2,7 @@ import os
from typing import Optional, Union from typing import Optional, Union
import infinicore import infinicore
import time import time
from . import modeling_llama
__all__ = ["AutoLlamaModel"] __all__ = ["AutoLlamaModel"]
...@@ -13,41 +14,21 @@ class AutoLlamaModel: ...@@ -13,41 +14,21 @@ class AutoLlamaModel:
model_path: Optional[Union[str, os.PathLike]], model_path: Optional[Union[str, os.PathLike]],
device: infinicore.device, device: infinicore.device,
dtype=infinicore.dtype, dtype=infinicore.dtype,
backend="python",
**kwargs, **kwargs,
): ):
t1 = time.time() t1 = time.time()
if backend == "python": print("\n***************************************************************")
from . import modeling_llama print("\t Loading Llama Model")
print(f"\t Device: {device}, DType: {dtype}")
print("\n***************************************************************") print("***************************************************************\n")
print("\t Loading Llama Model with Python Backend") print(" create model ......")
print(f"\t Device: {device}, DType: {dtype}")
print("***************************************************************\n") instance = modeling_llama.LlamaForCausalLM.from_pretrained(
print(" create model ......") model_path,
device=device,
instance = modeling_llama.LlamaForCausalLM.from_pretrained( **kwargs,
model_path, )
device=device,
**kwargs,
)
elif backend == "cpp":
from .backends import cpp
print("\n***************************************************************")
print("\t Loading Llama Model with C++ Backend")
print(f"\t Device: {device}, DType: {dtype}")
print("***************************************************************\n")
print(" create model ......")
instance = cpp.LlamaForCausalLM.from_pretrained(
model_path,
device=device,
**kwargs,
)
else:
raise KeyError("invalid backend")
t2 = time.time() t2 = time.time()
print(f" create model over! {(t2 - t1) * 1000} ms \n") print(f" create model over! {(t2 - t1) * 1000} ms \n")
......
from ....generation.utils import GenerationMixin
import infinicore
from infinilm.models.llama.configuration_llama import LlamaConfig
from infinilm.lib import _infinilm
from infinilm.cache import StaticKVCacheConfig
from infinilm.distributed import DistConfig
import json
import os
from typing import Optional, Union
class LlamaForCausalLM(GenerationMixin):
"""Llama model for causal language modeling"""
def __init__(
self,
config,
device=None,
dtype=None,
distributed_config=DistConfig(1),
cache_config=None,
):
"""
Create LlamaForCausalLM
Args:
config: LlamaConfig instance or dict
device: Device instance (defaults to CPU)
dtype: Optional dtype for model parameters (defaults to None)
"""
super().__init__()
# Convert config to LlamaConfig (handles both regular Llama and Jiuge models)
if isinstance(config, dict):
config = LlamaConfig(**config)
elif not isinstance(config, LlamaConfig):
# Not a dict or LlamaConfig, try to convert
config = LlamaConfig(config)
# If already LlamaConfig, use as-is (it will auto-detect jiuge models)
if device is None:
device = infinicore.device()
self.use_cache = False
# Store the Python wrapper config so it can be accessed later
# This is needed for DynamicCache which calls config.get_text_config()
self._config = config
self._device = device
# self._model = _infinilm.LlamaForCausalLM(
# config._underlying, device._underlying, dtype
# )
self._model = _infinilm.InferEngine(
config,
distributed_config._underlying,
device._underlying.type,
cache_config,
)
def reset_cache(self, batch_size: int, initial_capacity: int = 1024):
"""Reset the cache for the model"""
infinicore.sync_device()
cache_config = StaticKVCacheConfig(batch_size, initial_capacity)
self._model.reset_cache(cache_config)
def state_dict_keyname(self):
"""Get model key name."""
return self._model.state_dict()[0].keys()
def load_state_dict(self, state_dict, strict=None):
"""
Load state dictionary into the model
Args:
state_dict: Dictionary mapping parameter names to InfiniCore tensors, numpy arrays, or torch tensors
"""
# self._model.load_state_dict(state_dict, self._device._underlying)
for name, param in state_dict.items():
self._model.load_param(name, param._underlying)
def load_param(self, name: str, weight: infinicore.Tensor):
self._model.load_param(name, weight._underlying)
def get_parameter(self, name):
"""
Get a parameter tensor by name
Args:
name: Parameter name
Returns:
InfiniCore tensor
"""
return self._model.get_parameter(name)
@property
def config(self):
"""Get model configuration"""
# Return the Python wrapper config instead of C++ config
# This ensures compatibility with code that expects PretrainedConfig methods
# like get_text_config() used by DynamicCache
return self._config
def forward(self, input_ids, position_ids, cache_positions, *args, **kwargs):
return infinicore.Tensor(
self._model.forward(
self._model.Input(
input_ids._underlying,
position_ids._underlying,
cache_positions._underlying,
)
).logits
)
def __call__(self, input_ids, position_ids, cache_positions, *args, **kwargs):
return self.forward(
input_ids=input_ids,
position_ids=position_ids,
cache_positions=cache_positions,
*args,
**kwargs,
)
@classmethod
def from_pretrained(
cls,
model_path: Union[str, os.PathLike],
device: Optional[infinicore.device] = None,
dtype: Optional[infinicore.dtype] = None,
**kwargs,
):
"""
Load a pretrained LlamaForCausalLM model from a directory.
Args:
model_path: Path to the model directory containing config.json
device: Device instance (defaults to CPU)
dtype: Optional dtype for model parameters (defaults to None)
Returns:
LlamaForCausalLM instance
"""
config_path = os.path.join(model_path, "config.json")
if not os.path.exists(config_path):
raise FileNotFoundError(f"Config file not found: {config_path}")
with open(config_path, "r") as f:
config_dict = json.load(f)
# LlamaConfig automatically detects and handles jiuge models
config = LlamaConfig(**config_dict)
return cls(config, device=device, dtype=dtype, **kwargs)
...@@ -5,12 +5,13 @@ import time ...@@ -5,12 +5,13 @@ import time
import re import re
import csv import csv
from datasets import load_dataset, Dataset from datasets import load_dataset, Dataset
import numpy as np
import infinicore import infinicore
import infinilm
from infinilm.models.llama import AutoLlamaModel
from infinilm.modeling_utils import load_model_state_dict_by_file from infinilm.modeling_utils import load_model_state_dict_by_file
from infinilm.distributed import DistConfig from infinilm.distributed import DistConfig
from infinilm.cache import StaticKVCacheConfig from infinilm.cache import StaticKVCacheConfig
from infinilm.infer_engine import GenerationConfig, InferEngine
from infinilm.cache import StaticKVCacheConfig
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
...@@ -112,12 +113,14 @@ class InfiniLMBenchmark(BaseBenchmark): ...@@ -112,12 +113,14 @@ class InfiniLMBenchmark(BaseBenchmark):
[eos_token_id] if isinstance(eos_token_id, int) else eos_token_id [eos_token_id] if isinstance(eos_token_id, int) else eos_token_id
) )
if backend != "cpp":
raise ValueError(f"Unsupported backend: {backend}.")
# Create model with cpp backend # Create model with cpp backend
print("Loading model with cpp backend...") print("Loading model with cpp backend...")
self.model = AutoLlamaModel.from_pretrained( self.model = InferEngine(
model_dir_path, model_dir_path,
device=self.device, device=self.device,
backend=backend,
distributed_config=DistConfig(ndev), distributed_config=DistConfig(ndev),
cache_config=StaticKVCacheConfig(), cache_config=StaticKVCacheConfig(),
) )
...@@ -175,22 +178,45 @@ class InfiniLMBenchmark(BaseBenchmark): ...@@ -175,22 +178,45 @@ class InfiniLMBenchmark(BaseBenchmark):
input_ids_list = [tokens] input_ids_list = [tokens]
input_ids = infinicore.from_list(input_ids_list) input_ids = infinicore.from_list(input_ids_list)
start_time = time.perf_counter()
# Use model's built-in generate() method which properly handles KV cache # Use model's built-in generate() method which properly handles KV cache
# Pass sampling parameters (temperature, topk, topp) via kwargs # Pass sampling parameters (temperature, topk, topp) via kwargs
result = self.model.generate( output_ids = self.model.generate(
input_ids=input_ids, input_ids=input_ids,
max_new_tokens=max_steps, generation_config=GenerationConfig(
tokenizer=self.tokenizer, max_new_tokens=max_steps,
stop_on_eos=True, temperature=temperature_,
temperature=temperature_, top_k=topk_,
topk=topk_, top_p=topp_,
topp=topp_, ),
) )
end_time = time.perf_counter()
# ---- post process ----
generated_ids = np.array([output_id.to_numpy()[0] for output_id in output_ids])
output_text = self.tokenizer.decode(generated_ids)
# ---- stats ----
input_tokens = len(tokens)
new_tokens = generated_ids.size
total_tokens = input_tokens + new_tokens
total_time = end_time - start_time
throughput = total_tokens / total_time if total_time > 0 else 0.0
print(output_text)
print()
print(f"Total time: {total_time * 1000:.2f} ms")
print(f"Input tokens: {input_tokens}")
print(f"New tokens: {new_tokens}")
print(f"Total tokens processed: {total_tokens}")
print(f"Throughput: {throughput:.2f} tok/s")
global TOTAL_TOKENS, TOTAL_TIME global TOTAL_TOKENS, TOTAL_TIME
TOTAL_TIME += result["total_latency"] TOTAL_TOKENS += total_tokens
TOTAL_TOKENS += result["total_input_tokens"] + result["total_output_tokens"] TOTAL_TIME += total_time
return result["output_content"] return output_text
def destroy_model_instance(self): def destroy_model_instance(self):
# Cleanup if needed # Cleanup if needed
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment