"components/vscode:/vscode.git/clone" did not exist on "14eceb43df78d11407df03059f0e857d88c991ea"
Unverified Commit 50afb811 authored by hhzhang16's avatar hhzhang16 Committed by GitHub
Browse files

feat: add build --push command (#1485)

parent e621f249
......@@ -38,12 +38,15 @@ from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from dynamo.sdk import DYNAMO_IMAGE
from dynamo.sdk.core.protocol.deployment import Service
from dynamo.sdk.core.protocol.interface import (
DynamoConfig,
DynamoTransport,
LinkedServices,
ServiceInterface,
)
from dynamo.sdk.core.runner import TargetEnum
from dynamo.sdk.lib.utils import upload_graph
logger = logging.getLogger(__name__)
console = Console()
......@@ -104,7 +107,7 @@ class ServiceConfig(BaseModel):
resources: t.Dict[str, t.Any] = Field(default_factory=dict)
workers: t.Optional[int] = None
image: str = "dynamo:latest"
dynamo: t.Dict[str, t.Any] = Field(default_factory=dict)
dynamo: DynamoConfig = Field(default_factory=DynamoConfig)
http_exposed: bool = False
api_endpoints: t.List[str] = Field(default_factory=list)
......@@ -141,7 +144,7 @@ class ServiceInfo(BaseModel):
resources=service.config.resources.model_dump(),
workers=service.config.workers,
image=image,
dynamo=service.config.dynamo.model_dump(),
dynamo=DynamoConfig(**service.config.dynamo.model_dump()),
http_exposed=len(api_endpoints) > 0,
api_endpoints=api_endpoints,
)
......@@ -155,7 +158,7 @@ class ServiceInfo(BaseModel):
class BuildConfig(BaseModel):
"""Configuration for building a Dynamo pipeline."""
"""Configuration for building a Dynamo graph."""
service: str
name: t.Optional[str] = None
......@@ -277,7 +280,7 @@ class Package:
cls,
build_config: BuildConfig,
build_ctx: t.Optional[str] = None,
) -> t.Any:
) -> ServiceInterface:
"""Get a dynamo service from config."""
build_ctx = (
os.getcwd()
......@@ -367,6 +370,26 @@ class Package:
with open(os.path.join(self.path, "dynamo.yaml"), "w") as f:
yaml.dump(manifest_dict, f, default_flow_style=False)
def get_entry_service(self) -> Service:
"""Get the entry service."""
for service in self.info.services:
if service.name == self.info.entry_service:
entry_service = service
break
else:
raise ValueError(
f"Entry service {self.info.entry_service} not found in services"
)
return Service(
service_name=self.info.service,
name=self.info.entry_service,
namespace=entry_service.config.dynamo.namespace,
version=self.info.tag.version,
path=self.path,
envs=self.info.envs,
)
@staticmethod
def load_service(service_path: str, working_dir: str) -> t.Any:
"""Load a service from a path."""
......@@ -481,6 +504,9 @@ def build(
service: str = typer.Argument(
..., help="Service specification in the format module:ServiceClass"
),
endpoint: t.Optional[str] = typer.Option(
None, "--endpoint", "-e", help="Dynamo Cloud endpoint", envvar="DYNAMO_CLOUD"
),
output_dir: t.Optional[str] = typer.Option(
None, "--output-dir", "-o", help="Output directory for the build"
),
......@@ -490,13 +516,25 @@ def build(
containerize: bool = typer.Option(
False,
"--containerize",
help="Containerize the dynamo pipeline after building.",
help="Containerize the dynamo graph after building.",
),
push: bool = typer.Option(
False,
"--push",
help="Push the built dynamo graph to the Dynamo cloud remote API store.",
),
) -> None:
"""Packages Dynamo service for deployment. Optionally builds a docker container."""
"""Packages Dynamo service for deployment. Optionally builds and/or pushes a docker container."""
from dynamo.sdk.cli.utils import configure_target_environment
configure_target_environment(TargetEnum.DYNAMO)
if push:
containerize = True
if endpoint is None:
console.print(
"[bold red]Error: --push requires --endpoint, -e, or DYNAMO_CLOUD environment variable to be set.[/]"
)
raise typer.Exit(1)
# Determine output directory
if output_dir is None:
......@@ -558,9 +596,12 @@ def build(
next_steps = []
if not containerize:
next_steps.append(
"\n\n* Containerize your Dynamo pipeline with "
"\n\n* Containerize your Dynamo graph with "
"`dynamo build --containerize <service_name>`:\n"
f" $ dynamo build --containerize {service}"
"\n\n* Push your Dynamo graph to the Dynamo cloud with "
"`dynamo build --push <service_name>`:\n"
f" $ dynamo build --push {service}"
)
if next_steps:
......@@ -597,13 +638,29 @@ def build(
check=True,
)
console.print(f"[green]Successfully built Docker image {image_name}.")
if push:
# Upload the graph to the Dynamo cloud remote API store
with Progress(
SpinnerColumn(),
TextColumn(
f"[bold green]Pushing graph {image_name} to Dynamo cloud..."
),
transient=True,
) as progress:
progress.add_task("push", total=None)
entry_service = package.get_entry_service()
upload_graph(endpoint, image_name, entry_service)
console.print(
f"[green]Successfully pushed graph {image_name} to Dynamo cloud."
)
except Exception as e:
console.print(f"[red]Error building package: {str(e)}")
console.print(f"[red]Error with build: {str(e)}")
raise
def generate_random_tag() -> str:
"""Generate a random tag for the Dynamo pipeline."""
"""Generate a random tag for the Dynamo graph."""
return f"{uuid.uuid4().hex[:8]}"
......
......@@ -146,7 +146,7 @@ def _handle_deploy_create(
# TODO: hardcoding this is a hack to get the services for the deployment
# we should find a better way to do this once build is finished/generic
configure_target_environment(TargetEnum.DYNAMO)
entry_service = load_entry_service(config.pipeline)
entry_service = load_entry_service(config.graph)
deployment_manager = get_deployment_manager(config.target, config.endpoint)
env_dicts = _build_env_dicts(
......@@ -157,10 +157,9 @@ def _handle_deploy_create(
env_secrets_name=config.env_secrets_name,
)
deployment = Deployment(
name=config.name
or (config.pipeline if config.pipeline else "unnamed-deployment"),
name=config.name or (config.graph if config.graph else "unnamed-deployment"),
namespace="default",
pipeline=config.pipeline,
graph=config.graph,
entry_service=entry_service,
envs=env_dicts,
)
......@@ -231,7 +230,7 @@ def _handle_deploy_create(
@app.command()
def create(
ctx: typer.Context,
pipeline: str = typer.Argument(..., help="Dynamo pipeline to deploy"),
graph: str = typer.Argument(..., help="Dynamo graph to deploy"),
name: t.Optional[str] = typer.Option(None, "--name", "-n", help="Deployment name"),
config_file: t.Optional[typer.FileText] = typer.Option(
None, "--config-file", "-f", help="Configuration file path"
......@@ -248,7 +247,7 @@ def create(
envs: t.Optional[t.List[str]] = typer.Option(
None,
"--env",
help="Environment variable(s) to set (format: KEY=VALUE). Note: These environment variables will be set on ALL services in your Dynamo pipeline.",
help="Environment variable(s) to set (format: KEY=VALUE). Note: These environment variables will be set on ALL services in your Dynamo graph.",
),
envs_from_secret: t.Optional[t.List[str]] = typer.Option(
None,
......@@ -271,7 +270,7 @@ def create(
) -> DeploymentResponse:
"""Create a deployment on Dynamo Cloud."""
config = DeploymentConfig(
pipeline=pipeline,
graph=graph,
endpoint=endpoint,
name=name,
config_file=config_file,
......@@ -391,7 +390,7 @@ def update(
envs: t.Optional[t.List[str]] = typer.Option(
None,
"--env",
help="Environment variable(s) to set (format: KEY=VALUE). Note: These environment variables will be set on ALL services in your Dynamo pipeline.",
help="Environment variable(s) to set (format: KEY=VALUE). Note: These environment variables will be set on ALL services in your Dynamo graph.",
),
envs_from_secret: t.Optional[t.List[str]] = typer.Option(
None,
......@@ -507,7 +506,7 @@ def delete(
def deploy(
ctx: typer.Context,
pipeline: str = typer.Argument(..., help="Dynamo pipeline to deploy"),
graph: str = typer.Argument(..., help="Dynamo graph to deploy"),
name: t.Optional[str] = typer.Option(None, "--name", "-n", help="Deployment name"),
config_file: t.Optional[typer.FileText] = typer.Option(
None, "--config-file", "-f", help="Configuration file path"
......@@ -524,7 +523,7 @@ def deploy(
envs: t.Optional[t.List[str]] = typer.Option(
None,
"--env",
help="Environment variable(s) to set (format: KEY=VALUE). Note: These environment variables will be set on ALL services in your Dynamo pipeline.",
help="Environment variable(s) to set (format: KEY=VALUE). Note: These environment variables will be set on ALL services in your Dynamo graph.",
),
envs_from_secret: t.Optional[t.List[str]] = typer.Option(
None,
......@@ -545,9 +544,9 @@ def deploy(
envvar="DYNAMO_ENV_SECRETS",
),
) -> DeploymentResponse:
"""Deploy a Dynamo pipeline (same as deployment create)."""
"""Deploy a Dynamo graph (same as deployment create)."""
config = DeploymentConfig(
pipeline=pipeline,
graph=graph,
endpoint=endpoint,
name=name,
config_file=config_file,
......
......@@ -46,9 +46,7 @@ console = Console()
def serve(
ctx: typer.Context,
dynamo_pipeline: str = typer.Argument(
..., help="The path to the Dynamo pipeline to serve"
),
graph: str = typer.Argument(..., help="The path to the Dynamo graph to serve"),
service_name: str = typer.Option(
"",
help="Only serve the specified service. Don't serve any dependencies of this service.",
......@@ -113,9 +111,9 @@ def serve(
case_sensitive=False,
),
):
"""Locally serve a Dynamo pipeline.
"""Locally serve a Dynamo graph.
Starts a local server for the specified Dynamo pipeline.
Starts a local server for the specified Dynamo graph.
"""
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sdk.cli.utils import configure_target_environment
......@@ -152,8 +150,8 @@ def serve(
os.environ["DYNAMO_SERVICE_CONFIG"] = json.dumps(service_configs)
if working_dir is None:
if os.path.isdir(os.path.expanduser(dynamo_pipeline)):
working_dir = Path(os.path.expanduser(dynamo_pipeline))
if os.path.isdir(os.path.expanduser(graph)):
working_dir = Path(os.path.expanduser(graph))
else:
working_dir = Path(".")
......@@ -163,7 +161,7 @@ def serve(
if sys.path[0] != working_dir_str:
sys.path.insert(0, working_dir_str)
svc = find_and_load_service(dynamo_pipeline, working_dir=working_dir)
svc = find_and_load_service(graph, working_dir=working_dir)
logger.debug(f"Loaded service: {svc.name}")
logger.debug("Dependencies: %s", [dep.on.name for dep in svc.dependencies.values()])
LinkedServices.remove_unused_edges()
......@@ -181,13 +179,13 @@ def serve(
# Start the service
console.print(
Panel.fit(
f"[bold]Starting Dynamo service:[/bold] [cyan]{dynamo_pipeline}[/cyan]",
f"[bold]Starting Dynamo service:[/bold] [cyan]{graph}[/cyan]",
title="[bold green]Dynamo Serve[/bold green]",
border_style="green",
)
)
serve_dynamo_graph(
dynamo_pipeline,
graph,
working_dir=working_dir_str,
# host=host,
# port=port,
......
......@@ -150,7 +150,7 @@ def clear_namespace(namespace: str) -> None:
def serve_dynamo_graph(
dynamo_pipeline: str,
graph: str,
working_dir: str | None = None,
dependency_map: dict[str, str] | None = None,
service_name: str = "",
......@@ -171,7 +171,7 @@ def serve_dynamo_graph(
namespace: str = ""
env: dict[str, Any] = {}
svc = find_and_load_service(dynamo_pipeline, working_dir)
svc = find_and_load_service(graph, working_dir)
dynamo_path = pathlib.Path(working_dir or ".")
watchers: list[Watcher] = []
......@@ -236,7 +236,7 @@ def serve_dynamo_graph(
f"Service {dep_svc.name} is not servable. Please use link to override with a concrete implementation."
)
new_watcher, new_socket, uri = create_dynamo_watcher(
dynamo_pipeline,
graph,
dep_svc,
uds_path,
allocator,
......@@ -254,7 +254,7 @@ def serve_dynamo_graph(
dynamo_args = [
"-m",
_DYNAMO_WORKER_SCRIPT,
dynamo_pipeline,
graph,
"--service-name",
svc.name,
"--worker-id",
......@@ -410,7 +410,7 @@ def serve_dynamo_graph(
hasattr(svc, "is_dynamo_component")
and svc.is_dynamo_component()
)
else (dynamo_pipeline,)
else (graph,)
),
),
)
......
......@@ -13,12 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import os
import tarfile
import time
import typing as t
from datetime import datetime
import requests
......@@ -27,8 +23,8 @@ from dynamo.sdk.core.protocol.deployment import (
DeploymentManager,
DeploymentResponse,
DeploymentStatus,
Service,
)
from dynamo.sdk.lib.utils import upload_graph
class KubernetesDeploymentManager(DeploymentManager):
......@@ -42,84 +38,19 @@ class KubernetesDeploymentManager(DeploymentManager):
def __init__(self, endpoint: str):
self.endpoint = endpoint.rstrip("/")
self.session = requests.Session()
self.namespace = "default"
def _upload_pipeline(self, pipeline: str, entry_service: Service, **kwargs) -> None:
"""Upload the entire pipeline as a single component/version, with a manifest of all services."""
session = self.session
endpoint = self.endpoint
pipeline_name, pipeline_version = pipeline.split(":")
# Check if component exists before POST
comp_url = f"{endpoint}/api/v1/dynamo_components"
comp_get_url = f"{endpoint}/api/v1/dynamo_components/{pipeline_name}"
comp_exists = False
comp_resp = session.get(comp_get_url)
if comp_resp.status_code == 200:
comp_exists = True
if not comp_exists:
comp_payload = {
"name": pipeline_name,
"description": "Registered by Dynamo's KubernetesDeploymentManager",
}
resp = session.post(comp_url, json=comp_payload)
if resp.status_code not in (200, 201, 409):
print(resp.status_code)
raise RuntimeError(f"Failed to create component: {resp.text}")
# Check if version exists before POST
ver_url = f"{endpoint}/api/v1/dynamo_components/{pipeline_name}/versions"
ver_get_url = f"{endpoint}/api/v1/dynamo_components/{pipeline_name}/versions/{pipeline_version}"
ver_exists = False
ver_resp = session.get(ver_get_url)
if ver_resp.status_code == 200:
ver_exists = True
if not ver_exists:
build_at = kwargs.get("build_at")
if not build_at:
build_at = datetime.utcnow()
if isinstance(build_at, str):
try:
build_at = datetime.fromisoformat(build_at)
except Exception:
build_at = datetime.utcnow()
manifest = {
"service": entry_service.service_name,
"apis": entry_service.apis,
"size_bytes": entry_service.size_bytes,
}
ver_payload = {
"name": entry_service.name,
"description": f"Auto-registered version for {pipeline}",
"resource_type": "dynamo_component_version",
"version": entry_service.version,
"manifest": manifest,
"build_at": build_at.isoformat(),
}
resp = session.post(ver_url, json=ver_payload)
if resp.status_code not in (200, 201, 409):
raise RuntimeError(f"Failed to create component version: {resp.text}")
# Upload the graph
build_dir = entry_service.path
if not build_dir or not os.path.isdir(build_dir):
raise FileNotFoundError(f"Built pipeline directory not found: {build_dir}")
tar_stream = io.BytesIO()
with tarfile.open(fileobj=tar_stream, mode="w") as tar:
tar.add(build_dir, arcname=".")
tar_stream.seek(0)
upload_url = f"{endpoint}/api/v1/dynamo_components/{pipeline_name}/versions/{pipeline_version}/upload"
upload_headers = {"Content-Type": "application/x-tar"}
resp = session.put(upload_url, data=tar_stream, headers=upload_headers)
if resp.status_code not in (200, 201, 204):
raise RuntimeError(f"Failed to upload pipeline artifact: {resp.text}")
def create_deployment(self, deployment: Deployment, **kwargs) -> DeploymentResponse:
"""Create a new deployment. Ensures all components and versions are registered/uploaded before creating the deployment."""
# For each service/component in the deployment, upload it to the API store
self._upload_pipeline(
pipeline=deployment.pipeline or deployment.namespace,
if not deployment.graph:
raise ValueError(
"Deployment graph must be provided in the format <name>:<version>"
)
upload_graph(
endpoint=self.endpoint,
graph=deployment.graph,
entry_service=deployment.entry_service,
session=self.session,
**kwargs,
)
......@@ -127,7 +58,7 @@ class KubernetesDeploymentManager(DeploymentManager):
dev = kwargs.get("dev", False)
payload = {
"name": deployment.name,
"component": deployment.pipeline or deployment.namespace,
"component": deployment.graph or deployment.namespace,
"dev": dev,
"envs": deployment.envs,
}
......@@ -151,7 +82,6 @@ class KubernetesDeploymentManager(DeploymentManager):
access_authorization = kwargs.get("access_authorization", False)
payload = {
"name": deployment.name,
"component": deployment.pipeline or deployment.namespace,
"envs": deployment.envs,
"services": deployment.services,
"access_authorization": access_authorization,
......
......@@ -137,7 +137,7 @@ class Deployment:
name: str
namespace: str
pipeline: t.Optional[str] = None
graph: t.Optional[str] = None
entry_service: t.Optional[Service] = None
envs: t.Optional[t.List[t.Dict[str, t.Any]]] = None
......@@ -150,12 +150,12 @@ DeploymentResponse = t.Dict[str, t.Any]
class DeploymentConfig:
"""Configuration object for deployment operations.
Consolidates all deployment parameters including pipeline configuration,
Consolidates all deployment parameters including graph configuration,
environment variables, and deployment settings.
"""
# Core deployment settings
pipeline: str
graph: str
endpoint: str
name: t.Optional[str] = None
target: str = "kubernetes"
......
......@@ -208,23 +208,21 @@ def _get_dir_size(path: str) -> int:
def load_entry_service(
pipeline_tag: str, build_dir: str = "~/.dynamo/packages"
) -> ServiceInterface:
graph_tag: str, build_dir: str = "~/.dynamo/packages"
) -> Service:
"""
Given a built pipeline tag (e.g. frontend:2uk2fwzvqsswvs7t), load the entry service as a deployment Service instance.
Given a built graph tag (e.g. frontend:2uk2fwzvqsswvs7t), load the entry service as a deployment Service instance.
"""
if ":" not in pipeline_tag:
raise ValueError("pipeline_tag must be in the form name:version")
name, version = pipeline_tag.split(":", 1)
if ":" not in graph_tag:
raise ValueError("graph_tag must be in the form name:version")
name, version = graph_tag.split(":", 1)
graph_dir = os.path.expanduser(f"{build_dir}/{name}/{version}")
if not os.path.isdir(graph_dir):
raise FileNotFoundError(f"Pipeline directory not found: {graph_dir}")
raise FileNotFoundError(f"Graph directory not found: {graph_dir}")
config_path = os.path.join(graph_dir, "dynamo.yaml")
if not os.path.isfile(config_path):
raise FileNotFoundError(
f"Pipeline config (dynamo.yaml) not found in {graph_dir}"
)
raise FileNotFoundError(f"Graph config (dynamo.yaml) not found in {graph_dir}")
with open(config_path, encoding="utf-8") as f:
graph_cfg = yaml.safe_load(f)
......@@ -244,7 +242,7 @@ def load_entry_service(
entry_service = Service(
service_name=service_name,
name=svc_name,
namespace="default",
namespace=svc.get("dynamo", {}).get("namespace", "default"),
version=version,
path=graph_dir,
envs=graph_cfg.get("envs", []),
......@@ -252,4 +250,4 @@ def load_entry_service(
size_bytes=size_bytes,
)
return entry_service
raise ValueError("No entry service found in the pipeline")
raise ValueError("No entry service found in the graph")
......@@ -13,7 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import os
import tarfile
from datetime import datetime
from typing import Optional
import requests
from dynamo.sdk.core.protocol.deployment import Service
REQUEST_TIMEOUT = 20
def get_host_port():
......@@ -28,3 +38,97 @@ def get_system_app_host_port():
port = int(os.environ.get("DYNAMO_SYSTEM_APP_PORT", 0))
host = os.environ.get("DYNAMO_SYSTEM_APP_HOST", "0.0.0.0")
return host, port
def upload_graph(
endpoint: str,
graph: str,
entry_service: Service,
session: Optional[requests.Session] = None,
**kwargs,
) -> None:
"""Upload the entire graph as a single component/version, with a manifest of all services."""
session = session or requests.Session()
parts = graph.split(":")
if len(parts) != 2:
raise ValueError(
f"`graph` must be in '<name>:<version>' format, got '{graph}'."
)
graph_name, graph_version = parts
# Check if component exists before POST
comp_url = f"{endpoint}/api/v1/dynamo_components"
comp_get_url = f"{endpoint}/api/v1/dynamo_components/{graph_name}"
comp_exists = False
comp_resp = session.get(comp_get_url, timeout=REQUEST_TIMEOUT)
if comp_resp.status_code == 200:
comp_exists = True
elif comp_resp.status_code == 404:
comp_exists = False
else:
raise RuntimeError(
f"Failed to verify component '{graph_name}': "
f"{comp_resp.status_code}: {comp_resp.text}"
)
if not comp_exists:
comp_payload = {
"name": graph_name,
"description": "Registered by Dynamo's KubernetesDeploymentManager",
}
resp = session.post(comp_url, json=comp_payload, timeout=REQUEST_TIMEOUT)
if resp.status_code not in (200, 201, 409):
raise RuntimeError(f"Failed to create component: {resp.text}")
# Check if version exists before POST
ver_url = f"{endpoint}/api/v1/dynamo_components/{graph_name}/versions"
ver_get_url = (
f"{endpoint}/api/v1/dynamo_components/{graph_name}/versions/{graph_version}"
)
ver_exists = False
ver_resp = session.get(ver_get_url, timeout=REQUEST_TIMEOUT)
if ver_resp.status_code == 200:
ver_exists = True
if not ver_exists:
build_at = kwargs.get("build_at")
if not build_at:
build_at = datetime.utcnow()
if isinstance(build_at, str):
try:
build_at = datetime.fromisoformat(build_at)
except Exception:
build_at = datetime.utcnow()
manifest = {
"service": entry_service.service_name,
"apis": entry_service.apis,
"size_bytes": entry_service.size_bytes,
}
ver_payload = {
"name": entry_service.name,
"description": f"Auto-registered version for {graph}",
"resource_type": "dynamo_component_version",
"version": graph_version,
"manifest": manifest,
"build_at": build_at.isoformat(),
}
resp = session.post(ver_url, json=ver_payload, timeout=REQUEST_TIMEOUT)
if resp.status_code not in (200, 201, 409):
raise RuntimeError(f"Failed to create component version: {resp.text}")
# Upload the graph
build_dir = entry_service.path
if not build_dir or not os.path.isdir(build_dir):
raise FileNotFoundError(f"Built graph directory not found: {build_dir}")
tar_stream = io.BytesIO()
with tarfile.open(fileobj=tar_stream, mode="w") as tar:
tar.add(build_dir, arcname=".")
tar_stream.seek(0)
upload_url = f"{endpoint}/api/v1/dynamo_components/{graph_name}/versions/{graph_version}/upload"
upload_headers = {"Content-Type": "application/x-tar"}
resp = session.put(
upload_url,
data=tar_stream,
headers=upload_headers,
timeout=REQUEST_TIMEOUT,
)
if resp.status_code not in (200, 201, 204):
raise RuntimeError(f"Failed to upload graph artifact: {resp.text}")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment