scheduler.py 21.1 KB
Newer Older
1
from collections import deque
2
3
import enum
import time
4
from typing import Deque, Dict, Iterable, List, Optional, Tuple, Union, Set
Woosuk Kwon's avatar
Woosuk Kwon committed
5

6
from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig
7
from vllm.core.block_manager import AllocStatus, BlockSpaceManager
Woosuk Kwon's avatar
Woosuk Kwon committed
8
from vllm.core.policy import PolicyFactory
9
from vllm.lora.request import LoRARequest
Woosuk Kwon's avatar
Woosuk Kwon committed
10
11
from vllm.logger import init_logger
from vllm.sequence import (Sequence, SequenceData, SequenceGroup,
12
                           SequenceGroupMetadata, SequenceStatus)
Woosuk Kwon's avatar
Woosuk Kwon committed
13

Woosuk Kwon's avatar
Woosuk Kwon committed
14
logger = init_logger(__name__)
15

Woosuk Kwon's avatar
Woosuk Kwon committed
16

17
18
19
20
21
22
23
24
25
26
27
28
29
class PreemptionMode(enum.Enum):
    """Preemption modes.

    1. Swapping: Swap out the blocks of the preempted sequences to CPU memory
    and swap them back in when the sequences are resumed.
    2. Recomputation: Discard the blocks of the preempted sequences and
    recompute them when the sequences are resumed, treating the sequences as
    new prompts.
    """
    SWAP = enum.auto()
    RECOMPUTE = enum.auto()


30
31
32
33
class SchedulerOutputs:

    def __init__(
        self,
34
        scheduled_seq_groups: Iterable[SequenceGroup],
Woosuk Kwon's avatar
Woosuk Kwon committed
35
36
        prompt_run: bool,
        num_batched_tokens: int,
37
38
39
        blocks_to_swap_in: Dict[int, int],
        blocks_to_swap_out: Dict[int, int],
        blocks_to_copy: Dict[int, List[int]],
Woosuk Kwon's avatar
Woosuk Kwon committed
40
        ignored_seq_groups: List[SequenceGroup],
41
    ) -> None:
Woosuk Kwon's avatar
Woosuk Kwon committed
42
43
44
        self.scheduled_seq_groups = scheduled_seq_groups
        self.prompt_run = prompt_run
        self.num_batched_tokens = num_batched_tokens
45
46
47
48
49
        self.blocks_to_swap_in = blocks_to_swap_in
        self.blocks_to_swap_out = blocks_to_swap_out
        self.blocks_to_copy = blocks_to_copy
        # Swap in and swap out should never happen at the same time.
        assert not (blocks_to_swap_in and blocks_to_swap_out)
Woosuk Kwon's avatar
Woosuk Kwon committed
50
        self.ignored_seq_groups = ignored_seq_groups
51

52
53
54
55
        self.num_loras = len(self.lora_requests)
        if self.num_loras > 0:
            self._sort_by_lora_ids()

56
    def is_empty(self) -> bool:
Woosuk Kwon's avatar
Woosuk Kwon committed
57
58
59
        # NOTE: We do not consider the ignored sequence groups.
        return (not self.scheduled_seq_groups and not self.blocks_to_swap_in
                and not self.blocks_to_swap_out and not self.blocks_to_copy)
60

61
    def _sort_by_lora_ids(self) -> bool:
62
63
64
        self.scheduled_seq_groups = sorted(self.scheduled_seq_groups,
                                           key=lambda g:
                                           (g.lora_int_id, g.request_id))
65
66
67
68
69

    @property
    def lora_requests(self) -> Set[LoRARequest]:
        return {g.lora_request for g in self.scheduled_seq_groups}

70

Woosuk Kwon's avatar
Woosuk Kwon committed
71
72
class Scheduler:

Woosuk Kwon's avatar
Woosuk Kwon committed
73
    def __init__(
Woosuk Kwon's avatar
Woosuk Kwon committed
74
        self,
75
76
        scheduler_config: SchedulerConfig,
        cache_config: CacheConfig,
77
        lora_config: Optional[LoRAConfig],
Woosuk Kwon's avatar
Woosuk Kwon committed
78
    ) -> None:
79
80
        self.scheduler_config = scheduler_config
        self.cache_config = cache_config
81
82
83
84
        # Note for LoRA scheduling: the current policy is extremely
        # simple and NOT fair. It can lead to starvation of some
        # LoRAs. This should be improved in the future.
        self.lora_config = lora_config
Woosuk Kwon's avatar
Woosuk Kwon committed
85

86
87
88
        self.prompt_limit = min(self.scheduler_config.max_model_len,
                                self.scheduler_config.max_num_batched_tokens)

89
        # Instantiate the scheduling policy.
90
        self.policy = PolicyFactory.get_policy(policy_name="fcfs")
Woosuk Kwon's avatar
Woosuk Kwon committed
91
        # Create the block space manager.
Woosuk Kwon's avatar
Woosuk Kwon committed
92
        self.block_manager = BlockSpaceManager(
93
94
95
            block_size=self.cache_config.block_size,
            num_gpu_blocks=self.cache_config.num_gpu_blocks,
            num_cpu_blocks=self.cache_config.num_cpu_blocks,
96
97
            sliding_window=self.cache_config.sliding_window,
            enable_caching=self.cache_config.enable_prefix_caching)
98

99
        # Sequence groups in the WAITING state.
100
        self.waiting: Deque[SequenceGroup] = deque()
101
        # Sequence groups in the RUNNING state.
102
        self.running: Deque[SequenceGroup] = deque()
103
        # Sequence groups in the SWAPPED state.
104
        self.swapped: Deque[SequenceGroup] = deque()
Woosuk Kwon's avatar
Woosuk Kwon committed
105

106
107
108
109
    @property
    def lora_enabled(self) -> bool:
        return bool(self.lora_config)

110
    def add_seq_group(self, seq_group: SequenceGroup) -> None:
111
        # Add sequence groups to the waiting queue.
112
        self.waiting.append(seq_group)
Woosuk Kwon's avatar
Woosuk Kwon committed
113

Antoni Baum's avatar
Antoni Baum committed
114
    def abort_seq_group(self, request_id: Union[str, Iterable[str]]) -> None:
115
116
117
118
119
120
121
122
123
124
125
126
        """Aborts a sequence group with the given ID.

        Check if the sequence group with the given ID
            is present in any of the state queue.
        If present, remove the sequence group from the state queue.
            Also, if any of the sequences in the sequence group is not finished,
                free the sequence with status `FINISHED_ABORTED`.
        Otherwise, do nothing.

        Args:
            request_id: The ID(s) of the sequence group to abort.
        """
Antoni Baum's avatar
Antoni Baum committed
127
128
129
        if isinstance(request_id, str):
            request_id = (request_id, )
        request_ids = set(request_id)
130
        for state_queue in [self.waiting, self.running, self.swapped]:
ljss's avatar
ljss committed
131
            aborted_groups: List[SequenceGroup] = []
132
133
134
135
136
            for seq_group in state_queue:
                if not request_ids:
                    # Using 'break' here may add two extra iterations,
                    # but is acceptable to reduce complexity .
                    break
Antoni Baum's avatar
Antoni Baum committed
137
                if seq_group.request_id in request_ids:
138
139
                    # Appending aborted group into pending list.
                    aborted_groups.append(seq_group)
Antoni Baum's avatar
Antoni Baum committed
140
                    request_ids.remove(seq_group.request_id)
141
142
143
            for aborted_group in aborted_groups:
                # Remove the sequence group from the state queue.
                state_queue.remove(aborted_group)
ljss's avatar
ljss committed
144
                for seq in aborted_group.get_seqs():
145
146
147
148
                    if seq.is_finished():
                        continue
                    seq.status = SequenceStatus.FINISHED_ABORTED
                    self.free_seq(seq)
149

150
151
152
    def has_unfinished_seqs(self) -> bool:
        return self.waiting or self.running or self.swapped

153
154
155
    def get_num_unfinished_seq_groups(self) -> int:
        return len(self.waiting) + len(self.running) + len(self.swapped)

Woosuk Kwon's avatar
Woosuk Kwon committed
156
    def _schedule(self) -> SchedulerOutputs:
157
        # Blocks that need to be swapped or copied before model execution.
158
159
        blocks_to_swap_in: Dict[int, int] = {}
        blocks_to_swap_out: Dict[int, int] = {}
160
        blocks_to_copy: Dict[int, List[int]] = {}
161

162
        # Fix the current time.
163
        now = time.monotonic()
164

Woosuk Kwon's avatar
Woosuk Kwon committed
165
166
167
168
        # Join waiting sequences if possible.
        if not self.swapped:
            ignored_seq_groups: List[SequenceGroup] = []
            scheduled: List[SequenceGroup] = []
169
170
171
172
            # The total number of sequences on the fly, including the
            # requests in the generation phase.
            num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
                                for seq_group in self.running)
173
174
175
            curr_loras = set(
                seq_group.lora_int_id
                for seq_group in self.running) if self.lora_enabled else None
176
177
            seq_lens: List[int] = []

Woosuk Kwon's avatar
Woosuk Kwon committed
178
179
180
            # Optimization: We do not sort the waiting queue since the preempted
            # sequence groups are added to the front and the new sequence groups
            # are added to the back.
181
            leftover_waiting_sequences = deque()
Woosuk Kwon's avatar
Woosuk Kwon committed
182
183
            while self.waiting:
                seq_group = self.waiting[0]
184
185
186
                waiting_seqs = seq_group.get_seqs(
                    status=SequenceStatus.WAITING)
                assert len(waiting_seqs) == 1, (
187
188
                    "Waiting sequence group should have only one prompt "
                    "sequence.")
189
                num_prompt_tokens = waiting_seqs[0].get_len()
190
                if num_prompt_tokens > self.prompt_limit:
Woosuk Kwon's avatar
Woosuk Kwon committed
191
192
                    logger.warning(
                        f"Input prompt ({num_prompt_tokens} tokens) is too long"
193
                        f" and exceeds limit of {self.prompt_limit}")
194
                    for seq in waiting_seqs:
Woosuk Kwon's avatar
Woosuk Kwon committed
195
196
                        seq.status = SequenceStatus.FINISHED_IGNORED
                    ignored_seq_groups.append(seq_group)
197
                    self.waiting.popleft()
198
                    continue
Woosuk Kwon's avatar
Woosuk Kwon committed
199
200

                # If the sequence group cannot be allocated, stop.
201
202
                can_allocate = self.block_manager.can_allocate(seq_group)
                if can_allocate == AllocStatus.LATER:
Woosuk Kwon's avatar
Woosuk Kwon committed
203
                    break
204
205
206
207
                elif can_allocate == AllocStatus.NEVER:
                    logger.warning(
                        f"Input prompt ({num_prompt_tokens} tokens) is too long"
                        f" and exceeds the capacity of block_manager")
208
                    for seq in waiting_seqs:
209
210
                        seq.status = SequenceStatus.FINISHED_IGNORED
                    ignored_seq_groups.append(seq_group)
211
                    self.waiting.popleft()
212
                    continue
Woosuk Kwon's avatar
Woosuk Kwon committed
213

214
215
216
                lora_int_id = 0
                if self.lora_enabled:
                    lora_int_id = seq_group.lora_int_id
217
218
                    if (lora_int_id > 0 and lora_int_id not in curr_loras
                            and len(curr_loras) >= self.lora_config.max_loras):
219
220
221
222
223
224
                        # We don't have a space for another LoRA, so
                        # we ignore this request for now.
                        leftover_waiting_sequences.appendleft(seq_group)
                        self.waiting.popleft()
                        continue

Woosuk Kwon's avatar
Woosuk Kwon committed
225
                # If the number of batched tokens exceeds the limit, stop.
226
227
228
                new_seq_lens = seq_lens + [num_prompt_tokens]
                num_batched_tokens = len(new_seq_lens) * max(new_seq_lens)
                if (num_batched_tokens >
Woosuk Kwon's avatar
Woosuk Kwon committed
229
230
231
232
233
                        self.scheduler_config.max_num_batched_tokens):
                    break

                # The total number of sequences in the RUNNING state should not
                # exceed the maximum number of sequences.
234
                num_new_seqs = seq_group.get_max_num_running_seqs()
Woosuk Kwon's avatar
Woosuk Kwon committed
235
236
237
238
                if (num_curr_seqs + num_new_seqs >
                        self.scheduler_config.max_num_seqs):
                    break

239
240
241
242
243
                num_paddings = num_batched_tokens - sum(new_seq_lens)
                if num_paddings > self.scheduler_config.max_paddings:
                    break
                seq_lens = new_seq_lens

244
245
246
                if lora_int_id > 0:
                    curr_loras.add(lora_int_id)
                self.waiting.popleft()
Woosuk Kwon's avatar
Woosuk Kwon committed
247
248
                self._allocate(seq_group)
                self.running.append(seq_group)
249
                num_curr_seqs += num_new_seqs
Woosuk Kwon's avatar
Woosuk Kwon committed
250
251
                scheduled.append(seq_group)

252
253
            self.waiting.extendleft(leftover_waiting_sequences)

254
            if scheduled or ignored_seq_groups:
Woosuk Kwon's avatar
Woosuk Kwon committed
255
256
257
                scheduler_outputs = SchedulerOutputs(
                    scheduled_seq_groups=scheduled,
                    prompt_run=True,
Zhuofan's avatar
Zhuofan committed
258
259
                    num_batched_tokens=len(seq_lens) *
                    max(seq_lens) if seq_lens else 0,
Woosuk Kwon's avatar
Woosuk Kwon committed
260
261
262
263
264
265
266
267
268
                    blocks_to_swap_in=blocks_to_swap_in,
                    blocks_to_swap_out=blocks_to_swap_out,
                    blocks_to_copy=blocks_to_copy,
                    ignored_seq_groups=ignored_seq_groups,
                )
                return scheduler_outputs

        # NOTE(woosuk): Preemption happens only when there is no available slot
        # to keep all the sequence groups in the RUNNING state.
269
270
271
272
273
        # In this case, the policy is responsible for deciding which sequence
        # groups to preempt.
        self.running = self.policy.sort_by_priority(now, self.running)

        # Reserve new token slots for the running sequence groups.
274
        running: Deque[SequenceGroup] = deque()
275
276
        preempted: List[SequenceGroup] = []
        while self.running:
277
            seq_group = self.running.popleft()
278
            while not self.block_manager.can_append_slot(seq_group):
279
280
                if self.running:
                    # Preempt the lowest-priority sequence groups.
281
                    victim_seq_group = self.running.pop()
282
283
284
285
286
287
288
                    self._preempt(victim_seq_group, blocks_to_swap_out)
                    preempted.append(victim_seq_group)
                else:
                    # No other sequence groups can be preempted.
                    # Preempt the current sequence group.
                    self._preempt(seq_group, blocks_to_swap_out)
                    preempted.append(seq_group)
Woosuk Kwon's avatar
Woosuk Kwon committed
289
290
                    break
            else:
291
                # Append new slots to the sequence group.
292
                self._append_slot(seq_group, blocks_to_copy)
293
294
295
296
297
                running.append(seq_group)
        self.running = running

        # Swap in the sequence groups in the SWAPPED state if possible.
        self.swapped = self.policy.sort_by_priority(now, self.swapped)
298
299
300
        if not preempted:
            num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
                                for seq_group in self.running)
301
302
303
304
305
            curr_loras = set(
                seq_group.lora_int_id
                for seq_group in self.running) if self.lora_enabled else None

            leftover_swapped = deque()
306
307
308

            while self.swapped:
                seq_group = self.swapped[0]
309
310
311
                lora_int_id = 0
                if self.lora_enabled:
                    lora_int_id = seq_group.lora_int_id
312
313
                    if (lora_int_id > 0 and lora_int_id not in curr_loras
                            and len(curr_loras) >= self.lora_config.max_loras):
314
315
316
317
318
319
                        # We don't have a space for another LoRA, so
                        # we ignore this request for now.
                        leftover_swapped.appendleft(seq_group)
                        self.swapped.popleft()
                        continue

320
321
322
                # If the sequence group cannot be swapped in, stop.
                if not self.block_manager.can_swap_in(seq_group):
                    break
323

324
325
326
327
328
329
330
                # The total number of sequences in the RUNNING state should not
                # exceed the maximum number of sequences.
                num_new_seqs = seq_group.get_max_num_running_seqs()
                if (num_curr_seqs + num_new_seqs >
                        self.scheduler_config.max_num_seqs):
                    break

331
332
333
                if lora_int_id > 0:
                    curr_loras.add(lora_int_id)
                self.swapped.popleft()
334
335
336
337
338
                self._swap_in(seq_group, blocks_to_swap_in)
                self._append_slot(seq_group, blocks_to_copy)
                num_curr_seqs += num_new_seqs
                self.running.append(seq_group)

339
340
            self.swapped.extendleft(leftover_swapped)

341
342
343
        # Each sequence in the generation phase only takes one token slot.
        # Therefore, the number of batched tokens is equal to the number of
        # sequences in the RUNNING state.
344
345
        num_batched_tokens = sum(
            seq_group.num_seqs(status=SequenceStatus.RUNNING)
346
            for seq_group in self.running)
347

348
        scheduler_outputs = SchedulerOutputs(
Woosuk Kwon's avatar
Woosuk Kwon committed
349
350
351
            scheduled_seq_groups=self.running,
            prompt_run=False,
            num_batched_tokens=num_batched_tokens,
352
353
354
            blocks_to_swap_in=blocks_to_swap_in,
            blocks_to_swap_out=blocks_to_swap_out,
            blocks_to_copy=blocks_to_copy,
Woosuk Kwon's avatar
Woosuk Kwon committed
355
            ignored_seq_groups=[],
356
        )
Woosuk Kwon's avatar
Woosuk Kwon committed
357
        return scheduler_outputs
Woosuk Kwon's avatar
Woosuk Kwon committed
358

Woosuk Kwon's avatar
Woosuk Kwon committed
359
    def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
360
361
362
        # Schedule sequence groups.
        # This function call changes the internal states of the scheduler
        # such as self.running, self.swapped, and self.waiting.
Woosuk Kwon's avatar
Woosuk Kwon committed
363
        scheduler_outputs = self._schedule()
364
        now = time.time()
365
366

        # Create input data structures.
367
        seq_group_metadata_list: List[SequenceGroupMetadata] = []
Woosuk Kwon's avatar
Woosuk Kwon committed
368
        for seq_group in scheduler_outputs.scheduled_seq_groups:
369
370
            seq_group.maybe_set_first_scheduled_time(now)

Light Lin's avatar
Light Lin committed
371
            seq_data: Dict[int, SequenceData] = {}
372
            block_tables: Dict[int, List[int]] = {}
373

374
            for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
375
                seq_id = seq.seq_id
376
                seq_data[seq_id] = seq.data
377
                block_tables[seq_id] = self.block_manager.get_block_table(seq)
378
                self.block_manager.access_all_blocks_in_seq(seq, now)
379

380
            seq_group_metadata = SequenceGroupMetadata(
381
                request_id=seq_group.request_id,
Woosuk Kwon's avatar
Woosuk Kwon committed
382
                is_prompt=scheduler_outputs.prompt_run,
383
                seq_data=seq_data,
384
                sampling_params=seq_group.sampling_params,
385
                block_tables=block_tables,
386
                lora_request=seq_group.lora_request,
387
388
                computed_block_nums=self.block_manager.
                get_common_computed_block_ids(seq_group),
Nick Hill's avatar
Nick Hill committed
389
                state=seq_group.state,
390
            )
391
            seq_group_metadata_list.append(seq_group_metadata)
Woosuk Kwon's avatar
Woosuk Kwon committed
392
        return seq_group_metadata_list, scheduler_outputs
393

394
395
    def fork_seq(self, parent_seq: Sequence, child_seq: Sequence) -> None:
        self.block_manager.fork(parent_seq, child_seq)
Woosuk Kwon's avatar
Woosuk Kwon committed
396

397
    def free_seq(self, seq: Sequence) -> None:
398
        self.block_manager.free(seq)
Woosuk Kwon's avatar
Woosuk Kwon committed
399

400
    def free_finished_seq_groups(self) -> None:
401
402
        self.running = deque(seq_group for seq_group in self.running
                             if not seq_group.is_finished())
Woosuk Kwon's avatar
Woosuk Kwon committed
403

404
405
    def _allocate(self, seq_group: SequenceGroup) -> None:
        self.block_manager.allocate(seq_group)
406
        for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
407
408
            seq.status = SequenceStatus.RUNNING

409
    def _append_slot(
410
411
412
413
414
        self,
        seq_group: SequenceGroup,
        blocks_to_copy: Dict[int, List[int]],
    ) -> None:
        for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
415
            ret = self.block_manager.append_slot(seq)
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
            if ret is not None:
                src_block, dst_block = ret
                if src_block in blocks_to_copy:
                    blocks_to_copy[src_block].append(dst_block)
                else:
                    blocks_to_copy[src_block] = [dst_block]

    def _preempt(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
        preemption_mode: Optional[PreemptionMode] = None,
    ) -> None:
        # If preemption mode is not specified, we determine the mode as follows:
        # We use recomputation by default since it incurs lower overhead than
        # swapping. However, when the sequence group has multiple sequences
432
433
        # (e.g., beam search), recomputation is not currently supported. In
        # such a case, we use swapping instead.
434
435
436
437
438
439
440
        # FIXME(woosuk): This makes our scheduling policy a bit bizarre.
        # As swapped sequences are prioritized over waiting sequences,
        # sequence groups with multiple sequences are implicitly prioritized
        # over sequence groups with a single sequence.
        # TODO(woosuk): Support recomputation for sequence groups with multiple
        # sequences. This may require a more sophisticated CUDA kernel.
        if preemption_mode is None:
441
            if seq_group.get_max_num_running_seqs() == 1:
442
443
444
445
446
447
448
449
                preemption_mode = PreemptionMode.RECOMPUTE
            else:
                preemption_mode = PreemptionMode.SWAP
        if preemption_mode == PreemptionMode.RECOMPUTE:
            self._preempt_by_recompute(seq_group)
        elif preemption_mode == PreemptionMode.SWAP:
            self._preempt_by_swap(seq_group, blocks_to_swap_out)
        else:
450
            raise AssertionError("Invalid preemption mode.")
451
452
453
454
455
456
457
458
459
460

    def _preempt_by_recompute(
        self,
        seq_group: SequenceGroup,
    ) -> None:
        seqs = seq_group.get_seqs(status=SequenceStatus.RUNNING)
        assert len(seqs) == 1
        for seq in seqs:
            seq.status = SequenceStatus.WAITING
            self.block_manager.free(seq)
461
462
        # NOTE: For FCFS, we insert the preempted sequence group to the front
        # of the waiting queue.
463
        self.waiting.appendleft(seq_group)
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487

    def _preempt_by_swap(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
    ) -> None:
        self._swap_out(seq_group, blocks_to_swap_out)
        self.swapped.append(seq_group)

    def _swap_in(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_in: Dict[int, int],
    ) -> None:
        mapping = self.block_manager.swap_in(seq_group)
        blocks_to_swap_in.update(mapping)
        for seq in seq_group.get_seqs(status=SequenceStatus.SWAPPED):
            seq.status = SequenceStatus.RUNNING

    def _swap_out(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
    ) -> None:
488
489
490
491
492
493
        if not self.block_manager.can_swap_out(seq_group):
            # FIXME(woosuk): Abort the sequence group instead of aborting the
            # entire engine.
            raise RuntimeError(
                "Aborted due to the lack of CPU swap space. Please increase "
                "the swap space to avoid this error.")
494
495
496
497
        mapping = self.block_manager.swap_out(seq_group)
        blocks_to_swap_out.update(mapping)
        for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
            seq.status = SequenceStatus.SWAPPED
498
499
500

    def mark_blocks_as_computed(self, seq_group: SequenceGroup):
        self.block_manager.mark_blocks_as_computed(seq_group)