tokenizer_manager.py 12.8 KB
Newer Older
Lianmin Zheng's avatar
Lianmin Zheng committed
1
2
3
import asyncio
import concurrent.futures
import dataclasses
4
import logging
5
import multiprocessing as mp
Lianmin Zheng's avatar
Lianmin Zheng committed
6
7
8
9
10
11
12
13
import os
from typing import List

import numpy as np
import transformers
import uvloop
import zmq
import zmq.asyncio
Liangsheng Yin's avatar
Liangsheng Yin committed
14

Lianmin Zheng's avatar
Lianmin Zheng committed
15
16
17
18
19
20
21
22
from sglang.srt.hf_transformers_utils import (
    get_config,
    get_context_length,
    get_processor,
    get_tokenizer,
)
from sglang.srt.managers.io_struct import (
    BatchStrOut,
Cody Yu's avatar
Cody Yu committed
23
    DetokenizeReqInput,
24
    FlushCacheReq,
Lianmin Zheng's avatar
Lianmin Zheng committed
25
26
27
    GenerateReqInput,
    TokenizedGenerateReqInput,
)
shiyi.c_98's avatar
shiyi.c_98 committed
28
from sglang.srt.mm_utils import expand2square, process_anyres_image
Lianmin Zheng's avatar
Lianmin Zheng committed
29
30
31
32
33
34
from sglang.srt.sampling_params import SamplingParams
from sglang.srt.server_args import PortArgs, ServerArgs
from sglang.srt.utils import get_exception_traceback, is_multimodal_model, load_image

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

35
36
logger = logging.getLogger(__name__)

Lianmin Zheng's avatar
Lianmin Zheng committed
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

@dataclasses.dataclass
class ReqState:
    out_list: List
    finished: bool
    event: asyncio.Event


global global_processor


def init_global_processor(server_args: ServerArgs):
    global global_processor
    transformers.logging.set_verbosity_error()
    global_processor = get_processor(
        server_args.tokenizer_path,
        tokenizer_mode=server_args.tokenizer_mode,
        trust_remote_code=server_args.trust_remote_code,
    )


58
59
60
def get_pixel_values(
    image_data, image_aspect_ratio=None, image_grid_pinpoints=None, processor=None
):
Lianmin Zheng's avatar
Lianmin Zheng committed
61
62
    try:
        processor = processor or global_processor
Yuanhan Zhang's avatar
Yuanhan Zhang committed
63
64
65
66
67
68
69
70
        image, image_size = load_image(image_data)
        if image_size != None:
            image_hash = hash(image_data)
            pixel_values = processor.image_processor(image)["pixel_values"]
            for _ in range(len(pixel_values)):
                pixel_values[_] = pixel_values[_].astype(np.float16)
            pixel_values = np.stack(pixel_values, axis=0)
            return pixel_values, image_hash, image_size
shiyi.c_98's avatar
shiyi.c_98 committed
71
        else:
Yuanhan Zhang's avatar
Yuanhan Zhang committed
72
73
74
75
76
77
78
79
80
81
82
83
84
85
            image_hash = hash(image_data)
            if image_aspect_ratio == "pad":
                image = expand2square(
                    image, tuple(int(x * 255) for x in processor.image_processor.image_mean)
                )
                pixel_values = processor.image_processor(image)["pixel_values"][0]
            elif image_aspect_ratio == "anyres":
                pixel_values = process_anyres_image(
                    image, processor.image_processor, image_grid_pinpoints
                )
            else:
                pixel_values = processor.image_processor(image)["pixel_values"][0]
            pixel_values = pixel_values.astype(np.float16)
            return pixel_values, image_hash, image.size
Lianmin Zheng's avatar
Lianmin Zheng committed
86
87
88
89
90
91
92
93
94
    except Exception:
        print("Exception in TokenizerManager:\n" + get_exception_traceback())


class TokenizerManager:
    def __init__(
        self,
        server_args: ServerArgs,
        port_args: PortArgs,
Yuanhan Zhang's avatar
Yuanhan Zhang committed
95
        model_overide_args: dict = None,
Lianmin Zheng's avatar
Lianmin Zheng committed
96
    ):
Liangsheng Yin's avatar
Liangsheng Yin committed
97
98
        self.server_args = server_args

Lianmin Zheng's avatar
Lianmin Zheng committed
99
100
101
102
103
104
105
106
107
        context = zmq.asyncio.Context(2)
        self.recv_from_detokenizer = context.socket(zmq.PULL)
        self.recv_from_detokenizer.bind(f"tcp://127.0.0.1:{port_args.tokenizer_port}")

        self.send_to_router = context.socket(zmq.PUSH)
        self.send_to_router.connect(f"tcp://127.0.0.1:{port_args.router_port}")

        self.model_path = server_args.model_path
        self.hf_config = get_config(
Yuanhan Zhang's avatar
Yuanhan Zhang committed
108
109
110
            self.model_path,
            trust_remote_code=server_args.trust_remote_code,
            model_overide_args=model_overide_args,
Lianmin Zheng's avatar
Lianmin Zheng committed
111
112
113
114
115
116
117
118
119
120
121
122
        )
        self.context_len = get_context_length(self.hf_config)

        if is_multimodal_model(self.model_path):
            self.processor = get_processor(
                server_args.tokenizer_path,
                tokenizer_mode=server_args.tokenizer_mode,
                trust_remote_code=server_args.trust_remote_code,
            )
            self.tokenizer = self.processor.tokenizer
            os.environ["TOKENIZERS_PARALLELISM"] = "false"
            self.executor = concurrent.futures.ProcessPoolExecutor(
123
124
125
                initializer=init_global_processor,
                mp_context=mp.get_context("fork"),
                initargs=(server_args,),
Lianmin Zheng's avatar
Lianmin Zheng committed
126
127
128
129
130
131
132
133
134
135
136
137
            )
        else:
            self.tokenizer = get_tokenizer(
                server_args.tokenizer_path,
                tokenizer_mode=server_args.tokenizer_mode,
                trust_remote_code=server_args.trust_remote_code,
            )

        self.to_create_loop = True
        self.rid_to_state = {}  # Dict[str -> ReqState]

    async def get_pixel_values(self, image_data):
Ying Sheng's avatar
Ying Sheng committed
138
        aspect_ratio = getattr(self.hf_config, "image_aspect_ratio", None)
139
140
141
        grid_pinpoints = (
            self.hf_config.image_grid_pinpoints if aspect_ratio == "anyres" else None
        )
Lianmin Zheng's avatar
Lianmin Zheng committed
142
143
144
        if self.executor is not None:
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(
145
146
147
148
149
                self.executor,
                get_pixel_values,
                image_data,
                aspect_ratio,
                grid_pinpoints,
Lianmin Zheng's avatar
Lianmin Zheng committed
150
151
            )
        else:
152
153
154
            return get_pixel_values(
                image_data, aspect_ratio, grid_pinpoints, self.processor
            )
Lianmin Zheng's avatar
Lianmin Zheng committed
155
156
157
158
159

    async def generate_request(self, obj: GenerateReqInput):
        if self.to_create_loop:
            await self.create_handle_loop()

160
        is_single = obj.is_single
Lianmin Zheng's avatar
Lianmin Zheng committed
161
162
        if is_single:
            rid = obj.rid
163
164
165
166
167
168

            if obj.input_ids is None:
                input_ids = self.tokenizer.encode(obj.text)
            else:
                input_ids = obj.input_ids

169
170
171
172
173
174
            if len(input_ids) >= self.context_len:
                raise ValueError(
                    f"The input ({len(input_ids)} tokens) is longer than the "
                    f"model's context length ({self.context_len} tokens)"
                )

Lianmin Zheng's avatar
Lianmin Zheng committed
175
176
177
178
            sampling_params = SamplingParams(**obj.sampling_params)
            if sampling_params.max_new_tokens != 0:
                sampling_params.normalize(self.tokenizer)
                sampling_params.verify()
179
180
181
182
183
184

            if isinstance(obj.image_data, list) and len(obj.image_data) > 0:
                pixel_values, image_hash, image_size = await self.get_pixel_values(
                    obj.image_data[0]
                )
            elif isinstance(obj.image_data, str):
shiyi.c_98's avatar
shiyi.c_98 committed
185
186
187
                pixel_values, image_hash, image_size = await self.get_pixel_values(
                    obj.image_data
                )
188
189
            else:
                pixel_values, image_hash, image_size = None, None, None
Lianmin Zheng's avatar
Lianmin Zheng committed
190
191
            tokenized_obj = TokenizedGenerateReqInput(
                rid=rid,
Liangsheng Yin's avatar
Liangsheng Yin committed
192
                input_text=obj.text,
Lianmin Zheng's avatar
Lianmin Zheng committed
193
194
195
                input_ids=input_ids,
                pixel_values=pixel_values,
                image_hash=image_hash,
shiyi.c_98's avatar
shiyi.c_98 committed
196
                image_size=image_size,
Lianmin Zheng's avatar
Lianmin Zheng committed
197
                sampling_params=sampling_params,
198
199
                return_logprob=obj.return_logprob,
                logprob_start_len=obj.logprob_start_len,
Liangsheng Yin's avatar
Liangsheng Yin committed
200
                top_logprobs_num=obj.top_logprobs_num,
Lianmin Zheng's avatar
Lianmin Zheng committed
201
202
203
204
205
                stream=obj.stream,
            )
            self.send_to_router.send_pyobj(tokenized_obj)

            event = asyncio.Event()
206
            state = ReqState([], False, event)
Lianmin Zheng's avatar
Lianmin Zheng committed
207
208
209
210
            self.rid_to_state[rid] = state

            while True:
                await event.wait()
211
                out = self.convert_logprob_style(state.out_list[-1],
212
213
214
                                                 obj.return_logprob,
                                                 obj.top_logprobs_num,
                                                 obj.return_text_in_logprobs)
215
216
217
218
219

                if self.server_args.log_requests and state.finished:
                    logger.info(f"in={obj.text}, out={out}")

                yield out
Lianmin Zheng's avatar
Lianmin Zheng committed
220
221
222
223
224
225
226
                state.out_list = []
                if state.finished:
                    del self.rid_to_state[rid]
                    break
                event.clear()
        else:
            assert obj.stream is False
227
228
229
230
231
232

            if obj.input_ids is None:
                bs = len(obj.text)
            else:
                bs = len(obj.input_ids)

Lianmin Zheng's avatar
Lianmin Zheng committed
233
234
            for i in range(bs):
                rid = obj.rid[i]
235
236
237
238
239
240
241
242

                if obj.input_ids is None:
                    input_text = obj.text[i]
                    input_ids = self.tokenizer.encode(obj.text[i])
                else:
                    input_text = None
                    input_ids = obj.input_ids[i]

Lianmin Zheng's avatar
Lianmin Zheng committed
243
244
245
246
247
                sampling_params = SamplingParams(**obj.sampling_params[i])
                if sampling_params.max_new_tokens != 0:
                    sampling_params.normalize(self.tokenizer)
                    sampling_params.verify()
                if obj.image_data[i] is None:
shiyi.c_98's avatar
shiyi.c_98 committed
248
                    pixel_values, image_hash, image_size = None, None, None
Lianmin Zheng's avatar
Lianmin Zheng committed
249
                else:
shiyi.c_98's avatar
shiyi.c_98 committed
250
                    pixel_values, image_hash, image_size = await self.get_pixel_values(
Lianmin Zheng's avatar
Lianmin Zheng committed
251
252
253
254
                        obj.image_data[i]
                    )
                tokenized_obj = TokenizedGenerateReqInput(
                    rid=rid,
255
                    input_text=input_text,
Lianmin Zheng's avatar
Lianmin Zheng committed
256
257
258
                    input_ids=input_ids,
                    pixel_values=pixel_values,
                    image_hash=image_hash,
shiyi.c_98's avatar
shiyi.c_98 committed
259
                    image_size=image_size,
Lianmin Zheng's avatar
Lianmin Zheng committed
260
                    sampling_params=sampling_params,
261
262
                    return_logprob=obj.return_logprob[i],
                    logprob_start_len=obj.logprob_start_len[i],
Liangsheng Yin's avatar
Liangsheng Yin committed
263
                    top_logprobs_num=obj.top_logprobs_num[i],
Lianmin Zheng's avatar
Lianmin Zheng committed
264
265
266
267
268
                    stream=obj.stream,
                )
                self.send_to_router.send_pyobj(tokenized_obj)

                event = asyncio.Event()
269
                state = ReqState([], False, event)
Lianmin Zheng's avatar
Lianmin Zheng committed
270
271
272
273
274
275
276
                self.rid_to_state[rid] = state

            output_list = []
            for i in range(bs):
                rid = obj.rid[i]
                state = self.rid_to_state[rid]
                await state.event.wait()
277
278
279
280
281
                output_list.append(
                    self.convert_logprob_style(state.out_list[-1],
                                               obj.return_logprob[i],
                                               obj.top_logprobs_num[i],
                                               obj.return_text_in_logprobs))
Lianmin Zheng's avatar
Lianmin Zheng committed
282
283
284
285
286
                assert state.finished
                del self.rid_to_state[rid]

            yield output_list

Liangsheng Yin's avatar
Liangsheng Yin committed
287
288
289
290
    async def flush_cache(self):
        flush_cache_req = FlushCacheReq()
        self.send_to_router.send_pyobj(flush_cache_req)

Lianmin Zheng's avatar
Lianmin Zheng committed
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
    async def create_handle_loop(self):
        self.to_create_loop = False
        loop = asyncio.get_event_loop()
        loop.create_task(self.handle_loop())

    async def handle_loop(self):
        while True:
            recv_obj = await self.recv_from_detokenizer.recv_pyobj()

            if isinstance(recv_obj, BatchStrOut):
                for i, rid in enumerate(recv_obj.rids):
                    recv_obj.meta_info[i]["id"] = rid
                    out_dict = {
                        "text": recv_obj.output_str[i],
                        "meta_info": recv_obj.meta_info[i],
                    }
                    state = self.rid_to_state[rid]
                    state.out_list.append(out_dict)
                    state.finished = recv_obj.finished[i]
                    state.event.set()
            else:
                raise ValueError(f"Invalid object: {recv_obj}")
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346

    def convert_logprob_style(self, ret, return_logprob, top_logprobs_num, return_text_in_logprobs):
        if return_logprob:
            ret["meta_info"]["prefill_token_logprobs"] = self.detokenize_logprob_tokens(
                ret["meta_info"]["prefill_token_logprobs"], return_text_in_logprobs
            )
            ret["meta_info"]["decode_token_logprobs"] = self.detokenize_logprob_tokens(
                ret["meta_info"]["decode_token_logprobs"], return_text_in_logprobs
            )
        if top_logprobs_num > 0:
            ret["meta_info"]["prefill_top_logprobs"] = self.detokenize_top_logprobs_tokens(
                ret["meta_info"]["prefill_top_logprobs"], return_text_in_logprobs
            )
            ret["meta_info"]["decode_top_logprobs"] = self.detokenize_top_logprobs_tokens(
                ret["meta_info"]["decode_top_logprobs"], return_text_in_logprobs
            )
        return ret

    def detokenize_logprob_tokens(self, token_logprobs, decode_to_text):
        if not decode_to_text:
            return [(logprob, token_id, None) for logprob, token_id in token_logprobs]

        token_ids = [tid for _, tid in token_logprobs]
        token_texts = self.tokenizer.batch_decode(token_ids)
        return [
            (logprob, token_id, token_text)
            for (logprob, token_id), token_text, in zip(token_logprobs, token_texts)
        ]

    def detokenize_top_logprobs_tokens(self, top_logprobs, decode_to_text):
        for i, t in enumerate(top_logprobs):
            if t:
                top_logprobs[i] = self.detokenize_logprob_tokens(t, decode_to_text)
        return top_logprobs