graph.py 50.4 KB
Newer Older
1
# Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
5
#
# See LICENSE for license information.

"""Functions for CUDA Graphs support in FP8"""
6
from collections.abc import Iterable
7
8
import contextlib
import gc
9
import warnings
10
11
from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar, Union

12
13
14
15
16
import torch
from torch.utils._pytree import tree_flatten as _tree_flatten
from torch.utils._pytree import tree_unflatten as _tree_unflatten
from torch._C import _graph_pool_handle

17
from transformer_engine.common.recipe import DelayedScaling, Recipe
18
from transformer_engine.pytorch.constants import dist_group_type
19
20
from .quantization import (
    autocast,
21
22
23
24
25
    FP8GlobalStateManager,
    get_default_fp8_recipe,
)
from .distributed import get_all_rng_states, graph_safe_rng_available
from .module.base import TransformerEngineBaseModule
26
from .ops.op import BasicOperation
Jan Bielak's avatar
Jan Bielak committed
27
28
from .ops import Sequential
from .ops.fuser import OperationFuser
29
from .utils import make_weak_ref
30
31
32
33
34
35

__all__ = ["make_graphed_callables"]


_IS_GRAPH_CAPTURING = False

36
37
38
_T = TypeVar("_T")
SingleOrTuple = Union[_T, Tuple[_T, ...]]

39
40
41
42
43
44
45
46
47
48
49
50
51

def set_capture_start() -> None:
    """Record beginning of `make_graphed_callables`."""
    global _IS_GRAPH_CAPTURING
    _IS_GRAPH_CAPTURING = True


def set_capture_end() -> None:
    """Record end of `make_graphed_callables`."""
    global _IS_GRAPH_CAPTURING
    _IS_GRAPH_CAPTURING = False


Jan Bielak's avatar
Jan Bielak committed
52
def is_graph_capturing() -> bool:
53
54
55
56
57
58
59
60
61
62
63
    """Return whether within `make_graphed_callables`."""
    return _IS_GRAPH_CAPTURING


def graph_pool_handle():
    """
    Returns an opaque token representing the id of a graph memory pool.
    """
    return _graph_pool_handle()


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@contextlib.contextmanager
def _graph_context_wrapper(*args, **kwargs):
    """Wrapper around `torch.cuda.graph`.

    This wrapper is a temporary workaround for a PyTorch bug:
    automatic garbage collection can destroy a graph while another
    graph is being captured, resulting in a CUDA error. See
    https://github.com/pytorch/pytorch/pull/161037.

    """
    gc_is_enabled = gc.isenabled()
    if gc_is_enabled:
        gc.disable()
    with torch.cuda.graph(*args, **kwargs):
        yield
    if gc_is_enabled:
        gc.enable()


83
def _make_graphed_callables(
84
85
86
87
    callables: SingleOrTuple[Callable],
    sample_args: SingleOrTuple[Tuple[torch.Tensor, ...]],
    num_warmup_iters: int = 3,
    allow_unused_input: bool = False,
88
    cache_quantized_params: bool = False,
89
90
    sample_kwargs: Optional[SingleOrTuple[Dict[str, Any]]] = None,
    _order: Optional[List[int]] = None,
91
    _num_layers_per_chunk: Optional[List[int]] = None,
92
    pool: Optional[Tuple[int, ...]] = None,
93
    retain_graph_in_backward: bool = False,
94
    _reuse_graph_input_output_buffers: bool = False,
95
) -> SingleOrTuple[Callable]:
96
97
98
99
100
101
102
103
104
105
    """
    Helper method for `make_graphed_callables`
    """

    if torch.is_autocast_enabled() and torch.is_autocast_cache_enabled():
        raise RuntimeError(
            "make_graphed_callables does not support the autocast "
            "caching. Please set `cache_enabled=False`."
        )

106
107
108
109
110
111
    # Default is to pass no kwargs to callables
    if sample_kwargs is None:
        if isinstance(callables, tuple):
            sample_kwargs = tuple({} for _ in range(len(sample_args)))
        else:
            sample_kwargs = {}
112

113
114
    # Canonicalize args as tuples
    just_one_callable = False
115
116
117
118
    if not isinstance(callables, tuple):
        just_one_callable = True
        callables = (callables,)
        sample_args = (sample_args,)
119
        sample_kwargs = (sample_kwargs,)
120

121
122
123
124
125
126
127
128
    # Check training/inference
    is_training = all(c.training for c in callables)
    if not is_training and any(c.training for c in callables):
        assert False, (
            "make_graphed_callables only supports when modules are all in training or all in"
            " inference mode."
        )

129
130
131
132
133
134
135
136
137
138
139
    # Check sizes of args
    if _order is None:
        assert len(sample_args) == len(callables)
        assert len(sample_kwargs) == len(callables)
    else:
        # Custom logic for interleaved pipeline parallelism
        # Note: This is tightly coupled with the Megatron-core
        # implementation of interleaved pipeline parallelism at
        # https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/core/pipeline_parallel/schedules.py.
        # Note: The model is assumed to consist of layers
        # (corresponding to callables) that are grouped into
140
141
142
143
144
145
        # model chunks. _num_layers_per_chunk is a list of integers
        # that indicates the number of layers in each model chunk.
        # _order is a list of chunk indices (1-indexed) that
        # indicates the order in which the layers are evaluated.
        # Positive values indicate forward passes and negative
        # values indicate backward passes. Each
146
147
        # entry in sample_args corresponds to one of the forward
        # passes.
148
149
150
        num_model_chunks = max(_order)
        num_microbatches = len(_order) // num_model_chunks // 2
        assert num_model_chunks * num_microbatches * 2 == len(_order)
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174

        # Determine number of layers in each model chunk.
        if _num_layers_per_chunk is None:
            assert len(sample_args) * 2 >= len(_order) and (
                len(sample_args) * 2 % len(_order) == 0
            ), (
                f"{len(sample_args)} * 2 >= {len(_order)} and {len(sample_args)} * 2 %"
                f" {len(_order)} == 0"
            )
            num_layers = len(sample_args) // num_model_chunks // num_microbatches
            _num_layers_per_chunk = [num_layers] * num_model_chunks
        else:
            assert (
                isinstance(_num_layers_per_chunk, int)
                or len(_num_layers_per_chunk) == num_model_chunks
            ), (
                "If _num_layers_per_chunk is provided, it must be an integer or a list of"
                f" {num_model_chunks} integers, but got {_num_layers_per_chunk}."
            )
            if isinstance(_num_layers_per_chunk, int):
                _num_layers_per_chunk = [_num_layers_per_chunk] * num_model_chunks
        total_num_layers = sum(_num_layers_per_chunk)
        assert len(callables) == total_num_layers, (
            f"Callables should have ({total_num_layers}) "
175
176
            + f"entries when order input is provided but got {len(callables)}."
        )
177
178
        assert len(sample_args) == total_num_layers * num_microbatches, (
            f"Expected {total_num_layers * num_microbatches}"
179
180
            + f"args tuple, but got {len(sample_args)}."
        )
181
182
183
184
185
186
187

        # Calculate the starting index of each chunk in callables for future use.
        _prefix_num_layers = [0]
        for m_chunk in range(num_model_chunks):
            num_layers = _num_layers_per_chunk[m_chunk]
            _prefix_num_layers.append(_prefix_num_layers[-1] + num_layers)

188
        assert len(sample_kwargs) == len(sample_args)
189

190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
    # Check reuse graph conditions and reorganize sample_args and sample_kwargs.
    # Note: When capturing a graph, we hold onto the args and kwargs so we have static buffers
    # when the graph is replayed. If two model chunk microbatches have no overlap between their
    # forward and backward, then we can reduce memory usage by reusing the same static buffers.
    if _reuse_graph_input_output_buffers:
        assert (
            _order is not None
        ), "`_order` must be provided when `_reuse_graph_input_output_buffers` is True."
        assert (
            is_training
        ), "`_reuse_graph_input_output_buffers` is only available in training mode."
        assert isinstance(
            sample_args, list
        ), "sample_args must be a list for _reuse_graph_input_output_buffers."

        # Reorganize args and kwargs for input tensor reuse.
206
207
208
209
210
211
212
        # fwd_sample_qs is keyed by model chunk index. The value is a queue of tuples.
        # Each tuple contains the sample key signature and its fwd_idx. When we finish a backward
        # chunk, we pop the corresponding fwd_idx and push to the consumed_sample_q.
        # consumed_sample_q is keyed by the sample key signature. The value is a queue of the
        # fwd_idx whose backward has been called so that we can reuse the same static buffers.
        # In this way, we can reuse the same static input buffers for the non-overlapping samples
        # with the same input signature.
213
        fwd_sample_qs = {}
214
        consumed_sample_q = {}
215
216
217
218
219
220
221
222
223
224
225
        fwd_idx = [0] * num_model_chunks
        for c_id in _order:
            m_chunk = abs(c_id) - 1

            if c_id > 0:
                sample_start_idx = (_prefix_num_layers[m_chunk] * num_microbatches) + (
                    fwd_idx[m_chunk] * _num_layers_per_chunk[m_chunk]
                )
                fwd_sample_idx = [
                    sample_start_idx + i for i in range(_num_layers_per_chunk[m_chunk])
                ]
226
227
                if m_chunk not in fwd_sample_qs:
                    fwd_sample_qs[m_chunk] = []
228
                for per_callable_fwd_idx in fwd_sample_idx:
229
230
231
232
233
234
235
236
237
238
239
240
                    sample_args_keys = tuple(
                        (t.shape, t.dtype, t.layout) for t in sample_args[per_callable_fwd_idx]
                    )
                    sample_kwargs_keys = tuple(
                        (k, v.shape, v.dtype, v.layout)
                        for k, v in sorted(sample_kwargs[per_callable_fwd_idx].items())
                    )
                    sample_keys = sample_args_keys + sample_kwargs_keys

                    fwd_sample_qs[m_chunk].append((sample_keys, per_callable_fwd_idx))
                    if consumed_sample_q.get(sample_keys, []):
                        reuse_fwd_idx = consumed_sample_q[sample_keys].pop(0)
241
242
243
244
245
246
247
                        sample_args[per_callable_fwd_idx] = sample_args[reuse_fwd_idx]
                        sample_kwargs[per_callable_fwd_idx] = sample_kwargs[reuse_fwd_idx]
                fwd_idx[m_chunk] += 1
            else:
                num_consumed_samples = min(
                    len(fwd_sample_qs[m_chunk]), _num_layers_per_chunk[m_chunk]
                )
248
249
250
251
252
253
                for sample_keys, per_callable_fwd_idx in fwd_sample_qs[m_chunk][
                    :num_consumed_samples
                ]:
                    if sample_keys not in consumed_sample_q:
                        consumed_sample_q[sample_keys] = []
                    consumed_sample_q[sample_keys].append(per_callable_fwd_idx)
254
255
                fwd_sample_qs[m_chunk] = fwd_sample_qs[m_chunk][num_consumed_samples:]

256
    if cache_quantized_params:
257
        # Initialize flag that controls FP8 weight updates
258
259
        FP8GlobalStateManager.set_skip_fp8_weight_update_tensor(False)

260
    # Check callables
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
    for c in callables:
        if isinstance(c, torch.nn.Module):
            assert (
                len(c._backward_hooks) == 0
                and len(c._forward_hooks) == 0
                and len(c._forward_pre_hooks) == 0
            ), (
                "Modules must not have hooks registered at the time they are passed. "
                + "However, registering hooks on modules after passing them "
                + "through make_graphed_callables is allowed."
            )
            assert all(b.requires_grad is False for b in c.buffers()), (
                "In any :class:`~torch.nn.Module` passed to "
                + ":func:`~make_graphed_callables`, only parameters may be trainable. "
                + "All buffers must have ``requires_grad=False``."
            )
277
278
279
280
281

    # Flatten callable arguments
    per_callable_kwargs_keys = [list(kwargs.keys()) for kwargs in sample_kwargs]
    flatten_sample_args = []
    for args, kwargs, kwargs_keys in zip(sample_args, sample_kwargs, per_callable_kwargs_keys):
282
        flatten_arg, _ = _tree_flatten(args)
283
284
        flatten_kwarg, _ = _tree_flatten([kwargs[key] for key in kwargs_keys])
        flatten_sample_args.append(tuple(flatten_arg + flatten_kwarg))
285
286
287
288
289
290
291
        assert all(isinstance(arg, torch.Tensor) for arg in flatten_arg), (
            "In the beta API, sample_args "
            + "for each callable must contain only Tensors. Other types are not allowed."
        )

    # If a callable is an nn.Module, its graph's full input surface is the args the user explicitly
    # passes to forward (ie, its sample_args) AND the module's parameter attributes.
292
293
294
295
    # Note: These per_callable_* variables are not actually
    # per-callable, but per-forward-pass (see description of _order).
    # The names are kept for consistency with
    # torch.cuda.make_graphed_callables.
296
297
298
    per_callable_len_user_args = [len(args) for args in flatten_sample_args]
    if _order is None:
        per_callable_module_params = [
299
            tuple(c.parameters()) if isinstance(c, torch.nn.Module) else () for c in callables
300
301
        ]
        per_callable_static_input_surfaces = [
302
            flatten_sample_args[i] + per_callable_module_params[i] for i in range(len(callables))
303
304
305
        ]
    else:
        per_callable_module_params = []
306
307
        for m_chunk in range(num_model_chunks):
            for _ in range(num_microbatches):
308
                for l_no in range(_num_layers_per_chunk[m_chunk]):
309
                    per_callable_module_params.append(
310
311
312
313
314
                        tuple(callables[_prefix_num_layers[m_chunk] + l_no].parameters())
                        if isinstance(
                            callables[_prefix_num_layers[m_chunk] + l_no],
                            torch.nn.Module,
                        )
315
316
                        else ()
                    )
317
318
319
320
321
322
323
324
325
        assert len(per_callable_module_params) == len(flatten_sample_args)
        per_callable_static_input_surfaces = [
            flatten_sample_args[i] + per_callable_module_params[i]
            for i in range(len(flatten_sample_args))
        ]

    fwd_graphs = [torch.cuda.CUDAGraph() for _ in range(len(flatten_sample_args))]
    bwd_graphs = [torch.cuda.CUDAGraph() for _ in range(len(flatten_sample_args))]
    graph_callables = [None for _ in range(len(flatten_sample_args))]
326

327
328
329
330
331
332
333
    # For cases with multiple active RNG states, e.g. TP.
    if graph_safe_rng_available():
        for _, state in get_all_rng_states().items():
            for fwd_graph, bwd_graph in zip(fwd_graphs, bwd_graphs):
                fwd_graph.register_generator_state(state)
                bwd_graph.register_generator_state(state)

334
    mempool = graph_pool_handle() if pool is None else pool
335
336
337
338
339

    # Warmup
    # Hopefully prevents cudnn benchmarking and other lazy-initialization cuda work
    # from ending up in any captures.
    torch.cuda.synchronize()
340
341
342
343
344

    # Get warmup func and func_idx.
    warmup_func_idx = []
    warmup_func = []
    if _order is None:
345
        for func_idx, func in enumerate(callables):
346
347
348
349
350
351
352
            warmup_func_idx.append(func_idx)
            warmup_func.append(func)
    else:
        fwd_idx = [0] * num_model_chunks
        for c_id in _order:
            if c_id > 0:
                m_chunk = c_id - 1
353
354
355
356
                for l_no in range(_num_layers_per_chunk[m_chunk]):
                    func = callables[_prefix_num_layers[m_chunk] + l_no]
                    func_idx = (_prefix_num_layers[m_chunk] * num_microbatches) + (
                        fwd_idx[m_chunk] * _num_layers_per_chunk[m_chunk] + l_no
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
                    )
                    warmup_func_idx.append(func_idx)
                    warmup_func.append(func)
                fwd_idx[m_chunk] += 1
    assert len(warmup_func) == len(
        sample_args
    ), f"Warmup runs {len(warmup_func)} don't match args {len(sample_args)}."
    assert len(warmup_func_idx) == len(
        set(warmup_func_idx)
    ), f"Warmup runs {len(warmup_func)} but only {len(set(warmup_func_idx))} are unique."

    # Filter the TE modules that cudagraph can access.
    visited_te_modules = set()

    def hook_fn(module, inputs, outputs):  # pylint: disable=unused-argument
        if isinstance(module, TransformerEngineBaseModule):
            visited_te_modules.add(module)
Jan Bielak's avatar
Jan Bielak committed
374
375
376
377
378
379
380
381
382
383
        # If forward is called on a BasicOperation directly the hook will run
        elif isinstance(module, BasicOperation):
            visited_te_modules.add(module)
        # If forward is called on a te.ops.Sequential it is not called on its constituent ops
        elif isinstance(module, Sequential):
            assert module._module_groups is not None, "Should have been initialized by warmup"
            for module_group in module._module_groups:
                if isinstance(module_group, OperationFuser):
                    for basic_op in module_group._basic_ops:
                        visited_te_modules.add(basic_op)
384
385
386
387

    # Run warmup and do the above filtering.
    with torch.cuda.stream(torch.cuda.Stream()):
        for func_idx, func in zip(warmup_func_idx, warmup_func):
388
389
390
            args = sample_args[func_idx]
            kwargs = sample_kwargs[func_idx]
            static_input_surface = per_callable_static_input_surfaces[func_idx]
391
            for warmup_iter in range(num_warmup_iters):
392
393
394
395
                hooks = []
                for module in func.modules():
                    hook = module.register_forward_hook(hook_fn)
                    hooks.append(hook)
396
                outputs, _ = _tree_flatten(func(*args, **kwargs))
397
398
                for hook in hooks:
                    hook.remove()
399
400
401
402
403
404
405
406
                if is_training:
                    grad_inputs = torch.autograd.grad(
                        outputs=tuple(o for o in outputs if o.requires_grad),
                        inputs=tuple(i for i in static_input_surface if i.requires_grad),
                        grad_outputs=tuple(torch.empty_like(o) for o in outputs if o.requires_grad),
                        only_inputs=True,
                        allow_unused=allow_unused_input,
                    )
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434

                    # Filter module params that get None grad from grad_inputs and remove them
                    # from static_input_surface. This is to ensure that the backward hooks
                    # registered to these params are not wrongly triggered.
                    num_required_grad_sample_args = sum(
                        arg.requires_grad for arg in flatten_sample_args[func_idx]
                    )
                    required_grad_input_idx = []
                    for i, arg in enumerate(static_input_surface):
                        if arg.requires_grad:
                            required_grad_input_idx.append(i)
                    module_params_with_grad = []
                    for grad_inputs_idx, inputs_idx in enumerate(required_grad_input_idx):
                        if (
                            grad_inputs[grad_inputs_idx] is not None
                            and grad_inputs_idx >= num_required_grad_sample_args
                        ):
                            module_params_with_grad.append(static_input_surface[inputs_idx])
                    if len(module_params_with_grad) != len(per_callable_module_params[func_idx]):
                        assert warmup_iter == 0, (
                            "no-grad params should only be used as inputs in the first warmup"
                            " iteration"
                        )
                        per_callable_module_params[func_idx] = tuple(module_params_with_grad)
                        static_input_surface = flatten_sample_args[func_idx] + tuple(
                            module_params_with_grad
                        )
                        per_callable_static_input_surfaces[func_idx] = static_input_surface
435
436
                else:
                    grad_inputs = None
437
                del outputs, grad_inputs
438
439
440
441
442
            # The following code is added specifically for MCore's special requirements,
            # aimed at preventing warmup from altering the control flow.
            for module in func.modules():
                if hasattr(module, "is_first_microbatch"):
                    module.is_first_microbatch = True
443
444
445
446
447
448
    torch.cuda.synchronize()

    # All captures here share a mempool. To avoid replays corrupting each other's memory,
    # the safest approach is to capture all passes in the same order they'll run:
    # fwd 1, fwd 2, ... fwd N, then bwd N, bwd N-1, ... bwd 1.

449
    if _order is not None:  # pylint: disable=too-many-nested-blocks
450
451
452
453
454
455
        per_callable_static_outputs = [None] * len(flatten_sample_args)
        per_callable_output_unflatten_spec = [None] * len(flatten_sample_args)
        per_callable_static_grad_outputs = [None] * len(flatten_sample_args)
        per_callable_static_grad_inputs = [None] * len(flatten_sample_args)
        fwd_idx = [0] * num_model_chunks
        bwd_idx = [0] * num_model_chunks
456
        static_grad_outputs_dict = {}
457
        previous_chunk_last_callable_bwd_idx = None
458
459
460
        for c_id in _order:
            if c_id > 0:
                # Capture forward graph for model chunk c_id, microbatch fwd_idx[c_id-1]
461
                m_chunk = c_id - 1
462
463
464
465
                for l_no in range(_num_layers_per_chunk[m_chunk]):
                    func = callables[_prefix_num_layers[m_chunk] + l_no]
                    per_callable_fwd_idx = (_prefix_num_layers[m_chunk] * num_microbatches) + (
                        fwd_idx[m_chunk] * _num_layers_per_chunk[m_chunk] + l_no
466
                    )
467
                    args = sample_args[per_callable_fwd_idx]
468
                    kwargs = sample_kwargs[per_callable_fwd_idx]
469
                    fwd_graph = fwd_graphs[per_callable_fwd_idx]
470
                    with _graph_context_wrapper(fwd_graph, pool=mempool):
471
                        outputs = func(*args, **kwargs)
472
473
474
475
476
477
478
                    flatten_outputs, spec = _tree_flatten(outputs)
                    per_callable_static_outputs[per_callable_fwd_idx] = tuple(flatten_outputs)
                    per_callable_output_unflatten_spec[per_callable_fwd_idx] = spec
                    graph_callables[per_callable_fwd_idx] = func
                fwd_idx[m_chunk] += 1
            else:
                # Capture backward graph for model chunk c_id, microbatch bwd_idx[-c_id-1]
479
                m_chunk = -c_id - 1
480
                previous_per_callable_bwd_idx = None
481
482
483
                for l_no in list(reversed(range(_num_layers_per_chunk[m_chunk]))):
                    per_callable_bwd_idx = (_prefix_num_layers[m_chunk] * num_microbatches) + (
                        bwd_idx[m_chunk] * _num_layers_per_chunk[m_chunk] + l_no
484
                    )
485
486
487
488
                    static_input_surface = per_callable_static_input_surfaces[per_callable_bwd_idx]
                    static_outputs = per_callable_static_outputs[per_callable_bwd_idx]
                    bwd_graph = bwd_graphs[per_callable_bwd_idx]
                    # For now, assumes all static_outputs require grad
489
                    if _reuse_graph_input_output_buffers:
490
491
                        # Note for _reuse_graph_input_output_buffers: grad output is only used
                        # within backward, so we can reuse the same static buffers every time.
492
493
494
495
496
497
498
499
500
501
502
503
                        static_grad_outputs_keys = tuple(
                            (o.shape, o.dtype, o.layout) for o in static_outputs if o.requires_grad
                        )
                        if static_grad_outputs_keys in static_grad_outputs_dict:
                            static_grad_outputs = static_grad_outputs_dict[static_grad_outputs_keys]
                        else:
                            static_grad_outputs = tuple(
                                torch.empty_like(o) if o.requires_grad else None
                                for o in static_outputs
                            )
                            static_grad_outputs_dict[static_grad_outputs_keys] = static_grad_outputs
                    else:
504
505
506
                        static_grad_outputs = tuple(
                            torch.empty_like(o) if o.requires_grad else None for o in static_outputs
                        )
507
                    if is_training:
508
                        with _graph_context_wrapper(bwd_graph, pool=mempool):
509
510
511
512
513
514
515
516
                            grad_inputs = torch.autograd.grad(
                                outputs=tuple(o for o in static_outputs if o.requires_grad),
                                inputs=tuple(i for i in static_input_surface if i.requires_grad),
                                grad_outputs=tuple(o for o in static_grad_outputs if o is not None),
                                only_inputs=True,
                                allow_unused=allow_unused_input,
                                retain_graph=retain_graph_in_backward,
                            )
517
518
519
520
521
522
                    # Constructs a tuple suitable for returning from Graphed.backward:
                    # Pads out the actually-needed grads with Nones in gradient slots for inputs
                    # that don't require grad. I couldn't think of a one-liner for this pattern.
                    static_grad_inputs = []
                    grad_idx = 0
                    for arg in static_input_surface:
523
                        if is_training and isinstance(arg, torch.Tensor) and arg.requires_grad:
524
525
526
527
528
529
530
531
                            static_grad_inputs.append(grad_inputs[grad_idx])
                            grad_idx += 1
                        else:
                            static_grad_inputs.append(None)  # type: ignore[arg-type]
                    static_grad_inputs = tuple(static_grad_inputs)  # type: ignore[assignment]

                    per_callable_static_grad_outputs[per_callable_bwd_idx] = static_grad_outputs
                    per_callable_static_grad_inputs[per_callable_bwd_idx] = static_grad_inputs
532
533
534
535
536
537
538
539
540
541
542

                    # Weak ref the static outputs and static grad inputs that are no longer needed
                    # in the following steps. These two type of tensors are both in cudagraph
                    # mempool, so we just deallocate them and let PyTorch's memory allocator
                    # reuse them elsewhere.
                    if _reuse_graph_input_output_buffers:
                        # Weak ref the static outputs of the forward pass of this backward. It's
                        # no longer needed after the corresponding backward graph is built up.
                        per_callable_static_outputs[per_callable_bwd_idx] = make_weak_ref(
                            static_outputs
                        )
543
544
545

                        # Weak ref the static grad inputs of the previous backward pass within the
                        # same chunk.
546
                        if previous_per_callable_bwd_idx is not None:
547
548
549
                            idx = previous_per_callable_bwd_idx
                            per_callable_static_grad_inputs[idx] = make_weak_ref(
                                per_callable_static_grad_inputs[idx]
550
551
552
                            )
                        previous_per_callable_bwd_idx = per_callable_bwd_idx

553
554
555
556
557
558
559
560
561
562
563
564
565
                        # Weak ref the static grad inputs of the previous chunk's last backward
                        # pass.
                        # Note: After a chunk's backward pass, we assume Mcore will send the grad
                        # input to another pipeline parallel rank and that the communication is
                        # finished before the end of the next chunk's backward pass.
                        if l_no == 0:
                            if previous_chunk_last_callable_bwd_idx is not None:
                                idx = previous_chunk_last_callable_bwd_idx
                                per_callable_static_grad_inputs[idx] = make_weak_ref(
                                    per_callable_static_grad_inputs[idx]
                                )
                            previous_chunk_last_callable_bwd_idx = per_callable_bwd_idx

566
567
568
569
570
571
                bwd_idx[m_chunk] += 1
    else:
        # Capture forward graphs
        per_callable_static_outputs = []
        per_callable_output_unflatten_spec = []
        graph_id = 0
572
        for func, args, kwargs, fwd_graph in zip(callables, sample_args, sample_kwargs, fwd_graphs):
573
            with _graph_context_wrapper(fwd_graph, pool=mempool):
574
                outputs = func(*args, **kwargs)
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
            graph_callables[graph_id] = func
            graph_id += 1

            flatten_outputs, spec = _tree_flatten(outputs)
            per_callable_static_outputs.append(tuple(flatten_outputs))
            per_callable_output_unflatten_spec.append(spec)

        # Capture backward graphs in reverse order
        per_callable_static_grad_outputs = []
        per_callable_static_grad_inputs = []
        for static_input_surface, static_outputs, bwd_graph in zip(
            reversed(per_callable_static_input_surfaces),
            reversed(per_callable_static_outputs),
            reversed(bwd_graphs),
        ):
            # For now, assumes all static_outputs require grad
            static_grad_outputs = tuple(
                torch.empty_like(o) if o.requires_grad else None for o in static_outputs
            )
594
            if is_training:
595
                with _graph_context_wrapper(bwd_graph, pool=mempool):
596
597
598
599
600
601
602
603
                    grad_inputs = torch.autograd.grad(
                        outputs=tuple(o for o in static_outputs if o.requires_grad),
                        inputs=tuple(i for i in static_input_surface if i.requires_grad),
                        grad_outputs=tuple(o for o in static_grad_outputs if o is not None),
                        only_inputs=True,
                        allow_unused=allow_unused_input,
                        retain_graph=retain_graph_in_backward,
                    )
604
605
606
607
608
609
            # Constructs a tuple suitable for returning from Graphed.backward:
            # Pads out the actually-needed grads with Nones in gradient slots for inputs that
            # don't require grad. I couldn't think of a slick one-liner for this pattern.
            static_grad_inputs = []
            grad_idx = 0
            for arg in static_input_surface:
610
                if is_training and isinstance(arg, torch.Tensor) and arg.requires_grad:
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
                    static_grad_inputs.append(grad_inputs[grad_idx])
                    grad_idx += 1
                else:
                    static_grad_inputs.append(None)  # type: ignore[arg-type]
            static_grad_inputs = tuple(static_grad_inputs)  # type: ignore[assignment]

            per_callable_static_grad_outputs.append(static_grad_outputs)
            per_callable_static_grad_inputs.append(static_grad_inputs)

        # Reverses the most recent two lists
        per_callable_static_grad_outputs = list(reversed(per_callable_static_grad_outputs))
        per_callable_static_grad_inputs = list(reversed(per_callable_static_grad_inputs))
    # Now for every per_callable list, per_callable_*[i] holds the stuff for the ith callable.

    def make_graphed_autograd_function(
        fwd_graph,
        bwd_graph,
        module_params,
629
        kwargs_keys,
630
631
632
633
634
635
636
637
638
        len_user_args,
        output_unflatten_spec,
        static_input_surface,
        static_outputs,
        static_grad_outputs,
        static_grad_inputs,
    ):
        class Graphed(torch.autograd.Function):
            """Autograd function for graph replay."""
639

640
641
            @staticmethod
            def forward(ctx, skip_fp8_weight_update, *inputs):
642
                # pylint: disable=missing-function-docstring
643
644

                # Set flag for whether to update FP8 weight updates
645
646
647
648
                ctx.is_first_module = FP8GlobalStateManager.is_first_fp8_module()
                if ctx.is_first_module and skip_fp8_weight_update is not None:
                    FP8GlobalStateManager.set_skip_fp8_weight_update_tensor(skip_fp8_weight_update)

649
                # Copy values from new tensors into static tensors
650
                for i in range(len_user_args):
651
652
653
654
                    if (
                        isinstance(static_input_surface[i], torch.Tensor)
                        and static_input_surface[i].data_ptr() != inputs[i].data_ptr()
                    ):
655
                        static_input_surface[i].copy_(inputs[i])
656
657

                # Replay forward graph
658
659
660
661
662
663
664
                fwd_graph.replay()
                assert isinstance(static_outputs, tuple)
                return tuple(o.detach() for o in static_outputs)

            @staticmethod
            @torch.autograd.function.once_differentiable
            def backward(ctx, *grads):
665
                # pylint: disable=missing-function-docstring
666
667

                # Replay backward graph
668
669
670
671
672
673
674
675
676
                assert len(grads) == len(static_grad_outputs)
                for g, grad in zip(static_grad_outputs, grads):
                    if g is not None:
                        # don't copy if autograd gods have been kind and the
                        # incoming grad is already in the right place
                        if g.data_ptr() != grad.data_ptr():
                            g.copy_(grad)
                bwd_graph.replay()

677
                # Update FP8 scale factors if needed
678
679
680
681
682
683
684
685
686
687
                if ctx.is_first_module:
                    FP8GlobalStateManager.reduce_and_update_fp8_tensors(forward=False)

                # Input args that didn't require grad expect a None gradient.
                assert isinstance(static_grad_inputs, tuple)
                return (None,) + tuple(
                    b.detach() if b is not None else b for b in static_grad_inputs
                )

        def functionalized(*user_args, **user_kwargs):
688
689

            # Decide whether to update FP8 weights
690
            skip_fp8_weight_update = None
691
            if cache_quantized_params:
692
693
                assert "is_first_microbatch" in user_kwargs and isinstance(
                    user_kwargs["is_first_microbatch"], bool
694
695
696
697
                ), "`is_first_microbatch` boolean kwarg must be provided for FP8 weight caching."

                skip_fp8_weight_update = not user_kwargs["is_first_microbatch"]

698
699
700
701
702
703
704
705
706
707
708
709
            # Check that required kwargs are provided
            for key in kwargs_keys:
                if key not in user_kwargs:
                    raise TypeError(
                        f"Graphed callable was initialized with kwarg {key} ,"
                        "but it was not provided in graph replay"
                    )

            # Runs the autograd function with inputs == all inputs to
            # the graph that might require grad (explicit user args +
            # module parameters)
            # Assumes module params didn't change since capture.
710
            flatten_user_args, _ = _tree_flatten(user_args)
711
712
713
            flatten_user_kwargs, _ = _tree_flatten([user_kwargs[key] for key in kwargs_keys])
            func_args = tuple(flatten_user_args) + tuple(flatten_user_kwargs) + module_params
            out = Graphed.apply(skip_fp8_weight_update, *func_args)
714
715
716
717
718
719
720
721
722
723
724
            return _tree_unflatten(out, output_unflatten_spec)

        return functionalized

    # Put together the final graphed callables
    ret = []
    for i in range(len(sample_args)):
        graphed = make_graphed_autograd_function(
            fwd_graphs[i],
            bwd_graphs[i],
            per_callable_module_params[i],
725
            per_callable_kwargs_keys[i],
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
            per_callable_len_user_args[i],
            per_callable_output_unflatten_spec[i],
            per_callable_static_input_surfaces[i],
            per_callable_static_outputs[i],
            per_callable_static_grad_outputs[i],
            per_callable_static_grad_inputs[i],
        )

        func = graph_callables[i]
        if isinstance(func, torch.nn.Module):

            def make_graphed_forward(func, graph_training_state, graphed, orig_fwd):
                def new_fwd(*user_args, **user_kwargs):
                    # If the module's training-or-eval state matches what we graphed,
                    # run the graph, otherwise run the original forward method
                    if func.training == graph_training_state:
                        # Set the FP8 group from global amax reduction.
Jan Bielak's avatar
Jan Bielak committed
743
744
745
                        if FP8GlobalStateManager.is_fp8_enabled():
                            fp8_recipe = FP8GlobalStateManager.get_fp8_recipe()
                            for m in func.modules():
746
747
748
                                if m not in visited_te_modules:
                                    # Only Set the FP8 meta for the modules included by forward
                                    continue
Jan Bielak's avatar
Jan Bielak committed
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
                                if isinstance(m, TransformerEngineBaseModule):
                                    from transformer_engine.pytorch.attention.dot_product_attention import (
                                        DotProductAttention,
                                    )

                                    if (
                                        isinstance(m, DotProductAttention)
                                        and not fp8_recipe.fp8_mha
                                        and not fp8_recipe.fp8_dpa
                                    ):
                                        # Don't need to update FP8 meta for non-FP8 DPA
                                        continue
                                    m.fp8_meta["fp8_group"] = FP8GlobalStateManager.get_fp8_group()
                                    m.fp8_meta["recipe"] = FP8GlobalStateManager.get_fp8_recipe()
                                    FP8GlobalStateManager.add_fp8_tensors_to_global_buffer(
                                        m.fp8_meta,
                                    )
                                elif isinstance(m, BasicOperation):
                                    for mode in ("forward", "backward"):
                                        if m.num_quantizers(mode):
769
770
771
772
773
774
                                            m._fp8_metas[mode][
                                                "fp8_group"
                                            ] = FP8GlobalStateManager.get_fp8_group()
                                            m._fp8_metas[mode][
                                                "recipe"
                                            ] = FP8GlobalStateManager.get_fp8_recipe()
Jan Bielak's avatar
Jan Bielak committed
775
776
777
                                            FP8GlobalStateManager.add_fp8_tensors_to_global_buffer(
                                                m._fp8_metas[mode],
                                            )
778
779
                        return graphed(*user_args, **user_kwargs)
                    return orig_fwd(*user_args, **user_kwargs)
780

781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
                return new_fwd

            forward = make_graphed_forward(func, func.training, graphed, func.forward)
            if _order is None:
                func.forward = forward
                ret.append(func)
            else:
                ret.append(forward)
        else:
            ret.append(graphed)

    if just_one_callable:
        return ret[0]

    return tuple(ret)


798
799
def save_fp8_tensors(
    modules: Iterable[torch.nn.Module],
800
    recipe: Optional[Recipe],
801
) -> Optional[List[Any]]:
802
803
804
805
    """
    Returns the FP8 tensors for all modules
    with adjusted amax history sizes.
    """
806

807
    if not isinstance(recipe, DelayedScaling):
808
809
        return None

810
    fp8_tensors = []
811
812
    for module in modules:
        for m in module.modules():
813
            module_tensors = None
814
815
            if isinstance(m, TransformerEngineBaseModule):
                if m.primary_weights_in_fp8:
816
                    m.adjust_amax_history_length(recipe.amax_history_len)
817
818
                module_tensors = m.get_fp8_meta_tensors()
            elif isinstance(m, BasicOperation):
819
                m.reset_recipe_state(recipe=recipe)
820
821
822
823
824
825
826
                module_tensors = m._save_fp8_metas()
            fp8_tensors.append(module_tensors)
    return fp8_tensors


def restore_fp8_tensors(
    modules: Iterable[torch.nn.Module],
827
    fp8_tensors: Optional[List[Any]],
828
) -> None:
829
    """Restore FP8 tensors."""
830
831
832
833

    if fp8_tensors is None:
        return

834
835
    for module in modules:
        for m in module.modules():
836
            module_tensors = fp8_tensors.pop(0)
837
            if isinstance(m, TransformerEngineBaseModule):
838
839
840
841
842
843
844
845
                m.reset_fp8_meta_tensors(module_tensors)
            elif isinstance(m, BasicOperation):
                m._load_fp8_metas(module_tensors)
    if len(fp8_tensors) != 0:
        raise RuntimeError(
            f"Got FP8 state for {len(fp8_tensors)} more modules than expected. "
            "There is probably a discrepancy with `save_fp8_tensors`."
        )
846
847
848


def make_graphed_callables(
849
850
851
852
853
    modules: SingleOrTuple[Callable],
    sample_args: SingleOrTuple[Tuple[torch.Tensor, ...]],
    num_warmup_iters: int = 3,
    allow_unused_input: bool = False,
    sample_kwargs: Optional[SingleOrTuple[Dict[str, Any]]] = None,
854
855
    fp8_enabled: Optional[SingleOrTuple[bool]] = None,
    fp8_calibrating: Optional[bool] = None,
Jan Bielak's avatar
Jan Bielak committed
856
    fp8_recipe: Optional[Recipe] = None,
857
    fp8_group: Optional[dist_group_type] = None,
858
859
860
861
862
863
    fp8_weight_caching: Optional[bool] = None,
    enabled: Optional[SingleOrTuple[bool]] = None,
    calibrating: Optional[bool] = None,
    recipe: Optional[Recipe] = None,
    amax_reduction_group: Optional[dist_group_type] = None,
    cache_quantized_params: Optional[bool] = None,
864
    _order: Optional[List[int]] = None,
865
    _num_layers_per_chunk: Optional[List[int]] = None,
866
    pool: Optional[Tuple[int, ...]] = None,
867
    retain_graph_in_backward: bool = False,
868
    _reuse_graph_input_output_buffers: bool = False,
869
) -> Union[Callable, Tuple[Callable, ...]]:
870
    """
871
872
873
874
875
876
877
878
    Make CUDA graph version of Transformer Engine modules

    A variation of PyTorch's `make_graphed_callables` utility function
    with support for Transformer Engine modules and FP8. Please see
    the
    `original PyTorch implementation <https://pytorch.org/docs/stable/generated/torch.cuda.make_graphed_callables.html>`_
    for more documentation.

879
880
881
882
883
    .. warning::

       Arguments 'fp8_enabled', 'fp8_calibrating', 'fp8_recipe', 'fp8_group', and 'fp8_weight_caching' are deprecated.
       Use arguments 'enabled', 'calibrating', 'recipe', 'amax_reduction_group', and 'cache_quantized_params' instead.

884
885
886
887
888
889
890
891
892
893
894
895
896
    Graphing parameters
    -------------------
    modules: (tuple of) callable
             Callable or callables to graph.
    sample_args: (tuple of) tuple of torch.Tensor
                 Positional arguments to callable(s).
    num_warmup_iters: int, default = 3
                      Number of warmup iterations.
    allow_unused_input: bool, default = `False`
                        Whether to handle case where callable inputs
                        and outputs are disconnected in compute graph.
    sample_kwargs: (tuple of) dict, optional
                   Keyword arguments to callable(s)
897
898
899
    pool: (tuple of) int, default = `None`, optional
          An instance returned from function `torch.cuda.graph_pool_handle` that hints
          this graph may share memory with the indicated pool.
900
901
    retain_graph_in_backward: bool, default = `False`
                              Whether to set retain_graph=True in backward graph capture.
902
903
904
905
906
    _reuse_graph_input_output_buffers: bool, default = `False`
        Reduce memory usage by reusing input/output data buffers between
        graphs. Only supported with Mcore interleaved pipeline parallelism, i.e.
        when `_order` is provided. All callables in `modules` are assumed to have
        inputs and outputs with the same dtype and shape.
907

908
    Quantization related parameters
909
    ----------------------
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
    enabled: (tuple of) bool, default = `False`
             whether or not to enable low precision quantization (FP8/FP4).
             If tuple, the length must match the number of modules.
    calibrating: bool, default = `False`
                 calibration mode allows collecting statistics such as amax and scale
                 data of quantized tensors even when executing without quantization enabled.
                 This is useful for saving an inference ready checkpoint while training
                 using a higher precision.
    recipe: recipe.Recipe, default = `None`
            recipe used for low precision quantization.
    amax_reduction_group: torch._C._distributed_c10d.ProcessGroup, default = `None`
                          distributed group over which amaxes for the quantized tensors
                          are reduced at the end of each training step.
    cache_quantized_params: bool, default = `False`
                            Whether or not to cache quantized weights across microbatches. if set to `True`,
                            the `is_first_microbatch` boolean argument must be passed into the forward
                            method for TransformerEngine modules. When storing primary weights in low precision
                            using TE's `quantized_model_init` API and using an quantization aware optimizer,
                            this arg must be set to `False` if calculating weight transposes' outside TE, e.g.,
                            in the optimizer step.
930

931
    """
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011

    # Handle deprecated args. If old kwargs are set, they are prioritized with warning.
    if fp8_enabled is not None:
        if enabled is not None:
            raise ValueError(
                "make_graphed_callables has deprecated `fp8_enabled` kwarg "
                "in favor of `enabled`, but both kwargs are set."
            )
        warnings.warn(
            "make_graphed_callables has deprecated `fp8_enabled` kwarg in favor of `enabled`. "
            "`fp8_enabled` will be removed in a future release.",
            category=DeprecationWarning,
            stacklevel=2,
        )
        enabled = fp8_enabled
    if enabled is None:
        enabled = False

    if fp8_calibrating is not None:
        if calibrating is not None:
            raise ValueError(
                "make_graphed_callables has deprecated `fp8_calibrating` kwarg "
                "in favor of `calibrating`, but both kwargs are set."
            )
        warnings.warn(
            "make_graphed_callables has deprecated `fp8_calibrating` kwarg in favor of "
            "`calibrating`. `fp8_calibrating` will be removed in a future release.",
            category=DeprecationWarning,
            stacklevel=2,
        )
        calibrating = fp8_calibrating
    if calibrating is None:
        calibrating = False

    if fp8_recipe is not None:
        if recipe is None:
            warnings.warn(
                "make_graphed_callables has deprecated `fp8_recipe` kwarg in favor of "
                "`recipe`. `fp8_recipe` will be removed in a future release.",
                category=DeprecationWarning,
                stacklevel=2,
            )
        else:
            raise ValueError(
                "make_graphed_callables has deprecated `fp8_recipe` kwarg "
                "in favor of `recipe`, but both kwargs are set."
            )
        recipe = fp8_recipe

    if fp8_group is not None:
        if amax_reduction_group is None:
            warnings.warn(
                "make_graphed_callables has deprecated `fp8_group` kwarg in favor of "
                "`amax_reduction_group`. `fp8_group` will be removed in a future release.",
                category=DeprecationWarning,
                stacklevel=2,
            )
        else:
            raise ValueError(
                "make_graphed_callables has deprecated `fp8_group` kwarg "
                "in favor of `amax_reduction_group`, but both kwargs are set."
            )
        amax_reduction_group = fp8_group

    if fp8_weight_caching is not None:
        if cache_quantized_params is not None:
            raise ValueError(
                "make_graphed_callables has deprecated `fp8_weight_caching` kwarg "
                "in favor of `cache_quantized_params`, but both kwargs are set."
            )
        warnings.warn(
            "make_graphed_callables has deprecated `fp8_weight_caching` kwarg in favor of "
            "`cache_quantized_params`. `fp8_weight_caching` will be removed in a future release.",
            category=DeprecationWarning,
            stacklevel=2,
        )
        cache_quantized_params = fp8_weight_caching
    if cache_quantized_params is None:
        cache_quantized_params = False

1012
1013
1014
1015
1016
1017
1018
1019
    set_capture_start()

    # Handle single module.
    just_one_callable = False
    if not isinstance(modules, tuple):
        just_one_callable = True
        modules = (modules,)

1020
1021
1022
    if not isinstance(enabled, tuple):
        assert isinstance(enabled, bool), "enabled must be a bool or a tuple of bools"
        enabled = (enabled,) * len(modules)
1023
    else:
1024
        assert len(enabled) == len(
1025
            modules
1026
1027
1028
1029
1030
1031
        ), f"enabled length ({len(enabled)}) must match modules length ({len(modules)})"
    if any(enabled) and recipe is None:
        recipe = get_default_fp8_recipe()
    elif not any(enabled):
        recipe = None
    module_uses_fp8 = dict(zip((id(m) for m in modules), enabled))
1032

1033
    # Store FP8 tensors to reset later.
1034
    saved_fp8_tensors = save_fp8_tensors(modules, recipe=recipe)
1035
1036

    # FP8 wrapper.
1037
1038
    old_call_funcs = {}

1039
    def wrap_autocast(block):
1040
1041
1042
1043
1044
        block_cls = type(block)
        if block_cls in old_call_funcs:
            return

        old_call_funcs[block_cls] = block_cls.__call__
1045

1046
        # Wrap the original call function of the module class.
1047
        def call_func(self, *args, **kwargs):
1048
            with autocast(
1049
                enabled=module_uses_fp8.get(id(self), False),
1050
1051
1052
                calibrating=calibrating,
                recipe=recipe,
                amax_reduction_group=amax_reduction_group,
1053
                _graph=True,
1054
            ):
1055
                outputs = old_call_funcs[block_cls](self, *args, **kwargs)
1056
            return outputs
1057

1058
        block_cls.__call__ = call_func
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072

    forward_funcs = []
    for module in modules:
        assert isinstance(module, torch.nn.Module), f"Graphing for {type(module)} is not supported."
        wrap_autocast(module)
        forward_funcs.append(module)

    if just_one_callable:
        forward_funcs = forward_funcs[0]
    else:
        forward_funcs = tuple(forward_funcs)

    # Save RNG state.
    if graph_safe_rng_available():
1073
1074
1075
1076
        generators = [
            torch.cuda.default_generators[torch.cuda.current_device()],
            *get_all_rng_states().values(),
        ]
1077
1078
1079
1080
1081
        original_rng_states = [state.get_state() for state in generators]
    else:
        original_rng_states = torch.cuda.get_rng_state()

    graphed_callables = _make_graphed_callables(
1082
1083
1084
        forward_funcs,
        sample_args,
        num_warmup_iters=num_warmup_iters,
1085
        allow_unused_input=allow_unused_input,
1086
        cache_quantized_params=cache_quantized_params,
1087
        sample_kwargs=sample_kwargs,
1088
        _order=_order,
1089
        _num_layers_per_chunk=_num_layers_per_chunk,
1090
        pool=pool,
1091
        retain_graph_in_backward=retain_graph_in_backward,
1092
        _reuse_graph_input_output_buffers=_reuse_graph_input_output_buffers,
1093
    )
1094
1095
1096
1097
1098
1099
1100
1101

    # Ensures warmup does not affect numerics for ops such as dropout.
    if graph_safe_rng_available():
        for gen, state in zip(generators, original_rng_states):
            gen.set_state(state)
    else:
        torch.cuda.set_rng_state(original_rng_states)

1102
1103
1104
1105
    # Remove FP8 wrapper.
    for module_cls, old_call in old_call_funcs.items():
        module_cls.__call__ = old_call

1106
1107
1108
1109
1110
    # Restore FP8 state.
    restore_fp8_tensors(modules, saved_fp8_tensors)

    set_capture_end()
    return graphed_callables