Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
af0ff07c
Unverified
Commit
af0ff07c
authored
Apr 14, 2026
by
Biswa Panda
Committed by
GitHub
Apr 14, 2026
Browse files
feat: remove enable_nats usage (#7265)
parent
f9839161
Changes
15
Hide whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
327 additions
and
82 deletions
+327
-82
components/src/dynamo/common/recv_forward_pass_metrics.py
components/src/dynamo/common/recv_forward_pass_metrics.py
+1
-5
components/src/dynamo/common/utils/runtime.py
components/src/dynamo/common/utils/runtime.py
+16
-9
components/src/dynamo/frontend/main.py
components/src/dynamo/frontend/main.py
+1
-19
components/src/dynamo/mocker/main.py
components/src/dynamo/mocker/main.py
+0
-1
components/src/dynamo/sglang/main.py
components/src/dynamo/sglang/main.py
+0
-1
components/src/dynamo/trtllm/main.py
components/src/dynamo/trtllm/main.py
+0
-1
components/src/dynamo/vllm/main.py
components/src/dynamo/vllm/main.py
+0
-1
components/src/dynamo/vllm/omni/main.py
components/src/dynamo/vllm/omni/main.py
+0
-1
examples/diffusers/worker.py
examples/diffusers/worker.py
+1
-1
lib/bindings/python/rust/lib.rs
lib/bindings/python/rust/lib.rs
+23
-10
lib/bindings/python/src/dynamo/_core.pyi
lib/bindings/python/src/dynamo/_core.pyi
+0
-4
lib/bindings/python/src/dynamo/runtime/__init__.py
lib/bindings/python/src/dynamo/runtime/__init__.py
+15
-23
lib/bindings/python/tests/test_deprecated_enable_nats.py
lib/bindings/python/tests/test_deprecated_enable_nats.py
+252
-0
lib/runtime/src/distributed.rs
lib/runtime/src/distributed.rs
+17
-3
tests/frontend/vllm_prepost_worker.py
tests/frontend/vllm_prepost_worker.py
+1
-3
No files found.
components/src/dynamo/common/recv_forward_pass_metrics.py
View file @
af0ff07c
...
@@ -137,11 +137,7 @@ async def run(args: argparse.Namespace) -> None:
...
@@ -137,11 +137,7 @@ async def run(args: argparse.Namespace) -> None:
from
dynamo.llm
import
FpmEventSubscriber
from
dynamo.llm
import
FpmEventSubscriber
loop
=
asyncio
.
get_running_loop
()
loop
=
asyncio
.
get_running_loop
()
event_plane
=
os
.
environ
.
get
(
"DYN_EVENT_PLANE"
,
"nats"
)
runtime
=
DistributedRuntime
(
loop
,
args
.
discovery_backend
,
args
.
request_plane
)
enable_nats
=
args
.
request_plane
==
"nats"
or
event_plane
==
"nats"
runtime
=
DistributedRuntime
(
loop
,
args
.
discovery_backend
,
args
.
request_plane
,
enable_nats
)
endpoint
=
runtime
.
endpoint
(
f
"
{
args
.
namespace
}
.
{
args
.
component
}
.
{
args
.
endpoint
}
"
)
endpoint
=
runtime
.
endpoint
(
f
"
{
args
.
namespace
}
.
{
args
.
component
}
.
{
args
.
endpoint
}
"
)
subscriber
=
FpmEventSubscriber
(
endpoint
)
subscriber
=
FpmEventSubscriber
(
endpoint
)
...
...
components/src/dynamo/common/utils/runtime.py
View file @
af0ff07c
...
@@ -13,7 +13,8 @@ Provides:
...
@@ -13,7 +13,8 @@ Provides:
import
asyncio
import
asyncio
import
os
import
os
from
typing
import
Tuple
import
warnings
from
typing
import
Optional
,
Tuple
from
dynamo.runtime
import
DistributedRuntime
from
dynamo.runtime
import
DistributedRuntime
...
@@ -46,29 +47,35 @@ def create_runtime(
...
@@ -46,29 +47,35 @@ def create_runtime(
discovery_backend
:
str
,
discovery_backend
:
str
,
request_plane
:
str
,
request_plane
:
str
,
event_plane
:
str
,
event_plane
:
str
,
use_kv_events
:
bool
,
use_kv_events
:
Optional
[
bool
]
=
None
,
)
->
Tuple
[
DistributedRuntime
,
asyncio
.
AbstractEventLoop
]:
)
->
Tuple
[
DistributedRuntime
,
asyncio
.
AbstractEventLoop
]:
"""Create a DistributedRuntime.
"""Create a DistributedRuntime.
Sets DYN_EVENT_PLANE in the environment, computes whether NATS is needed,
and creates the runtime.
Args:
Args:
discovery_backend: Discovery backend type (kubernetes, etcd, file, mem).
discovery_backend: Discovery backend type (kubernetes, etcd, file, mem).
request_plane: Request distribution method (nats, http, tcp).
request_plane: Request distribution method (nats, http, tcp).
event_plane: Event publishing method (nats, zmq).
event_plane: Event publishing method (nats, zmq).
use_kv_events: Whether KV events are enabled.
use_kv_events: Deprecated. NATS enablement is now determined automatically
from the event-plane configuration. This parameter is accepted for
backwards compatibility but will be removed in a future release.
Returns:
Returns:
Tuple of (runtime, event_loop).
Tuple of (runtime, event_loop).
"""
"""
if
use_kv_events
is
not
None
:
warnings
.
warn
(
"The 'use_kv_events' parameter is deprecated and will be removed in a "
"future release. NATS enablement is now determined automatically from "
"the event-plane configuration."
,
DeprecationWarning
,
stacklevel
=
2
,
)
loop
=
asyncio
.
get_running_loop
()
loop
=
asyncio
.
get_running_loop
()
os
.
environ
[
"DYN_EVENT_PLANE"
]
=
event_plane
os
.
environ
[
"DYN_EVENT_PLANE"
]
=
event_plane
enable_nats
=
request_plane
==
"nats"
or
(
event_plane
==
"nats"
and
use_kv_events
)
runtime
=
DistributedRuntime
(
loop
,
discovery_backend
,
request_plane
)
runtime
=
DistributedRuntime
(
loop
,
discovery_backend
,
request_plane
,
enable_nats
)
return
runtime
,
loop
return
runtime
,
loop
...
...
components/src/dynamo/frontend/main.py
View file @
af0ff07c
...
@@ -209,26 +209,8 @@ async def async_main():
...
@@ -209,26 +209,8 @@ async def async_main():
if
prefix
:
if
prefix
:
os
.
environ
[
"DYN_METRICS_PREFIX"
]
=
config
.
metrics_prefix
os
.
environ
[
"DYN_METRICS_PREFIX"
]
=
config
.
metrics_prefix
# NATS is needed when:
# 1. Request plane is NATS, OR
# 2. Durable KV events (JetStream) is explicitly requested, OR
# 3. Event plane is NATS AND KV router mode AND (KV events OR replica sync enabled)
# Note: NATS Core (without JetStream) is the default for KV events when durable_kv_events=False
enable_nats
=
config
.
request_plane
==
"nats"
or
(
config
.
router_mode
==
"kv"
and
(
config
.
durable_kv_events
or
(
config
.
event_plane
==
"nats"
and
(
config
.
use_kv_events
or
config
.
router_replica_sync
)
)
)
)
loop
=
asyncio
.
get_running_loop
()
loop
=
asyncio
.
get_running_loop
()
runtime
=
DistributedRuntime
(
runtime
=
DistributedRuntime
(
loop
,
config
.
discovery_backend
,
config
.
request_plane
)
loop
,
config
.
discovery_backend
,
config
.
request_plane
,
enable_nats
)
def
signal_handler
():
def
signal_handler
():
asyncio
.
create_task
(
graceful_shutdown
(
runtime
))
asyncio
.
create_task
(
graceful_shutdown
(
runtime
))
...
...
components/src/dynamo/mocker/main.py
View file @
af0ff07c
...
@@ -165,7 +165,6 @@ async def launch_workers(args: argparse.Namespace, base_engine_args):
...
@@ -165,7 +165,6 @@ async def launch_workers(args: argparse.Namespace, base_engine_args):
args
.
discovery_backend
,
args
.
discovery_backend
,
args
.
request_plane
,
args
.
request_plane
,
args
.
event_plane
,
args
.
event_plane
,
True
,
# statically set to True, just determines to enable_nats if event_plane is nats
)
)
runtimes
.
append
(
runtime
)
runtimes
.
append
(
runtime
)
...
...
components/src/dynamo/sglang/main.py
View file @
af0ff07c
...
@@ -61,7 +61,6 @@ async def worker():
...
@@ -61,7 +61,6 @@ async def worker():
discovery_backend
=
dynamo_args
.
discovery_backend
,
discovery_backend
=
dynamo_args
.
discovery_backend
,
request_plane
=
dynamo_args
.
request_plane
,
request_plane
=
dynamo_args
.
request_plane
,
event_plane
=
dynamo_args
.
event_plane
,
event_plane
=
dynamo_args
.
event_plane
,
use_kv_events
=
dynamo_args
.
use_kv_events
,
)
)
run_deferred_handlers
=
install_graceful_shutdown
(
run_deferred_handlers
=
install_graceful_shutdown
(
...
...
components/src/dynamo/trtllm/main.py
View file @
af0ff07c
...
@@ -24,7 +24,6 @@ async def worker():
...
@@ -24,7 +24,6 @@ async def worker():
discovery_backend
=
config
.
discovery_backend
,
discovery_backend
=
config
.
discovery_backend
,
request_plane
=
config
.
request_plane
,
request_plane
=
config
.
request_plane
,
event_plane
=
config
.
event_plane
,
event_plane
=
config
.
event_plane
,
use_kv_events
=
config
.
use_kv_events
,
)
)
install_signal_handlers
(
loop
,
runtime
,
shutdown_endpoints
,
shutdown_event
)
install_signal_handlers
(
loop
,
runtime
,
shutdown_endpoints
,
shutdown_event
)
...
...
components/src/dynamo/vllm/main.py
View file @
af0ff07c
...
@@ -162,7 +162,6 @@ async def worker() -> None:
...
@@ -162,7 +162,6 @@ async def worker() -> None:
discovery_backend
=
config
.
discovery_backend
,
discovery_backend
=
config
.
discovery_backend
,
request_plane
=
config
.
request_plane
,
request_plane
=
config
.
request_plane
,
event_plane
=
config
.
event_plane
,
event_plane
=
config
.
event_plane
,
use_kv_events
=
config
.
use_kv_events
,
)
)
# [gluo FIXME] should be after init() below? 'shutdown_endpoints' are populated
# [gluo FIXME] should be after init() below? 'shutdown_endpoints' are populated
...
...
components/src/dynamo/vllm/omni/main.py
View file @
af0ff07c
...
@@ -142,7 +142,6 @@ async def worker():
...
@@ -142,7 +142,6 @@ async def worker():
discovery_backend
=
config
.
discovery_backend
,
discovery_backend
=
config
.
discovery_backend
,
request_plane
=
config
.
request_plane
,
request_plane
=
config
.
request_plane
,
event_plane
=
config
.
event_plane
,
event_plane
=
config
.
event_plane
,
use_kv_events
=
False
,
)
)
install_signal_handlers
(
loop
,
runtime
,
shutdown_endpoints
,
shutdown_event
)
install_signal_handlers
(
loop
,
runtime
,
shutdown_endpoints
,
shutdown_event
)
...
...
examples/diffusers/worker.py
View file @
af0ff07c
...
@@ -468,7 +468,7 @@ async def main(args: argparse.Namespace) -> None:
...
@@ -468,7 +468,7 @@ async def main(args: argparse.Namespace) -> None:
)
)
logger
.
info
(
"Using discovery backend: %s"
,
discovery_backend
)
logger
.
info
(
"Using discovery backend: %s"
,
discovery_backend
)
logger
.
info
(
"Resolved worker namespace: %s"
,
_get_worker_namespace
())
logger
.
info
(
"Resolved worker namespace: %s"
,
_get_worker_namespace
())
runtime
=
DistributedRuntime
(
loop
,
discovery_backend
,
"tcp"
,
False
)
runtime
=
DistributedRuntime
(
loop
,
discovery_backend
,
"tcp"
)
await
backend_worker
(
runtime
,
args
)
await
backend_worker
(
runtime
,
args
)
...
...
lib/bindings/python/rust/lib.rs
View file @
af0ff07c
...
@@ -592,6 +592,20 @@ impl DistributedRuntime {
...
@@ -592,6 +592,20 @@ impl DistributedRuntime {
request_plane
:
String
,
request_plane
:
String
,
enable_nats
:
Option
<
bool
>
,
enable_nats
:
Option
<
bool
>
,
)
->
PyResult
<
Self
>
{
)
->
PyResult
<
Self
>
{
if
enable_nats
.is_some
()
{
Python
::
with_gil
(|
py
|
{
let
warnings
=
py
.import
(
"warnings"
)
?
;
warnings
.call_method1
(
"warn"
,
(
"The 'enable_nats' parameter is deprecated and will be removed in a future release. NATS enablement is now determined automatically from the event-plane configuration."
,
py
.import
(
"builtins"
)
?
.getattr
(
"DeprecationWarning"
)
?
,
2i32
,
// stacklevel
),
)
?
;
Ok
::
<
(),
PyErr
>
(())
})
?
;
}
let
discovery_backend_config
=
match
discovery_backend
.as_str
()
{
let
discovery_backend_config
=
match
discovery_backend
.as_str
()
{
"kubernetes"
=>
DiscoveryBackend
::
Kubernetes
,
"kubernetes"
=>
DiscoveryBackend
::
Kubernetes
,
other
=>
{
other
=>
{
...
@@ -629,19 +643,18 @@ impl DistributedRuntime {
...
@@ -629,19 +643,18 @@ impl DistributedRuntime {
});
});
}
}
// NATS is used for more than just the NATS request-plane:
let
event_plane_is_nats
=
// - KV router events (JetStream or NATS core + local indexer)
std
::
env
::
var
(
config
::
environment_names
::
event_plane
::
DYN_EVENT_PLANE
)
// - inter-router replica sync (NATS core)
.map
(|
v
|
v
.eq_ignore_ascii_case
(
"nats"
))
//
.unwrap_or
(
true
);
// NATS initialization logic:
// 1. If request_plane is NATS, always enable NATS
let
nats_enabled
=
request_plane
.is_nats
()
// 2. Otherwise, use enable_nats parameter (defaults to true for backward compat)
||
std
::
env
::
var
(
config
::
environment_names
::
nats
::
NATS_SERVER
)
.is_ok
()
// Pass false to disable NATS (e.g., for approximate KV routing mode)
||
event_plane_is_nats
;
let
enable_nats
=
enable_nats
.unwrap_or
(
true
);
// Default to true
let
runtime_config
=
DistributedConfig
{
let
runtime_config
=
DistributedConfig
{
discovery_backend
:
discovery_backend_config
,
discovery_backend
:
discovery_backend_config
,
nats_config
:
if
request_plane
.is_nats
()
||
enable
_nats
{
nats_config
:
if
nats_
enable
d
{
Some
(
dynamo_runtime
::
transports
::
nats
::
ClientOptions
::
default
())
Some
(
dynamo_runtime
::
transports
::
nats
::
ClientOptions
::
default
())
}
else
{
}
else
{
None
None
...
...
lib/bindings/python/src/dynamo/_core.pyi
View file @
af0ff07c
...
@@ -51,7 +51,6 @@ class DistributedRuntime:
...
@@ -51,7 +51,6 @@ class DistributedRuntime:
event_loop: Any,
event_loop: Any,
discovery_backend: str,
discovery_backend: str,
request_plane: str,
request_plane: str,
enable_nats: Optional[bool] = None,
) -> "DistributedRuntime":
) -> "DistributedRuntime":
"""
"""
Create a new DistributedRuntime.
Create a new DistributedRuntime.
...
@@ -60,9 +59,6 @@ class DistributedRuntime:
...
@@ -60,9 +59,6 @@ class DistributedRuntime:
event_loop: The asyncio event loop
event_loop: The asyncio event loop
discovery_backend: Discovery backend ("kubernetes", "etcd", "file", or "mem")
discovery_backend: Discovery backend ("kubernetes", "etcd", "file", or "mem")
request_plane: Request plane transport ("tcp", "http", or "nats")
request_plane: Request plane transport ("tcp", "http", or "nats")
enable_nats: Whether to enable NATS for KV events. Defaults to True.
If request_plane is "nats", NATS is always enabled.
Pass False to disable NATS initialization (e.g., for approximate routing).
"""
"""
...
...
...
...
lib/bindings/python/src/dynamo/runtime/__init__.py
View file @
af0ff07c
...
@@ -3,8 +3,9 @@
...
@@ -3,8 +3,9 @@
import
asyncio
import
asyncio
import
os
import
os
import
warnings
from
functools
import
wraps
from
functools
import
wraps
from
typing
import
Any
,
AsyncGenerator
,
Callable
,
Type
,
Union
from
typing
import
Any
,
AsyncGenerator
,
Callable
,
Optional
,
Type
,
Union
from
pydantic
import
BaseModel
,
ValidationError
from
pydantic
import
BaseModel
,
ValidationError
...
@@ -16,15 +17,23 @@ from dynamo._core import DistributedRuntime as DistributedRuntime
...
@@ -16,15 +17,23 @@ from dynamo._core import DistributedRuntime as DistributedRuntime
from
dynamo._core
import
Endpoint
as
Endpoint
from
dynamo._core
import
Endpoint
as
Endpoint
def
dynamo_worker
(
enable_nats
:
bool
=
Tru
e
):
def
dynamo_worker
(
enable_nats
:
Optional
[
bool
]
=
Non
e
):
"""
"""
Decorator that creates a DistributedRuntime and passes it to the worker function.
Decorator that creates a DistributedRuntime and passes it to the worker function.
Args:
Args:
enable_nats:
Whether to enable NATS for KV events. Defaults to True.
enable_nats:
Deprecated. NATS enablement is now determined automatically
If request_plane is "nats", NATS is always enabled.
from the event-plane configuration. This parameter is accepted for
Pass False (via --no-kv-events flag) to disable NATS initialization
.
backwards compatibility but will be removed in a future release
.
"""
"""
if
enable_nats
is
not
None
:
warnings
.
warn
(
"The 'enable_nats' parameter is deprecated and will be removed in a "
"future release. NATS enablement is now determined automatically from "
"the event-plane configuration."
,
DeprecationWarning
,
stacklevel
=
2
,
)
def
decorator
(
func
):
def
decorator
(
func
):
@
wraps
(
func
)
@
wraps
(
func
)
...
@@ -32,27 +41,10 @@ def dynamo_worker(enable_nats: bool = True):
...
@@ -32,27 +41,10 @@ def dynamo_worker(enable_nats: bool = True):
loop
=
asyncio
.
get_running_loop
()
loop
=
asyncio
.
get_running_loop
()
request_plane
=
os
.
environ
.
get
(
"DYN_REQUEST_PLANE"
,
"tcp"
)
request_plane
=
os
.
environ
.
get
(
"DYN_REQUEST_PLANE"
,
"tcp"
)
discovery_backend
=
os
.
environ
.
get
(
"DYN_DISCOVERY_BACKEND"
,
"etcd"
)
discovery_backend
=
os
.
environ
.
get
(
"DYN_DISCOVERY_BACKEND"
,
"etcd"
)
runtime
=
DistributedRuntime
(
runtime
=
DistributedRuntime
(
loop
,
discovery_backend
,
request_plane
)
loop
,
discovery_backend
,
request_plane
,
enable_nats
)
await
func
(
runtime
,
*
args
,
**
kwargs
)
await
func
(
runtime
,
*
args
,
**
kwargs
)
# # wait for one of
# # 1. the task to complete
# # 2. the task to be cancelled
# done, pending = await asyncio.wait({task, cancelled}, return_when=asyncio.FIRST_COMPLETED)
# # i want to catch a SIGINT or SIGTERM or a cancellation event here
# try:
# # Call the actual function
# return await func(runtime, *args, **kwargs)
# finally:
# print("Decorator: Cleaning up runtime resources")
# # Perform cleanup actions here
return
wrapper
return
wrapper
return
decorator
return
decorator
...
...
lib/bindings/python/tests/test_deprecated_enable_nats.py
0 → 100644
View file @
af0ff07c
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Test that deprecated parameters are still accepted and emit DeprecationWarnings:
- DistributedRuntime(enable_nats=...)
- @dynamo_worker(enable_nats=...)
- create_runtime(use_kv_events=...)
Downstream components may still pass these kwargs. We must keep accepting
them for N+1 backwards compatibility while steering callers toward the
new auto-detection behaviour.
"""
import
asyncio
import
inspect
import
warnings
from
unittest.mock
import
MagicMock
,
patch
import
pytest
from
dynamo._core
import
DistributedRuntime
from
dynamo.common.utils.runtime
import
create_runtime
from
dynamo.runtime
import
dynamo_worker
pytestmark
=
[
pytest
.
mark
.
gpu_0
,
pytest
.
mark
.
parallel
,
pytest
.
mark
.
pre_merge
,
pytest
.
mark
.
unit
,
]
# ---------------------------------------------------------------------------
# DistributedRuntime tests
# ---------------------------------------------------------------------------
def
test_enable_nats_parameter_in_signature
():
"""DistributedRuntime.__init__ should still accept enable_nats as an optional kwarg."""
sig
=
inspect
.
signature
(
DistributedRuntime
)
assert
"enable_nats"
in
sig
.
parameters
param
=
sig
.
parameters
[
"enable_nats"
]
assert
param
.
default
is
None
@
pytest
.
mark
.
forked
def
test_enable_nats_emits_deprecation_warning
(
discovery_backend
,
request_plane
):
"""Passing enable_nats should emit a DeprecationWarning but otherwise work.
Uses asyncio.run() instead of @pytest.mark.asyncio to avoid the
pytest-asyncio event-loop fixture being set up in the parent process before
the fork. That combination leaves dangling finalizers that break the next
test module's setup (pytest "previous item was not torn down properly").
"""
async
def
_run
():
loop
=
asyncio
.
get_running_loop
()
with
warnings
.
catch_warnings
(
record
=
True
)
as
caught
:
warnings
.
simplefilter
(
"always"
)
runtime
=
DistributedRuntime
(
loop
,
discovery_backend
,
request_plane
,
enable_nats
=
True
)
try
:
deprecation_warnings
=
[
w
for
w
in
caught
if
issubclass
(
w
.
category
,
DeprecationWarning
)
]
assert
len
(
deprecation_warnings
)
==
1
assert
"enable_nats"
in
str
(
deprecation_warnings
[
0
].
message
)
finally
:
runtime
.
shutdown
()
asyncio
.
run
(
_run
())
@
pytest
.
mark
.
forked
def
test_no_warning_without_enable_nats
(
discovery_backend
,
request_plane
):
"""Omitting enable_nats should not emit a DeprecationWarning.
Uses asyncio.run() — see test_enable_nats_emits_deprecation_warning docstring.
"""
async
def
_run
():
loop
=
asyncio
.
get_running_loop
()
with
warnings
.
catch_warnings
(
record
=
True
)
as
caught
:
warnings
.
simplefilter
(
"always"
)
runtime
=
DistributedRuntime
(
loop
,
discovery_backend
,
request_plane
)
try
:
deprecation_warnings
=
[
w
for
w
in
caught
if
issubclass
(
w
.
category
,
DeprecationWarning
)
]
assert
len
(
deprecation_warnings
)
==
0
finally
:
runtime
.
shutdown
()
asyncio
.
run
(
_run
())
# ---------------------------------------------------------------------------
# dynamo_worker() decorator tests
# ---------------------------------------------------------------------------
def
test_dynamo_worker_accepts_enable_nats_kwarg
():
"""dynamo_worker() should accept enable_nats as an optional keyword argument."""
sig
=
inspect
.
signature
(
dynamo_worker
)
assert
"enable_nats"
in
sig
.
parameters
param
=
sig
.
parameters
[
"enable_nats"
]
assert
param
.
default
is
None
def
test_dynamo_worker_enable_nats_true_emits_warning
():
"""@dynamo_worker(enable_nats=True) should emit a DeprecationWarning."""
with
warnings
.
catch_warnings
(
record
=
True
)
as
caught
:
warnings
.
simplefilter
(
"always"
)
decorator
=
dynamo_worker
(
enable_nats
=
True
)
deprecation_warnings
=
[
w
for
w
in
caught
if
issubclass
(
w
.
category
,
DeprecationWarning
)
]
assert
len
(
deprecation_warnings
)
==
1
assert
"enable_nats"
in
str
(
deprecation_warnings
[
0
].
message
)
assert
callable
(
decorator
)
def
test_dynamo_worker_enable_nats_false_emits_warning
():
"""@dynamo_worker(enable_nats=False) should also emit a DeprecationWarning."""
with
warnings
.
catch_warnings
(
record
=
True
)
as
caught
:
warnings
.
simplefilter
(
"always"
)
decorator
=
dynamo_worker
(
enable_nats
=
False
)
deprecation_warnings
=
[
w
for
w
in
caught
if
issubclass
(
w
.
category
,
DeprecationWarning
)
]
assert
len
(
deprecation_warnings
)
==
1
assert
"enable_nats"
in
str
(
deprecation_warnings
[
0
].
message
)
assert
callable
(
decorator
)
def
test_dynamo_worker_no_args_no_warning
():
"""@dynamo_worker() without enable_nats should not emit a DeprecationWarning."""
with
warnings
.
catch_warnings
(
record
=
True
)
as
caught
:
warnings
.
simplefilter
(
"always"
)
decorator
=
dynamo_worker
()
deprecation_warnings
=
[
w
for
w
in
caught
if
issubclass
(
w
.
category
,
DeprecationWarning
)
]
assert
len
(
deprecation_warnings
)
==
0
assert
callable
(
decorator
)
def
test_dynamo_worker_returns_working_decorator
():
"""The decorator returned by dynamo_worker(enable_nats=True) should wrap a function."""
with
warnings
.
catch_warnings
(
record
=
True
):
warnings
.
simplefilter
(
"always"
)
@
dynamo_worker
(
enable_nats
=
True
)
async
def
_sample_worker
(
runtime
,
*
args
,
**
kwargs
):
pass
assert
asyncio
.
iscoroutinefunction
(
_sample_worker
)
# ---------------------------------------------------------------------------
# create_runtime() tests (use_kv_events deprecation)
# ---------------------------------------------------------------------------
def
test_create_runtime_accepts_use_kv_events_kwarg
():
"""create_runtime() should accept use_kv_events as an optional keyword argument."""
sig
=
inspect
.
signature
(
create_runtime
)
assert
"use_kv_events"
in
sig
.
parameters
param
=
sig
.
parameters
[
"use_kv_events"
]
assert
param
.
default
is
None
@
patch
(
"dynamo.common.utils.runtime.DistributedRuntime"
)
def
test_create_runtime_use_kv_events_true_emits_warning
(
mock_runtime_cls
):
"""create_runtime(use_kv_events=True) should emit a DeprecationWarning.
Mocks DistributedRuntime to avoid spawning a real Tokio runtime. The
use_kv_events warning is emitted in pure Python before the constructor
runs, so this is safe and avoids @pytest.mark.forked which leaves stale
pytest SetupState entries that break the next test module.
"""
mock_runtime_cls
.
return_value
=
MagicMock
()
async
def
_run
():
with
warnings
.
catch_warnings
(
record
=
True
)
as
caught
:
warnings
.
simplefilter
(
"always"
)
runtime
,
loop
=
create_runtime
(
discovery_backend
=
"file"
,
request_plane
=
"tcp"
,
event_plane
=
"nats"
,
use_kv_events
=
True
,
)
deprecation_warnings
=
[
w
for
w
in
caught
if
issubclass
(
w
.
category
,
DeprecationWarning
)
]
assert
len
(
deprecation_warnings
)
==
1
assert
"use_kv_events"
in
str
(
deprecation_warnings
[
0
].
message
)
asyncio
.
run
(
_run
())
@
patch
(
"dynamo.common.utils.runtime.DistributedRuntime"
)
def
test_create_runtime_use_kv_events_false_emits_warning
(
mock_runtime_cls
):
"""create_runtime(use_kv_events=False) should also emit a DeprecationWarning.
See test_create_runtime_use_kv_events_true_emits_warning docstring.
"""
mock_runtime_cls
.
return_value
=
MagicMock
()
async
def
_run
():
with
warnings
.
catch_warnings
(
record
=
True
)
as
caught
:
warnings
.
simplefilter
(
"always"
)
runtime
,
loop
=
create_runtime
(
discovery_backend
=
"file"
,
request_plane
=
"tcp"
,
event_plane
=
"nats"
,
use_kv_events
=
False
,
)
deprecation_warnings
=
[
w
for
w
in
caught
if
issubclass
(
w
.
category
,
DeprecationWarning
)
]
assert
len
(
deprecation_warnings
)
==
1
assert
"use_kv_events"
in
str
(
deprecation_warnings
[
0
].
message
)
asyncio
.
run
(
_run
())
@
patch
(
"dynamo.common.utils.runtime.DistributedRuntime"
)
def
test_create_runtime_no_use_kv_events_no_warning
(
mock_runtime_cls
):
"""Omitting use_kv_events should not emit a DeprecationWarning.
See test_create_runtime_use_kv_events_true_emits_warning docstring.
"""
mock_runtime_cls
.
return_value
=
MagicMock
()
async
def
_run
():
with
warnings
.
catch_warnings
(
record
=
True
)
as
caught
:
warnings
.
simplefilter
(
"always"
)
runtime
,
loop
=
create_runtime
(
discovery_backend
=
"file"
,
request_plane
=
"tcp"
,
event_plane
=
"nats"
,
)
deprecation_warnings
=
[
w
for
w
in
caught
if
issubclass
(
w
.
category
,
DeprecationWarning
)
]
assert
len
(
deprecation_warnings
)
==
0
asyncio
.
run
(
_run
())
lib/runtime/src/distributed.rs
View file @
af0ff07c
...
@@ -581,9 +581,18 @@ impl DistributedConfig {
...
@@ -581,9 +581,18 @@ impl DistributedConfig {
//
//
// Historically we only connected to NATS when the request plane was NATS, which made
// Historically we only connected to NATS when the request plane was NATS, which made
// `DYN_REQUEST_PLANE=tcp|http` incompatible with KV routing modes that rely on NATS.
// `DYN_REQUEST_PLANE=tcp|http` incompatible with KV routing modes that rely on NATS.
// If a NATS server is configured via env, enable the client regardless of request plane.
// Enable the NATS client when any of these hold:
// 1. Request plane is NATS
// 2. NATS_SERVER is explicitly configured
// 3. Event plane is NATS (the default)
let
event_plane_is_nats
=
std
::
env
::
var
(
crate
::
config
::
environment_names
::
event_plane
::
DYN_EVENT_PLANE
)
.map
(|
v
|
v
.eq_ignore_ascii_case
(
"nats"
))
.unwrap_or
(
true
);
let
nats_enabled
=
request_plane
.is_nats
()
let
nats_enabled
=
request_plane
.is_nats
()
||
std
::
env
::
var
(
crate
::
config
::
environment_names
::
nats
::
NATS_SERVER
)
.is_ok
();
||
std
::
env
::
var
(
crate
::
config
::
environment_names
::
nats
::
NATS_SERVER
)
.is_ok
()
||
event_plane_is_nats
;
// DYN_DISCOVERY_BACKEND selects the discovery mechanism
// DYN_DISCOVERY_BACKEND selects the discovery mechanism
// Valid values: "kubernetes", "etcd" (default), "file", "mem"
// Valid values: "kubernetes", "etcd" (default), "file", "mem"
...
@@ -623,8 +632,13 @@ impl DistributedConfig {
...
@@ -623,8 +632,13 @@ impl DistributedConfig {
..
Default
::
default
()
..
Default
::
default
()
};
};
let
request_plane
=
RequestPlaneMode
::
from_env
();
let
request_plane
=
RequestPlaneMode
::
from_env
();
let
event_plane_is_nats
=
std
::
env
::
var
(
crate
::
config
::
environment_names
::
event_plane
::
DYN_EVENT_PLANE
)
.map
(|
v
|
v
.eq_ignore_ascii_case
(
"nats"
))
.unwrap_or
(
true
);
let
nats_enabled
=
request_plane
.is_nats
()
let
nats_enabled
=
request_plane
.is_nats
()
||
std
::
env
::
var
(
crate
::
config
::
environment_names
::
nats
::
NATS_SERVER
)
.is_ok
();
||
std
::
env
::
var
(
crate
::
config
::
environment_names
::
nats
::
NATS_SERVER
)
.is_ok
()
||
event_plane_is_nats
;
DistributedConfig
{
DistributedConfig
{
discovery_backend
:
DiscoveryBackend
::
KvStore
(
kv
::
Selector
::
Etcd
(
Box
::
new
(
etcd_config
))),
discovery_backend
:
DiscoveryBackend
::
KvStore
(
kv
::
Selector
::
Etcd
(
Box
::
new
(
etcd_config
))),
nats_config
:
if
nats_enabled
{
nats_config
:
if
nats_enabled
{
...
...
tests/frontend/vllm_prepost_worker.py
View file @
af0ff07c
...
@@ -67,9 +67,7 @@ class VllmPrepostTestHandler:
...
@@ -67,9 +67,7 @@ class VllmPrepostTestHandler:
async
def
main
():
async
def
main
():
"""Register a token-based chat model and stream deterministic responses."""
"""Register a token-based chat model and stream deterministic responses."""
runtime
=
DistributedRuntime
(
runtime
=
DistributedRuntime
(
asyncio
.
get_running_loop
(),
"etcd"
,
"tcp"
)
asyncio
.
get_running_loop
(),
"etcd"
,
"tcp"
,
enable_nats
=
False
)
endpoint
=
runtime
.
endpoint
(
"test.vllm-prepost.generate"
)
endpoint
=
runtime
.
endpoint
(
"test.vllm-prepost.generate"
)
await
register_model
(
await
register_model
(
ModelInput
.
Tokens
,
ModelInput
.
Tokens
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment