tokenizer_manager.py 11.6 KB
Newer Older
Lianmin Zheng's avatar
Lianmin Zheng committed
1
2
3
import asyncio
import concurrent.futures
import dataclasses
4
import logging
5
import multiprocessing as mp
Lianmin Zheng's avatar
Lianmin Zheng committed
6
7
8
9
10
11
12
13
import os
from typing import List

import numpy as np
import transformers
import uvloop
import zmq
import zmq.asyncio
Liangsheng Yin's avatar
Liangsheng Yin committed
14

Lianmin Zheng's avatar
Lianmin Zheng committed
15
16
17
18
19
20
21
22
from sglang.srt.hf_transformers_utils import (
    get_config,
    get_context_length,
    get_processor,
    get_tokenizer,
)
from sglang.srt.managers.io_struct import (
    BatchStrOut,
Cody Yu's avatar
Cody Yu committed
23
    DetokenizeReqInput,
24
    FlushCacheReq,
Lianmin Zheng's avatar
Lianmin Zheng committed
25
26
27
    GenerateReqInput,
    TokenizedGenerateReqInput,
)
shiyi.c_98's avatar
shiyi.c_98 committed
28
from sglang.srt.mm_utils import expand2square, process_anyres_image
Lianmin Zheng's avatar
Lianmin Zheng committed
29
30
31
32
33
34
from sglang.srt.sampling_params import SamplingParams
from sglang.srt.server_args import PortArgs, ServerArgs
from sglang.srt.utils import get_exception_traceback, is_multimodal_model, load_image

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

35
36
logger = logging.getLogger(__name__)

Lianmin Zheng's avatar
Lianmin Zheng committed
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

@dataclasses.dataclass
class ReqState:
    out_list: List
    finished: bool
    event: asyncio.Event


global global_processor


def init_global_processor(server_args: ServerArgs):
    global global_processor
    transformers.logging.set_verbosity_error()
    global_processor = get_processor(
        server_args.tokenizer_path,
        tokenizer_mode=server_args.tokenizer_mode,
        trust_remote_code=server_args.trust_remote_code,
    )


58
59
60
def get_pixel_values(
    image_data, image_aspect_ratio=None, image_grid_pinpoints=None, processor=None
):
Lianmin Zheng's avatar
Lianmin Zheng committed
61
62
63
64
    try:
        processor = processor or global_processor
        image = load_image(image_data)
        image_hash = hash(image_data)
shiyi.c_98's avatar
shiyi.c_98 committed
65
66
67
68
69
70
71
        if image_aspect_ratio == "pad":
            image = expand2square(
                image, tuple(int(x * 255) for x in processor.image_processor.image_mean)
            )
            pixel_values = processor.image_processor(image)["pixel_values"][0]
        elif image_aspect_ratio == "anyres":
            pixel_values = process_anyres_image(
Ying Sheng's avatar
Ying Sheng committed
72
                image, processor.image_processor, image_grid_pinpoints
shiyi.c_98's avatar
shiyi.c_98 committed
73
74
75
            )
        else:
            pixel_values = processor.image_processor(image)["pixel_values"][0]
Lianmin Zheng's avatar
Lianmin Zheng committed
76
        pixel_values = pixel_values.astype(np.float16)
shiyi.c_98's avatar
shiyi.c_98 committed
77
        return pixel_values, image_hash, image.size
Lianmin Zheng's avatar
Lianmin Zheng committed
78
79
80
81
82
83
84
85
86
87
    except Exception:
        print("Exception in TokenizerManager:\n" + get_exception_traceback())


class TokenizerManager:
    def __init__(
        self,
        server_args: ServerArgs,
        port_args: PortArgs,
    ):
Liangsheng Yin's avatar
Liangsheng Yin committed
88
89
        self.server_args = server_args

Lianmin Zheng's avatar
Lianmin Zheng committed
90
91
92
93
94
95
96
97
98
99
100
        context = zmq.asyncio.Context(2)
        self.recv_from_detokenizer = context.socket(zmq.PULL)
        self.recv_from_detokenizer.bind(f"tcp://127.0.0.1:{port_args.tokenizer_port}")

        self.send_to_router = context.socket(zmq.PUSH)
        self.send_to_router.connect(f"tcp://127.0.0.1:{port_args.router_port}")

        self.model_path = server_args.model_path
        self.hf_config = get_config(
            self.model_path, trust_remote_code=server_args.trust_remote_code
        )
shiyi.c_98's avatar
shiyi.c_98 committed
101

Lianmin Zheng's avatar
Lianmin Zheng committed
102
103
104
105
106
107
108
109
110
111
112
        self.context_len = get_context_length(self.hf_config)

        if is_multimodal_model(self.model_path):
            self.processor = get_processor(
                server_args.tokenizer_path,
                tokenizer_mode=server_args.tokenizer_mode,
                trust_remote_code=server_args.trust_remote_code,
            )
            self.tokenizer = self.processor.tokenizer
            os.environ["TOKENIZERS_PARALLELISM"] = "false"
            self.executor = concurrent.futures.ProcessPoolExecutor(
113
114
115
                initializer=init_global_processor,
                mp_context=mp.get_context("fork"),
                initargs=(server_args,),
Lianmin Zheng's avatar
Lianmin Zheng committed
116
117
118
119
120
121
122
123
124
125
126
127
            )
        else:
            self.tokenizer = get_tokenizer(
                server_args.tokenizer_path,
                tokenizer_mode=server_args.tokenizer_mode,
                trust_remote_code=server_args.trust_remote_code,
            )

        self.to_create_loop = True
        self.rid_to_state = {}  # Dict[str -> ReqState]

    async def get_pixel_values(self, image_data):
Ying Sheng's avatar
Ying Sheng committed
128
        aspect_ratio = getattr(self.hf_config, "image_aspect_ratio", None)
129
130
131
        grid_pinpoints = (
            self.hf_config.image_grid_pinpoints if aspect_ratio == "anyres" else None
        )
Lianmin Zheng's avatar
Lianmin Zheng committed
132
133
134
        if self.executor is not None:
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(
135
136
137
138
139
                self.executor,
                get_pixel_values,
                image_data,
                aspect_ratio,
                grid_pinpoints,
Lianmin Zheng's avatar
Lianmin Zheng committed
140
141
            )
        else:
142
143
144
            return get_pixel_values(
                image_data, aspect_ratio, grid_pinpoints, self.processor
            )
Lianmin Zheng's avatar
Lianmin Zheng committed
145
146
147
148
149
150
151
152
153
154
155
156
157
158

    async def generate_request(self, obj: GenerateReqInput):
        if self.to_create_loop:
            await self.create_handle_loop()

        is_single = isinstance(obj.text, str)

        if is_single:
            rid = obj.rid
            input_ids = self.tokenizer.encode(obj.text)
            sampling_params = SamplingParams(**obj.sampling_params)
            if sampling_params.max_new_tokens != 0:
                sampling_params.normalize(self.tokenizer)
                sampling_params.verify()
159
160
161
162
163
164

            if isinstance(obj.image_data, list) and len(obj.image_data) > 0:
                pixel_values, image_hash, image_size = await self.get_pixel_values(
                    obj.image_data[0]
                )
            elif isinstance(obj.image_data, str):
shiyi.c_98's avatar
shiyi.c_98 committed
165
166
167
                pixel_values, image_hash, image_size = await self.get_pixel_values(
                    obj.image_data
                )
168
169
            else:
                pixel_values, image_hash, image_size = None, None, None
Lianmin Zheng's avatar
Lianmin Zheng committed
170
171
            tokenized_obj = TokenizedGenerateReqInput(
                rid=rid,
Liangsheng Yin's avatar
Liangsheng Yin committed
172
                input_text=obj.text,
Lianmin Zheng's avatar
Lianmin Zheng committed
173
174
175
                input_ids=input_ids,
                pixel_values=pixel_values,
                image_hash=image_hash,
shiyi.c_98's avatar
shiyi.c_98 committed
176
                image_size=image_size,
Lianmin Zheng's avatar
Lianmin Zheng committed
177
                sampling_params=sampling_params,
178
179
                return_logprob=obj.return_logprob,
                logprob_start_len=obj.logprob_start_len,
Liangsheng Yin's avatar
Liangsheng Yin committed
180
                top_logprobs_num=obj.top_logprobs_num,
Lianmin Zheng's avatar
Lianmin Zheng committed
181
182
183
184
185
                stream=obj.stream,
            )
            self.send_to_router.send_pyobj(tokenized_obj)

            event = asyncio.Event()
186
            state = ReqState([], False, event)
Lianmin Zheng's avatar
Lianmin Zheng committed
187
188
189
190
            self.rid_to_state[rid] = state

            while True:
                await event.wait()
191
                out = self.convert_logprob_style(state.out_list[-1],
192
193
194
                                                 obj.return_logprob,
                                                 obj.top_logprobs_num,
                                                 obj.return_text_in_logprobs)
195
196
197
198
199

                if self.server_args.log_requests and state.finished:
                    logger.info(f"in={obj.text}, out={out}")

                yield out
Lianmin Zheng's avatar
Lianmin Zheng committed
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
                state.out_list = []
                if state.finished:
                    del self.rid_to_state[rid]
                    break
                event.clear()
        else:
            assert obj.stream is False
            bs = len(obj.text)
            for i in range(bs):
                rid = obj.rid[i]
                input_ids = self.tokenizer.encode(obj.text[i])
                sampling_params = SamplingParams(**obj.sampling_params[i])
                if sampling_params.max_new_tokens != 0:
                    sampling_params.normalize(self.tokenizer)
                    sampling_params.verify()
                if obj.image_data[i] is None:
shiyi.c_98's avatar
shiyi.c_98 committed
216
                    pixel_values, image_hash, image_size = None, None, None
Lianmin Zheng's avatar
Lianmin Zheng committed
217
                else:
shiyi.c_98's avatar
shiyi.c_98 committed
218
                    pixel_values, image_hash, image_size = await self.get_pixel_values(
Lianmin Zheng's avatar
Lianmin Zheng committed
219
220
221
222
                        obj.image_data[i]
                    )
                tokenized_obj = TokenizedGenerateReqInput(
                    rid=rid,
223
                    input_text=obj.text[i],
Lianmin Zheng's avatar
Lianmin Zheng committed
224
225
226
                    input_ids=input_ids,
                    pixel_values=pixel_values,
                    image_hash=image_hash,
shiyi.c_98's avatar
shiyi.c_98 committed
227
                    image_size=image_size,
Lianmin Zheng's avatar
Lianmin Zheng committed
228
                    sampling_params=sampling_params,
229
230
                    return_logprob=obj.return_logprob[i],
                    logprob_start_len=obj.logprob_start_len[i],
Liangsheng Yin's avatar
Liangsheng Yin committed
231
                    top_logprobs_num=obj.top_logprobs_num[i],
Lianmin Zheng's avatar
Lianmin Zheng committed
232
233
234
235
236
                    stream=obj.stream,
                )
                self.send_to_router.send_pyobj(tokenized_obj)

                event = asyncio.Event()
237
                state = ReqState([], False, event)
Lianmin Zheng's avatar
Lianmin Zheng committed
238
239
240
241
242
243
244
                self.rid_to_state[rid] = state

            output_list = []
            for i in range(bs):
                rid = obj.rid[i]
                state = self.rid_to_state[rid]
                await state.event.wait()
245
246
247
248
249
                output_list.append(
                    self.convert_logprob_style(state.out_list[-1],
                                               obj.return_logprob[i],
                                               obj.top_logprobs_num[i],
                                               obj.return_text_in_logprobs))
Lianmin Zheng's avatar
Lianmin Zheng committed
250
251
252
253
254
                assert state.finished
                del self.rid_to_state[rid]

            yield output_list

Liangsheng Yin's avatar
Liangsheng Yin committed
255
256
257
258
    async def flush_cache(self):
        flush_cache_req = FlushCacheReq()
        self.send_to_router.send_pyobj(flush_cache_req)

Lianmin Zheng's avatar
Lianmin Zheng committed
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
    async def create_handle_loop(self):
        self.to_create_loop = False
        loop = asyncio.get_event_loop()
        loop.create_task(self.handle_loop())

    async def handle_loop(self):
        while True:
            recv_obj = await self.recv_from_detokenizer.recv_pyobj()

            if isinstance(recv_obj, BatchStrOut):
                for i, rid in enumerate(recv_obj.rids):
                    recv_obj.meta_info[i]["id"] = rid
                    out_dict = {
                        "text": recv_obj.output_str[i],
                        "meta_info": recv_obj.meta_info[i],
                    }
                    state = self.rid_to_state[rid]
                    state.out_list.append(out_dict)
                    state.finished = recv_obj.finished[i]
                    state.event.set()
            else:
                raise ValueError(f"Invalid object: {recv_obj}")
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314

    def convert_logprob_style(self, ret, return_logprob, top_logprobs_num, return_text_in_logprobs):
        if return_logprob:
            ret["meta_info"]["prefill_token_logprobs"] = self.detokenize_logprob_tokens(
                ret["meta_info"]["prefill_token_logprobs"], return_text_in_logprobs
            )
            ret["meta_info"]["decode_token_logprobs"] = self.detokenize_logprob_tokens(
                ret["meta_info"]["decode_token_logprobs"], return_text_in_logprobs
            )
        if top_logprobs_num > 0:
            ret["meta_info"]["prefill_top_logprobs"] = self.detokenize_top_logprobs_tokens(
                ret["meta_info"]["prefill_top_logprobs"], return_text_in_logprobs
            )
            ret["meta_info"]["decode_top_logprobs"] = self.detokenize_top_logprobs_tokens(
                ret["meta_info"]["decode_top_logprobs"], return_text_in_logprobs
            )
        return ret

    def detokenize_logprob_tokens(self, token_logprobs, decode_to_text):
        if not decode_to_text:
            return [(logprob, token_id, None) for logprob, token_id in token_logprobs]

        token_ids = [tid for _, tid in token_logprobs]
        token_texts = self.tokenizer.batch_decode(token_ids)
        return [
            (logprob, token_id, token_text)
            for (logprob, token_id), token_text, in zip(token_logprobs, token_texts)
        ]

    def detokenize_top_logprobs_tokens(self, top_logprobs, decode_to_text):
        for i, t in enumerate(top_logprobs):
            if t:
                top_logprobs[i] = self.detokenize_logprob_tokens(t, decode_to_text)
        return top_logprobs