Unverified Commit f7655790 authored by Liangsheng Yin's avatar Liangsheng Yin Committed by GitHub
Browse files

Fix typo: infight -> inflight (#5357)

parent f58b929a
...@@ -10,9 +10,9 @@ Life cycle of a request in the prefill server ...@@ -10,9 +10,9 @@ Life cycle of a request in the prefill server
2. Waiting Queue 2. Waiting Queue
a. Use PrefillAdder to pop requests a. Use PrefillAdder to pop requests
b. Run forward b. Run forward
c. Add the request to Infight Queue c. Add the request to Inflight Queue
3. Infight Queue 3. Inflight Queue
a. Poll (non-blocking) the sender of the request a. Poll (non-blocking) the sender of the request
b. Once the transfer has finished, return the request b. Once the transfer has finished, return the request
""" """
...@@ -162,7 +162,7 @@ class SchedulerDisaggregationPrefillMixin: ...@@ -162,7 +162,7 @@ class SchedulerDisaggregationPrefillMixin:
self: Scheduler, batch: ScheduleBatch, result: GenerationBatchResult self: Scheduler, batch: ScheduleBatch, result: GenerationBatchResult
) -> None: ) -> None:
""" """
Transfer kv for prefill completed requests and add it into disagg_prefill_infight_queue Transfer kv for prefill completed requests and add it into disagg_prefill_inflight_queue
Adapted from process_batch_result_prefill Adapted from process_batch_result_prefill
""" """
...@@ -175,7 +175,7 @@ class SchedulerDisaggregationPrefillMixin: ...@@ -175,7 +175,7 @@ class SchedulerDisaggregationPrefillMixin:
req.output_ids.append(next_token_id) req.output_ids.append(next_token_id)
self.tree_cache.cache_unfinished_req(req) # update the tree and lock self.tree_cache.cache_unfinished_req(req) # update the tree and lock
self.send_kv_chunk(req, token_id=next_token_id) self.send_kv_chunk(req, token_id=next_token_id)
self.disagg_prefill_infight_queue.append(req) self.disagg_prefill_inflight_queue.append(req)
else: else:
# being chunked reqs' prefill is not finished # being chunked reqs' prefill is not finished
req.is_chunked -= 1 req.is_chunked -= 1
...@@ -187,22 +187,22 @@ class SchedulerDisaggregationPrefillMixin: ...@@ -187,22 +187,22 @@ class SchedulerDisaggregationPrefillMixin:
self.current_stream.synchronize() self.current_stream.synchronize()
batch.next_batch_sampling_info.sampling_info_done.set() batch.next_batch_sampling_info.sampling_info_done.set()
def process_disagg_prefill_infight_queue(self: Scheduler) -> None: def process_disagg_prefill_inflight_queue(self: Scheduler) -> None:
""" """
Poll the requests in the middle of transfer. If done, return the request. Poll the requests in the middle of transfer. If done, return the request.
""" """
assert len(self.disagg_prefill_infight_queue) > 0 assert len(self.disagg_prefill_inflight_queue) > 0
done_reqs = [] done_reqs = []
polls = poll_and_all_reduce( polls = poll_and_all_reduce(
[req.disagg_kv_sender for req in self.disagg_prefill_infight_queue], [req.disagg_kv_sender for req in self.disagg_prefill_inflight_queue],
self.tp_worker.get_tp_cpu_group(), self.tp_worker.get_tp_cpu_group(),
) )
undone_reqs: List[Req] = [] undone_reqs: List[Req] = []
# Check .poll() for the reqs in disagg_prefill_infight_queue. If Success, respond to the client and remove it from the queue # Check .poll() for the reqs in disagg_prefill_inflight_queue. If Success, respond to the client and remove it from the queue
for req, poll in zip(self.disagg_prefill_infight_queue, polls): for req, poll in zip(self.disagg_prefill_inflight_queue, polls):
if poll in [KVPoll.WaitingForInput, KVPoll.Transferring]: if poll in [KVPoll.WaitingForInput, KVPoll.Transferring]:
undone_reqs.append(req) undone_reqs.append(req)
elif poll == KVPoll.Success: # transfer done elif poll == KVPoll.Success: # transfer done
...@@ -215,7 +215,7 @@ class SchedulerDisaggregationPrefillMixin: ...@@ -215,7 +215,7 @@ class SchedulerDisaggregationPrefillMixin:
# Stream requests which have finished transfer # Stream requests which have finished transfer
self.stream_output(done_reqs, False, None) self.stream_output(done_reqs, False, None)
self.disagg_prefill_infight_queue = undone_reqs self.disagg_prefill_inflight_queue = undone_reqs
def process_prefill_chunk(self: Scheduler) -> None: def process_prefill_chunk(self: Scheduler) -> None:
if self.last_batch and self.last_batch.forward_mode.is_extend(): if self.last_batch and self.last_batch.forward_mode.is_extend():
......
...@@ -594,7 +594,7 @@ class Scheduler( ...@@ -594,7 +594,7 @@ class Scheduler(
gloo_group=self.tp_worker.get_attention_tp_cpu_group(), gloo_group=self.tp_worker.get_attention_tp_cpu_group(),
) )
# The prefill requests that are in the middle of kv sending # The prefill requests that are in the middle of kv sending
self.disagg_prefill_infight_queue: List[Req] = [] self.disagg_prefill_inflight_queue: List[Req] = []
@DynamicGradMode() @DynamicGradMode()
def event_loop_normal(self): def event_loop_normal(self):
...@@ -674,10 +674,10 @@ class Scheduler( ...@@ -674,10 +674,10 @@ class Scheduler(
result = self.run_batch(batch) result = self.run_batch(batch)
self.process_batch_result_disagg_prefill(batch, result) self.process_batch_result_disagg_prefill(batch, result)
if len(self.disagg_prefill_infight_queue) > 0: if len(self.disagg_prefill_inflight_queue) > 0:
self.process_disagg_prefill_infight_queue() self.process_disagg_prefill_inflight_queue()
if batch is None and len(self.disagg_prefill_infight_queue) == 0: if batch is None and len(self.disagg_prefill_inflight_queue) == 0:
self.check_memory() self.check_memory()
self.new_token_ratio = self.init_new_token_ratio self.new_token_ratio = self.init_new_token_ratio
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment