Unverified Commit fb294b90 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore: no need to publish kv events for decode vllm worker (#3819)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 1aa5e92b
......@@ -225,6 +225,8 @@ for i in $(seq 1 $NUM_WORKERS); do
fi
if [ "$MODE" = "prefill" ]; then
VLLM_ARGS+=("--is-prefill-worker")
elif [ "$MODE" = "decode" ]; then
VLLM_ARGS+=("--is-decode-worker")
fi
VLLM_ARGS+=("${EXTRA_ARGS[@]}")
......
......@@ -45,6 +45,7 @@ class Config:
component: str
endpoint: str
is_prefill_worker: bool
is_decode_worker: bool
migration_limit: int = 0
kv_port: Optional[int] = None
port_range: DynamoPortRange
......@@ -85,6 +86,11 @@ def parse_args() -> Config:
action="store_true",
help="Enable prefill functionality for this worker. Uses the provided namespace to construct dyn://namespace.prefill.generate",
)
parser.add_argument(
"--is-decode-worker",
action="store_true",
help="Mark this as a decode worker which does not publish KV events.",
)
parser.add_argument(
"--migration-limit",
type=int,
......@@ -159,6 +165,7 @@ def parse_args() -> Config:
config.endpoint = "generate"
config.engine_args = engine_args
config.is_prefill_worker = args.is_prefill_worker
config.is_decode_worker = args.is_decode_worker
config.migration_limit = args.migration_limit
config.port_range = DynamoPortRange(
min=args.dynamo_port_min, max=args.dynamo_port_max
......
......@@ -118,6 +118,11 @@ def setup_kv_event_publisher(
if not config.engine_args.enable_prefix_caching:
return None
# Skip KV event publishing for decode workers
if config.is_decode_worker:
logger.info("Skipping KV event publisher setup for decode worker")
return None
# Get data_parallel_size to create publishers for all dp_ranks
data_parallel_size = getattr(vllm_config.parallel_config, "data_parallel_size", 1)
kv_publishers = []
......
......@@ -170,23 +170,20 @@ impl PrefillRouter {
bail!("Prefill router returned no output (stream ended)");
};
while prefill_response.next().await.is_some() {}
if let Some(err) = first_output.err() {
while prefill_response.next().await.is_some() {}
bail!("Prefill router returned error in output: {:?}", err);
bail!("Prefill router returned error in output: {err:?}");
}
let Some(output) = &first_output.data else {
while prefill_response.next().await.is_some() {}
bail!("Prefill router output has no data field");
};
let Some(disaggregated_params) = output.disaggregated_params.clone() else {
while prefill_response.next().await.is_some() {}
bail!("Prefill router output missing disaggregated_params");
};
while prefill_response.next().await.is_some() {}
Ok(disaggregated_params)
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment