Unverified Commit 6ff49edb authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: efficient serving of multiple mockers (#3997)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 17139df9
...@@ -168,88 +168,98 @@ trap cleanup SIGINT SIGTERM ...@@ -168,88 +168,98 @@ trap cleanup SIGINT SIGTERM
echo "Starting $NUM_WORKERS $MODE workers..." echo "Starting $NUM_WORKERS $MODE workers..."
for i in $(seq 1 $NUM_WORKERS); do if [ "$USE_MOCKERS" = true ]; then
{ # For mockers, launch a single process with --num-workers
MODE_CAPITALIZED=$(echo "$MODE" | sed 's/\(.\)/\U\1/') # All workers share the same tokio runtime and thread pool
echo "[$MODE_CAPITALIZED Worker-$i] Starting..." MODE_CAPITALIZED=$(echo "$MODE" | sed 's/\(.\)/\U\1/')
echo "[$MODE_CAPITALIZED Mocker] Starting $NUM_WORKERS workers in single process..."
# Calculate GPU indices for this worker (with base offset) MOCKER_ARGS=()
# Each worker needs TP * DP GPUs MOCKER_ARGS+=("--model-path" "$MODEL_PATH")
START_GPU=$(( BASE_GPU_OFFSET + (i - 1) * GPUS_PER_WORKER )) MOCKER_ARGS+=("--num-workers" "$NUM_WORKERS")
END_GPU=$(( START_GPU + GPUS_PER_WORKER - 1 ))
# Build CUDA_VISIBLE_DEVICES string for all GPUs (TP * DP) # Set endpoint based on worker mode
if [ "$GPUS_PER_WORKER" -eq 1 ]; then if [ "$MODE" = "prefill" ]; then
GPU_DEVICES="$START_GPU" MOCKER_ARGS+=("--endpoint" "dyn://test.prefill.generate")
else MOCKER_ARGS+=("--is-prefill-worker")
GPU_DEVICES="" elif [ "$MODE" = "decode" ]; then
for gpu in $(seq $START_GPU $END_GPU); do MOCKER_ARGS+=("--endpoint" "dyn://test.mocker.generate")
if [ -n "$GPU_DEVICES" ]; then MOCKER_ARGS+=("--is-decode-worker")
GPU_DEVICES="${GPU_DEVICES},$gpu" else
else MOCKER_ARGS+=("--endpoint" "dyn://test.mocker.generate")
GPU_DEVICES="$gpu" fi
fi
done
fi
if [ "$USE_MOCKERS" = true ]; then if [ "$DATA_PARALLEL_SIZE" -gt 1 ]; then
# Run mocker engine (no GPU assignment needed) MOCKER_ARGS+=("--data-parallel-size" "$DATA_PARALLEL_SIZE")
MOCKER_ARGS=() fi
MOCKER_ARGS+=("--model-path" "$MODEL_PATH") MOCKER_ARGS+=("${EXTRA_ARGS[@]}")
python -m dynamo.mocker "${MOCKER_ARGS[@]}" &
PIDS+=($!)
echo "Started mocker with $NUM_WORKERS workers (PID: $!)"
else
# For vLLM and TensorRT-LLM, use the original loop to launch separate processes
for i in $(seq 1 $NUM_WORKERS); do
{
MODE_CAPITALIZED=$(echo "$MODE" | sed 's/\(.\)/\U\1/')
echo "[$MODE_CAPITALIZED Worker-$i] Starting..."
# Calculate GPU indices for this worker (with base offset)
# Each worker needs TP * DP GPUs
START_GPU=$(( BASE_GPU_OFFSET + (i - 1) * GPUS_PER_WORKER ))
END_GPU=$(( START_GPU + GPUS_PER_WORKER - 1 ))
# Set endpoint based on worker mode # Build CUDA_VISIBLE_DEVICES string for all GPUs (TP * DP)
if [ "$MODE" = "prefill" ]; then if [ "$GPUS_PER_WORKER" -eq 1 ]; then
MOCKER_ARGS+=("--endpoint" "dyn://test.prefill.generate") GPU_DEVICES="$START_GPU"
MOCKER_ARGS+=("--is-prefill-worker")
elif [ "$MODE" = "decode" ]; then
MOCKER_ARGS+=("--endpoint" "dyn://test.mocker.generate")
MOCKER_ARGS+=("--is-decode-worker")
else else
MOCKER_ARGS+=("--endpoint" "dyn://test.mocker.generate") GPU_DEVICES=""
for gpu in $(seq $START_GPU $END_GPU); do
if [ -n "$GPU_DEVICES" ]; then
GPU_DEVICES="${GPU_DEVICES},$gpu"
else
GPU_DEVICES="$gpu"
fi
done
fi fi
if [ "$DATA_PARALLEL_SIZE" -gt 1 ]; then if [ "$USE_TRTLLM" = true ]; then
MOCKER_ARGS+=("--data-parallel-size" "$DATA_PARALLEL_SIZE") echo "[$MODE_CAPITALIZED Worker-$i] Using GPUs: $GPU_DEVICES"
fi # Run TensorRT-LLM engine with trtllm-llmapi-launch for proper initialization
MOCKER_ARGS+=("${EXTRA_ARGS[@]}") TRTLLM_ARGS=()
TRTLLM_ARGS+=("--model-path" "$MODEL_PATH")
TRTLLM_ARGS+=("--tensor-parallel-size" "$TENSOR_PARALLEL_SIZE")
if [ "$MODE" != "agg" ]; then
TRTLLM_ARGS+=("--disaggregation-mode" "$MODE")
fi
TRTLLM_ARGS+=("${EXTRA_ARGS[@]}")
exec python -m dynamo.mocker "${MOCKER_ARGS[@]}" exec env CUDA_VISIBLE_DEVICES=$GPU_DEVICES trtllm-llmapi-launch python -m dynamo.trtllm \
elif [ "$USE_TRTLLM" = true ]; then "${TRTLLM_ARGS[@]}"
echo "[$MODE_CAPITALIZED Worker-$i] Using GPUs: $GPU_DEVICES" else
# Run TensorRT-LLM engine with trtllm-llmapi-launch for proper initialization echo "[$MODE_CAPITALIZED Worker-$i] Using GPUs: $GPU_DEVICES"
TRTLLM_ARGS=() # Run vLLM engine with PYTHONHASHSEED=0 for deterministic event IDs in KV-aware routing
TRTLLM_ARGS+=("--model-path" "$MODEL_PATH") VLLM_ARGS=()
TRTLLM_ARGS+=("--tensor-parallel-size" "$TENSOR_PARALLEL_SIZE") VLLM_ARGS+=("--model" "$MODEL_PATH")
if [ "$MODE" != "agg" ]; then VLLM_ARGS+=("--tensor-parallel-size" "$TENSOR_PARALLEL_SIZE")
TRTLLM_ARGS+=("--disaggregation-mode" "$MODE") if [ "$DATA_PARALLEL_SIZE" -gt 1 ]; then
fi VLLM_ARGS+=("--data-parallel-size" "$DATA_PARALLEL_SIZE")
TRTLLM_ARGS+=("${EXTRA_ARGS[@]}") fi
if [ "$MODE" = "prefill" ]; then
VLLM_ARGS+=("--is-prefill-worker")
elif [ "$MODE" = "decode" ]; then
VLLM_ARGS+=("--is-decode-worker")
fi
VLLM_ARGS+=("${EXTRA_ARGS[@]}")
exec env CUDA_VISIBLE_DEVICES=$GPU_DEVICES trtllm-llmapi-launch python -m dynamo.trtllm \ exec env PYTHONHASHSEED=0 CUDA_VISIBLE_DEVICES=$GPU_DEVICES python -m dynamo.vllm \
"${TRTLLM_ARGS[@]}" "${VLLM_ARGS[@]}"
else
echo "[$MODE_CAPITALIZED Worker-$i] Using GPUs: $GPU_DEVICES"
# Run vLLM engine with PYTHONHASHSEED=0 for deterministic event IDs in KV-aware routing
VLLM_ARGS=()
VLLM_ARGS+=("--model" "$MODEL_PATH")
VLLM_ARGS+=("--tensor-parallel-size" "$TENSOR_PARALLEL_SIZE")
if [ "$DATA_PARALLEL_SIZE" -gt 1 ]; then
VLLM_ARGS+=("--data-parallel-size" "$DATA_PARALLEL_SIZE")
fi fi
if [ "$MODE" = "prefill" ]; then } &
VLLM_ARGS+=("--is-prefill-worker") PIDS+=($!)
elif [ "$MODE" = "decode" ]; then echo "Started $MODE worker $i (PID: $!)"
VLLM_ARGS+=("--is-decode-worker") done
fi fi
VLLM_ARGS+=("${EXTRA_ARGS[@]}")
exec env PYTHONHASHSEED=0 CUDA_VISIBLE_DEVICES=$GPU_DEVICES python -m dynamo.vllm \
"${VLLM_ARGS[@]}"
fi
} &
PIDS+=($!)
echo "Started $MODE worker $i (PID: $!)"
done
echo "All workers started. Press Ctrl+C to stop." echo "All workers started. Press Ctrl+C to stop."
wait wait
......
...@@ -24,6 +24,7 @@ The mocker engine now supports a vLLM-style CLI interface with individual argume ...@@ -24,6 +24,7 @@ The mocker engine now supports a vLLM-style CLI interface with individual argume
- `--watermark`: KV cache watermark threshold as a fraction (default: 0.01) - `--watermark`: KV cache watermark threshold as a fraction (default: 0.01)
- `--speedup-ratio`: Speed multiplier for token generation (default: 1.0). Higher values make the simulation engines run faster - `--speedup-ratio`: Speed multiplier for token generation (default: 1.0). Higher values make the simulation engines run faster
- `--data-parallel-size`: Number of data parallel workers to simulate (default: 1) - `--data-parallel-size`: Number of data parallel workers to simulate (default: 1)
- `--num-workers`: Number of mocker workers to launch in the same process (default: 1). All workers share the same tokio runtime and thread pool
### Example with individual arguments (vLLM-style): ### Example with individual arguments (vLLM-style):
```bash ```bash
...@@ -34,6 +35,7 @@ python -m dynamo.mocker \ ...@@ -34,6 +35,7 @@ python -m dynamo.mocker \
--block-size 16 \ --block-size 16 \
--speedup-ratio 10.0 \ --speedup-ratio 10.0 \
--max-num-seqs 512 \ --max-num-seqs 512 \
--num-workers 4 \
--enable-prefix-caching --enable-prefix-caching
# Start frontend server # Start frontend server
...@@ -41,4 +43,4 @@ python -m dynamo.frontend --http-port 8000 ...@@ -41,4 +43,4 @@ python -m dynamo.frontend --http-port 8000
``` ```
> [!Note] > [!Note]
> Each mocker instance runs as a single process, and each DP worker (specified by `--data-parallel-size`) is spawned as a lightweight async task within that process. For benchmarking (e.g., router testing), you would much prefer launching one mocker instance with a large `--data-parallel-size` rather than multiple separate mocker instances to reduce overhead. > Each mocker instance runs as a single process, and each DP worker (specified by `--data-parallel-size`) is spawned as a lightweight async task within that process. For benchmarking (e.g., router testing), you can use `--num-workers` to launch multiple mocker engines in the same process, which is more efficient than launching separate processes since they all share the same tokio runtime and thread pool.
\ No newline at end of file \ No newline at end of file
...@@ -175,6 +175,13 @@ def parse_args(): ...@@ -175,6 +175,13 @@ def parse_args():
default=None, default=None,
help="Simulated engine startup time in seconds (default: None)", help="Simulated engine startup time in seconds (default: None)",
) )
parser.add_argument(
"--num-workers",
type=int,
default=1,
help="Number of mocker workers to launch in the same process (default: 1). "
"All workers share the same tokio runtime and thread pool.",
)
# Legacy support - allow direct JSON file specification # Legacy support - allow direct JSON file specification
parser.add_argument( parser.add_argument(
...@@ -201,6 +208,10 @@ def parse_args(): ...@@ -201,6 +208,10 @@ def parse_args():
args = parser.parse_args() args = parser.parse_args()
validate_worker_type_args(args) validate_worker_type_args(args)
# Validate num_workers
if args.num_workers < 1:
raise ValueError(f"--num-workers must be at least 1, got {args.num_workers}")
# Set endpoint default based on worker type if not explicitly provided # Set endpoint default based on worker type if not explicitly provided
if args.endpoint is None: if args.endpoint is None:
if args.is_prefill_worker: if args.is_prefill_worker:
......
...@@ -4,12 +4,16 @@ ...@@ -4,12 +4,16 @@
# Usage: `python -m dynamo.mocker --model-path /data/models/Qwen3-0.6B` # Usage: `python -m dynamo.mocker --model-path /data/models/Qwen3-0.6B`
# Now supports vLLM-style individual arguments for MockEngineArgs # Now supports vLLM-style individual arguments for MockEngineArgs
import asyncio
import logging import logging
import os
import uvloop import uvloop
os.environ.setdefault("DYN_COMPUTE_THREADS", "0")
from dynamo.llm import EngineType, EntrypointArgs, make_engine, run_input from dynamo.llm import EngineType, EntrypointArgs, make_engine, run_input
from dynamo.runtime import DistributedRuntime, dynamo_worker from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging from dynamo.runtime.logging import configure_dynamo_logging
from .args import create_temp_engine_args_file, parse_args from .args import create_temp_engine_args_file, parse_args
...@@ -18,8 +22,12 @@ configure_dynamo_logging() ...@@ -18,8 +22,12 @@ configure_dynamo_logging()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dynamo_worker(static=False) async def worker():
async def worker(runtime: DistributedRuntime): """Main worker function that launches mocker instances.
Each mocker gets its own DistributedRuntime instance for true isolation,
while still sharing the same event loop and tokio runtime.
"""
args = parse_args() args = parse_args()
# Handle extra_engine_args: either use provided file or create from CLI args # Handle extra_engine_args: either use provided file or create from CLI args
...@@ -33,7 +41,41 @@ async def worker(runtime: DistributedRuntime): ...@@ -33,7 +41,41 @@ async def worker(runtime: DistributedRuntime):
logger.info("Created MockEngineArgs from CLI arguments") logger.info("Created MockEngineArgs from CLI arguments")
try: try:
# Create engine configuration logger.info(
f"Launching {args.num_workers} mocker worker(s) with isolated DistributedRuntime instances"
)
await launch_workers(args, extra_engine_args_path)
finally:
# Clean up temporary file if we created one
if not args.extra_engine_args and extra_engine_args_path.exists():
try:
extra_engine_args_path.unlink()
logger.debug(f"Cleaned up temporary file {extra_engine_args_path}")
except Exception as e:
logger.warning(f"Failed to clean up temporary file: {e}")
async def launch_workers(args, extra_engine_args_path):
"""Launch mocker worker(s) with isolated DistributedRuntime instances.
Each worker gets its own DistributedRuntime, which means:
- Separate etcd/NATS connections
- Separate Component instances (no shared overhead)
- Independent service registration and stats scraping
- But still sharing the same tokio runtime (efficient)
"""
loop = asyncio.get_running_loop()
futures = []
runtimes = []
for worker_id in range(args.num_workers):
logger.info(f"Creating mocker worker {worker_id + 1}/{args.num_workers}")
# Create a separate DistributedRuntime for this worker (on same event loop)
runtime = DistributedRuntime(loop, False)
runtimes.append(runtime)
# Create EntrypointArgs for this worker
entrypoint_args = EntrypointArgs( entrypoint_args = EntrypointArgs(
engine_type=EngineType.Mocker, engine_type=EngineType.Mocker,
model_path=args.model_path, model_path=args.model_path,
...@@ -43,18 +85,23 @@ async def worker(runtime: DistributedRuntime): ...@@ -43,18 +85,23 @@ async def worker(runtime: DistributedRuntime):
is_prefill=args.is_prefill_worker, is_prefill=args.is_prefill_worker,
) )
# Create and run the engine # Create the engine with this worker's isolated runtime
# NOTE: only supports dyn endpoint for now
engine_config = await make_engine(runtime, entrypoint_args) engine_config = await make_engine(runtime, entrypoint_args)
await run_input(runtime, args.endpoint, engine_config)
# run_input returns a Rust Future (not a Python coroutine)
future = run_input(runtime, args.endpoint, engine_config)
futures.append(future)
logger.info(f"All {args.num_workers} mocker worker(s) created and running")
try:
# Wait for all futures to complete
await asyncio.gather(*futures, return_exceptions=True)
finally: finally:
# Clean up temporary file if we created one # Clean up runtimes
if not args.extra_engine_args and extra_engine_args_path.exists(): logger.info("Shutting down DistributedRuntime instances")
try: for runtime in runtimes:
extra_engine_args_path.unlink() runtime.shutdown()
logger.debug(f"Cleaned up temporary file {extra_engine_args_path}")
except Exception as e:
logger.warning(f"Failed to clean up temporary file: {e}")
def main(): def main():
......
...@@ -428,16 +428,25 @@ enum ModelInput { ...@@ -428,16 +428,25 @@ enum ModelInput {
impl DistributedRuntime { impl DistributedRuntime {
#[new] #[new]
fn new(event_loop: PyObject, is_static: bool) -> PyResult<Self> { fn new(event_loop: PyObject, is_static: bool) -> PyResult<Self> {
let worker = rs::Worker::from_settings().map_err(to_pyerr)?; // Try to get existing runtime first, create new Worker only if needed
INIT.get_or_try_init(|| { // This allows multiple DistributedRuntime instances to share the same tokio runtime
let primary = worker.tokio_runtime()?; let runtime = rs::Worker::runtime_from_existing()
pyo3_async_runtimes::tokio::init_with_runtime(primary) .or_else(|_| {
.map_err(|e| rs::error!("failed to initialize pyo3 static runtime: {:?}", e))?; // No existing Worker, create new one
rs::OK(()) let worker = rs::Worker::from_settings()?;
})
.map_err(to_pyerr)?; // Initialize pyo3 bridge (only happens once per process)
INIT.get_or_try_init(|| {
let runtime = worker.runtime().clone(); let primary = worker.tokio_runtime()?;
pyo3_async_runtimes::tokio::init_with_runtime(primary).map_err(|e| {
rs::error!("failed to initialize pyo3 static runtime: {:?}", e)
})?;
rs::OK(())
})?;
rs::OK(worker.runtime().clone())
})
.map_err(to_pyerr)?;
// Initialize logging in context where tokio runtime is available // Initialize logging in context where tokio runtime is available
// otel exporter requires it // otel exporter requires it
......
...@@ -860,13 +860,13 @@ impl WorkerMetricsPublisher { ...@@ -860,13 +860,13 @@ impl WorkerMetricsPublisher {
{ {
tracing::warn!("Failed to publish metrics over NATS: {}", e); tracing::warn!("Failed to publish metrics over NATS: {}", e);
} }
// Reset timer to pending state to avoid tight loop
// It will be reset to 1ms when metrics actually change
publish_timer.as_mut().reset(
tokio::time::Instant::now() + tokio::time::Duration::from_secs(3600)
);
} }
// Reset timer to pending state to avoid tight loop
// It will be reset to 1ms when metrics actually change
publish_timer.as_mut().reset(
tokio::time::Instant::now() + tokio::time::Duration::from_secs(3600)
);
} }
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment