Commit f91d5488 authored by ishandhanani's avatar ishandhanani Committed by GitHub
Browse files

fix: clean unused bento pieces from serve.py and serving.py (#532)

parent df54b9cb
......@@ -36,7 +36,6 @@ def create_bentoml_cli() -> click.Command:
from dynamo.sdk.cli.run import run_command
from dynamo.sdk.cli.serve import serve_command
from dynamo.sdk.cli.server import cloud_command
from dynamo.sdk.cli.start import start_command
from dynamo.sdk.cli.utils import DynamoCommandGroup
# from dynamo.sdk.cli.cloud import cloud_command
......@@ -60,7 +59,6 @@ def create_bentoml_cli() -> click.Command:
# Add top-level CLI commands
bentoml_cli.add_command(cloud_command)
bentoml_cli.add_single_command(bento_command, "build")
bentoml_cli.add_subcommands(start_command)
bentoml_cli.add_subcommands(serve_command)
bentoml_cli.add_subcommands(run_command)
# bentoml_cli.add_command(deploy_command)
......
......@@ -148,7 +148,7 @@ def build_deploy_command() -> click.Command:
- a tag to a Bento in local Bento store
- a path to a built Bento
"""
from bentoml._internal.log import configure_server_logging
from dynamo.sdk.lib.logging import configure_server_logging
configure_server_logging()
......
......@@ -40,7 +40,7 @@ from rich.syntax import Syntax
from rich.table import Table
from simple_di import Provide, inject
logger = logging.getLogger("dynamo.deployment")
logger = logging.getLogger(__name__)
if t.TYPE_CHECKING:
from bentoml._internal.cloud import BentoCloudClient
......
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
......@@ -12,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from __future__ import annotations
......@@ -33,36 +35,6 @@ if t.TYPE_CHECKING:
logger = logging.getLogger(__name__)
DEFAULT_DEV_SERVER_HOST = "127.0.0.1"
def deprecated_option(*param_decls: str, **attrs: t.Any):
"""Marks a given options as deprecated, and omit a warning when it's used"""
deprecated = attrs.pop("deprecated", True)
new_behaviour = attrs.pop("current_behaviour", None)
assert new_behaviour is not None, "current_behaviour is required"
def show_deprecated_callback(
ctx: click.Context, param: click.Parameter, value: t.Any
):
if value is not param.default and deprecated:
name = "'--%(name)s'" if attrs.get("is_flag", False) else "'%(name)s'"
DEPRECATION_WARNING = f"[yellow]DeprecationWarning: The parameter {name} is deprecated and will be removed in the future. (Current behaviour: %(new_behaviour)s)[/]"
rich.print(
DEPRECATION_WARNING
% {"name": param.name, "new_behaviour": new_behaviour},
file=sys.stderr,
)
def decorator(f: F[t.Any]) -> t.Callable[[F[t.Any]], click.Command]: # type: ignore
msg = attrs.pop("help", "")
msg += " (Deprecated)" if msg else "(Deprecated)"
attrs.setdefault("help", msg)
attrs.setdefault("callback", show_deprecated_callback)
return click.option(*param_decls, **attrs)(f)
return decorator
def _parse_service_arg(arg_name: str, arg_value: str) -> tuple[str, str, t.Any]:
"""Parse a single CLI argument into service name, key, and value."""
......@@ -155,9 +127,7 @@ def _parse_service_args(args: list[str]) -> t.Dict[str, t.Any]:
def build_serve_command() -> click.Group:
from bentoml._internal.log import configure_server_logging
from bentoml_cli.env_manager import env_manager
from bentoml_cli.utils import AliasCommand
from dynamo.sdk.lib.logging import configure_server_logging
@click.group(name="serve")
def cli():
......@@ -168,8 +138,6 @@ def build_serve_command() -> click.Group:
ignore_unknown_options=True,
allow_extra_args=True,
),
aliases=["serve-http"],
cls=AliasCommand,
)
@click.argument("bento", type=click.STRING, default=".")
@click.option(
......@@ -193,68 +161,21 @@ def build_serve_command() -> click.Group:
type=click.Path(exists=True),
help="Path to YAML config file for service configuration",
)
@click.option(
"--development",
type=click.BOOL,
help="Run the BentoServer in development mode",
is_flag=True,
default=False,
show_default=True,
)
@deprecated_option(
"--production",
type=click.BOOL,
help="Run BentoServer in production mode",
current_behaviour="This is enabled by default. To run in development mode, use '--development'.",
is_flag=True,
default=True,
show_default=False,
)
@click.option(
"-p",
"--port",
type=click.INT,
help="The port to listen on for the REST api server",
help="The port to listen on for the REST api server if you are not using a dynamo service",
envvar="BENTOML_PORT",
show_envvar=True,
)
@click.option(
"--host",
type=click.STRING,
help="The host to bind for the REST api server",
help="The host to bind for the REST api server if you are not using a dynamo service",
envvar="BENTOML_HOST",
show_envvar=True,
)
@click.option(
"--api-workers",
type=click.INT,
help="Specify the number of API server workers to start. Default to number of available CPU cores in production mode",
envvar="BENTOML_API_WORKERS",
show_envvar=True,
hidden=True,
)
@click.option(
"--timeout",
type=click.INT,
help="Specify the timeout (seconds) for API server and runners",
envvar="BENTOML_TIMEOUT",
hidden=True,
)
@click.option(
"--backlog",
type=click.INT,
help="The maximum number of pending connections.",
show_default=True,
hidden=True,
)
@click.option(
"--reload",
type=click.BOOL,
is_flag=True,
help="Reload Service when code changes detected",
default=False,
show_default=True,
)
@click.option(
"--working-dir",
type=click.Path(),
......@@ -262,69 +183,6 @@ def build_serve_command() -> click.Group:
default=None,
show_default=True,
)
@click.option(
"--ssl-certfile",
type=str,
help="SSL certificate file",
show_default=True,
hidden=True,
)
@click.option(
"--ssl-keyfile",
type=str,
help="SSL key file",
show_default=True,
hidden=True,
)
@click.option(
"--ssl-keyfile-password",
type=str,
help="SSL keyfile password",
show_default=True,
hidden=True,
)
@click.option(
"--ssl-version",
type=int,
help="SSL version to use (see stdlib 'ssl' module)",
show_default=True,
hidden=True,
)
@click.option(
"--ssl-cert-reqs",
type=int,
help="Whether client certificate is required (see stdlib 'ssl' module)",
show_default=True,
hidden=True,
)
@click.option(
"--ssl-ca-certs",
type=str,
help="CA certificates file",
show_default=True,
hidden=True,
)
@click.option(
"--ssl-ciphers",
type=str,
help="Ciphers to use (see stdlib 'ssl' module)",
show_default=True,
hidden=True,
)
@click.option(
"--timeout-keep-alive",
type=int,
help="Close Keep-Alive connections if no new data is received within this timeout.",
hidden=True,
)
@click.option(
"--timeout-graceful-shutdown",
type=int,
default=None,
help="Maximum number of seconds to wait for graceful shutdown. After this timeout, the server will start terminating requests.",
show_default=True,
hidden=True,
)
@click.option(
"--dry-run",
is_flag=True,
......@@ -332,70 +190,20 @@ def build_serve_command() -> click.Group:
default=False,
)
@click.pass_context
@env_manager
def serve(
ctx: click.Context,
bento: str,
service_name: str,
depends: Optional[list[str]],
dry_run: bool,
development: bool,
port: int,
host: str,
file: str | None,
api_workers: int,
timeout: int | None,
backlog: int,
reload: bool,
working_dir: str | None,
ssl_certfile: str | None,
ssl_keyfile: str | None,
ssl_keyfile_password: str | None,
ssl_version: int | None,
ssl_cert_reqs: int | None,
ssl_ca_certs: str | None,
ssl_ciphers: str | None,
timeout_keep_alive: int | None,
timeout_graceful_shutdown: int | None,
**attrs: t.Any,
) -> None:
"""Locally run connected Dynamo services
\b
You can also pass service-specific configuration options using --ServiceName.param=value format.
\b
BENTO is the serving target, it can be the import as:
- the import path of a 'bentoml.Service' instance
- a tag to a Bento in local Bento store
- a folder containing a valid 'bentofile.yaml' build file with a 'service' field, which provides the import path of a 'bentoml.Service' instance
- a path to a built Bento (for internal & debug use only)
e.g.:
\b
Serve from a bentoml.Service instance source code (for development use only):
'bentoml serve fraud_detector.py:svc'
\b
Serve from a Bento built in local store:
'bentoml serve fraud_detector:4tht2icroji6zput3suqi5nl2'
'bentoml serve fraud_detector:latest'
\b
Serve from a Bento directory:
'bentoml serve ./fraud_detector_bento'
\b
If '--reload' is provided, BentoML will detect code and model store changes during development, and restarts the service automatically.
\b
The '--reload' flag will:
- be default, all file changes under '--working-dir' (default to current directory) will trigger a restart
- when specified, respect 'include' and 'exclude' under 'bentofile.yaml' as well as the '.bentoignore' file in '--working-dir', for code and file changes
- all model store changes will also trigger a restart (new model saved or existing model removed)
"""
from bentoml import Service
"""Locally run connected Dynamo services. You can pass service-specific configuration options using --ServiceName.param=value format."""
# WARNING: internal
from bentoml._internal.service.loader import load
from dynamo.sdk.lib.service import LinkedServices
......@@ -450,54 +258,7 @@ def build_serve_command() -> click.Group:
svc = load(bento_identifier=bento, working_dir=working_dir)
LinkedServices.remove_unused_edges()
if isinstance(svc, Service):
# bentoml<1.2
from bentoml.serving import serve_http_production
if development:
serve_http_production(
bento,
working_dir=working_dir,
port=port,
host=DEFAULT_DEV_SERVER_HOST if not host else host,
backlog=backlog,
api_workers=1,
timeout=timeout,
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
ssl_keyfile_password=ssl_keyfile_password,
ssl_version=ssl_version,
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
ssl_ciphers=ssl_ciphers,
reload=reload,
development_mode=True,
timeout_keep_alive=timeout_keep_alive,
timeout_graceful_shutdown=timeout_graceful_shutdown,
)
else:
serve_http_production(
bento,
working_dir=working_dir,
port=port,
host=host,
api_workers=api_workers,
timeout=timeout,
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
ssl_keyfile_password=ssl_keyfile_password,
ssl_version=ssl_version,
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
ssl_ciphers=ssl_ciphers,
reload=reload,
development_mode=False,
timeout_keep_alive=timeout_keep_alive,
timeout_graceful_shutdown=timeout_graceful_shutdown,
)
else:
# bentoml>=1.2
# from _bentoml_impl.server import serve_http
from dynamo.sdk.cli.serving import serve_http # type: ignore
svc.inject_config()
......@@ -506,19 +267,6 @@ def build_serve_command() -> click.Group:
working_dir=working_dir,
host=host,
port=port,
backlog=backlog,
timeout=timeout,
ssl_certfile=ssl_certfile,
ssl_keyfile=ssl_keyfile,
ssl_keyfile_password=ssl_keyfile_password,
ssl_version=ssl_version,
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
ssl_ciphers=ssl_ciphers,
development_mode=development,
reload=reload,
timeout_keep_alive=timeout_keep_alive,
timeout_graceful_shutdown=timeout_graceful_shutdown,
dependency_map=runner_map_dict,
service_name=service_name,
)
......
......@@ -22,8 +22,6 @@ import inspect
import json
import logging
import os
import random
import string
import typing as t
from typing import Any
......@@ -34,13 +32,7 @@ from dynamo.runtime import DistributedRuntime, dynamo_endpoint, dynamo_worker
from dynamo.sdk import dynamo_context
from dynamo.sdk.lib.service import LinkedServices
logger = logging.getLogger("dynamo.sdk.serve.dynamo")
logger.setLevel(logging.INFO)
def generate_run_id():
"""Generate a random 6-character run ID"""
return "".join(random.choices(string.ascii_uppercase + string.digits, k=6))
logger = logging.getLogger(__name__)
@click.command()
......@@ -73,9 +65,10 @@ def main(
from _bentoml_impl.loader import import_service
from bentoml._internal.container import BentoMLContainer
from bentoml._internal.context import server_context
from bentoml._internal.log import configure_server_logging
run_id = generate_run_id()
from dynamo.sdk.lib.logging import configure_server_logging
run_id = service_name
dynamo_context["service_name"] = service_name
dynamo_context["runner_map"] = runner_map
dynamo_context["worker_id"] = worker_id
......@@ -136,7 +129,7 @@ def main(
# Set runtime on all dependencies
for dep in service.dependencies.values():
dep.set_runtime(runtime)
logger.info(f"[{run_id}] Set runtime for dependency: {dep}")
logger.debug(f"[{run_id}] Set runtime for dependency: {dep}")
# Then register all Dynamo endpoints
dynamo_endpoints = service.get_dynamo_endpoints()
......@@ -148,7 +141,7 @@ def main(
endpoints = []
for name, endpoint in dynamo_endpoints.items():
td_endpoint = component.endpoint(name)
logger.info(f"[{run_id}] Registering endpoint '{name}'")
logger.debug(f"[{run_id}] Registering endpoint '{name}'")
endpoints.append(td_endpoint)
# Bind an instance of inner to the endpoint
dynamo_context["component"] = component
......@@ -169,12 +162,12 @@ def main(
if callable(member) and getattr(
member, "__bentoml_startup_hook__", False
):
logger.info(f"[{run_id}] Running startup hook: {name}")
logger.debug(f"[{run_id}] Running startup hook: {name}")
result = getattr(class_instance, name)()
if inspect.isawaitable(result):
# await on startup hook async_on_start
await result
logger.info(
logger.debug(
f"[{run_id}] Completed async startup hook: {name}"
)
else:
......
......@@ -30,8 +30,13 @@ import tempfile
import typing as t
from typing import Any, Dict, Optional, Protocol, TypeVar
# WARNING: internal
from _bentoml_sdk import Service
# WARNING: internal
from bentoml._internal.container import BentoMLContainer
# WARNING: internal
from bentoml._internal.utils.circus import Server
from bentoml.exceptions import BentoMLConfigException
from circus.sockets import CircusSocket
......@@ -39,6 +44,7 @@ from circus.watcher import Watcher
from simple_di import Provide, inject
from .allocator import ResourceAllocator
from .utils import path_to_uri, reserve_free_port
# Define a Protocol for services to ensure type safety
......@@ -61,7 +67,7 @@ IS_WSL = "microsoft-standard" in platform.release()
API_SERVER_NAME = "_bento_api_server"
MAX_AF_UNIX_PATH_LENGTH = 103
logger = logging.getLogger("bentoml.serve")
logger = logging.getLogger(__name__)
if POSIX and not IS_WSL:
......@@ -69,15 +75,13 @@ if POSIX and not IS_WSL:
service: ServiceProtocol,
uds_path: str,
port_stack: contextlib.ExitStack,
backlog: int,
) -> tuple[str, CircusSocket]:
from bentoml._internal.utils.uri import path_to_uri
from circus.sockets import CircusSocket
socket_path = os.path.join(uds_path, f"{id(service)}.sock")
assert len(socket_path) < MAX_AF_UNIX_PATH_LENGTH
return path_to_uri(socket_path), CircusSocket(
name=service.name, path=socket_path, backlog=backlog
name=service.name, path=socket_path
)
elif WINDOWS or IS_WSL:
......@@ -86,9 +90,7 @@ elif WINDOWS or IS_WSL:
service: ServiceProtocol,
uds_path: str,
port_stack: contextlib.ExitStack,
backlog: int,
) -> tuple[str, CircusSocket]:
from bentoml._internal.utils import reserve_free_port
from circus.sockets import CircusSocket
runner_port = port_stack.enter_context(reserve_free_port())
......@@ -98,7 +100,6 @@ elif WINDOWS or IS_WSL:
name=service.name,
host=runner_host,
port=runner_port,
backlog=backlog,
)
else:
......@@ -107,13 +108,13 @@ else:
service: ServiceProtocol,
uds_path: str,
port_stack: contextlib.ExitStack,
backlog: int,
) -> tuple[str, CircusSocket]:
from bentoml.exceptions import BentoMLException
raise BentoMLException("Unsupported platform")
# WARNING: internal
_SERVICE_WORKER_SCRIPT = "_bentoml_impl.worker.service"
......@@ -122,7 +123,6 @@ def create_dependency_watcher(
svc: ServiceProtocol,
uds_path: str,
port_stack: contextlib.ExitStack,
backlog: int,
scheduler: ResourceAllocator,
working_dir: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
......@@ -130,7 +130,7 @@ def create_dependency_watcher(
from bentoml.serving import create_watcher
num_workers, resource_envs = scheduler.get_resource_envs(svc)
uri, socket = _get_server_socket(svc, uds_path, port_stack, backlog)
uri, socket = _get_server_socket(svc, uds_path, port_stack)
args = [
"-m",
_SERVICE_WORKER_SCRIPT,
......@@ -161,7 +161,6 @@ def create_dynamo_watcher(
svc: ServiceProtocol,
uds_path: str,
port_stack: contextlib.ExitStack,
backlog: int,
scheduler: ResourceAllocator,
working_dir: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
......@@ -170,7 +169,7 @@ def create_dynamo_watcher(
from bentoml.serving import create_watcher
# Get socket for this service
uri, socket = _get_server_socket(svc, uds_path, port_stack, backlog)
uri, socket = _get_server_socket(svc, uds_path, port_stack)
# Get worker configuration
num_workers, resource_envs = scheduler.get_resource_envs(svc)
......@@ -253,42 +252,21 @@ def serve_http(
working_dir: str | None = None,
host: str = Provide[BentoMLContainer.http.host],
port: int = Provide[BentoMLContainer.http.port],
backlog: int = Provide[BentoMLContainer.api_server_config.backlog],
timeout: int | None = None,
ssl_certfile: str | None = Provide[BentoMLContainer.ssl.certfile],
ssl_keyfile: str | None = Provide[BentoMLContainer.ssl.keyfile],
ssl_keyfile_password: str | None = Provide[BentoMLContainer.ssl.keyfile_password],
ssl_version: int | None = Provide[BentoMLContainer.ssl.version],
ssl_cert_reqs: int | None = Provide[BentoMLContainer.ssl.cert_reqs],
ssl_ca_certs: str | None = Provide[BentoMLContainer.ssl.ca_certs],
ssl_ciphers: str | None = Provide[BentoMLContainer.ssl.ciphers],
bentoml_home: str = Provide[BentoMLContainer.bentoml_home],
development_mode: bool = False,
reload: bool = False,
timeout_keep_alive: int | None = None,
timeout_graceful_shutdown: int | None = None,
dependency_map: dict[str, str] | None = None,
service_name: str = "",
threaded: bool = False,
) -> Server:
from _bentoml_impl.loader import import_service, normalize_identifier
from bentoml._internal.log import SERVER_LOGGING_CONFIG
from bentoml._internal.utils import reserve_free_port
from bentoml._internal.utils.analytics.usage_stats import track_serve
# WARNING: internal
from _bentoml_impl.loader import load
# WARNING: internal
from bentoml._internal.utils.circus import create_standalone_arbiter
from bentoml.serving import (
construct_ssl_args,
construct_timeouts_args,
create_watcher,
ensure_prometheus_dir,
make_reload_plugin,
)
from bentoml.serving import create_watcher
from circus.sockets import CircusSocket
from .allocator import ResourceAllocator
bento_id: str = ""
env = {"PROMETHEUS_MULTIPROC_DIR": ensure_prometheus_dir()}
env: dict[str, Any] = {}
if isinstance(bento_identifier, Service):
svc = bento_identifier
bento_id = svc.import_string
......@@ -298,9 +276,9 @@ def serve_http(
# use cwd
bento_path = pathlib.Path(".")
else:
bento_id, bento_path = normalize_identifier(bento_identifier, working_dir)
svc = import_service(bento_id, bento_path)
svc = load(bento_identifier, working_dir)
bento_id = str(bento_identifier)
bento_path = pathlib.Path(working_dir or ".")
watchers: list[Watcher] = []
sockets: list[CircusSocket] = []
......@@ -311,8 +289,8 @@ def serve_http(
# TODO: Only for testing, this will prevent any other dep services from getting started, relying entirely on configured deps in the runner-map
standalone = False
if service_name:
print("Running in standalone mode")
print(f"service_name: {service_name}")
logger.info("Running in standalone mode")
logger.info(f"service_name: {service_name}")
standalone = True
if service_name and service_name != svc.name:
......@@ -321,7 +299,7 @@ def serve_http(
server_on_deployment(svc)
uds_path = tempfile.mkdtemp(prefix="bentoml-uds-")
try:
if not service_name and not development_mode and not standalone:
if not service_name and not standalone:
with contextlib.ExitStack() as port_stack:
for name, dep_svc in svc.all_services().items():
if name == svc.name:
......@@ -339,7 +317,6 @@ def serve_http(
dep_svc,
uds_path,
port_stack,
backlog,
allocator,
str(bento_path.absolute()),
env=env,
......@@ -351,7 +328,6 @@ def serve_http(
dep_svc,
uds_path,
port_stack,
backlog,
allocator,
str(bento_path.absolute()),
env=env,
......@@ -384,26 +360,8 @@ def serve_http(
host=host,
port=port,
family=family,
backlog=backlog,
)
)
if BentoMLContainer.ssl.enabled.get() and not ssl_certfile:
raise BentoMLConfigException("ssl_certfile is required when ssl is enabled")
ssl_args = construct_ssl_args(
ssl_certfile=ssl_certfile,
ssl_keyfile=ssl_keyfile,
ssl_keyfile_password=ssl_keyfile_password,
ssl_version=ssl_version,
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
ssl_ciphers=ssl_ciphers,
)
timeouts_args = construct_timeouts_args(
timeout_keep_alive=timeout_keep_alive,
timeout_graceful_shutdown=timeout_graceful_shutdown,
)
timeout_args = ["--timeout", str(timeout)] if timeout else []
server_args = [
"-m",
......@@ -413,20 +371,13 @@ def serve_http(
f"$(circus.sockets.{API_SERVER_NAME})",
"--service-name",
svc.name,
"--backlog",
str(backlog),
"--worker-id",
"$(CIRCUS.WID)",
*ssl_args,
*timeouts_args,
*timeout_args,
]
if resource_envs:
server_args.extend(["--worker-env", json.dumps(resource_envs)])
if development_mode:
server_args.append("--development-mode")
scheme = "https" if BentoMLContainer.ssl.enabled.get() else "http"
scheme = "http"
# Check if this is a Dynamo service
if hasattr(svc, "is_dynamo_component") and svc.is_dynamo_component():
......@@ -469,11 +420,10 @@ def serve_http(
args=args,
numprocesses=num_workers,
working_dir=str(bento_path.absolute()),
close_child_stdin=not development_mode,
env=worker_env, # Dependency map will be injected by serve_http
)
watchers.append(watcher)
print(f"dynamo_service_{svc.name} entrypoint created")
logger.info(f"dynamo_service_{svc.name} entrypoint created")
else:
# Create regular BentoML service watcher
watchers.append(
......@@ -482,7 +432,6 @@ def serve_http(
args=server_args,
working_dir=str(bento_path.absolute()),
numprocesses=num_workers,
close_child_stdin=not development_mode,
env=env,
)
)
......@@ -502,21 +451,9 @@ def serve_http(
arbiter_kwargs: dict[str, t.Any] = {
"watchers": watchers,
"sockets": sockets,
"threaded": threaded,
}
if reload:
reload_plugin = make_reload_plugin(str(bento_path.absolute()), bentoml_home)
arbiter_kwargs["plugins"] = [reload_plugin]
if development_mode:
arbiter_kwargs["debug"] = True
arbiter_kwargs["loggerconfig"] = SERVER_LOGGING_CONFIG
arbiter = create_standalone_arbiter(**arbiter_kwargs)
arbiter.exit_stack.enter_context(
track_serve(svc, production=not development_mode)
)
arbiter.exit_stack.callback(shutil.rmtree, uds_path, ignore_errors=True)
arbiter.start(
cb=lambda _: logger.info( # type: ignore
......
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
from __future__ import annotations
import json
import logging
import os
import sys
import typing as t
from typing import Optional
from urllib.parse import urlparse
import click
import rich
import yaml
from dynamo.sdk.cli.serve import _parse_service_args
logger = logging.getLogger(__name__)
def build_start_command() -> click.Group:
from bentoml._internal.utils import add_experimental_docstring
@click.group(name="start")
def cli():
pass
@cli.command(
context_settings=dict(
ignore_unknown_options=True,
allow_extra_args=True,
),
)
@click.argument("bento", type=click.STRING, default=".")
@click.option(
"--service-name",
type=click.STRING,
required=False,
default="",
envvar="BENTOML_SERVE_SERVICE_NAME",
help="specify the runner name to serve",
)
@click.option(
"-f",
"--file",
type=click.Path(exists=True),
help="Path to YAML config file for service configuration",
)
@click.option(
"--depends",
type=click.STRING,
multiple=True,
envvar="BENTOML_SERVE_DEPENDS",
help="list of runners map",
)
@click.option(
"--runner-map",
type=click.STRING,
envvar="BENTOML_SERVE_RUNNER_MAP",
help="[Deprecated] use --depends instead. "
"JSON string of runners map. For backword compatibility for yatai < 1.0.0",
)
@click.option(
"--bind",
type=click.STRING,
help="[Deprecated] use --host and --port instead."
"Bind address for the server. For backword compatibility for yatai < 1.0.0",
required=False,
)
@click.option(
"--port",
type=click.INT,
help="The port to listen on for the REST api server",
envvar="BENTOML_PORT",
show_envvar=True,
)
@click.option(
"--host",
type=click.STRING,
help="The host to bind for the REST api server [defaults: 127.0.0.1(dev), 0.0.0.0(production)]",
show_envvar="BENTOML_HOST",
)
@click.option(
"--backlog",
type=click.INT,
help="The maximum number of pending connections.",
show_envvar=True,
)
@click.option(
"--api-workers",
type=click.INT,
help="Specify the number of API server workers to start. Default to number of available CPU cores in production mode",
envvar="BENTOML_API_WORKERS",
)
@click.option(
"--timeout",
type=click.INT,
help="Specify the timeout (seconds) for API server",
envvar="BENTOML_TIMEOUT",
)
@click.option(
"--working-dir",
type=click.Path(),
help="When loading from source code, specify the directory to find the Service instance",
default=None,
show_default=True,
)
@click.option("--ssl-certfile", type=str, help="SSL certificate file")
@click.option("--ssl-keyfile", type=str, help="SSL key file")
@click.option("--ssl-keyfile-password", type=str, help="SSL keyfile password")
@click.option(
"--ssl-version", type=int, help="SSL version to use (see stdlib 'ssl' module)"
)
@click.option(
"--ssl-cert-reqs",
type=int,
help="Whether client certificate is required (see stdlib 'ssl' module)",
)
@click.option("--ssl-ca-certs", type=str, help="CA certificates file")
@click.option(
"--ssl-ciphers", type=str, help="Ciphers to use (see stdlib 'ssl' module)"
)
@click.option(
"--timeout-keep-alive",
type=int,
help="Close Keep-Alive connections if no new data is received within this timeout.",
)
@click.option(
"--timeout-graceful-shutdown",
type=int,
default=None,
help="Maximum number of seconds to wait for graceful shutdown. After this timeout, the server will start terminating requests.",
)
@click.option(
"--reload",
is_flag=True,
help="Reload Service when code changes detected",
default=False,
)
@click.option(
"--dry-run",
is_flag=True,
help="Print the final service configuration and exit without starting the server",
default=False,
)
@click.pass_context
@add_experimental_docstring
def start(
ctx: click.Context,
bento: str,
service_name: str,
dry_run: bool,
depends: Optional[list[str]],
runner_map: Optional[str],
bind: Optional[str],
port: Optional[int],
host: Optional[str],
file: str | None,
backlog: Optional[int],
working_dir: Optional[str],
api_workers: Optional[int],
timeout: Optional[int],
ssl_certfile: Optional[str],
ssl_keyfile: Optional[str],
ssl_keyfile_password: Optional[str],
ssl_version: Optional[int],
ssl_cert_reqs: Optional[int],
ssl_ca_certs: Optional[str],
ssl_ciphers: Optional[str],
timeout_keep_alive: Optional[int],
timeout_graceful_shutdown: Optional[int],
reload: bool = False,
) -> None:
"""
Start a single Dynamo service. This will be used inside Yatai.
"""
from bentoml import Service
from bentoml._internal.service.loader import load
service_configs: dict[str, dict[str, t.Any]] = {}
# Load file if provided
if file:
with open(file) as f:
yaml_configs = yaml.safe_load(f)
# Initialize service_configs as empty dict if it's None
# Convert nested YAML structure to flat dict with dot notation
for service, configs in yaml_configs.items():
for key, value in configs.items():
if service not in service_configs:
service_configs[service] = {}
service_configs[service][key] = value
# Process service-specific options
cmdline_overrides: t.Dict[str, t.Any] = _parse_service_args(ctx.args)
for service, configs in cmdline_overrides.items():
for key, value in configs.items():
if service not in service_configs:
service_configs[service] = {}
service_configs[service][key] = value
if dry_run:
rich.print("[bold]Service Configuration:[/bold]")
rich.print(json.dumps(service_configs, indent=2))
rich.print("\n[bold]Environment Variable that would be set:[/bold]")
rich.print(f"DYNAMO_SERVICE_CONFIG={json.dumps(service_configs)}")
sys.exit(0)
# Set environment variable with service configuration
if service_configs:
os.environ["DYNAMO_SERVICE_CONFIG"] = json.dumps(service_configs)
if working_dir is None:
if os.path.isdir(os.path.expanduser(bento)):
working_dir = os.path.expanduser(bento)
else:
working_dir = "."
if sys.path[0] != working_dir:
sys.path.insert(0, working_dir)
if depends:
runner_map_dict = dict([s.split("=", maxsplit=2) for s in depends or []])
elif runner_map:
runner_map_dict = json.loads(runner_map)
else:
runner_map_dict = {}
if bind is not None:
parsed = urlparse(bind)
assert parsed.scheme == "tcp"
host = parsed.hostname or host
port = parsed.port or port
svc = load(bento, working_dir=working_dir)
if isinstance(svc, Service):
if reload:
logger.warning("--reload does not work with legacy style services")
# for <1.2 bentos
if not service_name or service_name == svc.name:
from bentoml.start import start_http_server
for dep in depends or []:
rich.print(f"Using remote: {dep}")
start_http_server(
bento,
runner_map=runner_map_dict,
working_dir=working_dir,
port=port,
host=host,
backlog=backlog,
api_workers=api_workers or 1,
timeout=timeout,
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
ssl_keyfile_password=ssl_keyfile_password,
ssl_version=ssl_version,
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
ssl_ciphers=ssl_ciphers,
timeout_keep_alive=timeout_keep_alive,
timeout_graceful_shutdown=timeout_graceful_shutdown,
)
else:
from bentoml.start import start_runner_server
if bind is not None:
parsed = urlparse(bind)
assert parsed.scheme == "tcp"
host = parsed.hostname or host
port = parsed.port or port
start_runner_server(
bento,
runner_name=service_name,
working_dir=working_dir,
timeout=timeout,
port=port,
host=host,
backlog=backlog,
)
else:
# for >=1.2 bentos
from dynamo.sdk.cli.serving import serve_http
print(f"Starting service {service_name}")
svc.inject_config()
serve_http(
bento,
working_dir=working_dir,
port=port,
host=host,
backlog=backlog,
timeout=timeout,
ssl_keyfile=ssl_keyfile,
ssl_certfile=ssl_certfile,
ssl_keyfile_password=ssl_keyfile_password,
ssl_version=ssl_version,
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
ssl_ciphers=ssl_ciphers,
timeout_keep_alive=timeout_keep_alive,
timeout_graceful_shutdown=timeout_graceful_shutdown,
dependency_map=runner_map_dict,
service_name=service_name,
reload=reload,
)
return cli
start_command = build_start_command()
# SPDX-FileCopyrightText: Copyright (c) 2020 Atalaya Tech. Inc
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# #
# http://www.apache.org/licenses/LICENSE-2.0
#
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES
import contextlib
import os
import pathlib
import random
import socket
import typing as t
import click
import psutil
from click import Command, Context
......@@ -73,3 +81,71 @@ class DynamoCommandGroup(click.Group):
raise ValueError(f"Command '{command_name}' not found in group")
self.add_command(cmd, command_name)
@contextlib.contextmanager
def reserve_free_port(
host: str = "localhost",
port: int | None = None,
prefix: t.Optional[str] = None,
max_retry: int = 50,
enable_so_reuseport: bool = False,
) -> t.Iterator[int]:
"""
detect free port and reserve until exit the context
"""
import psutil
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if enable_so_reuseport:
if psutil.WINDOWS:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
elif psutil.MACOS or psutil.FREEBSD:
sock.setsockopt(socket.SOL_SOCKET, 0x10000, 1) # SO_REUSEPORT_LB
else:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
raise RuntimeError("Failed to set SO_REUSEPORT.") from None
if prefix is not None:
prefix_num = int(prefix) * 10 ** (5 - len(prefix))
suffix_range = min(65535 - prefix_num, 10 ** (5 - len(prefix)))
for _ in range(max_retry):
suffix = random.randint(0, suffix_range)
port = int(f"{prefix_num + suffix}")
try:
sock.bind((host, port))
break
except OSError:
continue
else:
raise RuntimeError(
f"Cannot find free port with prefix {prefix} after {max_retry} retries."
) from None
else:
if port:
sock.bind((host, port))
else:
sock.bind((host, 0))
try:
yield sock.getsockname()[1]
finally:
sock.close()
def path_to_uri(path: str) -> str:
"""
Convert a path to a URI.
Args:
path: Path to convert to URI.
Returns:
URI string. (quoted, absolute)
"""
path = os.path.abspath(path)
if psutil.WINDOWS:
return pathlib.PureWindowsPath(path).as_uri()
if psutil.POSIX:
return pathlib.PurePosixPath(path).as_uri()
raise ValueError("Unsupported OS")
......@@ -18,6 +18,7 @@
import asyncio
from typing import Any, Dict, Optional, TypeVar
# WARNING: internal
from _bentoml_sdk.service import Service
from _bentoml_sdk.service.dependency import Dependency
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import logging.config
import os
from dynamo.runtime.logging import configure_logger
# Create a replacement for BentoML's configure_server_logging
def configure_server_logging():
"""
A single place to configure logging for Dynamo that can be used to replace BentoML's logging configuration.
"""
# First, remove any existing handlers to avoid duplication
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Configure the logger with Dynamo's handler
configure_logger()
# Make sure bentoml's loggers use the same configuration
bentoml_logger = logging.getLogger("bentoml")
# Configure vllm loggers to use our format
os.environ["VLLM_CONFIGURE_LOGGING"] = "0" # Disable VLLM's default configuration
# Get all vllm loggers and configure them
vllm_logger = logging.getLogger("vllm")
vllm_logger.handlers = []
vllm_logger.setLevel(logging.INFO)
vllm_logger.propagate = True
# Also handle any root loggers that VLLM might be using without the vllm prefix
other_loggers = ["__init__", "nixl"]
for logger_name in other_loggers:
logger = logging.getLogger(logger_name)
logger.handlers = []
logger.setLevel(logging.INFO)
logger.propagate = True
# Bento Logger
bentoml_logger.setLevel(logging.ERROR)
bentoml_logger.propagate = True
# Override internal BentoML loggers
aux_bento_loggers = ["tag"]
for logger_name in aux_bento_loggers:
logger = logging.getLogger(logger_name)
logger.setLevel(logging.ERROR)
logger.propagate = True
......@@ -15,11 +15,13 @@
from __future__ import annotations
import json
import logging
import os
from collections import defaultdict
from dataclasses import asdict, dataclass
from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, Union
# WARNING: internal
from _bentoml_sdk import Service, ServiceConfig
from _bentoml_sdk.images import Image
from _bentoml_sdk.service.config import validate
......@@ -28,6 +30,8 @@ from dynamo.sdk.lib.decorators import DynamoEndpoint
T = TypeVar("T", bound=object)
logger = logging.getLogger(__name__)
class RuntimeLinkedServices:
"""
......@@ -137,14 +141,14 @@ class DynamoService(Service[T]):
# Parse dynamo://namespace/name into (namespace, name)
_, path = address.split("://", 1)
namespace, name = path.split("/", 1)
print(
logger.debug(
f"Resolved Dynamo address from runner map: {namespace}/{name}"
)
return (namespace, name)
except (json.JSONDecodeError, ValueError) as e:
raise ValueError(f"Failed to parse BENTOML_RUNNER_MAP: {str(e)}") from e
print(
logger.debug(
f"Using default Dynamo address: {self._dynamo_config.namespace}/{self._dynamo_config.name}"
)
return (self._dynamo_config.namespace, self._dynamo_config.name)
......
......@@ -13,8 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from vllm.logger import logger as vllm_logger
logger = logging.getLogger(__name__)
class PyDisaggregatedRouter:
......@@ -39,7 +40,7 @@ class PyDisaggregatedRouter:
absolute_prefill_length > self.max_local_prefill_length
and queue_size < self.max_prefill_queue_size
)
vllm_logger.info(
logger.info(
f"Remote prefill: {decision} (prefill length: {absolute_prefill_length}/{prompt_length}, prefill queue size: {queue_size}/{self.max_prefill_queue_size})"
)
return decision
......@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import subprocess
from pathlib import Path
......@@ -25,6 +26,8 @@ from dynamo.sdk import depends, service
from dynamo.sdk.lib.config import ServiceConfig
from dynamo.sdk.lib.image import DYNAMO_IMAGE
logger = logging.getLogger(__name__)
def get_http_binary_path():
sdk_path = Path(sdk.__file__)
......@@ -75,7 +78,7 @@ class Frontend:
]
)
print("Starting HTTP server")
logger.info("Starting HTTP server")
http_binary = get_http_binary_path()
process = subprocess.Popen(
[http_binary, "-p", str(frontend_config.port)], stdout=None, stderr=None
......
......@@ -15,6 +15,7 @@
import argparse
import logging
import random
from argparse import Namespace
from typing import AsyncIterator
......@@ -30,6 +31,8 @@ from dynamo.sdk.lib.config import ServiceConfig
WorkerId = str
logger = logging.getLogger(__name__)
def parse_args(service_name, prefix) -> Namespace:
parser = argparse.ArgumentParser()
......@@ -105,7 +108,7 @@ class Router:
await kv_listener.create_service()
self.indexer = KvIndexer(kv_listener, self.args.block_size)
self.metrics_aggregator = KvMetricsAggregator(kv_listener)
print("KV Router initialized")
logger.info("KV Router initialized")
def _cost_function(
self,
......
......@@ -15,6 +15,7 @@
import asyncio
import logging
import os
import sys
......@@ -30,6 +31,8 @@ from vllm.remote_prefill import RemotePrefillParams, RemotePrefillRequest
from dynamo.sdk import async_on_start, dynamo_context, dynamo_endpoint, service
logger = logging.getLogger(__name__)
class RequestType(BaseModel):
text: str
......@@ -50,23 +53,23 @@ class PrefillWorker:
self._loaded_metadata = set()
self.initialized = False
if self.engine_args.enable_chunked_prefill is not False:
print("Chunked prefill is not supported yet, setting to False")
logger.info("Chunked prefill is not supported yet, setting to False")
self.engine_args.enable_chunked_prefill = False
if self.engine_args.pipeline_parallel_size != 1:
print("Pipeline parallel size is not supported yet, setting to 1")
logger.info("Pipeline parallel size is not supported yet, setting to 1")
self.engine_args.pipeline_parallel_size = 1
if self.engine_args.disable_async_output_proc is not True:
print("Async output processing is not supported yet, setting to True")
logger.info("Async output processing is not supported yet, setting to True")
self.engine_args.disable_async_output_proc = True
if self.engine_args.enforce_eager is not True:
print("Prefill must be done eagerly, setting to True")
logger.info("Prefill must be done eagerly, setting to True")
self.engine_args.enforce_eager = True
if self.engine_args.enable_prefix_caching is not False:
print(
logger.info(
"Prefix caching is not supported yet in prefill worker, setting to False"
)
self.engine_args.enable_prefix_caching = False
......@@ -89,36 +92,40 @@ class PrefillWorker:
def prefill_queue_handler_cb(fut):
try:
fut.result()
print("prefill queue handler exited successfully")
logger.info("prefill queue handler exited successfully")
except Exception as e:
print(f"[ERROR] prefill queue handler failed: {e!r}")
logger.error(f"[ERROR] prefill queue handler failed: {e!r}")
sys.exit(1)
task.add_done_callback(prefill_queue_handler_cb)
print("PrefillWorker initialized")
logger.info("PrefillWorker initialized")
async def prefill_queue_handler(self):
print("[DEBUG] prefill queue handler entered")
logger.info("Prefill queue handler entered")
prefill_queue_nats_server = os.getenv("NATS_SERVER", "nats://localhost:4222")
prefill_queue_stream_name = (
self.engine_args.served_model_name
if self.engine_args.served_model_name is not None
else "vllm"
)
print(f"Prefill queue: {prefill_queue_nats_server}:{prefill_queue_stream_name}")
logger.info(
f"Prefill queue: {prefill_queue_nats_server}:{prefill_queue_stream_name}"
)
self.initialized = True
# TODO: integrate prefill_queue to a dynamo endpoint
async with PrefillQueue.get_instance(
nats_server=prefill_queue_nats_server,
stream_name=prefill_queue_stream_name,
) as prefill_queue:
print("prefill queue handler started")
logger.info("prefill queue handler started")
while True:
# TODO: this might add a small overhead to pull prefill from nats
# need to test and check how much overhead it is
prefill_request = await prefill_queue.dequeue_prefill_request()
if prefill_request is not None:
print(f"Dequeued prefill request: {prefill_request.request_id}")
logger.info(
f"Dequeued prefill request: {prefill_request.request_id}"
)
async for _ in self.generate(prefill_request):
pass
......@@ -139,7 +146,7 @@ class PrefillWorker:
if request.engine_id not in self._loaded_metadata:
remote_metadata = await self._metadata_store.get(request.engine_id)
await self.engine_client.add_remote_nixl_metadata(remote_metadata)
print(
logger.info(
f"Loaded nixl metadata from engine {request.engine_id} into "
f"engine {self.engine_client.nixl_metadata.engine_id}"
)
......
......@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import uuid
from enum import Enum
from typing import AsyncIterator, Tuple, Union
......@@ -26,12 +27,13 @@ from utils.protocol import MyRequestOutput, Tokens, vLLMGenerateRequest
from utils.vllm import parse_vllm_args
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.entrypoints.openai.protocol import ChatCompletionRequest, CompletionRequest
from vllm.logger import logger as vllm_logger
from vllm.outputs import RequestOutput
from vllm.transformers_utils.tokenizer import AnyTokenizer
from dynamo.sdk import async_on_start, depends, dynamo_context, dynamo_endpoint, service
logger = logging.getLogger(__name__)
class RequestType(Enum):
CHAT = "chat"
......@@ -99,7 +101,7 @@ class Processor(ProcessMixIn):
request_type: RequestType,
):
request_id = str(uuid.uuid4())
vllm_logger.debug(f"Got raw request: {raw_request}")
logger.debug(f"Got raw request: {raw_request}")
(
request,
conversation,
......@@ -113,7 +115,7 @@ class Processor(ProcessMixIn):
):
worker_id, prefix_hit_rate = route_response.split("_")
prefix_hit_rate = float(prefix_hit_rate)
vllm_logger.info(
logger.info(
f"Worker ID: {worker_id} with estimated prefix hit rate: {prefix_hit_rate}"
)
break
......
......@@ -15,6 +15,7 @@
import asyncio
import logging
import os
from components.disagg_router import PyDisaggregatedRouter
......@@ -26,13 +27,14 @@ from utils.vllm import parse_vllm_args
from vllm.entrypoints.openai.api_server import (
build_async_engine_client_from_engine_args,
)
from vllm.logger import logger as vllm_logger
from vllm.remote_prefill import RemotePrefillParams, RemotePrefillRequest
from vllm.sampling_params import RequestOutputKind
from dynamo.llm import KvMetricsPublisher
from dynamo.sdk import async_on_start, depends, dynamo_context, dynamo_endpoint, service
logger = logging.getLogger(__name__)
@service(
dynamo={
......@@ -60,21 +62,21 @@ class VllmWorker:
"NATS_SERVER", "nats://localhost:4222"
)
self._prefill_queue_stream_name = self.model_name
vllm_logger.info(
logger.info(
f"Prefill queue: {self._prefill_queue_nats_server}:{self._prefill_queue_stream_name}"
)
if self.engine_args.remote_prefill:
if self.engine_args.enable_chunked_prefill is not False:
print("Chunked prefill is not supported yet, setting to False")
logger.info("Chunked prefill is not supported yet, setting to False")
self.engine_args.enable_chunked_prefill = False
if self.engine_args.preemption_mode != "swap":
print("Preemption mode is not supported yet, setting to swap")
logger.info("Preemption mode is not supported yet, setting to swap")
self.engine_args.preemption_mode = "swap"
if self.engine_args.pipeline_parallel_size != 1:
print("Pipeline parallel size is not supported yet, setting to 1")
logger.info("Pipeline parallel size is not supported yet, setting to 1")
self.engine_args.pipeline_parallel_size = 1
if self.engine_args.router == "kv":
......@@ -82,7 +84,7 @@ class VllmWorker:
os.environ["VLLM_WORKER_ID"] = str(VLLM_WORKER_ID)
os.environ["VLLM_KV_NAMESPACE"] = "dynamo"
os.environ["VLLM_KV_COMPONENT"] = class_name
vllm_logger.info(f"Generate endpoint ID: {VLLM_WORKER_ID}")
logger.info(f"Generate endpoint ID: {VLLM_WORKER_ID}")
self.metrics_publisher = KvMetricsPublisher()
@async_on_start
......@@ -110,7 +112,7 @@ class VllmWorker:
)
task = asyncio.create_task(self.create_metrics_publisher_endpoint())
task.add_done_callback(
lambda _: print("metrics publisher endpoint created")
lambda _: logger.info("metrics publisher endpoint created")
)
runtime = dynamo_context["runtime"]
......@@ -129,7 +131,7 @@ class VllmWorker:
)
else:
self.disaggregated_router = None
print("VllmWorker has been initialized")
logger.info("VllmWorker has been initialized")
async def create_metrics_publisher_endpoint(self):
component = dynamo_context["component"]
......@@ -170,12 +172,12 @@ class VllmWorker:
is_remote_prefill=True,
remote_prefill_request_callback=self.get_remote_prefill_request_callback(),
)
print(
logger.info(
f"Prefilling remotely for request {request.request_id} with length {len(request.engine_prompt['prompt_token_ids'])}"
)
else:
remote_prefill_params = None
print(
logger.info(
f"Prefilling locally for request {request.request_id} with length {len(request.engine_prompt['prompt_token_ids'])}"
)
......
......@@ -14,9 +14,12 @@
# limitations under the License.
import asyncio
import logging
from dynamo._core import Client
logger = logging.getLogger(__name__)
async def check_required_workers(
workers_client: Client, required_workers: int, on_change=True, poll_interval=0.5
......@@ -31,7 +34,7 @@ async def check_required_workers(
new_count = len(worker_ids)
if (not on_change) or new_count != num_workers:
print(
logger.info(
f"Waiting for more workers to be ready.\n"
f" Current: {new_count},"
f" Required: {required_workers}"
......
......@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from contextlib import contextmanager
......@@ -24,6 +24,8 @@ from dynamo.runtime import DistributedRuntime
METADATA_DIR = "/tmp/nixl"
logger = logging.getLogger(__name__)
@contextmanager
def temp_metadata_file(engine_id, metadata: NixlMetadata):
......@@ -31,7 +33,7 @@ def temp_metadata_file(engine_id, metadata: NixlMetadata):
path = f"{METADATA_DIR}/{engine_id}.nixl_meta"
with open(path, "wb") as f:
encoded = msgspec.msgpack.encode(metadata)
print(f"Size of encoded metadata: {len(encoded)}")
logger.info(f"Size of encoded metadata: {len(encoded)}")
f.write(encoded)
try:
yield path
......
......@@ -26,7 +26,7 @@ license-files = ["LICENSE"]
requires-python = ">=3.10"
dependencies = [
"pytest>=8.3.4",
"bentoml==1.4.1",
"bentoml==1.4.7",
"types-psutil==7.0.0.20250218",
"kubernetes==32.0.1",
"ai-dynamo-runtime==0.1.0",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment