Request.h 2.98 KB
Newer Older
Li Zhang's avatar
Li Zhang committed
1
2
3
4
// Copyright (c) OpenMMLab. All rights reserved.

#pragma once

lvhan028's avatar
lvhan028 committed
5
#include "src/turbomind/utils/Tensor.h"
Li Zhang's avatar
Li Zhang committed
6
7
8
9
10
11
12
#include <condition_variable>
#include <cstdint>
#include <future>
#include <limits>
#include <queue>
#include <unordered_map>

lvhan028's avatar
lvhan028 committed
13
namespace turbomind {
Li Zhang's avatar
Li Zhang committed
14
15
16

struct Request {
    uint64_t id;
Li Zhang's avatar
Li Zhang committed
17
18
19
20
21
    uint64_t priority;

    bool start_flag;
    bool end_flag;
    bool stop_flag;
Li Zhang's avatar
Li Zhang committed
22
23
24
25
26
27
28
29

    // per rank inputs/outputs
    std::vector<TensorMap> inputs;
    std::vector<TensorMap> outputs;

    using Callback = std::function<void(std::unordered_map<std::string, Tensor>*)>;
    Callback stream_cb;

AllentDan's avatar
AllentDan committed
30
31
    enum
    {
Li Zhang's avatar
Li Zhang committed
32
33
34
35
        kInvalid  = 1,
        kConflict = 2,
        kBusy     = 3,
        kInactive = 4,
Li Zhang's avatar
Li Zhang committed
36
37
        kFail     = 5,
        kTooLong  = 6
Li Zhang's avatar
Li Zhang committed
38
39
40
41
42
43
44
45
46
47
48
49
    };
    std::promise<int> signal;
};

class RequestQueue {
public:
    std::vector<std::future<int>> enqueue(std::vector<std::shared_ptr<Request>> requests)
    {
        std::vector<std::future<int>> futures;
        futures.reserve(requests.size());
        {
            std::lock_guard<std::mutex> lock(mutex_);
50
51
52
53
54

            if (closed_) {
                throw std::runtime_error("Queue is closed");
            }

Li Zhang's avatar
Li Zhang committed
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
            for (auto& r : requests) {
                futures.push_back(r->signal.get_future());
                if (r->stop_flag) {
                    stop_queue_.push(std::move(r));
                }
                else {
                    infer_queue_.push(std::move(r));
                }
            }
        }
        cv_.notify_one();
        return futures;
    }

    void dequeue(std::vector<std::shared_ptr<Request>>& stop_requests,
                 std::vector<std::shared_ptr<Request>>& infer_requests,
                 unsigned                               max_infer_count,
Li Zhang's avatar
Li Zhang committed
72
73
                 bool                                   blocking,
                 bool&                                  abort)
Li Zhang's avatar
Li Zhang committed
74
75
76
    {
        std::unique_lock<std::mutex> lock(mutex_);
        if (blocking) {
Li Zhang's avatar
Li Zhang committed
77
78
79
80
81
            cv_.wait(lock, [this] { return !(stop_queue_.empty() && infer_queue_.empty()) || closed_; });
            if (closed_) {
                abort = true;
                return;
            }
Li Zhang's avatar
Li Zhang committed
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
        }

        stop_requests.clear();
        while (!stop_queue_.empty()) {
            stop_requests.push_back(std::move(stop_queue_.front()));
            stop_queue_.pop();
        }

        infer_requests.clear();
        while (!infer_queue_.empty() && infer_requests.size() < max_infer_count) {
            infer_requests.push_back(std::move(infer_queue_.front()));
            infer_queue_.pop();
        }
    }

97
98
    void close()
    {
Li Zhang's avatar
Li Zhang committed
99
100
101
102
        {
            std::lock_guard<std::mutex> lock(mutex_);
            closed_ = true;
        }
103
104
105
        cv_.notify_all();
    }

Li Zhang's avatar
Li Zhang committed
106
107
108
109
110
private:
    std::queue<std::shared_ptr<Request>> stop_queue_;
    std::queue<std::shared_ptr<Request>> infer_queue_;
    std::mutex                           mutex_;
    std::condition_variable              cv_;
Li Zhang's avatar
Li Zhang committed
111
    bool                                 closed_{false};
Li Zhang's avatar
Li Zhang committed
112
113
};

lvhan028's avatar
lvhan028 committed
114
}  // namespace turbomind