Unverified Commit 6cb00c63 authored by shangmingc's avatar shangmingc Committed by GitHub
Browse files

[PD] Optimize time out logic and add env var doc for mooncake (#6761)


Signed-off-by: default avatarShangming Cai <caishangming@linux.alibaba.com>
parent 62cac2c4
...@@ -47,6 +47,23 @@ $ python -m sglang.launch_server --model-path deepseek-ai/DeepSeek-V3-0324 --dis ...@@ -47,6 +47,23 @@ $ python -m sglang.launch_server --model-path deepseek-ai/DeepSeek-V3-0324 --dis
# decode 1 # decode 1
$ python -m sglang.launch_server --model-path deepseek-ai/DeepSeek-V3-0324 --disaggregation-ib-device ${device_name} --disaggregation-mode decode --host ${local_ip} --port 30001 --trust-remote-code --dist-init-addr ${decode_master_ip}:5000 --nnodes 2 --node-rank 1 --tp-size 16 --dp-size 8 --enable-dp-attention --enable-deepep-moe --deepep-mode low_latency --mem-fraction-static 0.8 --max-running-requests 128 $ python -m sglang.launch_server --model-path deepseek-ai/DeepSeek-V3-0324 --disaggregation-ib-device ${device_name} --disaggregation-mode decode --host ${local_ip} --port 30001 --trust-remote-code --dist-init-addr ${decode_master_ip}:5000 --nnodes 2 --node-rank 1 --tp-size 16 --dp-size 8 --enable-dp-attention --enable-deepep-moe --deepep-mode low_latency --mem-fraction-static 0.8 --max-running-requests 128
``` ```
### Advanced Configuration
PD Disaggregation with Mooncake supports the following environment variables for fine-grained control over system behavior.
#### Prefill Server Configuration
| Variable | Description | Default |
|:--------:|:-----------:|:--------:
| **`SGLANG_DISAGGREGATION_THREAD_POOL_SIZE`** | Controls the total number of worker threads for KV transfer operations per TP rank | A dynamic value calculated by `int(0.75 * os.cpu_count()) // 8)`, which is limited to be larger than 4 and less than 12 to ensure efficiency and prevent thread race conditions |
| **`SGLANG_DISAGGREGATION_QUEUE_SIZE`** | Sets the maximum pending tasks in the parallel transfer queue | `4` |
| **`SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT`** | Timeout (seconds) for receiving destination KV indices during request initialization | `30` |
#### Decode Server Configuration
| Variable | Description | Default |
|:--------:|:-----------:|:--------:
| **`SGLANG_DISAGGREGATION_HEARTBEAT_INTERVAL`** | Interval (seconds) between health checks to prefill bootstrap servers | `5.0` |
| **`SGLANG_DISAGGREGATION_HEARTBEAT_MAX_FAILURE`** | Consecutive heartbeat failures before marking prefill server offline | `2` |
## NIXL ## NIXL
### Requirements ### Requirements
......
...@@ -677,14 +677,15 @@ class MooncakeKVSender(BaseKVSender): ...@@ -677,14 +677,15 @@ class MooncakeKVSender(BaseKVSender):
self.kv_mgr.update_status(bootstrap_room, KVPoll.Bootstrapping) self.kv_mgr.update_status(bootstrap_room, KVPoll.Bootstrapping)
self.aux_index = None self.aux_index = None
self.bootstrap_server_url = bootstrap_addr self.bootstrap_server_url = bootstrap_addr
self.init_time = time.time()
self.conclude_state = None self.conclude_state = None
self.init_time = None
# inner state # inner state
self.curr_idx = 0 self.curr_idx = 0
def init(self, num_kv_indices: int, aux_index: Optional[int] = None): def init(self, num_kv_indices: int, aux_index: Optional[int] = None):
self.num_kv_indices = num_kv_indices self.num_kv_indices = num_kv_indices
self.aux_index = aux_index self.aux_index = aux_index
self.init_time = time.time()
def send( def send(
self, self,
...@@ -713,15 +714,16 @@ class MooncakeKVSender(BaseKVSender): ...@@ -713,15 +714,16 @@ class MooncakeKVSender(BaseKVSender):
if status in (KVPoll.Success, KVPoll.Failed): if status in (KVPoll.Success, KVPoll.Failed):
self.conclude_state = status self.conclude_state = status
elif status == KVPoll.Bootstrapping: elif status == KVPoll.Bootstrapping:
now = time.time() if self.init_time is not None:
elapsed = now - self.init_time now = time.time()
if elapsed >= self.kv_mgr.bootstrap_time_out: elapsed = now - self.init_time
self.kv_mgr.record_failure( if elapsed >= self.kv_mgr.bootstrap_time_out:
self.bootstrap_room, self.kv_mgr.record_failure(
f"Request {self.bootstrap_room} timed out after {elapsed:.1f}s in KVPoll.Bootstrapping", self.bootstrap_room,
) f"Request {self.bootstrap_room} timed out after {elapsed:.1f}s in KVPoll.Bootstrapping",
self.conclude_state = KVPoll.Failed )
return KVPoll.Failed self.conclude_state = KVPoll.Failed
return KVPoll.Failed
return status return status
else: else:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment