Unverified Commit e165a9fc authored by Lianmin Zheng's avatar Lianmin Zheng Committed by GitHub
Browse files

Make detokenizer_manager.py not asyncio (#1532)

parent 4e4459b9
...@@ -15,13 +15,10 @@ limitations under the License. ...@@ -15,13 +15,10 @@ limitations under the License.
"""DetokenizerManager is a process that detokenizes the token ids.""" """DetokenizerManager is a process that detokenizes the token ids."""
import asyncio
import dataclasses import dataclasses
from typing import List from typing import List
import uvloop
import zmq import zmq
import zmq.asyncio
from sglang.srt.hf_transformers_utils import get_tokenizer from sglang.srt.hf_transformers_utils import get_tokenizer
from sglang.srt.managers.io_struct import ( from sglang.srt.managers.io_struct import (
...@@ -34,8 +31,6 @@ from sglang.srt.managers.schedule_batch import FINISH_MATCHED_STR ...@@ -34,8 +31,6 @@ from sglang.srt.managers.schedule_batch import FINISH_MATCHED_STR
from sglang.srt.server_args import PortArgs, ServerArgs from sglang.srt.server_args import PortArgs, ServerArgs
from sglang.utils import find_printable_text, get_exception_traceback from sglang.utils import find_printable_text, get_exception_traceback
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
@dataclasses.dataclass @dataclasses.dataclass
class DecodeStatus: class DecodeStatus:
...@@ -57,7 +52,7 @@ class DetokenizerManager: ...@@ -57,7 +52,7 @@ class DetokenizerManager:
port_args: PortArgs, port_args: PortArgs,
): ):
# Init inter-process communication # Init inter-process communication
context = zmq.asyncio.Context(2) context = zmq.Context(2)
self.recv_from_router = context.socket(zmq.PULL) self.recv_from_router = context.socket(zmq.PULL)
self.recv_from_router.bind(f"tcp://127.0.0.1:{port_args.detokenizer_port}") self.recv_from_router.bind(f"tcp://127.0.0.1:{port_args.detokenizer_port}")
...@@ -75,11 +70,11 @@ class DetokenizerManager: ...@@ -75,11 +70,11 @@ class DetokenizerManager:
self.decode_status = {} self.decode_status = {}
async def handle_loop(self): def handle_loop(self):
"""The event loop that handles requests""" """The event loop that handles requests"""
while True: while True:
recv_obj = await self.recv_from_router.recv_pyobj() recv_obj = self.recv_from_router.recv_pyobj()
if isinstance(recv_obj, BatchEmbeddingOut): if isinstance(recv_obj, BatchEmbeddingOut):
# If it is embedding model, no detokenization is needed. # If it is embedding model, no detokenization is needed.
...@@ -181,5 +176,4 @@ def start_detokenizer_process( ...@@ -181,5 +176,4 @@ def start_detokenizer_process(
pipe_writer.send(get_exception_traceback()) pipe_writer.send(get_exception_traceback())
raise raise
pipe_writer.send("init ok") pipe_writer.send("init ok")
loop = asyncio.get_event_loop() manager.handle_loop()
loop.run_until_complete(manager.handle_loop())
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment