Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
change
sglang
Commits
ff914748
Unverified
Commit
ff914748
authored
Jun 02, 2025
by
Arthur Cheng
Committed by
GitHub
Jun 02, 2025
Browse files
[Router] Fix k8s Service Discovery (#6766)
Co-authored-by:
Simo Lin
<
linsimo.mark@gmail.com
>
parent
eb38c7d1
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
281 additions
and
59 deletions
+281
-59
sgl-router/Cargo.toml
sgl-router/Cargo.toml
+1
-1
sgl-router/src/server.rs
sgl-router/src/server.rs
+17
-15
sgl-router/src/service_discovery.rs
sgl-router/src/service_discovery.rs
+263
-43
No files found.
sgl-router/Cargo.toml
View file @
ff914748
...
...
@@ -21,7 +21,7 @@ serde_json = "1.0"
pyo3
=
{
version
=
"0.22.5"
,
features
=
["extension-module"]
}
dashmap
=
"6.1.0"
http
=
"1.1.0"
tokio
=
"1.42.0"
tokio
=
{
version
=
"1.42.0"
,
features
=
[
"macros"
,
"rt-multi-thread"
]
}
# Added for enhanced logging system
tracing
=
"0.1"
tracing-subscriber
=
{
version
=
"0.3"
,
features
=
[
"env-filter"
,
"json"
,
"chrono"
]
}
...
...
sgl-router/src/server.rs
View file @
ff914748
...
...
@@ -18,7 +18,7 @@ use tracing::{error, info, warn, Level};
#[derive(Debug)]
pub
struct
AppState
{
router
:
Router
,
router
:
Arc
<
Router
>
,
client
:
Client
,
}
...
...
@@ -29,7 +29,7 @@ impl AppState {
policy_config
:
PolicyConfig
,
)
->
Result
<
Self
,
String
>
{
// Create router based on policy
let
router
=
Router
::
new
(
worker_urls
,
policy_config
)
?
;
let
router
=
Arc
::
new
(
Router
::
new
(
worker_urls
,
policy_config
)
?
)
;
Ok
(
Self
{
router
,
client
})
}
}
...
...
@@ -218,24 +218,23 @@ pub async fn startup(config: ServerConfig) -> std::io::Result<()> {
.build
()
.expect
(
"Failed to create HTTP client"
);
let
app_state
=
web
::
D
at
a
::
new
(
AppState
::
new
(
config
.worker_urls
.clone
(),
client
.clone
(),
// Clone the client here
config
.policy_config
.clone
(),
)
.map_err
(|
e
|
std
::
io
::
Error
::
new
(
std
::
io
::
ErrorKind
::
Other
,
e
))
?
,
);
let
app_state
_init
=
AppSt
at
e
::
new
(
config
.worker_urls
.clone
(),
client
.clone
(),
config
.policy_config
.clone
(),
)
.map_err
(|
e
|
std
::
io
::
Error
::
new
(
std
::
io
::
ErrorKind
::
Other
,
e
))
?
;
let
router_arc
=
Arc
::
clone
(
&
app_state_init
.router
);
let
app_state
=
web
::
Data
::
new
(
app_state_init
);
// Start the service discovery if enabled
if
let
Some
(
service_discovery_config
)
=
config
.service_discovery_config
{
if
service_discovery_config
.enabled
{
let
worker_urls
=
Arc
::
clone
(
&
app_state
.router
.get_worker_urls
()
);
match
start_service_discovery
(
service_discovery_config
,
worker_urls
)
.await
{
info!
(
"🚧 Initializing Kubernetes service discovery"
);
// Pass the Arc<Router> directly
match
start_service_discovery
(
service_discovery_config
,
router_arc
)
.await
{
Ok
(
handle
)
=>
{
info!
(
"✅ Service discovery started successfully"
);
// Spawn a task to handle the service discovery thread
spawn
(
async
move
{
if
let
Err
(
e
)
=
handle
.await
{
...
...
@@ -252,7 +251,10 @@ pub async fn startup(config: ServerConfig) -> std::io::Result<()> {
}
info!
(
"✅ Serving router on {}:{}"
,
config
.host
,
config
.port
);
info!
(
"✅ Serving workers on {:?}"
,
config
.worker_urls
);
info!
(
"✅ Serving workers on {:?}"
,
app_state
.router
.get_worker_urls
()
.read
()
.unwrap
()
);
HttpServer
::
new
(
move
||
{
App
::
new
()
...
...
sgl-router/src/service_discovery.rs
View file @
ff914748
use
crate
::
router
::
Router
;
use
futures
::{
StreamExt
,
TryStreamExt
};
use
k8s_openapi
::
api
::
core
::
v1
::
Pod
;
use
kube
::{
...
...
@@ -7,11 +9,12 @@ use kube::{
Client
,
};
use
std
::
collections
::{
HashMap
,
HashSet
};
use
std
::
sync
::{
Arc
,
Mutex
,
RwLock
};
use
std
::
sync
::{
Arc
,
Mutex
};
use
std
::
time
::
Duration
;
use
tokio
::
task
;
use
tokio
::
time
;
use
tracing
::{
error
,
info
,
warn
};
use
tracing
::{
debug
,
error
,
info
,
warn
};
/// Represents the service discovery configuration
#[derive(Debug,
Clone)]
...
...
@@ -81,7 +84,7 @@ impl PodInfo {
pub
async
fn
start_service_discovery
(
config
:
ServiceDiscoveryConfig
,
worker_urls
:
Arc
<
RwLock
<
Vec
<
String
>>
>
,
router
:
Arc
<
Router
>
,
)
->
Result
<
task
::
JoinHandle
<
()
>
,
kube
::
Error
>
{
// Don't initialize anything if service discovery is disabled
if
!
config
.enabled
{
...
...
@@ -136,7 +139,6 @@ pub async fn start_service_discovery(
// Clone Arcs for the closures
let
selector_clone
=
Arc
::
clone
(
&
selector
);
let
tracked_pods_clone
=
Arc
::
clone
(
&
tracked_pods
);
let
worker_urls_clone
=
Arc
::
clone
(
&
worker_urls
);
// Apply label selector filter separately since we can't do it directly with the watcher anymore
let
filtered_stream
=
watcher_stream
.filter_map
(
move
|
obj_res
|
{
...
...
@@ -164,12 +166,12 @@ pub async fn start_service_discovery(
// Clone again for the next closure
let
tracked_pods_clone2
=
Arc
::
clone
(
&
tracked_pods_clone
);
let
worker_urls
_clone
2
=
Arc
::
clone
(
&
worker_urls_clone
);
let
router
_clone
=
Arc
::
clone
(
&
router
);
match
filtered_stream
.try_for_each
(
move
|
pod
|
{
let
tracked_pods_inner
=
Arc
::
clone
(
&
tracked_pods_clone2
);
let
worker_urls
_inner
=
Arc
::
clone
(
&
worker_urls
_clone
2
);
let
router
_inner
=
Arc
::
clone
(
&
router
_clone
);
async
move
{
if
let
Some
(
pod_info
)
=
PodInfo
::
from_pod
(
&
pod
)
{
...
...
@@ -177,18 +179,13 @@ pub async fn start_service_discovery(
handle_pod_deletion
(
&
pod_info
,
tracked_pods_inner
,
worker_urls
_inner
,
router
_inner
,
port
,
)
.await
;
}
else
{
handle_pod_event
(
&
pod_info
,
tracked_pods_inner
,
worker_urls_inner
,
port
,
)
.await
;
handle_pod_event
(
&
pod_info
,
tracked_pods_inner
,
router_inner
,
port
)
.await
;
}
}
Ok
(())
...
...
@@ -219,7 +216,7 @@ pub async fn start_service_discovery(
async
fn
handle_pod_event
(
pod_info
:
&
PodInfo
,
tracked_pods
:
Arc
<
Mutex
<
HashSet
<
PodInfo
>>>
,
worker_urls
:
Arc
<
RwLock
<
Vec
<
String
>>
>
,
router
:
Arc
<
Router
>
,
port
:
u16
,
)
{
let
worker_url
=
pod_info
.worker_url
(
port
);
...
...
@@ -234,52 +231,275 @@ async fn handle_pod_event(
if
pod_info
.is_healthy
()
{
if
!
already_tracked
{
info!
(
"
Adding h
ealthy pod
{} ({}) as
worker"
,
pod_info
.name
,
pod_info
.ip
"
H
ealthy pod
found: {}. Adding
worker
: {}
"
,
pod_info
.name
,
worker_url
);
// Add URL to worker list
let
mut
urls
=
worker_urls
.write
()
.unwrap
();
if
!
urls
.contains
(
&
worker_url
)
{
urls
.push
(
worker_url
.clone
());
info!
(
"Added new worker URL: {}"
,
worker_url
);
match
router
.add_worker
(
&
worker_url
)
.await
{
Ok
(
msg
)
=>
{
info!
(
"Router add_worker: {}"
,
msg
);
let
mut
tracker
=
tracked_pods
.lock
()
.unwrap
();
tracker
.insert
(
pod_info
.clone
());
}
Err
(
e
)
=>
error!
(
"Failed to add worker {} to router: {}"
,
worker_url
,
e
),
}
// Track this pod
let
mut
tracker
=
tracked_pods
.lock
()
.unwrap
();
tracker
.insert
(
pod_info
.clone
());
}
}
else
if
already_tracked
{
// If pod was healthy before but not anymore, remove it
handle_pod_deletion
(
pod_info
,
tracked_pods
,
worker_urls
,
port
)
.await
;
handle_pod_deletion
(
pod_info
,
tracked_pods
,
router
,
port
)
.await
;
}
}
async
fn
handle_pod_deletion
(
pod_info
:
&
PodInfo
,
tracked_pods
:
Arc
<
Mutex
<
HashSet
<
PodInfo
>>>
,
worker_urls
:
Arc
<
RwLock
<
Vec
<
String
>>
>
,
router
:
Arc
<
Router
>
,
port
:
u16
,
)
{
let
worker_url
=
pod_info
.worker_url
(
port
);
let
mut
tracked
=
tracked_pods
.lock
()
.unwrap
();
// Remove the pod from our tracking
let
was_tracked
=
{
let
mut
tracker
=
tracked_pods
.lock
()
.unwrap
();
tracker
.remove
(
pod_info
)
};
if
was_tracked
{
if
tracked
.remove
(
pod_info
)
{
info!
(
"
Removing pod {} ({}) from
worker
s
"
,
pod_info
.name
,
pod_info
.ip
"
Pod deleted: {}. Removing
worker
: {}
"
,
pod_info
.name
,
worker_url
);
router
.remove_worker
(
&
worker_url
);
}
else
{
// This case might occur if a pod is deleted before it was ever marked healthy and added.
// Or if the event is duplicated. No action needed on the router if it wasn't tracked (and thus not added).
debug!
(
"Pod deletion event for untracked/already removed pod: {}. Worker URL: {}"
,
pod_info
.name
,
worker_url
);
}
}
// Remove URL from worker list
let
mut
urls
=
worker_urls
.write
()
.unwrap
();
if
let
Some
(
idx
)
=
urls
.iter
()
.position
(|
url
|
url
==
&
worker_url
)
{
urls
.remove
(
idx
);
info!
(
"Removed worker URL: {}"
,
worker_url
);
#[cfg(test)]
mod
tests
{
use
super
::
*
;
use
crate
::
router
::
Router
;
use
k8s_openapi
::
api
::
core
::
v1
::{
Pod
,
PodCondition
,
PodSpec
,
PodStatus
};
use
k8s_openapi
::
apimachinery
::
pkg
::
apis
::
meta
::
v1
::
ObjectMeta
;
use
k8s_openapi
::
apimachinery
::
pkg
::
apis
::
meta
::
v1
::
Time
;
use
std
::
sync
::
RwLock
;
// Helper function to create a Pod for testing PodInfo::from_pod
fn
create_k8s_pod
(
name
:
Option
<&
str
>
,
ip
:
Option
<&
str
>
,
phase
:
Option
<&
str
>
,
ready_status
:
Option
<&
str
>
,
deletion_timestamp
:
Option
<
Time
>
,
)
->
Pod
{
let
mut
pod
=
Pod
{
metadata
:
ObjectMeta
{
name
:
name
.map
(
String
::
from
),
deletion_timestamp
,
..
Default
::
default
()
},
spec
:
Some
(
PodSpec
::
default
()),
status
:
None
,
};
if
ip
.is_some
()
||
phase
.is_some
()
||
ready_status
.is_some
()
{
let
mut
pod_status
=
PodStatus
{
pod_ip
:
ip
.map
(
String
::
from
),
phase
:
phase
.map
(
String
::
from
),
conditions
:
None
,
..
Default
::
default
()
};
if
let
Some
(
status_str
)
=
ready_status
{
let
condition
=
PodCondition
{
type_
:
"Ready"
.to_string
(),
status
:
status_str
.to_string
(),
last_probe_time
:
None
,
last_transition_time
:
None
,
message
:
None
,
reason
:
None
,
};
pod_status
.conditions
=
Some
(
vec!
[
condition
]);
}
pod
.status
=
Some
(
pod_status
);
}
pod
}
// Helper to create a Router instance for testing event handlers
fn
create_test_router
()
->
Arc
<
Router
>
{
let
worker_urls
=
Arc
::
new
(
RwLock
::
new
(
Vec
::
new
()));
Arc
::
new
(
Router
::
Random
{
worker_urls
,
timeout_secs
:
5
,
interval_secs
:
1
,
})
}
#[test]
fn
test_service_discovery_config_default
()
{
let
config
=
ServiceDiscoveryConfig
::
default
();
assert
!
(
!
config
.enabled
);
assert
!
(
config
.selector
.is_empty
());
assert_eq!
(
config
.check_interval
,
Duration
::
from_secs
(
60
));
assert_eq!
(
config
.port
,
80
);
assert
!
(
config
.namespace
.is_none
());
}
#[test]
fn
test_pod_info_from_pod_valid
()
{
let
k8s_pod
=
create_k8s_pod
(
Some
(
"test-pod"
),
Some
(
"10.0.0.1"
),
Some
(
"Running"
),
Some
(
"True"
),
None
,
);
let
pod_info
=
PodInfo
::
from_pod
(
&
k8s_pod
)
.unwrap
();
assert_eq!
(
pod_info
.name
,
"test-pod"
);
assert_eq!
(
pod_info
.ip
,
"10.0.0.1"
);
assert_eq!
(
pod_info
.status
,
"Running"
);
assert
!
(
pod_info
.is_ready
);
}
#[test]
fn
test_pod_info_from_pod_not_ready
()
{
let
k8s_pod
=
create_k8s_pod
(
Some
(
"test-pod"
),
Some
(
"10.0.0.1"
),
Some
(
"Running"
),
Some
(
"False"
),
None
,
);
let
pod_info
=
PodInfo
::
from_pod
(
&
k8s_pod
)
.unwrap
();
assert
!
(
!
pod_info
.is_ready
);
}
#[test]
fn
test_pod_info_from_pod_no_conditions
()
{
let
k8s_pod
=
create_k8s_pod
(
Some
(
"test-pod"
),
Some
(
"10.0.0.1"
),
Some
(
"Running"
),
None
,
None
,
);
let
pod_info
=
PodInfo
::
from_pod
(
&
k8s_pod
)
.unwrap
();
assert
!
(
!
pod_info
.is_ready
);
}
#[test]
fn
test_pod_info_from_pod_missing_name
()
{
let
k8s_pod
=
create_k8s_pod
(
None
,
Some
(
"10.0.0.1"
),
Some
(
"Running"
),
Some
(
"True"
),
None
);
assert
!
(
PodInfo
::
from_pod
(
&
k8s_pod
)
.is_none
());
}
#[test]
fn
test_pod_info_from_pod_missing_ip
()
{
let
k8s_pod
=
create_k8s_pod
(
Some
(
"test-pod"
),
None
,
Some
(
"Running"
),
Some
(
"True"
),
None
);
assert
!
(
PodInfo
::
from_pod
(
&
k8s_pod
)
.is_none
());
}
#[test]
fn
test_pod_info_from_pod_missing_status_phase
()
{
let
k8s_pod
=
create_k8s_pod
(
Some
(
"test-pod"
),
Some
(
"10.0.0.1"
),
None
,
Some
(
"True"
),
None
);
let
pod_info
=
PodInfo
::
from_pod
(
&
k8s_pod
)
.unwrap
();
assert_eq!
(
pod_info
.status
,
"Unknown"
);
}
#[test]
fn
test_pod_info_from_pod_no_status_object
()
{
let
mut
k8s_pod
=
create_k8s_pod
(
Some
(
"test-pod"
),
None
,
None
,
None
,
None
);
k8s_pod
.status
=
None
;
assert
!
(
PodInfo
::
from_pod
(
&
k8s_pod
)
.is_none
());
}
#[test]
fn
test_pod_info_is_healthy
()
{
let
healthy_pod
=
PodInfo
{
name
:
"p1"
.into
(),
ip
:
"1.1.1.1"
.into
(),
status
:
"Running"
.into
(),
is_ready
:
true
,
};
assert
!
(
healthy_pod
.is_healthy
());
let
not_ready_pod
=
PodInfo
{
name
:
"p2"
.into
(),
ip
:
"1.1.1.2"
.into
(),
status
:
"Running"
.into
(),
is_ready
:
false
,
};
assert
!
(
!
not_ready_pod
.is_healthy
());
let
not_running_pod
=
PodInfo
{
name
:
"p3"
.into
(),
ip
:
"1.1.1.3"
.into
(),
status
:
"Pending"
.into
(),
is_ready
:
true
,
};
assert
!
(
!
not_running_pod
.is_healthy
());
}
#[test]
fn
test_pod_info_worker_url
()
{
let
pod_info
=
PodInfo
{
name
:
"p1"
.into
(),
ip
:
"1.2.3.4"
.into
(),
status
:
"Running"
.into
(),
is_ready
:
true
,
};
assert_eq!
(
pod_info
.worker_url
(
8080
),
"http://1.2.3.4:8080"
);
}
#[tokio::test]
async
fn
test_handle_pod_event_add_unhealthy_pod
()
{
let
router
=
create_test_router
();
let
tracked_pods
=
Arc
::
new
(
Mutex
::
new
(
HashSet
::
new
()));
let
pod_info
=
PodInfo
{
name
:
"pod1"
.into
(),
ip
:
"1.2.3.4"
.into
(),
status
:
"Pending"
.into
(),
is_ready
:
false
,
};
let
port
=
8080u16
;
handle_pod_event
(
&
pod_info
,
Arc
::
clone
(
&
tracked_pods
),
Arc
::
clone
(
&
router
),
port
,
)
.await
;
assert
!
(
!
tracked_pods
.lock
()
.unwrap
()
.contains
(
&
pod_info
));
assert
!
(
!
router
.get_worker_urls
()
.read
()
.unwrap
()
.contains
(
&
pod_info
.worker_url
(
port
)));
}
#[tokio::test]
async
fn
test_handle_pod_deletion_non_existing_pod
()
{
let
router
=
create_test_router
();
let
tracked_pods
=
Arc
::
new
(
Mutex
::
new
(
HashSet
::
new
()));
let
pod_info
=
PodInfo
{
name
:
"pod1"
.into
(),
ip
:
"1.2.3.4"
.into
(),
status
:
"Running"
.into
(),
is_ready
:
true
,
};
let
port
=
8080u16
;
handle_pod_deletion
(
&
pod_info
,
Arc
::
clone
(
&
tracked_pods
),
Arc
::
clone
(
&
router
),
port
,
)
.await
;
assert
!
(
tracked_pods
.lock
()
.unwrap
()
.is_empty
());
assert
!
(
router
.get_worker_urls
()
.read
()
.unwrap
()
.is_empty
());
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment