deployment.py 5.81 KB
Newer Older
Neelay Shah's avatar
Neelay Shah committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
Neelay Shah's avatar
Neelay Shah committed
16
17
from pprint import pformat
from typing import Optional, Type
Neelay Shah's avatar
Neelay Shah committed
18

Neelay Shah's avatar
Neelay Shah committed
19
20
21
22
23
24
25
from triton_distributed.icp import (
    DataPlane,
    NatsRequestPlane,
    NatsServer,
    RequestPlane,
    UcpDataPlane,
)
26
27
from triton_distributed.runtime.logger import get_logger
from triton_distributed.runtime.worker import Worker, WorkerConfig
Neelay Shah's avatar
Neelay Shah committed
28
29

LOGGER_NAME = __name__
Neelay Shah's avatar
Neelay Shah committed
30
31
32


class Deployment:
Neelay Shah's avatar
Neelay Shah committed
33
34
35
36
37
38
39
40
41
42
43
    def __init__(
        self,
        worker_configs: list[WorkerConfig | tuple[WorkerConfig, int]],
        log_level=3,
        initialize_request_plane=False,
        initialize_data_plane=False,
        request_plane_args: Optional[tuple[list, dict]] = None,
        request_plane: Optional[Type[RequestPlane]] = NatsRequestPlane,
        data_plane: Optional[Type[DataPlane]] = UcpDataPlane,
        data_plane_args: Optional[tuple[list, dict]] = None,
        log_dir="logs",
44
        consolidate_logs=False,
Neelay Shah's avatar
Neelay Shah committed
45
46
        starting_metrics_port=0,
    ):
Neelay Shah's avatar
Neelay Shah committed
47
48
49
        self._process_context = multiprocessing.get_context("spawn")
        self._worker_configs = worker_configs
        self._workers: list[multiprocessing.context.SpawnProcess] = []
50
        self._logger = get_logger(log_level, LOGGER_NAME)
Neelay Shah's avatar
Neelay Shah committed
51
52
53
54
55
56
57
58
59
        self._default_request_plane = request_plane
        self._default_request_plane_args = request_plane_args
        self._default_data_plane = data_plane
        self._default_data_plane_args = data_plane_args
        self._initialize_request_plane = initialize_request_plane
        self._initialize_data_plane = initialize_data_plane
        self.request_plane_server: NatsServer = None
        self._default_log_dir = log_dir
        self._default_log_level = log_level
60
        self._consolidate_logs = consolidate_logs
Neelay Shah's avatar
Neelay Shah committed
61
        self._starting_metrics_port = starting_metrics_port
Neelay Shah's avatar
Neelay Shah committed
62
63
64
65
66
67

    @staticmethod
    def _start_worker(worker_config):
        Worker(worker_config).start()

    def start(self):
Neelay Shah's avatar
Neelay Shah committed
68
69
70
71
        if self._initialize_request_plane:
            if self._default_request_plane == NatsRequestPlane:
                self.request_plane_server = NatsServer(log_dir=self._default_log_dir)
            else:
72
                raise ValueError(
Neelay Shah's avatar
Neelay Shah committed
73
74
75
                    f"Unknown Request Plane Type, can not initialize {self._default_request_plane}"
                )

Neelay Shah's avatar
Neelay Shah committed
76
        for worker_config in self._worker_configs:
Neelay Shah's avatar
Neelay Shah committed
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
            worker_instances = 1
            if isinstance(worker_config, tuple):
                worker_instances = worker_config[1]
                worker_config = worker_config[0]

            base_name = worker_config.name
            base_port = worker_config.metrics_port

            if not base_port and self._starting_metrics_port:
                base_port = self._starting_metrics_port
                self._starting_metrics_port += worker_instances

            request_plane_args, request_plane_kwargs = worker_config.request_plane_args

            if not request_plane_args and not request_plane_kwargs:
                if self._default_request_plane_args:
                    worker_config.request_plane_args = self._default_request_plane_args
                elif self.request_plane_server:
                    worker_config.request_plane_args = (
                        [self.request_plane_server.url],
                        {},
                    )

            if not worker_config.log_dir:
                worker_config.log_dir = self._default_log_dir

            if not worker_config.log_level:
                worker_config.log_level = self._default_log_level

106
107
108
            if self._consolidate_logs:
                worker_config.consolidate_logs = True

Neelay Shah's avatar
Neelay Shah committed
109
110
111
112
113
114
115
116
117
118
119
120
121
122
            for index in range(worker_instances):
                worker_config.name = f"{base_name}.{index}"
                worker_config.metrics_port = base_port + index
                self._workers.append(
                    self._process_context.Process(
                        target=Deployment._start_worker,
                        name=worker_config.name,
                        args=[worker_config],
                    )
                )
                self._logger.info(
                    "\n\nStarting Worker:\n\n\tConfig:\n\t%s\n\t%s\n",
                    pformat(worker_config),
                    self._workers[-1],
Neelay Shah's avatar
Neelay Shah committed
123
                )
Neelay Shah's avatar
Neelay Shah committed
124
125
126
127
                self._workers[-1].start()

    def stop(self):
        return self.shutdown()
Neelay Shah's avatar
Neelay Shah committed
128
129

    def shutdown(self, join=True, timeout=10):
Neelay Shah's avatar
Neelay Shah committed
130
        exit_code = 0
Neelay Shah's avatar
Neelay Shah committed
131
        for worker in self._workers:
Neelay Shah's avatar
Neelay Shah committed
132
            self._logger.info("\n\nStopping Worker:\n\n\n\t%s\n", worker)
Neelay Shah's avatar
Neelay Shah committed
133
134
135
136
137
138
139
            worker.terminate()
        if join:
            for worker in self._workers:
                worker.join(timeout)
            for worker in self._workers:
                if worker.is_alive():
                    worker.kill()
Neelay Shah's avatar
Neelay Shah committed
140
141
142
143
144
145
146
147
148
149
150
                worker.join(timeout)
                self._logger.info("\n\nWorker Stopped:\n\n\n\t%s\n", worker)
                if worker.exitcode is not None:
                    # Note we accumulate exit codes
                    # assumption being no error is exit_code==0
                    # anything else represents an error
                    #
                    # this is to catch some obvious errors but not all

                    exit_code += worker.exitcode
        return exit_code