Unverified Commit dd1012fc authored by shangmingc's avatar shangmingc Committed by GitHub
Browse files

[PD] Fix potential perf spike caused by tracker gc and optimize doc (#6764)


Signed-off-by: default avatarShangming Cai <caishangming@linux.alibaba.com>
parent 44aab7f9
......@@ -54,8 +54,8 @@ PD Disaggregation with Mooncake supports the following environment variables for
#### Prefill Server Configuration
| Variable | Description | Default |
|:--------:|:-----------:|:--------:
| **`SGLANG_DISAGGREGATION_THREAD_POOL_SIZE`** | Controls the total number of worker threads for KV transfer operations per TP rank | A dynamic value calculated by `int(0.75 * os.cpu_count()) // 8)`, which is limited to be larger than 4 and less than 12 to ensure efficiency and prevent thread race conditions |
| **`SGLANG_DISAGGREGATION_QUEUE_SIZE`** | Sets the maximum pending tasks in the parallel transfer queue | `4` |
| **`SGLANG_DISAGGREGATION_THREAD_POOL_SIZE`** | Controls the total number of worker threads for KVCache transfer operations per TP rank | A dynamic value calculated by `int(0.75 * os.cpu_count()) // 8)`, which is limited to be larger than 4 and less than 12 to ensure efficiency and prevent thread race conditions |
| **`SGLANG_DISAGGREGATION_QUEUE_SIZE`** | Sets the number of parallel transfer queues. KVCache transfer requests from multiple decode instances will be sharded into these queues so that they can share the threads and the transfer bandwidth at the same time. If it is set to `1`, then we transfer requests one by one according to fcfs strategy | `4` |
| **`SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT`** | Timeout (seconds) for receiving destination KV indices during request initialization | `30` |
#### Decode Server Configuration
......
......@@ -191,7 +191,7 @@ class MooncakeKVManager(BaseKVManager):
self.heartbeat_failures = {}
self.session_pool = defaultdict(requests.Session)
self.session_pool_lock = threading.Lock()
self.addr_to_rooms_tracker = defaultdict(list)
self.addr_to_rooms_tracker = defaultdict(set)
self.connection_lock = threading.Lock()
# Heartbeat interval should be at least 2 seconds
self.heartbeat_interval = max(
......@@ -504,12 +504,14 @@ class MooncakeKVManager(BaseKVManager):
if response.status_code == 200:
self.heartbeat_failures[bootstrap_addr] = 0
for bootstrap_room in self.addr_to_rooms_tracker[
current_rooms = self.addr_to_rooms_tracker[
bootstrap_addr
]:
# Remove KVPoll.Success requests from the map
].copy()
for bootstrap_room in current_rooms:
# Remove KVPoll.Success requests from the tracker
if bootstrap_room not in self.request_status:
self.addr_to_rooms_tracker[bootstrap_addr].remove(
self.addr_to_rooms_tracker[bootstrap_addr].discard(
bootstrap_room
)
else:
......@@ -879,9 +881,7 @@ class MooncakeKVReceiver(BaseKVReceiver):
self.bootstrap_infos = self.kv_mgr.connection_pool[bootstrap_key]
assert len(self.bootstrap_infos) > 0
self.kv_mgr.addr_to_rooms_tracker[self.bootstrap_addr].append(
self.bootstrap_room
)
self.kv_mgr.addr_to_rooms_tracker[self.bootstrap_addr].add(self.bootstrap_room)
self.kv_mgr.update_status(self.bootstrap_room, KVPoll.WaitingForInput)
def _get_bootstrap_info_from_server(self, engine_rank, target_dp_group):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment