parallel_sampling.py 3.82 KB
Newer Older
1
2
3
# SPDX-License-Identifier: Apache-2.0

from copy import copy
4
from typing import Callable, Optional, Union
5

6
from vllm.outputs import CompletionOutput, RequestOutput
7
from vllm.pooling_params import PoolingParams
8
from vllm.sampling_params import SamplingParams
9
10


11
class ParentRequest:
12
    """Info, state & processing for parallel sampling request.
13

14
15
16
17
18
19
    Store parent request ID and sampling params.
    Facilitate generating child request sampling params.
    """

    request_id: str
    sampling_params: SamplingParams
20
21
22
23
24

    # To aggregate child completions when not streaming
    output_aggregator: Optional[RequestOutput]

    # To efficiently obtain child sampling params
25
26
27
28
29
30
    cached_child_sampling_params: Optional[SamplingParams]

    def __init__(self, request_id: str,
                 sampling_params: SamplingParams) -> None:
        self.request_id = request_id
        self.sampling_params = sampling_params
31
32

        self.output_aggregator = None
33
        self.cached_child_sampling_params = None
34
35
36
37
38
39
40
41
42
43

    @classmethod
    def from_params(
        cls,
        request_id: str,
        params: Union[SamplingParams, PoolingParams],
    ) -> Optional['ParentRequest']:
        if not isinstance(params, SamplingParams) or params.n == 1:
            return None
        return cls(request_id, params)
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

    def _get_child_sampling_params(
        self,
        index: int,
    ) -> SamplingParams:
        """Efficiently obtain child `sampling_params`

        If `sampling_params.seed` is not `None` then 
        each child request requires a unique clone of
        parent `sampling_params` with a unique seed.

        Args:
          index: index within `n` child requests

        Returns:
          Child `sampling_params` instance.
        """
        seed = self.sampling_params.seed
        if self.cached_child_sampling_params:
            # Reuse child sampling_params data structure
            return self.cached_child_sampling_params
        # Build child sampling_params
        child_sampling_params = copy(self.sampling_params)
        child_sampling_params.n = 1
        if seed is None:
            # Cache child sampling_params for later reuse
            self.cached_child_sampling_params = child_sampling_params
        else:
            # Each child gets a clone with a unique seed
            child_sampling_params.seed = seed + index
        return child_sampling_params

76
    def get_child_info(self, index: int) -> tuple[str, SamplingParams]:
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
        """Get child request ID and sampling params.
        
        Args:
          index: index within `n` child requests.
        
        Returns:
          (request ID, sampling_params) tuple
        """
        return (f"{index}_{self.request_id}",
                self._get_child_sampling_params(index))

    @property
    def n(self) -> int:
        return self.sampling_params.n

92
    def make_request_output(
93
        self,
94
95
96
97
98
99
        final_only: bool,
        completion_output: CompletionOutput,
        new_request_output: Callable[[str], RequestOutput],
    ) -> Optional[RequestOutput]:
        # Use an existing RequestOutput if we're aggregating
        request_output = self.output_aggregator
100

101
102
103
        # Make new RequestOutput otherwise
        if request_output is None:
            request_output = new_request_output(self.request_id)
104

105
106
        # Add a new completion
        request_output.outputs.append(completion_output)
107

108
109
110
111
        # If not streaming, aggregate until all child requests complete
        if final_only and len(request_output.outputs) != self.n:
            self.output_aggregator = request_output
            return None
112

113
114
        # We're done aggregating
        self.output_aggregator = None
115

116
117
118
119
        # Parent completion output list must be sorted by index
        request_output.outputs = sorted(request_output.outputs,
                                        key=lambda x: x.index)
        return request_output