optimizer.py 5.78 KB
Newer Older
liangjing's avatar
v1  
liangjing committed
1
2
# Copyright (c) 2022-2023, NVIDIA CORPORATION.  All rights reserved.

xingjinliang's avatar
xingjinliang committed
3
4
5
""" Helpers for defining sharding for optimizer states based on existing sharding
for model parameters.
"""
liangjing's avatar
v1  
liangjing committed
6
7
8
9

import logging
from copy import deepcopy
from dataclasses import replace
xingjinliang's avatar
xingjinliang committed
10
from typing import Dict, Iterable, Tuple, Union
liangjing's avatar
v1  
liangjing committed
11
12
13
14
15

logger = logging.getLogger(__name__)

import torch

xingjinliang's avatar
xingjinliang committed
16
17
from megatron.core.utils import to_local_if_dtensor

liangjing's avatar
v1  
liangjing committed
18
19
from .dict_utils import nested_values
from .mapping import (
xingjinliang's avatar
xingjinliang committed
20
    LocalNonpersistentObject,
liangjing's avatar
v1  
liangjing committed
21
22
23
24
25
    ShardedStateDict,
    ShardedTensor,
    ShardedTensorFactory,
    StateDict,
)
xingjinliang's avatar
xingjinliang committed
26
from .utils import extract_sharded_tensors_and_factories
liangjing's avatar
v1  
liangjing committed
27
28
29


def get_optim_param_to_id_map(optim_params_iter: Iterable[torch.nn.Parameter]) -> Dict[int, int]:
xingjinliang's avatar
xingjinliang committed
30
    """Generate mapping from optimizer param to optimizer state id."""
liangjing's avatar
v1  
liangjing committed
31
32
    param_mappings = {}
    for i, param in enumerate(optim_params_iter):
xingjinliang's avatar
xingjinliang committed
33
        param = to_local_if_dtensor(param)
liangjing's avatar
v1  
liangjing committed
34
35
36
37
38
39
40
41
        if id(param) not in param_mappings:
            param_mappings[id(param)] = i
    return param_mappings


def get_param_id_to_sharded_param_map(
    model_sharded_state_dict: ShardedStateDict, optim_params_iter: Iterable[torch.nn.Parameter]
) -> Dict[int, Union[ShardedTensor, ShardedTensorFactory]]:
xingjinliang's avatar
xingjinliang committed
42
43
44
45
46
47
48
49
50
51
52
53
    """Generate mapping from optimizer state ids to model sharded parameters.

    Args:
        model_sharded_state_dict: sharded state dict with all model sharded tensors
            (can have any structure)
        optim_params_iter: iterable which iterates over model parameters tracked by the optimizer.
            The iteration must be in the same order as in the optimizer parameters.

    Returns:
        Dict[int, Union[ShardedTensor, ShardedTensorFactory]]: mapping from optimizer state ids
            to model sharded parameters.
    """
liangjing's avatar
v1  
liangjing committed
54
55
56
    model_sharded_state_dict, _ = extract_sharded_tensors_and_factories(model_sharded_state_dict)
    id_to_sharded_param_map = {}
    param_to_id_map = get_optim_param_to_id_map(optim_params_iter)
xingjinliang's avatar
xingjinliang committed
57
58
59
    # If using PyTorch FSDP2 the values in model_sharded_state_dict would
    # have been converted to local tensors during initialization.
    # See the make_(tp)_sharded_tensor_for_checkpoint functions.
liangjing's avatar
v1  
liangjing committed
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
    for ten in nested_values(model_sharded_state_dict):
        if id(ten.data) in param_to_id_map:
            id_to_sharded_param_map[param_to_id_map[id(ten.data)]] = ten
        else:
            logger.debug(f'{ten} is not tracked by the optimizer')

    if not id_to_sharded_param_map:
        logger.warning(
            "Sharded parameters mapping is empty. It means tensors in model state dict"
            " do not correspond to tensors in optimizer parameters map."
            " Make sure to call state_dict with `keep_vars=True`."
        )
    return id_to_sharded_param_map


def make_sharded_optimizer_tensor(
    model_param: Union[ShardedTensor, ShardedTensorFactory], optim_param: torch.Tensor, prefix: str
) -> Union[ShardedTensor, ShardedTensorFactory]:
xingjinliang's avatar
xingjinliang committed
78
79
80
81
82
83
84
85
86
87
88
    """Build a ShardedTensor or ShardedTensorFactory for optimizer param based on model param

    Args:
        model_param (Union[ShardedTensor, ShardedTensorFactory]): model param
        optim_param (torch.Tensor): corresponding optimizer param
        prefix (str): optimizer prefix for the ShardedTensor or ShardedTensorFactory

    Returns:
        Union[ShardedTensor, ShardedTensorFactory]: wrapped optimizer parameter
    """
    optim_param = to_local_if_dtensor(optim_param)
liangjing's avatar
v1  
liangjing committed
89
90
91
    if isinstance(model_param, ShardedTensorFactory):
        return replace(model_param, key=f'{prefix}.{model_param.key}', data=optim_param)

xingjinliang's avatar
xingjinliang committed
92
93
94
95
96
    assert tuple(optim_param.shape) == model_param.local_shape, (
        f'Optimizer shape ({tuple(optim_param.shape)} does not match model shape '
        f'({model_param.local_shape})'
    )
    sh_ten = replace(
liangjing's avatar
v1  
liangjing committed
97
98
        model_param, key=f'{prefix}.{model_param.key}', data=optim_param, dtype=optim_param.dtype
    )
xingjinliang's avatar
xingjinliang committed
99
100
    sh_ten.validate_metadata_integrity()
    return sh_ten
liangjing's avatar
v1  
liangjing committed
101
102
103


def optim_state_to_sharding_state(
xingjinliang's avatar
xingjinliang committed
104
105
106
    optim_state_dict: StateDict,
    id_to_sharded_param_map: Dict[int, ShardedTensor],
    exclude_keys: Tuple[str] = (),
liangjing's avatar
v1  
liangjing committed
107
):
xingjinliang's avatar
xingjinliang committed
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
    """Turn optimizer state dict to sharded state dict based on model state dict *in-place*.

    Can be used to add sharding information to most common optimizer state dict.
    Creates separate ShardedTensors for each key in `optim_state_dict['state']`
    (e.g. for torch.optim.Adam there will be separate tensors for `exp_avg` and `exp_avg_sq`)

    Args:
        optim_state_dict (StateDict): optimizer state dict with
            state parameters under `state` key and group hyperparameters under
            `param_groups` -> `params` key.
        id_to_sharded_param_map (Dict[int, ShardedTensor]): mapping from optimizer param ids
            to model sharded tensors. Can be generated with `get_param_id_to_sharded_param_map`
            function.
        exclude_keys (Tuple[str]): optimizer state keys to exclude from the final state dict.

    Returns:
        None: state dict is modified in place
    """
liangjing's avatar
v1  
liangjing committed
126
127
128
129
    sharded_state = {}
    for param_id, param_state in optim_state_dict['state'].items():
        sharded_state[param_id] = {}
        for state_key, param in param_state.items():
xingjinliang's avatar
xingjinliang committed
130
131
            if state_key in exclude_keys:
                continue
liangjing's avatar
v1  
liangjing committed
132
133
134
135
136
137
138
139
140
            if param_id in id_to_sharded_param_map:
                sharded_state[param_id][state_key] = make_sharded_optimizer_tensor(
                    id_to_sharded_param_map[param_id], param, prefix=f'optimizer.state.{state_key}'
                )
            else:
                raise ValueError(f'Param id {param_id} does not match any model sharded param')

    optim_state_dict['param_groups'] = deepcopy(optim_state_dict['param_groups'])
    for group in optim_state_dict['param_groups']:
xingjinliang's avatar
xingjinliang committed
141
        group['params'] = LocalNonpersistentObject(group['params'])
liangjing's avatar
v1  
liangjing committed
142
    optim_state_dict['state'] = sharded_state