"vscode:/vscode.git/clone" did not exist on "7443197a63de26350fb6785377f20a0cbf491118"
test_scheduler.py 7.06 KB
Newer Older
zhouxiang's avatar
zhouxiang committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import pytest
import torch

from lmdeploy.pytorch.config import CacheConfig, SchedulerConfig
from lmdeploy.pytorch.messages import MessageStatus
from lmdeploy.pytorch.paging.scheduler import Scheduler


class TestScheduler:

    @pytest.fixture
    def block_size(self):
        yield 16

    @pytest.fixture
    def num_cpu_blocks(self):
        yield 4

    @pytest.fixture
    def num_gpu_blocks(self):
        yield 4

    @pytest.fixture
    def cache_config(self, block_size, num_cpu_blocks, num_gpu_blocks):
        yield CacheConfig(block_size=block_size,
                          num_cpu_blocks=num_cpu_blocks,
                          num_gpu_blocks=num_gpu_blocks)

    @pytest.fixture
    def scheduler_config(self):
        yield SchedulerConfig(max_batches=4,
                              max_session_len=128,
                              max_request_output_len=64,
                              eviction_type='copy')

    @pytest.fixture
    def scheduler(self, cache_config, scheduler_config):
        yield Scheduler(scheduler_config=scheduler_config,
                        cache_config=cache_config)

    def test_schedule_base(self, scheduler, block_size, num_gpu_blocks):
        block_manager = scheduler.block_manager
        session_id = 0
        session = scheduler.add_session(session_id)
        assert session_id in scheduler.sessions
        assert scheduler.sessions[session_id] == session

        num_blocks = 2
        token_ids = torch.tensor([0] * block_size * num_blocks)
        seq = session.add_sequence(token_ids)
        scheduler.add_sequence(seq)

        assert seq.status == MessageStatus.WAITING
        assert seq in scheduler.waiting

        output = scheduler.schedule(is_prefill=True)
        block_tables = scheduler.get_block_tables(output.running)

        assert seq.status == MessageStatus.RUNNING
        assert seq in output.running
        assert len(block_tables) == 1
        assert len(block_tables[0]) == num_blocks
        assert block_manager.get_num_free_gpu_blocks(
        ) == num_gpu_blocks - num_blocks

        assert scheduler.has_unfinished()

    def test_update(self, scheduler, block_size, num_gpu_blocks):
        block_manager = scheduler.block_manager
        session_id1 = 0
        session1 = scheduler.add_session(session_id1)
        token_ids1 = torch.tensor([0] * block_size * 1)
        seq1 = session1.add_sequence(token_ids1)
        scheduler.add_sequence(seq1)

        session_id2 = 1
        session2 = scheduler.add_session(session_id2)
        token_ids2 = torch.tensor([0] * block_size * 2)
        seq2 = session2.add_sequence(token_ids2)
        scheduler.add_sequence(seq2)
        token_ids3 = torch.tensor([0] * block_size * 3)
        seq3 = session2.add_sequence(token_ids3)
        scheduler.add_sequence(seq3)

        scheduler.schedule(is_prefill=True)
        assert seq1.status == MessageStatus.RUNNING
        assert seq2.status == MessageStatus.RUNNING
        assert seq3.status == MessageStatus.WAITING

        # stop seq
        seq1.status = MessageStatus.STOPPED
        scheduler.update()
        assert len(scheduler.running) == 1
        assert seq1 in scheduler.hanging

        # end seq
        seq1.status = MessageStatus.ENDED
        scheduler.update()
        assert session_id1 in scheduler.sessions
        assert seq1 not in scheduler.running
        assert seq1 not in scheduler.hanging
        assert block_manager.get_num_free_gpu_blocks() == num_gpu_blocks - 2

        # stop session
        scheduler.stop_session(session_id2)
        scheduler.update()
        assert len(scheduler.running) == 0
        assert len(scheduler.waiting) == 0
        assert len(scheduler.hanging) == 2

        # end session
        scheduler.end_session(session_id2)
        scheduler.update()
        assert seq2.status == MessageStatus.ENDED
        assert seq3.status == MessageStatus.ENDED
        assert session_id2 not in scheduler.sessions
        assert len(scheduler.hanging) == 0
        assert block_manager.get_num_free_gpu_blocks() == num_gpu_blocks

    def test_swap(self, scheduler, block_size, num_gpu_blocks, num_cpu_blocks):
        block_manager = scheduler.block_manager
        session_id = 0
        session = scheduler.add_session(session_id)

        # test: add 3 seq
        token_ids1 = torch.tensor([0] * block_size * 1)
        seq1 = session.add_sequence(token_ids1)
        scheduler.add_sequence(seq1)
        token_ids2 = torch.tensor([0] * block_size * 2)
        seq2 = session.add_sequence(token_ids2)
        scheduler.add_sequence(seq2)
        token_ids3 = torch.tensor([0] * block_size * 3)
        seq3 = session.add_sequence(token_ids3)
        scheduler.add_sequence(seq3)
        scheduler.schedule(is_prefill=True)
        # seq1: 1 running gpu
        # seq2: 2 running gpu
        # seq3: 3 waiting empty
        assert seq1.status == MessageStatus.RUNNING
        assert seq2.status == MessageStatus.RUNNING
        assert seq3.status == MessageStatus.WAITING
        assert block_manager.get_num_free_gpu_blocks() == num_gpu_blocks - 3

        # test: waiting alloc
        seq2.status = MessageStatus.STOPPED
        scheduler.update()
        assert len(scheduler.running) == 1
        assert len(scheduler.waiting) == 1
        assert len(scheduler.hanging) == 1

        output = scheduler.schedule(is_prefill=True)
        # seq1: 1 running gpu
        # seq2: 2 hanging cpu
        # seq3: 3 waiting gpu
        assert seq1.status == MessageStatus.RUNNING
        assert seq2.status == MessageStatus.STOPPED
        assert seq3.status == MessageStatus.RUNNING
        assert block_manager.get_num_free_gpu_blocks() == 0
        assert block_manager.get_num_free_cpu_blocks() == num_cpu_blocks - 2
        assert len(output.swap_out_map) == 2

        # test: waiting append token
        seq2.status = MessageStatus.WAITING
        seq3.status = MessageStatus.ENDED
        seq2.update_token_ids(torch.tensor([1] * block_size))
        scheduler.update()
        assert len(scheduler.running) == 1
        assert len(scheduler.waiting) == 1
        assert len(scheduler.hanging) == 0

        output = scheduler.schedule(is_prefill=True)
        # seq1: 1 running gpu
        # seq2: 3 running gpu
        # seq3: 3 nan
        assert seq1.status == MessageStatus.RUNNING
        assert seq2.status == MessageStatus.RUNNING
        assert block_manager.get_num_free_gpu_blocks() == 0
        assert block_manager.get_num_free_cpu_blocks() == num_cpu_blocks
        assert len(output.swap_in_map) == 2

        # test running append
        seq1.update_token_ids(torch.tensor([1] * block_size))
        seq2.update_token_ids(torch.tensor([1] * block_size))
        scheduler.update()
        assert len(scheduler.running) == 2
        output = scheduler.schedule(is_prefill=False)
        # seq1: 1 waiting cpu
        # seq2: 4 running gpu
        # seq3: 3 nan
        assert seq1.status == MessageStatus.WAITING
        assert seq2.status == MessageStatus.RUNNING
        assert block_manager.get_num_free_gpu_blocks() == 0
        assert block_manager.get_num_free_cpu_blocks() == num_cpu_blocks - 1
        assert len(output.swap_out_map) == 1