"...layers/git@developer.sourcefind.cn:OpenDAS/detectron2.git" did not exist on "b634945d8ce3fbcbbcf2fc89e62cf7de03b17673"
Unverified Commit 2ac36b9a authored by Ata Fatahi's avatar Ata Fatahi Committed by GitHub
Browse files

Make request payload size configurable (#2444)


Signed-off-by: default avatarAta Fatahi <immrata@gmail.com>
parent 2d60a5ee
...@@ -60,6 +60,7 @@ jobs: ...@@ -60,6 +60,7 @@ jobs:
pip install --force-reinstall dist/*.whl pip install --force-reinstall dist/*.whl
- name: Run e2e test - name: Run e2e test
run: | run: |
bash scripts/killall_sglang.sh
cd rust/py_test cd rust/py_test
python3 run_suite.py python3 run_suite.py
......
...@@ -38,6 +38,7 @@ class RouterArgs: ...@@ -38,6 +38,7 @@ class RouterArgs:
balance_rel_threshold: float = 1.0001 balance_rel_threshold: float = 1.0001
eviction_interval: int = 60 eviction_interval: int = 60
max_tree_size: int = 2**24 max_tree_size: int = 2**24
max_payload_size: int = 4 * 1024 * 1024 # 4MB
verbose: bool = False verbose: bool = False
@staticmethod @staticmethod
...@@ -116,6 +117,12 @@ class RouterArgs: ...@@ -116,6 +117,12 @@ class RouterArgs:
default=RouterArgs.max_tree_size, default=RouterArgs.max_tree_size,
help="Maximum size of the approximation tree for cache-aware routing", help="Maximum size of the approximation tree for cache-aware routing",
) )
parser.add_argument(
f"--{prefix}max-payload-size",
type=int,
default=RouterArgs.max_payload_size,
help="Maximum payload size in bytes",
)
parser.add_argument( parser.add_argument(
f"--{prefix}verbose", f"--{prefix}verbose",
action="store_true", action="store_true",
...@@ -144,6 +151,7 @@ class RouterArgs: ...@@ -144,6 +151,7 @@ class RouterArgs:
balance_rel_threshold=getattr(args, f"{prefix}balance_rel_threshold"), balance_rel_threshold=getattr(args, f"{prefix}balance_rel_threshold"),
eviction_interval=getattr(args, f"{prefix}eviction_interval"), eviction_interval=getattr(args, f"{prefix}eviction_interval"),
max_tree_size=getattr(args, f"{prefix}max_tree_size"), max_tree_size=getattr(args, f"{prefix}max_tree_size"),
max_payload_size=getattr(args, f"{prefix}max_payload_size"),
verbose=getattr(args, f"{prefix}verbose", False), verbose=getattr(args, f"{prefix}verbose", False),
) )
...@@ -187,6 +195,7 @@ def launch_router(args: argparse.Namespace) -> Optional[Router]: ...@@ -187,6 +195,7 @@ def launch_router(args: argparse.Namespace) -> Optional[Router]:
balance_rel_threshold=router_args.balance_rel_threshold, balance_rel_threshold=router_args.balance_rel_threshold,
eviction_interval_secs=router_args.eviction_interval, eviction_interval_secs=router_args.eviction_interval,
max_tree_size=router_args.max_tree_size, max_tree_size=router_args.max_tree_size,
max_payload_size=router_args.max_payload_size,
verbose=router_args.verbose, verbose=router_args.verbose,
) )
...@@ -194,7 +203,7 @@ def launch_router(args: argparse.Namespace) -> Optional[Router]: ...@@ -194,7 +203,7 @@ def launch_router(args: argparse.Namespace) -> Optional[Router]:
return router return router
except Exception as e: except Exception as e:
logger.error(f"Error starting router: {e}", file=sys.stderr) logger.error(f"Error starting router: {e}")
return None return None
......
...@@ -26,6 +26,7 @@ class Router: ...@@ -26,6 +26,7 @@ class Router:
AND max_load > min_load * rel_threshold. Otherwise, use cache aware. Default: 1.0001 AND max_load > min_load * rel_threshold. Otherwise, use cache aware. Default: 1.0001
eviction_interval_secs: Interval in seconds between cache eviction operations in cache-aware eviction_interval_secs: Interval in seconds between cache eviction operations in cache-aware
routing. Default: 60 routing. Default: 60
max_payload_size: Maximum payload size in bytes. Default: 4MB
max_tree_size: Maximum size of the approximation tree for cache-aware routing. Default: 2^24 max_tree_size: Maximum size of the approximation tree for cache-aware routing. Default: 2^24
verbose: Enable verbose logging. Default: False verbose: Enable verbose logging. Default: False
""" """
...@@ -41,6 +42,7 @@ class Router: ...@@ -41,6 +42,7 @@ class Router:
balance_rel_threshold: float = 1.0001, balance_rel_threshold: float = 1.0001,
eviction_interval_secs: int = 60, eviction_interval_secs: int = 60,
max_tree_size: int = 2**24, max_tree_size: int = 2**24,
max_payload_size: int = 4 * 1024 * 1024, # 4MB
verbose: bool = False, verbose: bool = False,
): ):
self._router = _Router( self._router = _Router(
...@@ -53,6 +55,7 @@ class Router: ...@@ -53,6 +55,7 @@ class Router:
balance_rel_threshold=balance_rel_threshold, balance_rel_threshold=balance_rel_threshold,
eviction_interval_secs=eviction_interval_secs, eviction_interval_secs=eviction_interval_secs,
max_tree_size=max_tree_size, max_tree_size=max_tree_size,
max_payload_size=max_payload_size,
verbose=verbose, verbose=verbose,
) )
......
...@@ -35,6 +35,7 @@ class TestLaunchRouter(unittest.TestCase): ...@@ -35,6 +35,7 @@ class TestLaunchRouter(unittest.TestCase):
balance_rel_threshold=1.0001, balance_rel_threshold=1.0001,
eviction_interval=60, eviction_interval=60,
max_tree_size=2**24, max_tree_size=2**24,
max_payload_size=4 * 1024 * 1024, # 4MB
verbose=False, verbose=False,
) )
......
...@@ -21,6 +21,7 @@ def popen_launch_router( ...@@ -21,6 +21,7 @@ def popen_launch_router(
dp_size: int, dp_size: int,
timeout: float, timeout: float,
policy: str = "cache_aware", policy: str = "cache_aware",
max_payload_size: int = None,
): ):
""" """
Launch the router server process. Launch the router server process.
...@@ -31,6 +32,7 @@ def popen_launch_router( ...@@ -31,6 +32,7 @@ def popen_launch_router(
dp_size: Data parallel size dp_size: Data parallel size
timeout: Server launch timeout timeout: Server launch timeout
policy: Router policy, one of "cache_aware", "round_robin", "random" policy: Router policy, one of "cache_aware", "round_robin", "random"
max_payload_size: Maximum payload size in bytes
""" """
_, host, port = base_url.split(":") _, host, port = base_url.split(":")
host = host[2:] host = host[2:]
...@@ -46,13 +48,16 @@ def popen_launch_router( ...@@ -46,13 +48,16 @@ def popen_launch_router(
"--port", "--port",
port, port,
"--dp", "--dp",
str(dp_size), # Convert dp_size to string str(dp_size),
"--router-eviction-interval", "--router-eviction-interval",
"5", # frequent eviction for testing "5",
"--router-policy", "--router-policy",
policy, policy,
] ]
if max_payload_size is not None:
command.extend(["--router-max-payload-size", str(max_payload_size)])
process = subprocess.Popen(command, stdout=None, stderr=None) process = subprocess.Popen(command, stdout=None, stderr=None)
start_time = time.time() start_time = time.time()
...@@ -280,6 +285,54 @@ class TestLaunchServer(unittest.TestCase): ...@@ -280,6 +285,54 @@ class TestLaunchServer(unittest.TestCase):
msg = f"MMLU test {'passed' if passed else 'failed'} with score {score:.3f} (threshold: {THRESHOLD})" msg = f"MMLU test {'passed' if passed else 'failed'} with score {score:.3f} (threshold: {THRESHOLD})"
self.assertGreaterEqual(score, THRESHOLD, msg) self.assertGreaterEqual(score, THRESHOLD, msg)
def test_4_payload_size(self):
print("Running test_4_payload_size...")
# Start router with 3MB limit
self.process = popen_launch_router(
self.model,
self.base_url,
dp_size=1,
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
policy="round_robin",
max_payload_size=1 * 1024 * 1024, # 1MB limit
)
# Test case 1: Payload just under 1MB should succeed
payload_0_5_mb = {
"text": "x" * int(0.5 * 1024 * 1024), # 0.5MB of text
"temperature": 0.0,
}
with requests.Session() as session:
response = session.post(
f"{self.base_url}/generate",
json=payload_0_5_mb,
headers={"Content-Type": "application/json"},
)
self.assertEqual(
response.status_code,
200,
f"0.5MB payload should succeed but got status {response.status_code}",
)
# Test case 2: Payload over 1MB should fail
payload_1_plus_mb = {
"text": "x" * int((1.2 * 1024 * 1024)), # 1.2MB of text
"temperature": 0.0,
}
with requests.Session() as session:
response = session.post(
f"{self.base_url}/generate",
json=payload_1_plus_mb,
headers={"Content-Type": "application/json"},
)
self.assertEqual(
response.status_code,
413, # Payload Too Large
f"1.2MB payload should fail with 413 but got status {response.status_code}",
)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -22,6 +22,7 @@ struct Router { ...@@ -22,6 +22,7 @@ struct Router {
balance_rel_threshold: f32, balance_rel_threshold: f32,
eviction_interval_secs: u64, eviction_interval_secs: u64,
max_tree_size: usize, max_tree_size: usize,
max_payload_size: usize,
verbose: bool, verbose: bool,
} }
...@@ -38,6 +39,7 @@ impl Router { ...@@ -38,6 +39,7 @@ impl Router {
balance_rel_threshold = 1.0001, balance_rel_threshold = 1.0001,
eviction_interval_secs = 60, eviction_interval_secs = 60,
max_tree_size = 2usize.pow(24), max_tree_size = 2usize.pow(24),
max_payload_size = 4 * 1024 * 1024,
verbose = false verbose = false
))] ))]
fn new( fn new(
...@@ -50,6 +52,7 @@ impl Router { ...@@ -50,6 +52,7 @@ impl Router {
balance_rel_threshold: f32, balance_rel_threshold: f32,
eviction_interval_secs: u64, eviction_interval_secs: u64,
max_tree_size: usize, max_tree_size: usize,
max_payload_size: usize,
verbose: bool, verbose: bool,
) -> PyResult<Self> { ) -> PyResult<Self> {
Ok(Router { Ok(Router {
...@@ -62,6 +65,7 @@ impl Router { ...@@ -62,6 +65,7 @@ impl Router {
balance_rel_threshold, balance_rel_threshold,
eviction_interval_secs, eviction_interval_secs,
max_tree_size, max_tree_size,
max_payload_size,
verbose, verbose,
}) })
} }
...@@ -86,6 +90,7 @@ impl Router { ...@@ -86,6 +90,7 @@ impl Router {
worker_urls: self.worker_urls.clone(), worker_urls: self.worker_urls.clone(),
policy_config, policy_config,
verbose: self.verbose, verbose: self.verbose,
max_payload_size: self.max_payload_size,
}) })
.await .await
.unwrap(); .unwrap();
......
...@@ -127,6 +127,7 @@ pub struct ServerConfig { ...@@ -127,6 +127,7 @@ pub struct ServerConfig {
pub worker_urls: Vec<String>, pub worker_urls: Vec<String>,
pub policy_config: PolicyConfig, pub policy_config: PolicyConfig,
pub verbose: bool, pub verbose: bool,
pub max_payload_size: usize,
} }
pub async fn startup(config: ServerConfig) -> std::io::Result<()> { pub async fn startup(config: ServerConfig) -> std::io::Result<()> {
...@@ -164,10 +165,16 @@ pub async fn startup(config: ServerConfig) -> std::io::Result<()> { ...@@ -164,10 +165,16 @@ pub async fn startup(config: ServerConfig) -> std::io::Result<()> {
info!("✅ Starting router on {}:{}", config.host, config.port); info!("✅ Starting router on {}:{}", config.host, config.port);
info!("✅ Serving Worker URLs: {:?}", config.worker_urls); info!("✅ Serving Worker URLs: {:?}", config.worker_urls);
info!("✅ Policy Config: {:?}", config.policy_config); info!("✅ Policy Config: {:?}", config.policy_config);
info!(
"✅ Max payload size: {} MB",
config.max_payload_size / (1024 * 1024)
);
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.app_data(app_state.clone()) .app_data(app_state.clone())
.app_data(web::JsonConfig::default().limit(config.max_payload_size))
.app_data(web::PayloadConfig::default().limit(config.max_payload_size))
.service(generate) .service(generate)
.service(v1_chat_completions) .service(v1_chat_completions)
.service(v1_completions) .service(v1_completions)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment