graph.py 59.2 KB
Newer Older
1
# Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
5
#
# See LICENSE for license information.

"""Functions for CUDA Graphs support in FP8"""
6
from collections.abc import Iterable
7
8
import contextlib
import gc
9
import warnings
10
from math import ceil
11
12
from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar, Union

13
14
15
16
17
import torch
from torch.utils._pytree import tree_flatten as _tree_flatten
from torch.utils._pytree import tree_unflatten as _tree_unflatten
from torch._C import _graph_pool_handle

18
from transformer_engine.common.recipe import DelayedScaling, Recipe
19
from transformer_engine.pytorch.constants import dist_group_type
20
21
from .quantization import (
    autocast,
22
23
24
25
26
    FP8GlobalStateManager,
    get_default_fp8_recipe,
)
from .distributed import get_all_rng_states, graph_safe_rng_available
from .module.base import TransformerEngineBaseModule
27
from .ops.op import BasicOperation
Jan Bielak's avatar
Jan Bielak committed
28
29
from .ops import Sequential
from .ops.fuser import OperationFuser
30
from .utils import make_weak_ref
31
32
33
34
35
36

__all__ = ["make_graphed_callables"]


_IS_GRAPH_CAPTURING = False

37
38
39
_T = TypeVar("_T")
SingleOrTuple = Union[_T, Tuple[_T, ...]]

40
41
42
43
44
45
46
47
48
49
50
51
52

def set_capture_start() -> None:
    """Record beginning of `make_graphed_callables`."""
    global _IS_GRAPH_CAPTURING
    _IS_GRAPH_CAPTURING = True


def set_capture_end() -> None:
    """Record end of `make_graphed_callables`."""
    global _IS_GRAPH_CAPTURING
    _IS_GRAPH_CAPTURING = False


Jan Bielak's avatar
Jan Bielak committed
53
def is_graph_capturing() -> bool:
54
55
56
57
58
59
60
61
62
63
64
    """Return whether within `make_graphed_callables`."""
    return _IS_GRAPH_CAPTURING


def graph_pool_handle():
    """
    Returns an opaque token representing the id of a graph memory pool.
    """
    return _graph_pool_handle()


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@contextlib.contextmanager
def _none_grad_context_wrapper(inputs):
    """
    Wrapper to set the gradients of the inputs to None,
    in case the backward pass makes grad accumulations.
    """
    original_input_grads = []
    for input_tensor in inputs:
        original_input_grads.append(input_tensor.grad)
        input_tensor.grad = None
    yield
    for input_tensor, original_grad in zip(inputs, original_input_grads):
        input_tensor.grad = original_grad


80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@contextlib.contextmanager
def _graph_context_wrapper(*args, **kwargs):
    """Wrapper around `torch.cuda.graph`.

    This wrapper is a temporary workaround for a PyTorch bug:
    automatic garbage collection can destroy a graph while another
    graph is being captured, resulting in a CUDA error. See
    https://github.com/pytorch/pytorch/pull/161037.

    """
    gc_is_enabled = gc.isenabled()
    if gc_is_enabled:
        gc.disable()
    with torch.cuda.graph(*args, **kwargs):
        yield
    if gc_is_enabled:
        gc.enable()


99
def _make_graphed_callables(
100
101
102
103
    callables: SingleOrTuple[Callable],
    sample_args: SingleOrTuple[Tuple[torch.Tensor, ...]],
    num_warmup_iters: int = 3,
    allow_unused_input: bool = False,
104
    cache_quantized_params: bool = False,
105
106
    sample_kwargs: Optional[SingleOrTuple[Dict[str, Any]]] = None,
    _order: Optional[List[int]] = None,
107
    _num_layers_per_chunk: Optional[List[int]] = None,
108
    pool: Optional[Tuple[int, ...]] = None,
109
    retain_graph_in_backward: bool = False,
110
    _reuse_graph_input_output_buffers: bool = False,
111
) -> SingleOrTuple[Callable]:
112
113
114
115
116
117
118
119
120
121
    """
    Helper method for `make_graphed_callables`
    """

    if torch.is_autocast_enabled() and torch.is_autocast_cache_enabled():
        raise RuntimeError(
            "make_graphed_callables does not support the autocast "
            "caching. Please set `cache_enabled=False`."
        )

122
123
124
125
126
127
    # Default is to pass no kwargs to callables
    if sample_kwargs is None:
        if isinstance(callables, tuple):
            sample_kwargs = tuple({} for _ in range(len(sample_args)))
        else:
            sample_kwargs = {}
128

129
130
    # Canonicalize args as tuples
    just_one_callable = False
131
132
133
134
    if not isinstance(callables, tuple):
        just_one_callable = True
        callables = (callables,)
        sample_args = (sample_args,)
135
        sample_kwargs = (sample_kwargs,)
136

137
138
139
140
141
142
143
144
    # Check training/inference
    is_training = all(c.training for c in callables)
    if not is_training and any(c.training for c in callables):
        assert False, (
            "make_graphed_callables only supports when modules are all in training or all in"
            " inference mode."
        )

145
    # Check sizes of args
146
147
    _order_without_wgrad = None
    delay_wgrad_compute = False
148
149
150
151
152
153
154
155
156
157
    if _order is None:
        assert len(sample_args) == len(callables)
        assert len(sample_kwargs) == len(callables)
    else:
        # Custom logic for interleaved pipeline parallelism
        # Note: This is tightly coupled with the Megatron-core
        # implementation of interleaved pipeline parallelism at
        # https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/core/pipeline_parallel/schedules.py.
        # Note: The model is assumed to consist of layers
        # (corresponding to callables) that are grouped into
158
159
160
161
162
163
        # model chunks. _num_layers_per_chunk is a list of integers
        # that indicates the number of layers in each model chunk.
        # _order is a list of chunk indices (1-indexed) that
        # indicates the order in which the layers are evaluated.
        # Positive values indicate forward passes and negative
        # values indicate backward passes. Each
164
165
        # entry in sample_args corresponds to one of the forward
        # passes.
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
        _order_without_wgrad = []
        for c_id in _order:
            if ceil(c_id) != c_id:
                delay_wgrad_compute = True
                continue
            _order_without_wgrad.append(c_id)
        num_model_chunks = max(_order_without_wgrad)
        num_microbatches = len(_order_without_wgrad) // num_model_chunks // 2
        assert num_model_chunks * num_microbatches * 2 == len(_order_without_wgrad)

        # When delay_wgrad_compute is enabled, each layer is treated as a model chunk, which
        # allows for fine-grained graph capture order.
        if delay_wgrad_compute:
            assert (
                _num_layers_per_chunk is not None
            ), "'_num_layers_per_chunk' must be provided when delay_wgrad_compute is True."
            for num_layers in _num_layers_per_chunk:
                assert (
                    num_layers == 1
                ), "Each model chunk must have only one layer when delay_wgrad_compute is True."
186
187
188

        # Determine number of layers in each model chunk.
        if _num_layers_per_chunk is None:
189
190
            assert len(sample_args) * 2 >= len(_order_without_wgrad) and (
                len(sample_args) * 2 % len(_order_without_wgrad) == 0
191
            ), (
192
193
                f"{len(sample_args)} * 2 >= {len(_order_without_wgrad)} and {len(sample_args)} * 2"
                f" % {len(_order_without_wgrad)} == 0"
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
            )
            num_layers = len(sample_args) // num_model_chunks // num_microbatches
            _num_layers_per_chunk = [num_layers] * num_model_chunks
        else:
            assert (
                isinstance(_num_layers_per_chunk, int)
                or len(_num_layers_per_chunk) == num_model_chunks
            ), (
                "If _num_layers_per_chunk is provided, it must be an integer or a list of"
                f" {num_model_chunks} integers, but got {_num_layers_per_chunk}."
            )
            if isinstance(_num_layers_per_chunk, int):
                _num_layers_per_chunk = [_num_layers_per_chunk] * num_model_chunks
        total_num_layers = sum(_num_layers_per_chunk)
        assert len(callables) == total_num_layers, (
            f"Callables should have ({total_num_layers}) "
210
211
            + f"entries when order input is provided but got {len(callables)}."
        )
212
        assert len(sample_args) == total_num_layers * num_microbatches, (
213
            f"Expected {total_num_layers * num_microbatches} "
214
215
            + f"args tuple, but got {len(sample_args)}."
        )
216
217
218
219
220
221
222

        # Calculate the starting index of each chunk in callables for future use.
        _prefix_num_layers = [0]
        for m_chunk in range(num_model_chunks):
            num_layers = _num_layers_per_chunk[m_chunk]
            _prefix_num_layers.append(_prefix_num_layers[-1] + num_layers)

223
        assert len(sample_kwargs) == len(sample_args)
224

225
226
227
228
229
230
231
232
233
234
235
    # Check reuse graph conditions and reorganize sample_args and sample_kwargs.
    # Note: When capturing a graph, we hold onto the args and kwargs so we have static buffers
    # when the graph is replayed. If two model chunk microbatches have no overlap between their
    # forward and backward, then we can reduce memory usage by reusing the same static buffers.
    if _reuse_graph_input_output_buffers:
        assert (
            _order is not None
        ), "`_order` must be provided when `_reuse_graph_input_output_buffers` is True."
        assert (
            is_training
        ), "`_reuse_graph_input_output_buffers` is only available in training mode."
236
237
238
239
        if isinstance(sample_args, tuple):
            sample_args = list(sample_args)
        if isinstance(sample_kwargs, tuple):
            sample_kwargs = list(sample_kwargs)
240
241

        # Reorganize args and kwargs for input tensor reuse.
242
243
244
245
246
247
248
        # fwd_sample_qs is keyed by model chunk index. The value is a queue of tuples.
        # Each tuple contains the sample key signature and its fwd_idx. When we finish a backward
        # chunk, we pop the corresponding fwd_idx and push to the consumed_sample_q.
        # consumed_sample_q is keyed by the sample key signature. The value is a queue of the
        # fwd_idx whose backward has been called so that we can reuse the same static buffers.
        # In this way, we can reuse the same static input buffers for the non-overlapping samples
        # with the same input signature.
249
        fwd_sample_qs = {}
250
        consumed_sample_q = {}
251
252
        fwd_idx = [0] * num_model_chunks
        for c_id in _order:
253
            m_chunk = abs(ceil(c_id)) - 1
254
255
256
257
258
259
260
261

            if c_id > 0:
                sample_start_idx = (_prefix_num_layers[m_chunk] * num_microbatches) + (
                    fwd_idx[m_chunk] * _num_layers_per_chunk[m_chunk]
                )
                fwd_sample_idx = [
                    sample_start_idx + i for i in range(_num_layers_per_chunk[m_chunk])
                ]
262
263
                if m_chunk not in fwd_sample_qs:
                    fwd_sample_qs[m_chunk] = []
264
                for per_callable_fwd_idx in fwd_sample_idx:
265
266
267
268
269
270
271
272
273
274
275
276
                    sample_args_keys = tuple(
                        (t.shape, t.dtype, t.layout) for t in sample_args[per_callable_fwd_idx]
                    )
                    sample_kwargs_keys = tuple(
                        (k, v.shape, v.dtype, v.layout)
                        for k, v in sorted(sample_kwargs[per_callable_fwd_idx].items())
                    )
                    sample_keys = sample_args_keys + sample_kwargs_keys

                    fwd_sample_qs[m_chunk].append((sample_keys, per_callable_fwd_idx))
                    if consumed_sample_q.get(sample_keys, []):
                        reuse_fwd_idx = consumed_sample_q[sample_keys].pop(0)
277
278
279
                        sample_args[per_callable_fwd_idx] = sample_args[reuse_fwd_idx]
                        sample_kwargs[per_callable_fwd_idx] = sample_kwargs[reuse_fwd_idx]
                fwd_idx[m_chunk] += 1
280
281
            elif ceil(c_id) != c_id:
                continue
282
283
284
285
            else:
                num_consumed_samples = min(
                    len(fwd_sample_qs[m_chunk]), _num_layers_per_chunk[m_chunk]
                )
286
287
288
289
290
291
                for sample_keys, per_callable_fwd_idx in fwd_sample_qs[m_chunk][
                    :num_consumed_samples
                ]:
                    if sample_keys not in consumed_sample_q:
                        consumed_sample_q[sample_keys] = []
                    consumed_sample_q[sample_keys].append(per_callable_fwd_idx)
292
293
                fwd_sample_qs[m_chunk] = fwd_sample_qs[m_chunk][num_consumed_samples:]

294
    if cache_quantized_params:
295
        # Initialize flag that controls FP8 weight updates
296
297
        FP8GlobalStateManager.set_skip_fp8_weight_update_tensor(False)

298
    # Check callables
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
    for c in callables:
        if isinstance(c, torch.nn.Module):
            assert (
                len(c._backward_hooks) == 0
                and len(c._forward_hooks) == 0
                and len(c._forward_pre_hooks) == 0
            ), (
                "Modules must not have hooks registered at the time they are passed. "
                + "However, registering hooks on modules after passing them "
                + "through make_graphed_callables is allowed."
            )
            assert all(b.requires_grad is False for b in c.buffers()), (
                "In any :class:`~torch.nn.Module` passed to "
                + ":func:`~make_graphed_callables`, only parameters may be trainable. "
                + "All buffers must have ``requires_grad=False``."
            )
315
316
317
318
319

    # Flatten callable arguments
    per_callable_kwargs_keys = [list(kwargs.keys()) for kwargs in sample_kwargs]
    flatten_sample_args = []
    for args, kwargs, kwargs_keys in zip(sample_args, sample_kwargs, per_callable_kwargs_keys):
320
        flatten_arg, _ = _tree_flatten(args)
321
322
        flatten_kwarg, _ = _tree_flatten([kwargs[key] for key in kwargs_keys])
        flatten_sample_args.append(tuple(flatten_arg + flatten_kwarg))
323
324
325
326
327
328
329
        assert all(isinstance(arg, torch.Tensor) for arg in flatten_arg), (
            "In the beta API, sample_args "
            + "for each callable must contain only Tensors. Other types are not allowed."
        )

    # If a callable is an nn.Module, its graph's full input surface is the args the user explicitly
    # passes to forward (ie, its sample_args) AND the module's parameter attributes.
330
331
332
333
    # Note: These per_callable_* variables are not actually
    # per-callable, but per-forward-pass (see description of _order).
    # The names are kept for consistency with
    # torch.cuda.make_graphed_callables.
334
335
336
    per_callable_len_user_args = [len(args) for args in flatten_sample_args]
    if _order is None:
        per_callable_module_params = [
337
            tuple(c.parameters()) if isinstance(c, torch.nn.Module) else () for c in callables
338
339
        ]
        per_callable_static_input_surfaces = [
340
            flatten_sample_args[i] + per_callable_module_params[i] for i in range(len(callables))
341
342
343
        ]
    else:
        per_callable_module_params = []
344
345
        for m_chunk in range(num_model_chunks):
            for _ in range(num_microbatches):
346
                for l_no in range(_num_layers_per_chunk[m_chunk]):
347
                    per_callable_module_params.append(
348
349
350
351
352
                        tuple(callables[_prefix_num_layers[m_chunk] + l_no].parameters())
                        if isinstance(
                            callables[_prefix_num_layers[m_chunk] + l_no],
                            torch.nn.Module,
                        )
353
354
                        else ()
                    )
355
356
357
358
359
360
361
362
        assert len(per_callable_module_params) == len(flatten_sample_args)
        per_callable_static_input_surfaces = [
            flatten_sample_args[i] + per_callable_module_params[i]
            for i in range(len(flatten_sample_args))
        ]

    fwd_graphs = [torch.cuda.CUDAGraph() for _ in range(len(flatten_sample_args))]
    bwd_graphs = [torch.cuda.CUDAGraph() for _ in range(len(flatten_sample_args))]
363
    bwd_dw_graphs = [torch.cuda.CUDAGraph() for _ in range(len(flatten_sample_args))]
364
    graph_callables = [None for _ in range(len(flatten_sample_args))]
365

366
367
368
    # For cases with multiple active RNG states, e.g. TP.
    if graph_safe_rng_available():
        for _, state in get_all_rng_states().items():
369
            for fwd_graph, bwd_graph, bwd_dw_graph in zip(fwd_graphs, bwd_graphs, bwd_dw_graphs):
370
371
                fwd_graph.register_generator_state(state)
                bwd_graph.register_generator_state(state)
372
                bwd_dw_graph.register_generator_state(state)
373

374
    mempool = graph_pool_handle() if pool is None else pool
375
376
377
378
379

    # Warmup
    # Hopefully prevents cudnn benchmarking and other lazy-initialization cuda work
    # from ending up in any captures.
    torch.cuda.synchronize()
380
381
382
383
384

    # Get warmup func and func_idx.
    warmup_func_idx = []
    warmup_func = []
    if _order is None:
385
        for func_idx, func in enumerate(callables):
386
387
388
389
390
391
392
            warmup_func_idx.append(func_idx)
            warmup_func.append(func)
    else:
        fwd_idx = [0] * num_model_chunks
        for c_id in _order:
            if c_id > 0:
                m_chunk = c_id - 1
393
394
395
396
                for l_no in range(_num_layers_per_chunk[m_chunk]):
                    func = callables[_prefix_num_layers[m_chunk] + l_no]
                    func_idx = (_prefix_num_layers[m_chunk] * num_microbatches) + (
                        fwd_idx[m_chunk] * _num_layers_per_chunk[m_chunk] + l_no
397
398
399
400
401
402
403
404
405
406
407
408
                    )
                    warmup_func_idx.append(func_idx)
                    warmup_func.append(func)
                fwd_idx[m_chunk] += 1
    assert len(warmup_func) == len(
        sample_args
    ), f"Warmup runs {len(warmup_func)} don't match args {len(sample_args)}."
    assert len(warmup_func_idx) == len(
        set(warmup_func_idx)
    ), f"Warmup runs {len(warmup_func)} but only {len(set(warmup_func_idx))} are unique."

    # Filter the TE modules that cudagraph can access.
409
410
    visited_te_modules = {}
    need_bwd_dw_graph = {}
411
412
413
414

    # Run warmup and do the above filtering.
    with torch.cuda.stream(torch.cuda.Stream()):
        for func_idx, func in zip(warmup_func_idx, warmup_func):
415
416
417
            args = sample_args[func_idx]
            kwargs = sample_kwargs[func_idx]
            static_input_surface = per_callable_static_input_surfaces[func_idx]
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442

            def hook_fn(
                module, inputs, outputs, func_idx=func_idx
            ):  # pylint: disable=unused-argument
                modules = set()
                if isinstance(module, TransformerEngineBaseModule):
                    modules.add(module)
                # If forward is called on a BasicOperation directly the hook will run
                elif isinstance(module, BasicOperation):
                    modules.add(module)
                # If forward is called on a te.ops.Sequential it is not called on its constituent ops
                elif isinstance(module, Sequential):
                    assert (
                        module._module_groups is not None
                    ), "Should have been initialized by warmup"
                    for module_group in module._module_groups:
                        if isinstance(module_group, OperationFuser):
                            for basic_op in module_group._basic_ops:
                                modules.add(basic_op)
                if modules:
                    if func_idx not in visited_te_modules:
                        visited_te_modules[func_idx] = modules
                    else:
                        visited_te_modules[func_idx].update(modules)

443
            for warmup_iter in range(num_warmup_iters):
444
445
446
447
                hooks = []
                for module in func.modules():
                    hook = module.register_forward_hook(hook_fn)
                    hooks.append(hook)
448
                outputs, _ = _tree_flatten(func(*args, **kwargs))
449
450
                for hook in hooks:
                    hook.remove()
451
                if is_training:
452
453
                    inputs = tuple(i for i in static_input_surface if i.requires_grad)
                    with _none_grad_context_wrapper(inputs):
454
455
456
                        outputs_requiring_grad = tuple(
                            o for o in outputs if o is not None and o.requires_grad
                        )
457
                        torch.autograd.backward(
458
459
                            outputs_requiring_grad,
                            grad_tensors=tuple(torch.empty_like(o) for o in outputs_requiring_grad),
460
461
                        )
                        grad_inputs = tuple(input.grad for input in inputs)
462
463
464
465
466
467
468
469
470
471
472
473
474
475

                    # Filter module params that get None grad from grad_inputs and remove them
                    # from static_input_surface. This is to ensure that the backward hooks
                    # registered to these params are not wrongly triggered.
                    num_required_grad_sample_args = sum(
                        arg.requires_grad for arg in flatten_sample_args[func_idx]
                    )
                    required_grad_input_idx = []
                    for i, arg in enumerate(static_input_surface):
                        if arg.requires_grad:
                            required_grad_input_idx.append(i)
                    module_params_with_grad = []
                    for grad_inputs_idx, inputs_idx in enumerate(required_grad_input_idx):
                        if (
476
477
478
479
480
481
482
483
                            grad_inputs[grad_inputs_idx] is None
                            and grad_inputs_idx < num_required_grad_sample_args
                        ):
                            assert allow_unused_input, (
                                "The input tensor requires grad, but the grad is None after"
                                " backward pass."
                            )
                        elif (
484
485
486
487
488
489
490
491
492
493
494
495
496
497
                            grad_inputs[grad_inputs_idx] is not None
                            and grad_inputs_idx >= num_required_grad_sample_args
                        ):
                            module_params_with_grad.append(static_input_surface[inputs_idx])
                    if len(module_params_with_grad) != len(per_callable_module_params[func_idx]):
                        assert warmup_iter == 0, (
                            "no-grad params should only be used as inputs in the first warmup"
                            " iteration"
                        )
                        per_callable_module_params[func_idx] = tuple(module_params_with_grad)
                        static_input_surface = flatten_sample_args[func_idx] + tuple(
                            module_params_with_grad
                        )
                        per_callable_static_input_surfaces[func_idx] = static_input_surface
498
499
500
501
502
503
504
505
506

                    # Run wgrad. This is essential for some TE modules when they have
                    # delay_wgrad_compute enabled.
                    need_backward_dw = False
                    for module in visited_te_modules.get(func_idx, set()):
                        if hasattr(module, "need_backward_dw") and module.need_backward_dw():
                            need_backward_dw = True
                            module.backward_dw()
                    need_bwd_dw_graph[func_idx] = need_backward_dw
507
508
                else:
                    grad_inputs = None
509
                del outputs, grad_inputs
510
511
512
513
514
            # The following code is added specifically for MCore's special requirements,
            # aimed at preventing warmup from altering the control flow.
            for module in func.modules():
                if hasattr(module, "is_first_microbatch"):
                    module.is_first_microbatch = True
515
516
517
518
519
520
    torch.cuda.synchronize()

    # All captures here share a mempool. To avoid replays corrupting each other's memory,
    # the safest approach is to capture all passes in the same order they'll run:
    # fwd 1, fwd 2, ... fwd N, then bwd N, bwd N-1, ... bwd 1.

521
    if _order is not None:  # pylint: disable=too-many-nested-blocks
522
523
524
525
526
527
        per_callable_static_outputs = [None] * len(flatten_sample_args)
        per_callable_output_unflatten_spec = [None] * len(flatten_sample_args)
        per_callable_static_grad_outputs = [None] * len(flatten_sample_args)
        per_callable_static_grad_inputs = [None] * len(flatten_sample_args)
        fwd_idx = [0] * num_model_chunks
        bwd_idx = [0] * num_model_chunks
528
        static_grad_outputs_dict = {}
529
        wgrad_validation_list = [None] * len(_order)
530
        previous_chunk_last_callable_bwd_idx = None
531
        for i, c_id in enumerate(_order):
532
            if c_id > 0:
533
                assert isinstance(c_id, int), "Forward order value must be an integer."
534
                # Capture forward graph for model chunk c_id, microbatch fwd_idx[c_id-1]
535
                m_chunk = c_id - 1
536
537
538
539
                for l_no in range(_num_layers_per_chunk[m_chunk]):
                    func = callables[_prefix_num_layers[m_chunk] + l_no]
                    per_callable_fwd_idx = (_prefix_num_layers[m_chunk] * num_microbatches) + (
                        fwd_idx[m_chunk] * _num_layers_per_chunk[m_chunk] + l_no
540
                    )
541
                    args = sample_args[per_callable_fwd_idx]
542
                    kwargs = sample_kwargs[per_callable_fwd_idx]
543
                    fwd_graph = fwd_graphs[per_callable_fwd_idx]
544
                    with _graph_context_wrapper(fwd_graph, pool=mempool):
545
                        outputs = func(*args, **kwargs)
546
547
548
549
550
551
552
                    flatten_outputs, spec = _tree_flatten(outputs)
                    per_callable_static_outputs[per_callable_fwd_idx] = tuple(flatten_outputs)
                    per_callable_output_unflatten_spec[per_callable_fwd_idx] = spec
                    graph_callables[per_callable_fwd_idx] = func
                fwd_idx[m_chunk] += 1
            else:
                # Capture backward graph for model chunk c_id, microbatch bwd_idx[-c_id-1]
553
                m_chunk = -ceil(c_id) - 1
554
                previous_per_callable_bwd_idx = None
555
556
557
                for l_no in list(reversed(range(_num_layers_per_chunk[m_chunk]))):
                    per_callable_bwd_idx = (_prefix_num_layers[m_chunk] * num_microbatches) + (
                        bwd_idx[m_chunk] * _num_layers_per_chunk[m_chunk] + l_no
558
                    )
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
                    if ceil(c_id) == c_id and need_bwd_dw_graph[per_callable_bwd_idx]:
                        # Check if bwd graph has corresponding wgrad graph:
                        # Number of dgrad backward graphs should be equal to number of
                        # wgrad backward graphs.
                        # Note: For MCore, the validation rule is more strict (the next backward
                        # of dgrad graph must be corresponding wgrad graph).
                        if wgrad_validation_list[i] is None:
                            same_bwd_c_id_list = [i]
                            num_wgrad_c_id = 0
                            for idx in range(i + 1, len(_order)):
                                if _order[idx] > 0:
                                    continue
                                if _order[idx] == c_id:
                                    same_bwd_c_id_list.append(idx)
                                if _order[idx] + 0.5 == c_id:
                                    num_wgrad_c_id += 1
                                if len(same_bwd_c_id_list) == num_wgrad_c_id:
                                    for same_c_id_idx in same_bwd_c_id_list:
                                        wgrad_validation_list[same_c_id_idx] = True
                                    break
                                if len(same_bwd_c_id_list) < num_wgrad_c_id:
                                    # It's impossible to have more wgrad than dgrad.
                                    wgrad_validation_list[i] = False
                                    break
                            if wgrad_validation_list[i] is None:
                                wgrad_validation_list[i] = False
                            assert wgrad_validation_list[i], (
                                f"Number of wgrad graph({num_wgrad_c_id}) doesn't match number "
                                f"of dgrad graphs ({len(same_bwd_c_id_list)}) for chunk {c_id}."
                            )
                    elif ceil(c_id) != c_id:
                        per_callable_bwd_idx -= _num_layers_per_chunk[m_chunk]
                        assert is_training, "Only training mode supports backward_dw."
                        # If no one module needs the backward_dw, the bwd_dw_graph will be empty.
                        # So skip capturing it. For backward_dw, the order value is c_id - 0.5 to indicate
                        # the specific order of backward_dw.
                        assert ceil(c_id) - c_id == 0.5, (
                            "The order diff of wgrad and dgrad must be 0.5, "
                            f"get {ceil(c_id) - c_id}."
                        )
                        assert need_bwd_dw_graph[
                            per_callable_bwd_idx
                        ], "No module needs wgrad computation but get float in order"
                        bwd_dw_graph = bwd_dw_graphs[per_callable_bwd_idx]
                        with _graph_context_wrapper(bwd_dw_graph, pool=mempool):
                            for module in visited_te_modules[per_callable_bwd_idx]:
                                if (
                                    hasattr(module, "need_backward_dw")
                                    and module.need_backward_dw()
                                ):
                                    module.backward_dw()
                        continue

612
613
614
615
                    static_input_surface = per_callable_static_input_surfaces[per_callable_bwd_idx]
                    static_outputs = per_callable_static_outputs[per_callable_bwd_idx]
                    bwd_graph = bwd_graphs[per_callable_bwd_idx]
                    # For now, assumes all static_outputs require grad
616
                    if _reuse_graph_input_output_buffers:
617
618
                        # Note for _reuse_graph_input_output_buffers: grad output is only used
                        # within backward, so we can reuse the same static buffers every time.
619
                        static_grad_outputs_keys = tuple(
620
621
622
                            (o.shape, o.dtype, o.layout)
                            for o in static_outputs
                            if o is not None and o.requires_grad
623
624
625
626
627
                        )
                        if static_grad_outputs_keys in static_grad_outputs_dict:
                            static_grad_outputs = static_grad_outputs_dict[static_grad_outputs_keys]
                        else:
                            static_grad_outputs = tuple(
628
                                torch.empty_like(o) if o is not None and o.requires_grad else None
629
630
631
632
                                for o in static_outputs
                            )
                            static_grad_outputs_dict[static_grad_outputs_keys] = static_grad_outputs
                    else:
633
                        static_grad_outputs = tuple(
634
635
                            torch.empty_like(o) if o is not None and o.requires_grad else None
                            for o in static_outputs
636
                        )
637
                    if is_training:
638
639
640
641
642
                        inputs = tuple(i for i in static_input_surface if i.requires_grad)
                        with _none_grad_context_wrapper(inputs), _graph_context_wrapper(
                            bwd_graph, pool=mempool
                        ):
                            torch.autograd.backward(
643
644
645
                                tuple(
                                    o for o in static_outputs if o is not None and o.requires_grad
                                ),
646
                                grad_tensors=tuple(o for o in static_grad_outputs if o is not None),
647
648
                                retain_graph=retain_graph_in_backward,
                            )
649
650
                            grad_inputs = tuple(input.grad for input in inputs)

651
652
653
654
655
656
                    # Constructs a tuple suitable for returning from Graphed.backward:
                    # Pads out the actually-needed grads with Nones in gradient slots for inputs
                    # that don't require grad. I couldn't think of a one-liner for this pattern.
                    static_grad_inputs = []
                    grad_idx = 0
                    for arg in static_input_surface:
657
                        if is_training and isinstance(arg, torch.Tensor) and arg.requires_grad:
658
659
660
661
662
663
664
665
                            static_grad_inputs.append(grad_inputs[grad_idx])
                            grad_idx += 1
                        else:
                            static_grad_inputs.append(None)  # type: ignore[arg-type]
                    static_grad_inputs = tuple(static_grad_inputs)  # type: ignore[assignment]

                    per_callable_static_grad_outputs[per_callable_bwd_idx] = static_grad_outputs
                    per_callable_static_grad_inputs[per_callable_bwd_idx] = static_grad_inputs
666
667
668
669
670
671
672
673
674
675
676

                    # Weak ref the static outputs and static grad inputs that are no longer needed
                    # in the following steps. These two type of tensors are both in cudagraph
                    # mempool, so we just deallocate them and let PyTorch's memory allocator
                    # reuse them elsewhere.
                    if _reuse_graph_input_output_buffers:
                        # Weak ref the static outputs of the forward pass of this backward. It's
                        # no longer needed after the corresponding backward graph is built up.
                        per_callable_static_outputs[per_callable_bwd_idx] = make_weak_ref(
                            static_outputs
                        )
677
678
679

                        # Weak ref the static grad inputs of the previous backward pass within the
                        # same chunk.
680
                        if previous_per_callable_bwd_idx is not None:
681
682
683
                            idx = previous_per_callable_bwd_idx
                            per_callable_static_grad_inputs[idx] = make_weak_ref(
                                per_callable_static_grad_inputs[idx]
684
685
686
                            )
                        previous_per_callable_bwd_idx = per_callable_bwd_idx

687
688
689
690
691
692
693
694
695
696
697
698
                        # Weak ref the static grad inputs of the previous chunk's last backward
                        # pass.
                        # Note: After a chunk's backward pass, we assume Mcore will send the grad
                        # input to another pipeline parallel rank and that the communication is
                        # finished before the end of the next chunk's backward pass.
                        if l_no == 0:
                            if previous_chunk_last_callable_bwd_idx is not None:
                                idx = previous_chunk_last_callable_bwd_idx
                                per_callable_static_grad_inputs[idx] = make_weak_ref(
                                    per_callable_static_grad_inputs[idx]
                                )
                            previous_chunk_last_callable_bwd_idx = per_callable_bwd_idx
699
700
                if ceil(c_id) == c_id:
                    bwd_idx[m_chunk] += 1
701
702
703
704
705
    else:
        # Capture forward graphs
        per_callable_static_outputs = []
        per_callable_output_unflatten_spec = []
        graph_id = 0
706
        for func, args, kwargs, fwd_graph in zip(callables, sample_args, sample_kwargs, fwd_graphs):
707
            with _graph_context_wrapper(fwd_graph, pool=mempool):
708
                outputs = func(*args, **kwargs)
709
710
711
712
713
714
715
716
717
718
            graph_callables[graph_id] = func
            graph_id += 1

            flatten_outputs, spec = _tree_flatten(outputs)
            per_callable_static_outputs.append(tuple(flatten_outputs))
            per_callable_output_unflatten_spec.append(spec)

        # Capture backward graphs in reverse order
        per_callable_static_grad_outputs = []
        per_callable_static_grad_inputs = []
719
        for static_input_surface, static_outputs, bwd_graph, bwd_dw_graph, bwd_idx in zip(
720
721
722
            reversed(per_callable_static_input_surfaces),
            reversed(per_callable_static_outputs),
            reversed(bwd_graphs),
723
724
            reversed(bwd_dw_graphs),
            reversed(range(len(per_callable_static_input_surfaces))),
725
726
727
        ):
            # For now, assumes all static_outputs require grad
            static_grad_outputs = tuple(
728
729
                torch.empty_like(o) if o is not None and o.requires_grad else None
                for o in static_outputs
730
            )
731
            if is_training:
732
733
734
735
736
                inputs = tuple(i for i in static_input_surface if i.requires_grad)
                with _none_grad_context_wrapper(inputs), _graph_context_wrapper(
                    bwd_graph, pool=mempool
                ):
                    torch.autograd.backward(
737
                        tuple(o for o in static_outputs if o is not None and o.requires_grad),
738
                        grad_tensors=tuple(o for o in static_grad_outputs if o is not None),
739
740
                        retain_graph=retain_graph_in_backward,
                    )
741
742
                    grad_inputs = tuple(input.grad for input in inputs)

743
744
745
746
747
                if need_bwd_dw_graph[bwd_idx]:
                    with _graph_context_wrapper(bwd_dw_graph, pool=mempool):
                        for module in visited_te_modules[bwd_idx]:
                            if hasattr(module, "need_backward_dw") and module.need_backward_dw():
                                module.backward_dw()
748
749
750
751
752
753
            # Constructs a tuple suitable for returning from Graphed.backward:
            # Pads out the actually-needed grads with Nones in gradient slots for inputs that
            # don't require grad. I couldn't think of a slick one-liner for this pattern.
            static_grad_inputs = []
            grad_idx = 0
            for arg in static_input_surface:
754
                if is_training and isinstance(arg, torch.Tensor) and arg.requires_grad:
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
                    static_grad_inputs.append(grad_inputs[grad_idx])
                    grad_idx += 1
                else:
                    static_grad_inputs.append(None)  # type: ignore[arg-type]
            static_grad_inputs = tuple(static_grad_inputs)  # type: ignore[assignment]

            per_callable_static_grad_outputs.append(static_grad_outputs)
            per_callable_static_grad_inputs.append(static_grad_inputs)

        # Reverses the most recent two lists
        per_callable_static_grad_outputs = list(reversed(per_callable_static_grad_outputs))
        per_callable_static_grad_inputs = list(reversed(per_callable_static_grad_inputs))
    # Now for every per_callable list, per_callable_*[i] holds the stuff for the ith callable.

    def make_graphed_autograd_function(
        fwd_graph,
        bwd_graph,
        module_params,
773
        kwargs_keys,
774
775
776
777
778
779
780
781
782
        len_user_args,
        output_unflatten_spec,
        static_input_surface,
        static_outputs,
        static_grad_outputs,
        static_grad_inputs,
    ):
        class Graphed(torch.autograd.Function):
            """Autograd function for graph replay."""
783

784
785
            @staticmethod
            def forward(ctx, skip_fp8_weight_update, *inputs):
786
                # pylint: disable=missing-function-docstring
787
788

                # Set flag for whether to update FP8 weight updates
789
790
791
792
                ctx.is_first_module = FP8GlobalStateManager.is_first_fp8_module()
                if ctx.is_first_module and skip_fp8_weight_update is not None:
                    FP8GlobalStateManager.set_skip_fp8_weight_update_tensor(skip_fp8_weight_update)

793
                # Copy values from new tensors into static tensors
794
                for i in range(len_user_args):
795
796
797
798
                    if (
                        isinstance(static_input_surface[i], torch.Tensor)
                        and static_input_surface[i].data_ptr() != inputs[i].data_ptr()
                    ):
799
                        static_input_surface[i].copy_(inputs[i])
800
801

                # Replay forward graph
802
803
                fwd_graph.replay()
                assert isinstance(static_outputs, tuple)
804
                return tuple(o.detach() if o is not None else o for o in static_outputs)
805
806
807
808

            @staticmethod
            @torch.autograd.function.once_differentiable
            def backward(ctx, *grads):
809
                # pylint: disable=missing-function-docstring
810
811

                # Replay backward graph
812
813
814
815
816
817
818
819
820
                assert len(grads) == len(static_grad_outputs)
                for g, grad in zip(static_grad_outputs, grads):
                    if g is not None:
                        # don't copy if autograd gods have been kind and the
                        # incoming grad is already in the right place
                        if g.data_ptr() != grad.data_ptr():
                            g.copy_(grad)
                bwd_graph.replay()

821
                # Update FP8 scale factors if needed
822
823
824
825
826
827
828
829
830
831
                if ctx.is_first_module:
                    FP8GlobalStateManager.reduce_and_update_fp8_tensors(forward=False)

                # Input args that didn't require grad expect a None gradient.
                assert isinstance(static_grad_inputs, tuple)
                return (None,) + tuple(
                    b.detach() if b is not None else b for b in static_grad_inputs
                )

        def functionalized(*user_args, **user_kwargs):
832
833

            # Decide whether to update FP8 weights
834
            skip_fp8_weight_update = None
835
            if cache_quantized_params:
836
837
                assert "is_first_microbatch" in user_kwargs and isinstance(
                    user_kwargs["is_first_microbatch"], bool
838
839
840
841
                ), "`is_first_microbatch` boolean kwarg must be provided for FP8 weight caching."

                skip_fp8_weight_update = not user_kwargs["is_first_microbatch"]

842
843
844
845
846
847
848
849
850
851
852
853
            # Check that required kwargs are provided
            for key in kwargs_keys:
                if key not in user_kwargs:
                    raise TypeError(
                        f"Graphed callable was initialized with kwarg {key} ,"
                        "but it was not provided in graph replay"
                    )

            # Runs the autograd function with inputs == all inputs to
            # the graph that might require grad (explicit user args +
            # module parameters)
            # Assumes module params didn't change since capture.
854
            flatten_user_args, _ = _tree_flatten(user_args)
855
856
857
            flatten_user_kwargs, _ = _tree_flatten([user_kwargs[key] for key in kwargs_keys])
            func_args = tuple(flatten_user_args) + tuple(flatten_user_kwargs) + module_params
            out = Graphed.apply(skip_fp8_weight_update, *func_args)
858
859
860
861
            return _tree_unflatten(out, output_unflatten_spec)

        return functionalized

862
    def make_graphed_attribute_functions(graph_idx):
863
864
        # Get te modules for current graph
        te_modules = visited_te_modules.get(graph_idx, set())
865
866
867
868
869
870

        # Attach backward_dw as an attribute to the graphed callable.
        def backward_dw():
            if need_bwd_dw_graph.get(graph_idx, False):
                bwd_dw_graphs[graph_idx].replay()

871
872
873
874
875
876
877
878
                # Trigger the grad accumulation hook for wgrad graphs.
                for module in te_modules:
                    if (
                        isinstance(module, TransformerEngineBaseModule)
                        and module.need_backward_dw()
                    ):
                        module._trigger_wgrad_accumulation_and_reduce_hooks()

879
880
881
882
883
884
885
886
        # Attach reset as an attribute to the graphed callable.
        def reset():
            fwd_graphs[graph_idx].reset()
            bwd_graphs[graph_idx].reset()
            bwd_dw_graphs[graph_idx].reset()

        return backward_dw, reset

887
888
889
890
891
892
893
    # Put together the final graphed callables
    ret = []
    for i in range(len(sample_args)):
        graphed = make_graphed_autograd_function(
            fwd_graphs[i],
            bwd_graphs[i],
            per_callable_module_params[i],
894
            per_callable_kwargs_keys[i],
895
896
897
898
899
900
901
902
903
            per_callable_len_user_args[i],
            per_callable_output_unflatten_spec[i],
            per_callable_static_input_surfaces[i],
            per_callable_static_outputs[i],
            per_callable_static_grad_outputs[i],
            per_callable_static_grad_inputs[i],
        )

        func = graph_callables[i]
904
        te_modules = visited_te_modules.get(i, set())
905
906
        if isinstance(func, torch.nn.Module):

907
            def make_graphed_forward(func, graph_training_state, graphed, orig_fwd, te_modules):
908
909
910
911
912
                def new_fwd(*user_args, **user_kwargs):
                    # If the module's training-or-eval state matches what we graphed,
                    # run the graph, otherwise run the original forward method
                    if func.training == graph_training_state:
                        # Set the FP8 group from global amax reduction.
Jan Bielak's avatar
Jan Bielak committed
913
914
915
                        if FP8GlobalStateManager.is_fp8_enabled():
                            fp8_recipe = FP8GlobalStateManager.get_fp8_recipe()
                            for m in func.modules():
916
                                if m not in te_modules:
917
918
                                    # Only Set the FP8 meta for the modules included by forward
                                    continue
Jan Bielak's avatar
Jan Bielak committed
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
                                if isinstance(m, TransformerEngineBaseModule):
                                    from transformer_engine.pytorch.attention.dot_product_attention import (
                                        DotProductAttention,
                                    )

                                    if (
                                        isinstance(m, DotProductAttention)
                                        and not fp8_recipe.fp8_mha
                                        and not fp8_recipe.fp8_dpa
                                    ):
                                        # Don't need to update FP8 meta for non-FP8 DPA
                                        continue
                                    m.fp8_meta["fp8_group"] = FP8GlobalStateManager.get_fp8_group()
                                    m.fp8_meta["recipe"] = FP8GlobalStateManager.get_fp8_recipe()
                                    FP8GlobalStateManager.add_fp8_tensors_to_global_buffer(
                                        m.fp8_meta,
                                    )
                                elif isinstance(m, BasicOperation):
                                    for mode in ("forward", "backward"):
                                        if m.num_quantizers(mode):
939
940
941
942
943
944
                                            m._fp8_metas[mode][
                                                "fp8_group"
                                            ] = FP8GlobalStateManager.get_fp8_group()
                                            m._fp8_metas[mode][
                                                "recipe"
                                            ] = FP8GlobalStateManager.get_fp8_recipe()
Jan Bielak's avatar
Jan Bielak committed
945
946
947
                                            FP8GlobalStateManager.add_fp8_tensors_to_global_buffer(
                                                m._fp8_metas[mode],
                                            )
948
949
                        return graphed(*user_args, **user_kwargs)
                    return orig_fwd(*user_args, **user_kwargs)
950

951
952
                return new_fwd

953
            forward = make_graphed_forward(func, func.training, graphed, func.forward, te_modules)
954
955
956
957
958
959
960
961
            if _order is None:
                func.forward = forward
                ret.append(func)
            else:
                ret.append(forward)
        else:
            ret.append(graphed)

962
963
964
        backward_dw_func, reset_func = make_graphed_attribute_functions(i)
        setattr(ret[-1], "backward_dw", backward_dw_func)
        setattr(ret[-1], "reset", reset_func)
965

966
967
968
969
970
971
    if just_one_callable:
        return ret[0]

    return tuple(ret)


972
973
def save_fp8_tensors(
    modules: Iterable[torch.nn.Module],
974
    recipe: Optional[Recipe],
975
) -> Optional[List[Any]]:
976
977
978
979
    """
    Returns the FP8 tensors for all modules
    with adjusted amax history sizes.
    """
980

981
    if not isinstance(recipe, DelayedScaling):
982
983
        return None

984
    fp8_tensors = []
985
986
    for module in modules:
        for m in module.modules():
987
            module_tensors = None
988
989
            if isinstance(m, TransformerEngineBaseModule):
                if m.primary_weights_in_fp8:
990
                    m.adjust_amax_history_length(recipe.amax_history_len)
991
992
                module_tensors = m.get_fp8_meta_tensors()
            elif isinstance(m, BasicOperation):
993
                m.reset_recipe_state(recipe=recipe)
994
995
996
997
998
999
1000
                module_tensors = m._save_fp8_metas()
            fp8_tensors.append(module_tensors)
    return fp8_tensors


def restore_fp8_tensors(
    modules: Iterable[torch.nn.Module],
1001
    fp8_tensors: Optional[List[Any]],
1002
) -> None:
1003
    """Restore FP8 tensors."""
1004
1005
1006
1007

    if fp8_tensors is None:
        return

1008
1009
    for module in modules:
        for m in module.modules():
1010
            module_tensors = fp8_tensors.pop(0)
1011
            if isinstance(m, TransformerEngineBaseModule):
1012
1013
1014
1015
1016
1017
1018
1019
                m.reset_fp8_meta_tensors(module_tensors)
            elif isinstance(m, BasicOperation):
                m._load_fp8_metas(module_tensors)
    if len(fp8_tensors) != 0:
        raise RuntimeError(
            f"Got FP8 state for {len(fp8_tensors)} more modules than expected. "
            "There is probably a discrepancy with `save_fp8_tensors`."
        )
1020
1021
1022


def make_graphed_callables(
1023
1024
1025
1026
1027
    modules: SingleOrTuple[Callable],
    sample_args: SingleOrTuple[Tuple[torch.Tensor, ...]],
    num_warmup_iters: int = 3,
    allow_unused_input: bool = False,
    sample_kwargs: Optional[SingleOrTuple[Dict[str, Any]]] = None,
1028
1029
    fp8_enabled: Optional[SingleOrTuple[bool]] = None,
    fp8_calibrating: Optional[bool] = None,
Jan Bielak's avatar
Jan Bielak committed
1030
    fp8_recipe: Optional[Recipe] = None,
1031
    fp8_group: Optional[dist_group_type] = None,
1032
1033
1034
1035
1036
1037
    fp8_weight_caching: Optional[bool] = None,
    enabled: Optional[SingleOrTuple[bool]] = None,
    calibrating: Optional[bool] = None,
    recipe: Optional[Recipe] = None,
    amax_reduction_group: Optional[dist_group_type] = None,
    cache_quantized_params: Optional[bool] = None,
1038
    _order: Optional[List[int]] = None,
1039
    _num_layers_per_chunk: Optional[List[int]] = None,
1040
    pool: Optional[Tuple[int, ...]] = None,
1041
    retain_graph_in_backward: bool = False,
1042
    _reuse_graph_input_output_buffers: bool = False,
1043
) -> Union[Callable, Tuple[Callable, ...]]:
1044
    """
1045
1046
1047
1048
1049
1050
1051
1052
    Make CUDA graph version of Transformer Engine modules

    A variation of PyTorch's `make_graphed_callables` utility function
    with support for Transformer Engine modules and FP8. Please see
    the
    `original PyTorch implementation <https://pytorch.org/docs/stable/generated/torch.cuda.make_graphed_callables.html>`_
    for more documentation.

1053
1054
1055
1056
1057
    .. warning::

       Arguments 'fp8_enabled', 'fp8_calibrating', 'fp8_recipe', 'fp8_group', and 'fp8_weight_caching' are deprecated.
       Use arguments 'enabled', 'calibrating', 'recipe', 'amax_reduction_group', and 'cache_quantized_params' instead.

1058
1059
1060
1061
1062
1063
1064
1065
    Graphing parameters
    -------------------
    modules: (tuple of) callable
             Callable or callables to graph.
    sample_args: (tuple of) tuple of torch.Tensor
                 Positional arguments to callable(s).
    num_warmup_iters: int, default = 3
                      Number of warmup iterations.
Paweł Gadziński's avatar
Paweł Gadziński committed
1066
    allow_unused_input: bool, default = False
1067
1068
1069
1070
                        Whether to handle case where callable inputs
                        and outputs are disconnected in compute graph.
    sample_kwargs: (tuple of) dict, optional
                   Keyword arguments to callable(s)
Paweł Gadziński's avatar
Paweł Gadziński committed
1071
    pool: (tuple of) int, default = None, optional
1072
1073
          An instance returned from function `torch.cuda.graph_pool_handle` that hints
          this graph may share memory with the indicated pool.
Paweł Gadziński's avatar
Paweł Gadziński committed
1074
    retain_graph_in_backward: bool, default = False
1075
                              Whether to set retain_graph=True in backward graph capture.
Paweł Gadziński's avatar
Paweł Gadziński committed
1076
    _reuse_graph_input_output_buffers: bool, default = False
1077
1078
1079
1080
        Reduce memory usage by reusing input/output data buffers between
        graphs. Only supported with Mcore interleaved pipeline parallelism, i.e.
        when `_order` is provided. All callables in `modules` are assumed to have
        inputs and outputs with the same dtype and shape.
1081

Paweł Gadziński's avatar
Paweł Gadziński committed
1082
1083
1084
    Quantization parameters
    -----------------------
    enabled: (tuple of) bool, default = False
1085
1086
             whether or not to enable low precision quantization (FP8/FP4).
             If tuple, the length must match the number of modules.
Paweł Gadziński's avatar
Paweł Gadziński committed
1087
    calibrating: bool, default = False
1088
1089
1090
1091
                 calibration mode allows collecting statistics such as amax and scale
                 data of quantized tensors even when executing without quantization enabled.
                 This is useful for saving an inference ready checkpoint while training
                 using a higher precision.
Paweł Gadziński's avatar
Paweł Gadziński committed
1092
    recipe: recipe.Recipe, default = None
1093
            recipe used for low precision quantization.
Paweł Gadziński's avatar
Paweł Gadziński committed
1094
    amax_reduction_group: torch._C._distributed_c10d.ProcessGroup, default = None
1095
1096
                          distributed group over which amaxes for the quantized tensors
                          are reduced at the end of each training step.
Paweł Gadziński's avatar
Paweł Gadziński committed
1097
    cache_quantized_params: bool, default = False
1098
1099
1100
1101
1102
1103
                            Whether or not to cache quantized weights across microbatches. if set to `True`,
                            the `is_first_microbatch` boolean argument must be passed into the forward
                            method for TransformerEngine modules. When storing primary weights in low precision
                            using TE's `quantized_model_init` API and using an quantization aware optimizer,
                            this arg must be set to `False` if calculating weight transposes' outside TE, e.g.,
                            in the optimizer step.
1104

1105
    """
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185

    # Handle deprecated args. If old kwargs are set, they are prioritized with warning.
    if fp8_enabled is not None:
        if enabled is not None:
            raise ValueError(
                "make_graphed_callables has deprecated `fp8_enabled` kwarg "
                "in favor of `enabled`, but both kwargs are set."
            )
        warnings.warn(
            "make_graphed_callables has deprecated `fp8_enabled` kwarg in favor of `enabled`. "
            "`fp8_enabled` will be removed in a future release.",
            category=DeprecationWarning,
            stacklevel=2,
        )
        enabled = fp8_enabled
    if enabled is None:
        enabled = False

    if fp8_calibrating is not None:
        if calibrating is not None:
            raise ValueError(
                "make_graphed_callables has deprecated `fp8_calibrating` kwarg "
                "in favor of `calibrating`, but both kwargs are set."
            )
        warnings.warn(
            "make_graphed_callables has deprecated `fp8_calibrating` kwarg in favor of "
            "`calibrating`. `fp8_calibrating` will be removed in a future release.",
            category=DeprecationWarning,
            stacklevel=2,
        )
        calibrating = fp8_calibrating
    if calibrating is None:
        calibrating = False

    if fp8_recipe is not None:
        if recipe is None:
            warnings.warn(
                "make_graphed_callables has deprecated `fp8_recipe` kwarg in favor of "
                "`recipe`. `fp8_recipe` will be removed in a future release.",
                category=DeprecationWarning,
                stacklevel=2,
            )
        else:
            raise ValueError(
                "make_graphed_callables has deprecated `fp8_recipe` kwarg "
                "in favor of `recipe`, but both kwargs are set."
            )
        recipe = fp8_recipe

    if fp8_group is not None:
        if amax_reduction_group is None:
            warnings.warn(
                "make_graphed_callables has deprecated `fp8_group` kwarg in favor of "
                "`amax_reduction_group`. `fp8_group` will be removed in a future release.",
                category=DeprecationWarning,
                stacklevel=2,
            )
        else:
            raise ValueError(
                "make_graphed_callables has deprecated `fp8_group` kwarg "
                "in favor of `amax_reduction_group`, but both kwargs are set."
            )
        amax_reduction_group = fp8_group

    if fp8_weight_caching is not None:
        if cache_quantized_params is not None:
            raise ValueError(
                "make_graphed_callables has deprecated `fp8_weight_caching` kwarg "
                "in favor of `cache_quantized_params`, but both kwargs are set."
            )
        warnings.warn(
            "make_graphed_callables has deprecated `fp8_weight_caching` kwarg in favor of "
            "`cache_quantized_params`. `fp8_weight_caching` will be removed in a future release.",
            category=DeprecationWarning,
            stacklevel=2,
        )
        cache_quantized_params = fp8_weight_caching
    if cache_quantized_params is None:
        cache_quantized_params = False

1186
1187
1188
1189
1190
1191
1192
1193
    set_capture_start()

    # Handle single module.
    just_one_callable = False
    if not isinstance(modules, tuple):
        just_one_callable = True
        modules = (modules,)

1194
1195
1196
    if not isinstance(enabled, tuple):
        assert isinstance(enabled, bool), "enabled must be a bool or a tuple of bools"
        enabled = (enabled,) * len(modules)
1197
    else:
1198
        assert len(enabled) == len(
1199
            modules
1200
1201
1202
1203
1204
1205
        ), f"enabled length ({len(enabled)}) must match modules length ({len(modules)})"
    if any(enabled) and recipe is None:
        recipe = get_default_fp8_recipe()
    elif not any(enabled):
        recipe = None
    module_uses_fp8 = dict(zip((id(m) for m in modules), enabled))
1206

1207
    # Store FP8 tensors to reset later.
1208
    saved_fp8_tensors = save_fp8_tensors(modules, recipe=recipe)
1209
1210

    # FP8 wrapper.
1211
1212
    old_call_funcs = {}

1213
    def wrap_autocast(block):
1214
1215
1216
1217
1218
        block_cls = type(block)
        if block_cls in old_call_funcs:
            return

        old_call_funcs[block_cls] = block_cls.__call__
1219

1220
        # Wrap the original call function of the module class.
1221
        def call_func(self, *args, **kwargs):
1222
            with autocast(
1223
                enabled=module_uses_fp8.get(id(self), False),
1224
1225
1226
                calibrating=calibrating,
                recipe=recipe,
                amax_reduction_group=amax_reduction_group,
1227
                _graph=True,
1228
            ):
1229
                outputs = old_call_funcs[block_cls](self, *args, **kwargs)
1230
            return outputs
1231

1232
        block_cls.__call__ = call_func
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246

    forward_funcs = []
    for module in modules:
        assert isinstance(module, torch.nn.Module), f"Graphing for {type(module)} is not supported."
        wrap_autocast(module)
        forward_funcs.append(module)

    if just_one_callable:
        forward_funcs = forward_funcs[0]
    else:
        forward_funcs = tuple(forward_funcs)

    # Save RNG state.
    if graph_safe_rng_available():
1247
1248
1249
1250
        generators = [
            torch.cuda.default_generators[torch.cuda.current_device()],
            *get_all_rng_states().values(),
        ]
1251
1252
1253
1254
1255
        original_rng_states = [state.get_state() for state in generators]
    else:
        original_rng_states = torch.cuda.get_rng_state()

    graphed_callables = _make_graphed_callables(
1256
1257
1258
        forward_funcs,
        sample_args,
        num_warmup_iters=num_warmup_iters,
1259
        allow_unused_input=allow_unused_input,
1260
        cache_quantized_params=cache_quantized_params,
1261
        sample_kwargs=sample_kwargs,
1262
        _order=_order,
1263
        _num_layers_per_chunk=_num_layers_per_chunk,
1264
        pool=pool,
1265
        retain_graph_in_backward=retain_graph_in_backward,
1266
        _reuse_graph_input_output_buffers=_reuse_graph_input_output_buffers,
1267
    )
1268
1269
1270
1271
1272
1273
1274
1275

    # Ensures warmup does not affect numerics for ops such as dropout.
    if graph_safe_rng_available():
        for gen, state in zip(generators, original_rng_states):
            gen.set_state(state)
    else:
        torch.cuda.set_rng_state(original_rng_states)

1276
1277
1278
1279
    # Remove FP8 wrapper.
    for module_cls, old_call in old_call_funcs.items():
        module_cls.__call__ = old_call

1280
1281
1282
1283
1284
    # Restore FP8 state.
    restore_fp8_tensors(modules, saved_fp8_tensors)

    set_capture_end()
    return graphed_callables