Unverified Commit efbc116a authored by Lianmin Zheng's avatar Lianmin Zheng Committed by GitHub
Browse files

Do not use longest prefix matching when #queue-req is large (#1896)

parent 6aed0445
...@@ -45,9 +45,15 @@ class SchedulePolicy: ...@@ -45,9 +45,15 @@ class SchedulePolicy:
self.tree_cache = tree_cache self.tree_cache = tree_cache
def calc_priority(self, waiting_queue: List[Req]): def calc_priority(self, waiting_queue: List[Req]):
if len(waiting_queue) > 128 and self.policy == "lpm":
# Turn off the expensive prefix matching and sorting when the #queue is large.
policy = "fcfs"
else:
policy = self.policy
# Compute matched prefix length # Compute matched prefix length
prefix_computed = False prefix_computed = False
if self.policy == "lpm" or self.policy == "dfs-weight": if policy == "lpm" or policy == "dfs-weight":
for r in waiting_queue: for r in waiting_queue:
# NOTE: the prefix_indices must always be aligned with last_node # NOTE: the prefix_indices must always be aligned with last_node
r.prefix_indices, r.last_node = self.tree_cache.match_prefix( r.prefix_indices, r.last_node = self.tree_cache.match_prefix(
...@@ -56,18 +62,18 @@ class SchedulePolicy: ...@@ -56,18 +62,18 @@ class SchedulePolicy:
prefix_computed = True prefix_computed = True
if self.policy == "lpm": if policy == "lpm":
# Longest Prefix Match # Longest Prefix Match
waiting_queue.sort(key=lambda x: -len(x.prefix_indices)) waiting_queue.sort(key=lambda x: -len(x.prefix_indices))
elif self.policy == "fcfs": elif policy == "fcfs":
# first come first serve # first come first serve
pass pass
elif self.policy == "lof": elif policy == "lof":
# longest output first # longest output first
waiting_queue.sort(key=lambda x: -x.sampling_params.max_new_tokens) waiting_queue.sort(key=lambda x: -x.sampling_params.max_new_tokens)
elif self.policy == "random": elif policy == "random":
random.shuffle(waiting_queue) random.shuffle(waiting_queue)
elif self.policy == "dfs-weight": elif policy == "dfs-weight":
last_node_to_reqs = defaultdict(list) last_node_to_reqs = defaultdict(list)
for req in waiting_queue: for req in waiting_queue:
last_node_to_reqs[req.last_node].append(req) last_node_to_reqs[req.last_node].append(req)
...@@ -85,7 +91,7 @@ class SchedulePolicy: ...@@ -85,7 +91,7 @@ class SchedulePolicy:
waiting_queue, waiting_queue,
) )
else: else:
raise ValueError(f"Unknown schedule_policy: {self.policy}") raise ValueError(f"Unknown schedule_policy: {policy=}")
return prefix_computed return prefix_computed
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment