Unverified Commit 1458312e authored by Zhenhua Han's avatar Zhenhua Han Committed by GitHub
Browse files

[Retiarii] Bugfix: wrong device placement and invalid CUDA ordinal when using CGO engine (#4086)

parent 6a6bdeed
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
# Licensed under the MIT license. # Licensed under the MIT license.
from dataclasses import dataclass from dataclasses import dataclass
from abc import ABC, abstractmethod
try: try:
from typing import Literal from typing import Literal
except ImportError: except ImportError:
...@@ -9,23 +11,54 @@ except ImportError: ...@@ -9,23 +11,54 @@ except ImportError:
@dataclass @dataclass
class GPUDevice: class Device(ABC):
node_id: str node_id: str
gpu_id: int
status: Literal['idle', 'busy', 'unknown'] = 'idle' status: Literal['idle', 'busy', 'unknown'] = 'idle'
def __eq__(self, o) -> bool: def __eq__(self, o) -> bool:
if isinstance(self, type(o)):
return self.node_id == o.node_id
else:
return False
def __lt__(self, o) -> bool:
return self.node_id < o.node_id
def set_status(self, status):
self.status = status
def __repr__(self) -> str:
return "{Abstract Device %s, Status %s}" % (self.node_id, self.status)
@abstractmethod
def device_repr(self) -> str:
pass
@dataclass
class GPUDevice(Device):
gpu_id: str = -1
def __init__(self, node_id, gpu_id, status='idle'):
self.node_id = node_id
self.gpu_id = gpu_id
self.status = status
def __eq__(self, o: Device) -> bool:
if isinstance(o, GPUDevice): if isinstance(o, GPUDevice):
return self.node_id == o.node_id and self.gpu_id == o.gpu_id return self.node_id == o.node_id and self.gpu_id == o.gpu_id
return False return False
def __lt__(self, o) -> bool: def __lt__(self, o: Device) -> bool:
if self.node_id < o.node_id: if self.node_id < o.node_id:
return True return True
elif self.node_id > o.node_id: elif self.node_id > o.node_id:
return False return False
else: else:
return self.gpu_id < o.gpu_id if isinstance(o, GPUDevice):
return self.gpu_id < o.gpu_id
else:
return True
def __repr__(self) -> str: def __repr__(self) -> str:
return "{Environment %s, GPU %d, Status %s}" % (self.node_id, self.gpu_id, self.status) return "{Environment %s, GPU %d, Status %s}" % (self.node_id, self.gpu_id, self.status)
...@@ -33,8 +66,18 @@ class GPUDevice: ...@@ -33,8 +66,18 @@ class GPUDevice:
def __hash__(self) -> int: def __hash__(self) -> int:
return hash(self.node_id + '_' + str(self.gpu_id)) return hash(self.node_id + '_' + str(self.gpu_id))
def set_status(self, status):
self.status = status
def device_repr(self,): def device_repr(self,):
return f"cuda:{self.gpu_id}" return f"cuda:{self.gpu_id}"
@dataclass
class CPUDevice(Device):
def __init__(self, node_id):
self.node_id = node_id
self.device = 'cpu'
def __repr__(self) -> str:
return "{CPU Device, NodeID %s, Status %s}" % (self.node_id, self.status)
def device_repr(self):
return "cpu"
...@@ -2,7 +2,10 @@ ...@@ -2,7 +2,10 @@
# Licensed under the MIT license. # Licensed under the MIT license.
import logging import logging
from typing import List, Tuple, Any from typing import Dict, List, Tuple, Any
from nni.retiarii.operation_def.torch_op_def import ToDevice
from nni.common.device import Device, GPUDevice
from ..graph import IllegalGraphError, Edge, Graph, Node, Model from ..graph import IllegalGraphError, Edge, Graph, Node, Model
...@@ -70,7 +73,7 @@ def _format_inputs(node: Node) -> Tuple[List[str], List[Any]]: ...@@ -70,7 +73,7 @@ def _format_inputs(node: Node) -> Tuple[List[str], List[Any]]:
# when the input comes from a single-output operator # when the input comes from a single-output operator
inputs.append('{}'.format(edge.head.name)) inputs.append('{}'.format(edge.head.name))
if edge.head.operation.type in ('prim::Constant', 'prim::GetAttr') and \ if edge.head.operation.type in ('prim::Constant', 'prim::GetAttr') and \
'value' in edge.head.operation.parameters: 'value' in edge.head.operation.parameters:
inputs_value.append(edge.head.operation.parameters['value']) inputs_value.append(edge.head.operation.parameters['value'])
else: else:
inputs_value.append(None) inputs_value.append(None)
...@@ -98,6 +101,24 @@ def _remove_prefix(names, graph_name): ...@@ -98,6 +101,24 @@ def _remove_prefix(names, graph_name):
return names[len(graph_name):] if names.startswith(graph_name) else names return names[len(graph_name):] if names.startswith(graph_name) else names
def generate_cuda_mapping(placement: Dict[Node, Device]) -> Dict[Device, int]:
'''
Since CUDA_VISIBLE_DEVICES will be set to the list of real GPU ID,
we need to remap the GPU ID when generating code to match them correctly.
For example, when CUDA_VISIBLE_DEVICES="0,3", we need to use "cuda:0", "cuda:1" in the generated code.
'''
unique_devices = sorted(list(set([e for e in placement.values() if isinstance(e, GPUDevice)])))
node_gpu_cnt = {}
cuda_remapped_id = {}
for d in unique_devices:
if d.node_id not in node_gpu_cnt:
node_gpu_cnt[d.node_id] = 0
node_gpu_cnt[d.node_id] += 1
cuda_remapped_id[d] = node_gpu_cnt[d.node_id] - 1
return cuda_remapped_id
def graph_to_pytorch_model(graph_name: str, graph: Graph, placement=None) -> str: def graph_to_pytorch_model(graph_name: str, graph: Graph, placement=None) -> str:
nodes = graph.topo_sort() nodes = graph.topo_sort()
...@@ -105,8 +126,14 @@ def graph_to_pytorch_model(graph_name: str, graph: Graph, placement=None) -> str ...@@ -105,8 +126,14 @@ def graph_to_pytorch_model(graph_name: str, graph: Graph, placement=None) -> str
# only need to generate code for module here # only need to generate code for module here
import_pkgs = set() import_pkgs = set()
node_codes = [] node_codes = []
cuda_remapped_id = None
if placement:
cuda_remapped_id = generate_cuda_mapping(placement)
for node in nodes: for node in nodes:
if node.operation: if node.operation:
if placement and isinstance(node.operation, ToDevice):
node.operation.override_device_repr("cuda:%d" % cuda_remapped_id[node.operation.device])
if node.operation.type == 'shared': if node.operation.type == 'shared':
continue continue
pkg_name = node.operation.get_import_pkg() pkg_name = node.operation.get_import_pkg()
...@@ -115,7 +142,11 @@ def graph_to_pytorch_model(graph_name: str, graph: Graph, placement=None) -> str ...@@ -115,7 +142,11 @@ def graph_to_pytorch_model(graph_name: str, graph: Graph, placement=None) -> str
node_code = node.operation.to_init_code(_remove_prefix(node.name, graph_name)) node_code = node.operation.to_init_code(_remove_prefix(node.name, graph_name))
if node_code is not None: if node_code is not None:
if placement and node in placement and len(node_code) > 0: if placement and node in placement and len(node_code) > 0:
node_codes.append(f"{node_code}.to('{placement[node].device_repr()}')") if isinstance(placement[node], GPUDevice):
device_repr = "cuda:%d" % cuda_remapped_id[placement[node]]
else:
device_repr = placement[node].device_repr()
node_codes.append(f"{node_code}.to('{device_repr}')")
else: else:
node_codes.append(node_code) node_codes.append(node_code)
......
...@@ -9,7 +9,7 @@ import time ...@@ -9,7 +9,7 @@ import time
import threading import threading
from typing import Iterable, List, Dict, Tuple from typing import Iterable, List, Dict, Tuple
from nni.common.device import GPUDevice from nni.common.device import GPUDevice, Device
from .interface import AbstractExecutionEngine, AbstractGraphListener, WorkerInfo from .interface import AbstractExecutionEngine, AbstractGraphListener, WorkerInfo
from .. import codegen, utils from .. import codegen, utils
from ..graph import Model, ModelStatus, MetricData, Node from ..graph import Model, ModelStatus, MetricData, Node
...@@ -33,9 +33,8 @@ class CGOExecutionEngine(AbstractExecutionEngine): ...@@ -33,9 +33,8 @@ class CGOExecutionEngine(AbstractExecutionEngine):
Parameters Parameters
---------- ----------
devices : List[str] or List[GPUDevice] devices : List[Device]
Available devices for execution. Available devices for execution.
If a list of str is provided, it will build a list of GPUDevice in a server named ``single_server``
max_concurrency : int max_concurrency : int
The maximum number of trials to run concurrently. The maximum number of trials to run concurrently.
batch_waiting_time: int batch_waiting_time: int
...@@ -43,14 +42,14 @@ class CGOExecutionEngine(AbstractExecutionEngine): ...@@ -43,14 +42,14 @@ class CGOExecutionEngine(AbstractExecutionEngine):
The trials within one batch could apply cross-graph optimization. The trials within one batch could apply cross-graph optimization.
""" """
def __init__(self, devices: List[GPUDevice] = None, def __init__(self, devices: List[Device] = None,
max_concurrency: int = None, max_concurrency: int = None,
batch_waiting_time: int = 60, batch_waiting_time: int = 60,
) -> None: ) -> None:
self._listeners: List[AbstractGraphListener] = [] self._listeners: List[AbstractGraphListener] = []
self._running_models: Dict[int, Model] = dict() self._running_models: Dict[int, Model] = dict()
self.logical_plan_counter = 0 self.logical_plan_counter = 0
self.available_devices: List[GPUDevice] = [] self.available_devices: List[Device] = []
self.max_concurrency: int = max_concurrency self.max_concurrency: int = max_concurrency
for device in devices: for device in devices:
self.available_devices.append(device) self.available_devices.append(device)
...@@ -61,7 +60,7 @@ class CGOExecutionEngine(AbstractExecutionEngine): ...@@ -61,7 +60,7 @@ class CGOExecutionEngine(AbstractExecutionEngine):
self._original_models = {} self._original_models = {}
self._original_model_to_multi_model = {} self._original_model_to_multi_model = {}
self._trial_to_original_models = {} self._trial_to_original_models = {}
self._trial_used_devices: Dict[int, List[GPUDevice]] = {} self._trial_used_devices: Dict[int, List[Device]] = {}
self._history: List[Model] = [] self._history: List[Model] = []
...@@ -110,6 +109,15 @@ class CGOExecutionEngine(AbstractExecutionEngine): ...@@ -110,6 +109,15 @@ class CGOExecutionEngine(AbstractExecutionEngine):
self._queue_lock.release() self._queue_lock.release()
time.sleep(1) time.sleep(1)
def _extract_placement_constaint(self, placement_mapping: Dict[Node, Device]):
unique_gpus = sorted(list(set([ e for e in placement_mapping.values() if isinstance(e, GPUDevice)])))
placement_constraint = None
if len(unique_gpus) > 0:
placement_constraint = {}
placement_constraint['type'] = 'Device'
placement_constraint['gpus'] = [(e.node_id, e.gpu_id) for e in unique_gpus]
return placement_constraint
def _submit_models_in_batch(self, *models: List[Model]) -> None: def _submit_models_in_batch(self, *models: List[Model]) -> None:
_logger.info('%d models are submitted in batch', len(models)) _logger.info('%d models are submitted in batch', len(models))
logical = self._build_logical(models) logical = self._build_logical(models)
...@@ -120,9 +128,10 @@ class CGOExecutionEngine(AbstractExecutionEngine): ...@@ -120,9 +128,10 @@ class CGOExecutionEngine(AbstractExecutionEngine):
phy_models_and_placements = self._assemble(logical) phy_models_and_placements = self._assemble(logical)
for model, placement, grouped_models in phy_models_and_placements: for model, placement, grouped_models in phy_models_and_placements:
data = BaseGraphData(codegen.model_to_pytorch_script(model, placement=placement), model.evaluator) data = BaseGraphData(codegen.model_to_pytorch_script(model, placement=placement), model.evaluator)
trial_id = send_trial(data.dump()) placement_constraint = self._extract_placement_constaint(placement)
trial_id = send_trial(data.dump(), placement_constraint=placement_constraint)
# unique non-cpu devices used by the trial # unique non-cpu devices used by the trial
self._trial_used_devices[trial_id] = list([_ for _ in set(placement.values()) if isinstance(_, GPUDevice)]) self._trial_used_devices[trial_id] = list(set([_ for _ in placement.values() if isinstance(_, GPUDevice)]))
# currently, it is impossible for search strategy to submit models more than the number of available devices # currently, it is impossible for search strategy to submit models more than the number of available devices
for used_device in self._trial_used_devices[trial_id]: for used_device in self._trial_used_devices[trial_id]:
...@@ -139,14 +148,18 @@ class CGOExecutionEngine(AbstractExecutionEngine): ...@@ -139,14 +148,18 @@ class CGOExecutionEngine(AbstractExecutionEngine):
def list_models(self) -> Iterable[Model]: def list_models(self) -> Iterable[Model]:
return self._history return self._history
def _assemble(self, logical_plan: LogicalPlan) -> List[Tuple[Model, Dict[Node, GPUDevice], List[Model]]]: def _assemble(self, logical_plan: LogicalPlan) -> List[Tuple[Model, Dict[Node, Device], List[Model]]]:
"""
Return the assembled models as a list of tuple.
Each tuple contains the assembled model, the device placement of graph nodes, and the original models.
"""
# try to use the available_devices first so that it can be launched as early as possible # try to use the available_devices first so that it can be launched as early as possible
# if free devices are not enough to assemble all models in one trial, try all devices # if free devices are not enough to assemble all models in one trial, try all devices
if len(self.available_devices) > 0: if len(self.available_devices) > 0:
grouped_models: List[Dict[Model, GPUDevice]] = AssemblePolicy().group(logical_plan, self.available_devices) grouped_models: List[Dict[Model, Device]] = AssemblePolicy().group(logical_plan, self.available_devices)
if len(self.available_devices) == 0 or len(grouped_models) > 1: if len(self.available_devices) == 0 or len(grouped_models) > 1:
grouped_models: List[Dict[Model, GPUDevice]] = AssemblePolicy().group(logical_plan, self.all_devices) grouped_models: List[Dict[Model, Device]] = AssemblePolicy().group(logical_plan, self.all_devices)
phy_models_and_placements = [] phy_models_and_placements = []
for multi_model in grouped_models: for multi_model in grouped_models:
...@@ -256,17 +269,7 @@ class CGOExecutionEngine(AbstractExecutionEngine): ...@@ -256,17 +269,7 @@ class CGOExecutionEngine(AbstractExecutionEngine):
os.remove(file_name) os.remove(file_name)
def _remap_cuda_device(group_model: Dict[Model, GPUDevice]):
used_devices = {}
for m in group_model:
if group_model[m].node_id not in used_devices:
used_devices[group_model[m].node_id] = {}
if isinstance(group_model[m], GPUDevice):
if group_model[m].gpu_id not in used_devices[group_model[m].node_id]:
n_used_gpu_in_server = len(used_devices[group_model[m].node_id])
used_devices[group_model[m].node_id][group_model[m].gpu_id] = n_used_gpu_in_server
group_model[m].gpu_id = used_devices[group_model[m].node_id][group_model[m].gpu_id]
return group_model
class AssemblePolicy: class AssemblePolicy:
...@@ -282,7 +285,7 @@ class AssemblePolicy: ...@@ -282,7 +285,7 @@ class AssemblePolicy:
@staticmethod @staticmethod
def _check_graph_connectivity(model: Model, def _check_graph_connectivity(model: Model,
group_model: Dict[Model, GPUDevice], group_model: Dict[Model, Device],
logical_plan: LogicalPlan) -> bool: logical_plan: LogicalPlan) -> bool:
for edge in logical_plan.logical_graph.edges: for edge in logical_plan.logical_graph.edges:
if AssemblePolicy._is_related_node(model, edge.head) or \ if AssemblePolicy._is_related_node(model, edge.head) or \
...@@ -294,7 +297,7 @@ class AssemblePolicy: ...@@ -294,7 +297,7 @@ class AssemblePolicy:
return False return False
@staticmethod @staticmethod
def _check_evaluator(new_model: Model, group_model: Dict[Model, GPUDevice]) -> bool: def _check_evaluator(new_model: Model, group_model: Dict[Model, Device]) -> bool:
if not (isinstance(new_model.evaluator, Lightning) if not (isinstance(new_model.evaluator, Lightning)
and isinstance(new_model.evaluator.module, MultiModelSupervisedLearningModule)): and isinstance(new_model.evaluator.module, MultiModelSupervisedLearningModule)):
return False return False
...@@ -318,11 +321,11 @@ class AssemblePolicy: ...@@ -318,11 +321,11 @@ class AssemblePolicy:
if len(group_model) > 0 and \ if len(group_model) > 0 and \
(AssemblePolicy._check_graph_connectivity(m, group_model, logical_plan) == False or (AssemblePolicy._check_graph_connectivity(m, group_model, logical_plan) == False or
AssemblePolicy._check_evaluator(m, group_model) == False): AssemblePolicy._check_evaluator(m, group_model) == False):
all_grouped_models.append(_remap_cuda_device(group_model)) all_grouped_models.append(group_model)
group_model = {} group_model = {}
group_model[m] = available_devices[idx % len(available_devices)] group_model[m] = available_devices[idx % len(available_devices)]
if len(group_model) == len(available_devices) or \ if len(group_model) == len(available_devices) or \
idx == len(logical_plan.models) - 1: idx == len(logical_plan.models) - 1:
all_grouped_models.append(_remap_cuda_device(group_model)) all_grouped_models.append(group_model)
group_model = {} group_model = {}
return all_grouped_models return all_grouped_models
...@@ -2,30 +2,39 @@ ...@@ -2,30 +2,39 @@
# Licensed under the MIT license. # Licensed under the MIT license.
import copy import copy
from typing import Dict, Tuple, Any, Union from typing import Dict, Tuple, Any
from nni.retiarii.utils import uid from nni.retiarii.utils import uid
from nni.common.device import GPUDevice from nni.common.device import Device, CPUDevice
from ...graph import Cell, Edge, Graph, Model, Node from ...graph import Cell, Edge, Graph, Model, Node
from ...operation import Operation, _IOPseudoOperation from ...operation import Operation, _IOPseudoOperation
class CPUDevice:
def __init__(self, node_id):
self.node_id = node_id
self.device = 'cpu'
def device_repr(self):
return "cpu"
class AbstractLogicalNode(Node): class AbstractLogicalNode(Node):
def __init__(self, graph, node_id, name, operation, _internal=False): def __init__(self, graph, node_id, name, operation, _internal=False):
super().__init__(graph, node_id, name, operation, _internal=_internal) super().__init__(graph, node_id, name, operation, _internal=_internal)
self.related_models = [] self.related_models = []
def assemble(self, multi_model_placement: Dict[Model, GPUDevice]) -> Tuple[Node, GPUDevice]: def assemble(self, multi_model_placement: Dict[Model, Device]) -> Tuple[Node, Device]:
"""
Given a set of models to be formed in a physical model and their device placement,
this function replaces the logical node with an executable physical node for the physical model.
Parameters
----------
multi_model_placement : dict
a dict of models and device placement.
These models will be assembled into the same physical model to run.
Returns
-------
node : Node
the physical node to replace the logical node in the physical model
placement : Device
the device placement of the returned physical node
"""
raise NotImplementedError raise NotImplementedError
def _fork_to(self, graph: Graph): def _fork_to(self, graph: Graph):
...@@ -85,6 +94,11 @@ class LogicalGraph(Graph): ...@@ -85,6 +94,11 @@ class LogicalGraph(Graph):
class OriginNode(AbstractLogicalNode): class OriginNode(AbstractLogicalNode):
"""
This is logical node representing the original node without any modification.
In assemble, just return the original node along with the physical placement given by multi_model_placement.
"""
def __init__(self, logical_graph: LogicalGraph, def __init__(self, logical_graph: LogicalGraph,
original_graph: Graph, original_node: Node, original_graph: Graph, original_node: Node,
name: str, operation, _internal=False): name: str, operation, _internal=False):
...@@ -92,7 +106,7 @@ class OriginNode(AbstractLogicalNode): ...@@ -92,7 +106,7 @@ class OriginNode(AbstractLogicalNode):
self.original_graph = original_graph self.original_graph = original_graph
self.original_node = original_node self.original_node = original_node
def assemble(self, multi_model_placement: Dict[Model, GPUDevice]) -> Tuple[Node, GPUDevice]: def assemble(self, multi_model_placement: Dict[Model, Device]) -> Tuple[Node, Device]:
model_id = self.original_node.graph.model.model_id model_id = self.original_node.graph.model.model_id
new_node = Node(self.original_node.graph, self.original_node.id, new_node = Node(self.original_node.graph, self.original_node.id,
f"M_{model_id}_" + f"M_{model_id}_" +
...@@ -138,8 +152,27 @@ class LogicalPlan: ...@@ -138,8 +152,27 @@ class LogicalPlan:
new_tail = id_to_new_node[edge.tail.id] new_tail = id_to_new_node[edge.tail.id]
Edge((new_head, edge.head_slot), (new_tail, edge.tail_slot), _internal=True)._register() Edge((new_head, edge.head_slot), (new_tail, edge.tail_slot), _internal=True)._register()
def assemble(self, multi_model_placement: Dict[Model, GPUDevice]) \ def assemble(self, multi_model_placement: Dict[Model, Device]) \
-> Tuple[Model, Dict[Node, Union[GPUDevice, CPUDevice]]]: -> Tuple[Model, Dict[Node, Device]]:
"""
Given a set of models to be formed in a physical model and their device placement,
this function replaces all the logical node in this LogicalPlan with executable physical nodes
for the physical model.
Parameters
----------
multi_model_placement : dict
a dict of models and device placement.
These models will be assembled into the same physical model to run.
Returns
-------
phy_model : Model
the physical model formed by models in `multi_model_placement`
all logical node are replaced by physical nodes
node_placements : dict
the device placement of the nodes in `phy_model`
"""
phy_model = Model(_internal=True) phy_model = Model(_internal=True)
phy_graph = self.lp_model.root_graph._fork_to(phy_model) phy_graph = self.lp_model.root_graph._fork_to(phy_model)
phy_graph._rename_graph(phy_graph.name, "_model") phy_graph._rename_graph(phy_graph.name, "_model")
...@@ -222,9 +255,10 @@ class LogicalPlan: ...@@ -222,9 +255,10 @@ class LogicalPlan:
node.remove() node.remove()
# If two nodes are placed on different devices, use ToDevice op to copy the node # If two nodes are placed on different devices, use ToDevice op to copy the node
# TODO: when copying one node to multiple devices, broadcast is more efficient than P2P communication
existing_edges = phy_graph.edges.copy() existing_edges = phy_graph.edges.copy()
# Avoid a node is copied multiple times on the same device # Avoid a node is copied multiple times on the same device
copied_op: Dict[Tuple(Node, Union[GPUDevice, CPUDevice]), Node] = {} copied_op: Dict[Tuple(Node, Device), Node] = {}
for edge in existing_edges: for edge in existing_edges:
head_placement = node_placements[edge.head] head_placement = node_placements[edge.head]
tail_placement = node_placements[edge.tail] tail_placement = node_placements[edge.tail]
...@@ -238,11 +272,12 @@ class LogicalPlan: ...@@ -238,11 +272,12 @@ class LogicalPlan:
dst_name = edge.head.name + "_to_" + edge.tail.name dst_name = edge.head.name + "_to_" + edge.tail.name
to_operation = Operation.new( to_operation = Operation.new(
'ToDevice', { 'ToDevice', {
"device": tail_placement.device_repr(), "src": ( "device": tail_placement, "src": (
edge.head.name, edge.head_slot), "dst": dst_name}) edge.head.name, edge.head_slot), "dst": dst_name})
to_node = Node(phy_graph, uid(), dst_name, to_operation)._register() to_node = Node(phy_graph, uid(), dst_name, to_operation)._register()
Edge((edge.head, edge.head_slot), (to_node, None), _internal=True)._register() Edge((edge.head, edge.head_slot), (to_node, None), _internal=True)._register()
copied_op[(edge.head, tail_placement)] = to_node copied_op[(edge.head, tail_placement)] = to_node
node_placements[to_node] = head_placement
edge.head = to_node edge.head = to_node
edge.head_slot = None edge.head_slot = None
......
...@@ -17,6 +17,13 @@ _supported_evaluators = [MultiModelSupervisedLearningModule] ...@@ -17,6 +17,13 @@ _supported_evaluators = [MultiModelSupervisedLearningModule]
class DedupInputNode(AbstractLogicalNode): class DedupInputNode(AbstractLogicalNode):
"""
This is logical node representing the node for deduplication.
In assemble, just return one copy of the original node when multiple models are assembled.
These models will share the result of once calculation.
"""
def __init__(self, logical_graph: LogicalGraph, node_id: int, def __init__(self, logical_graph: LogicalGraph, node_id: int,
nodes_to_dedup: List[Node], _internal=False): nodes_to_dedup: List[Node], _internal=False):
super().__init__(logical_graph, node_id, super().__init__(logical_graph, node_id,
......
...@@ -79,8 +79,12 @@ class RetiariiAdvisor(MsgDispatcherBase): ...@@ -79,8 +79,12 @@ class RetiariiAdvisor(MsgDispatcherBase):
raise ValueError('placement_constraint.type must be either `None`,. `GPUNumber` or `Device`') raise ValueError('placement_constraint.type must be either `None`,. `GPUNumber` or `Device`')
if placement_constraint['type'] == 'None' and len(placement_constraint['gpus']) > 0: if placement_constraint['type'] == 'None' and len(placement_constraint['gpus']) > 0:
raise ValueError('placement_constraint.gpus must be an empty list when type == None') raise ValueError('placement_constraint.gpus must be an empty list when type == None')
if placement_constraint['type'] == 'Device' and len(placement_constraint['gpus']) != 1: if placement_constraint['type'] == 'GPUNumber':
raise ValueError('placement_constraint.gpus must be a list of number (currently only support one host)') if len(placement_constraint['gpus']) != 1:
raise ValueError('placement_constraint.gpus currently only support one host when type == GPUNumber')
for e in placement_constraint['gpus']:
if not isinstance(e, int):
raise ValueError('placement_constraint.gpus must be a list of number when type == GPUNumber')
if placement_constraint['type'] == 'Device': if placement_constraint['type'] == 'Device':
for e in placement_constraint['gpus']: for e in placement_constraint['gpus']:
if not isinstance(e, tuple): if not isinstance(e, tuple):
......
...@@ -69,7 +69,7 @@ class PrimConstant(PyTorchOperation): ...@@ -69,7 +69,7 @@ class PrimConstant(PyTorchOperation):
# TODO: deal with all the types # TODO: deal with all the types
if self.parameters['type'] in ['None', 'NoneType']: if self.parameters['type'] in ['None', 'NoneType']:
return f'{output} = None' return f'{output} = None'
elif self.parameters['type'] in ('int', 'float', 'bool', 'int[]'): # 'Long()' ??? elif self.parameters['type'] in ('int', 'float', 'bool', 'int[]'): # 'Long()' ???
return f'{output} = {self.parameters["value"]}' return f'{output} = {self.parameters["value"]}'
elif self.parameters['type'] == 'str': elif self.parameters['type'] == 'str':
str_val = self.parameters["value"] str_val = self.parameters["value"]
...@@ -494,17 +494,36 @@ class ToDevice(PyTorchOperation): ...@@ -494,17 +494,36 @@ class ToDevice(PyTorchOperation):
def __init__(self, type_name: str, parameters: Dict[str, Any], _internal: bool = False): def __init__(self, type_name: str, parameters: Dict[str, Any], _internal: bool = False):
self.type = "ToDevice" self.type = "ToDevice"
self.device = parameters['device'] self.device = parameters['device']
self.overridden_device_repr = None
self.src = parameters['src'] self.src = parameters['src']
self.dst = parameters['dst'] self.dst = parameters['dst']
def override_device_repr(self, device_repr):
# CUDA GPUDevice may remap GPU physical ID to CUDA ID. The device repr is different from GPUDevice.device_repr()
# override_device_repr will be called in pytorch.graph_to_pytorch_model to replace device_repr with the correct
# CUDA ID, e.g., when a job uses Physical GPU-1,2, its CUDA ID should be "cuda:0" and "cuda:1".
# self.device.device_repr() would return "cuda:1" and "cuda:2", but override_device_repr should be "cuda:0" and
# "cuda:1"
self.overridden_device_repr = device_repr
def __repr__(self): def __repr__(self):
return f'to("{self.device}")' if self.overridden_device_repr is None:
return f'to("{self.device.device_repr()}")'
else:
return f'to("{self.overridden_device_repr}")'
def to_forward_code(self, field: str, output: str, inputs: List[str], inputs_value: List[Any]) -> str: def to_forward_code(self, field: str, output: str, inputs: List[str], inputs_value: List[Any]) -> str:
return f'{output} = {inputs[0]}.to("{self.device}")' if self.overridden_device_repr is None:
forward_code = f'{output} = {inputs[0]}.to("{self.device.device_repr()}")'
else:
forward_code = f'{output} = {inputs[0]}.to("{self.overridden_device_repr}")'
return forward_code
class AtenDet(PyTorchOperation): class AtenDet(PyTorchOperation):
# for torch 1.9 # for torch 1.9
# NOTE: it is not included in the above aten ops, maybe because torch.det is alias for torch.linalg.det # NOTE: it is not included in the above aten ops, maybe because torch.det is alias for torch.linalg.det
_ori_type_name = ['aten::linalg_det'] _ori_type_name = ['aten::linalg_det']
def to_forward_code(self, field: str, output: str, inputs: List[str], inputs_value: List[Any] = None) -> str: def to_forward_code(self, field: str, output: str, inputs: List[str], inputs_value: List[Any] = None) -> str:
return f'{output} = torch.det({inputs[0]})' return f'{output} = torch.det({inputs[0]})'
...@@ -115,8 +115,8 @@ export class GpuScheduler { ...@@ -115,8 +115,8 @@ export class GpuScheduler {
const gpus = constraint.gpus as Array<[string, number]>; const gpus = constraint.gpus as Array<[string, number]>;
const selectedHost = gpus[0][0]; const selectedHost = gpus[0][0];
const hostsOfConstraint: Array<[string, number]> = gpus.filter((gpuTuple: [string, number]) => gpuTuple[0] === selectedHost); const differentHosts: Array<[string, number]> = gpus.filter((gpuTuple: [string, number]) => gpuTuple[0] != selectedHost);
if (hostsOfConstraint.length > 1) { if (differentHosts.length >= 1) {
//TODO: remove this constraint when supporting multi-host placement //TODO: remove this constraint when supporting multi-host placement
throw new Error("Device constraint does not support using multiple hosts") throw new Error("Device constraint does not support using multiple hosts")
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment