collective_fusion.py 44.8 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3
from importlib.util import find_spec
4
5
6
7
8
from typing import Optional

import torch
import torch._inductor.pattern_matcher as pm
import torch.fx as fx
9
from torch._higher_order_ops.auto_functionalize import auto_functionalized
10
11
12
from torch._inductor.pattern_matcher import PatternMatcherPass
from torch.distributed._symmetric_memory import enable_symm_mem_for_group

13
import vllm.envs as envs
14
from vllm.config import VllmConfig
15
from vllm.distributed import get_tp_group, tensor_model_parallel_all_reduce
16
from vllm.distributed.parallel_state import (
17
18
19
    get_tensor_model_parallel_rank,
    get_tensor_model_parallel_world_size,
)
20
from vllm.logger import init_logger
21
from vllm.platforms import current_platform
22
from vllm.utils import direct_register_custom_op
23

24
from .inductor_pass import enable_fake_mode
25
from .vllm_inductor_pass import VllmInductorPass, VllmPatternMatcherPass
26

27
28
FP8_DTYPE = current_platform.fp8_dtype()

29
if find_spec("flashinfer"):
30
31
    try:
        import flashinfer.comm as flashinfer_comm
32
33
34
35
36
37

        flashinfer_comm = (
            flashinfer_comm
            if hasattr(flashinfer_comm, "trtllm_allreduce_fusion")
            else None
        )
38
39
    except ImportError:
        flashinfer_comm = None
40
41
42
else:
    flashinfer_comm = None

43
44
logger = init_logger(__name__)

45
46
47
ALLREDUCE_OP = torch.ops.vllm.all_reduce.default
RMS_OP = torch.ops._C.rms_norm.default
RMS_ADD_OP = torch.ops._C.fused_add_rms_norm.default
48
49
STATIC_FP8_QUANT_OP = torch.ops._C.static_scaled_fp8_quant.default
STATIC_FP4_QUANT_OP = torch.ops._C.scaled_fp4_quant.default
50

51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

class BasePattern:
    def __init__(self, dtype: torch.dtype, device: str):
        self.dtype = dtype
        self.device = device
        self.tp = get_tp_group()
        self.tp_size = get_tensor_model_parallel_world_size()


class GEMMReduceScatterPattern(BasePattern):
    def get_inputs(self):
        mul = torch.empty([16, 4], device=self.device, dtype=self.dtype)
        mm_weight = torch.empty([4, 4], device=self.device, dtype=self.dtype)
        return [mul, mm_weight]

    def register(self, pm_pass: PatternMatcherPass):
        def pattern(mul: torch.Tensor, mm_weight: torch.Tensor):
            mm = torch.ops.aten.mm.default(mul, mm_weight)
            reduce_scatter = torch.ops.vllm.reduce_scatter.default(
                mm,
                dim=0,
                world_size=self.tp_size,
73
74
                group_name=self.tp.unique_name,
            )
75
76
77
78
79
80
81
82
83
84
85
86
87
            return reduce_scatter

        def replacement(mul: torch.Tensor, mm_weight: torch.Tensor):
            gemm_rs = torch.ops.symm_mem.fused_matmul_reduce_scatter(
                mul,
                mm_weight,
                "avg",
                scatter_dim=0,
                group_name=self.tp.device_group.group_name,
            )

            return gemm_rs

88
89
90
        pm.register_replacement(
            pattern, replacement, self.get_inputs(), pm.fwd_only, pm_pass
        )
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108


class AllGatherGEMMPattern(BasePattern):
    def get_inputs(self):
        x = torch.empty([4, 4], device=self.device, dtype=self.dtype)
        weight = torch.empty([4, 4], device=self.device, dtype=self.dtype)

        return [x, weight]

    def register(self, pm_pass: PatternMatcherPass):
        def pattern(
            x: torch.Tensor,
            weight: torch.Tensor,
        ) -> tuple[torch.Tensor, torch.Tensor]:
            all_gather = torch.ops.vllm.all_gather.default(
                x,
                dim=0,
                world_size=self.tp_size,
109
110
                group_name=self.tp.unique_name,
            )
111
112
113
114

            return torch.ops.aten.mm.default(all_gather, weight)

        def replacement(
115
116
            x: torch.Tensor, weight: torch.Tensor
        ) -> tuple[torch.Tensor, torch.Tensor]:
117
118
119
120
121
122
123
124
            ag_output, mm_outputs = torch.ops.symm_mem.fused_all_gather_matmul(
                x,
                [weight],
                gather_dim=0,
                group_name=self.tp.device_group.group_name,
            )
            return mm_outputs

125
126
127
        pm.register_replacement(
            pattern, replacement, self.get_inputs(), pm.fwd_only, pm_pass
        )
128
129


130
131
132
class ScaledMMReduceScatterPattern(BasePattern):
    def get_inputs(self):
        input = torch.empty([16, 16], device=self.device, dtype=FP8_DTYPE)
133
134
135
136
137
        mm_weight = (
            torch.empty([16, 16], device=self.device, dtype=FP8_DTYPE)
            .contiguous()
            .transpose(0, 1)
        )
138
139
140
141
142
        scale_a = torch.empty([16, 1], device=self.device, dtype=torch.float32)
        scale_b = torch.empty([1, 16], device=self.device, dtype=torch.float32)
        return [input, mm_weight, scale_a, scale_b]

    def register(self, pm_pass: PatternMatcherPass):
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
        def pattern(
            input: torch.Tensor,
            mat2: torch.Tensor,
            scale_a: torch.Tensor,
            scale_b: torch.Tensor,
        ) -> torch.Tensor:
            scaled_mm = torch.ops.aten._scaled_mm.default(
                input,
                mat2=mat2,
                scale_a=scale_a,
                scale_b=scale_b,
                bias=None,
                scale_result=None,
                out_dtype=self.dtype,
            )
158
159
160
161
            reduce_scatter = torch.ops.vllm.reduce_scatter.default(
                scaled_mm,
                dim=0,
                world_size=self.tp_size,
162
163
                group_name=self.tp.unique_name,
            )
164
165
            return reduce_scatter

166
167
168
169
170
171
        def replacement(
            input: torch.Tensor,
            mat2: torch.Tensor,
            scale_a: torch.Tensor,
            scale_b: torch.Tensor,
        ) -> torch.Tensor:
172
173
174
            # Calculate output shape: input @ mat2 with scatter_dim reduced
            output_shape = [*input.shape[:-1], mat2.shape[1]]
            scatter_dim = 0
175
            gemm_rs = torch.ops.vllm.patched_fused_scaled_matmul_reduce_scatter(
176
177
178
179
180
                input,
                mat2,
                scale_a,
                scale_b,
                "avg",
181
182
183
184
185
186
187
188
                scatter_dim,  # orig_scatter_dim
                scatter_dim,  # scatter_dim_after_maybe_reshape
                self.tp.device_group.group_name,
                output_shape,
                None,  # bias
                None,  # result_scale
                self.dtype,  # out_dtype
                False,  # use_fast_accum
189
190
191
192
            )

            return gemm_rs

193
194
195
        pm.register_replacement(
            pattern, replacement, self.get_inputs(), pm.fwd_only, pm_pass
        )
196
197
198
199
200


class AllGatherScaledMMPattern(BasePattern):
    def get_inputs(self):
        x = torch.empty([8, 16], device=self.device, dtype=FP8_DTYPE)
201
202
203
204
205
        weight = (
            torch.empty([16, 16], device=self.device, dtype=FP8_DTYPE)
            .contiguous()
            .transpose(0, 1)
        )
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221

        s1 = x.shape[0] * self.tp_size

        scale_a = torch.empty([s1, 1], device=self.device, dtype=torch.float32)
        scale_b = torch.empty([1, 16], device=self.device, dtype=torch.float32)

        return [x, weight, scale_a, scale_b]

    def register(self, pm_pass: PatternMatcherPass):
        def pattern(
            x: torch.Tensor,
            weight: torch.Tensor,
            scale_a: torch.Tensor,
            scale_b: torch.Tensor,
        ) -> torch.Tensor:
            all_gather = torch.ops.vllm.all_gather.default(
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
                x, dim=0, world_size=self.tp_size, group_name=self.tp.unique_name
            )

            return torch.ops.aten._scaled_mm.default(
                all_gather,
                mat2=weight,
                scale_a=scale_a,
                scale_b=scale_b,
                bias=None,
                scale_result=None,
                out_dtype=self.dtype,
            )

        def replacement(
            x: torch.Tensor,
            weight: torch.Tensor,
            scale_a: torch.Tensor,
            scale_b: torch.Tensor,
        ) -> torch.Tensor:
241
242
243
244
245
246
247
248
249
250
251
252
253
254
            ag_output, mm_outputs = torch.ops.symm_mem.fused_all_gather_scaled_matmul(  # noqa
                x,
                [weight],
                scale_a,
                [scale_b],
                gather_dim=0,
                biases=[None],
                result_scales=[None],
                out_dtypes=[self.dtype],
                use_fast_accum=[False],
                group_name=self.tp.device_group.group_name,
            )
            return mm_outputs

255
256
257
        pm.register_replacement(
            pattern, replacement, self.get_inputs(), pm.fwd_only, pm_pass
        )
258
259
260
261
262


class CutlassScaledMMReduceScatterPattern(BasePattern):
    def get_inputs(self):
        input = torch.empty([16, 16], device=self.device, dtype=FP8_DTYPE)
263
264
265
266
267
        mm_weight = (
            torch.empty([16, 16], device=self.device, dtype=FP8_DTYPE)
            .contiguous()
            .transpose(0, 1)
        )
268
269
270
        scale_a = torch.empty([16, 1], device=self.device, dtype=torch.float32)
        scale_b = torch.empty([1, 16], device=self.device, dtype=torch.float32)

271
        cutlass_mm_output = torch.empty([16, 16], device=self.device, dtype=self.dtype)
272
273
274
        return [input, mm_weight, scale_a, scale_b, cutlass_mm_output]

    def register(self, pm_pass: PatternMatcherPass):
275
276
277
278
279
280
281
        def pattern(
            input: torch.Tensor,
            weight: torch.Tensor,
            scale_a: torch.Tensor,
            scale_b: torch.Tensor,
            cutlass_mm_output: torch.Tensor,
        ) -> torch.Tensor:
282
283
284
285
286
287
288
            cutlass_scaled_mm = torch.ops.higher_order.auto_functionalized(
                torch.ops._C.cutlass_scaled_mm.default,
                out=cutlass_mm_output,
                a=input,
                b=weight,
                a_scales=scale_a,
                b_scales=scale_b,
289
290
                bias=None,
            )
291
292
293
294
295

            reduce_scatter = torch.ops.vllm.reduce_scatter.default(
                cutlass_scaled_mm[1],
                dim=0,
                world_size=self.tp_size,
296
297
                group_name=self.tp.unique_name,
            )
298
299
            return reduce_scatter

300
301
302
303
304
305
306
        def replacement(
            input: torch.Tensor,
            mat2: torch.Tensor,
            scale_a: torch.Tensor,
            scale_b: torch.Tensor,
            cutlass_mm_output: torch.Tensor,
        ) -> torch.Tensor:
307
308
309
            # Calculate output shape: input @ mat2 with scatter_dim reduced
            output_shape = [*input.shape[:-1], mat2.shape[1]]
            scatter_dim = 0
310
            gemm_rs = torch.ops.vllm.patched_fused_scaled_matmul_reduce_scatter(
311
312
313
314
315
                input,
                mat2,
                scale_a,
                scale_b,
                "avg",
316
317
318
319
320
321
322
323
                scatter_dim,  # orig_scatter_dim
                scatter_dim,  # scatter_dim_after_maybe_reshape
                self.tp.device_group.group_name,
                output_shape,
                None,  # bias
                None,  # result_scale
                self.dtype,  # out_dtype
                False,  # use_fast_accum
324
325
326
327
            )

            return gemm_rs

328
329
330
        pm.register_replacement(
            pattern, replacement, self.get_inputs(), pm.fwd_only, pm_pass
        )
331
332
333
334
335


class AllGatherCutlassScaledMMPattern(BasePattern):
    def get_inputs(self):
        x = torch.empty([8, 16], device=self.device, dtype=FP8_DTYPE)
336
337
338
339
340
        weight = (
            torch.empty([16, 16], device=self.device, dtype=FP8_DTYPE)
            .contiguous()
            .transpose(0, 1)
        )
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360

        s1 = x.shape[0] * self.tp_size

        scale_a = torch.empty([s1, 1], device=self.device, dtype=torch.float32)
        scale_b = torch.empty([1, 16], device=self.device, dtype=torch.float32)

        s2 = weight.shape[1]
        output = torch.empty([s1, s2], device=self.device, dtype=self.dtype)

        return [x, weight, scale_a, scale_b, output]

    def register(self, pm_pass: PatternMatcherPass):
        def pattern(
            x: torch.Tensor,
            weight: torch.Tensor,
            scale_a: torch.Tensor,
            scale_b: torch.Tensor,
            output: torch.Tensor,
        ) -> torch.Tensor:
            all_gather = torch.ops.vllm.all_gather.default(
361
362
                x, dim=0, world_size=self.tp_size, group_name=self.tp.unique_name
            )
363
364
365
366
367
368
369
370

            cutlass_scaled_mm = torch.ops.higher_order.auto_functionalized(
                torch.ops._C.cutlass_scaled_mm.default,
                out=output,
                a=all_gather,
                b=weight,
                a_scales=scale_a,
                b_scales=scale_b,
371
372
                bias=None,
            )
373
374
            return cutlass_scaled_mm[1]

375
376
377
378
379
380
381
        def replacement(
            x: torch.Tensor,
            weight: torch.Tensor,
            scale_a: torch.Tensor,
            scale_b: torch.Tensor,
            output: torch.Tensor,
        ) -> torch.Tensor:
382
383
384
385
386
387
388
389
390
391
392
393
394
395
            ag_output, mm_outputs = torch.ops.symm_mem.fused_all_gather_scaled_matmul(  # noqa
                x,
                [weight],
                scale_a,
                [scale_b],
                gather_dim=0,
                biases=[None],
                result_scales=[None],
                out_dtypes=[self.dtype],
                use_fast_accum=[False],
                group_name=self.tp.device_group.group_name,
            )
            return mm_outputs

396
397
398
        pm.register_replacement(
            pattern, replacement, self.get_inputs(), pm.fwd_only, pm_pass
        )
399
400


401
class AsyncTPPass(VllmPatternMatcherPass):
402
    @enable_fake_mode
403
404
405
406
407
408
    def __init__(self, config: VllmConfig):
        super().__init__(config)

        # Enable symmetric memory for the TP process group
        enable_symm_mem_for_group(get_tp_group().device_group.group_name)
        self.patterns: PatternMatcherPass = PatternMatcherPass(
409
410
411
            pass_name="async_tp_pass"
        )
        GEMMReduceScatterPattern(self.model_dtype, self.device).register(self.patterns)
412

413
        AllGatherGEMMPattern(self.model_dtype, self.device).register(self.patterns)
414

415
416
417
418
        # These fusions are enabled only for bfloat16 models because
        # `scaled_mm` or `cutlass_scaled_mm` with per-token (row-wise) scaling
        # only supports bfloat16 as the output dtype.
        if self.model_dtype == torch.bfloat16:
419
420
421
422
423
424
            ScaledMMReduceScatterPattern(self.model_dtype, self.device).register(
                self.patterns
            )
            AllGatherScaledMMPattern(self.model_dtype, self.device).register(
                self.patterns
            )
425

426
427
428
429
430
431
            CutlassScaledMMReduceScatterPattern(self.model_dtype, self.device).register(
                self.patterns
            )
            AllGatherCutlassScaledMMPattern(self.model_dtype, self.device).register(
                self.patterns
            )
432

433
434
        self.dump_patterns(config, self.patterns)

435
436
437
438
439
    def is_applicable_for_shape(self, shape: Optional[int]) -> bool:
        # only do replace for specific shapes
        tp_size = get_tensor_model_parallel_world_size()
        return shape is not None and shape % tp_size == 0

440
    @VllmInductorPass.time_and_log
441
    def __call__(self, graph: fx.Graph):
442
443
        self.matched_count = self.patterns.apply(graph)
        logger.debug("Replaced %s patterns", self.matched_count)
444
445
446
447
448
449
450
451
452


if flashinfer_comm is not None:
    _FI_WORKSPACE_TENSOR = None

    MiB = 1024 * 1024
    # Max size of the input tensor per world size
    # to use flashinfer fused allreduce
    _FI_MAX_SIZES = {
453
        2: 64 * MiB,  # 64MB
454
455
456
457
        4: MiB,  # 1MB
        6: MiB // 2,  # 512KB
        8: MiB // 2,  # 512KB
    }
458
459

    try:
460
461
462
463
464
465
        _FI_MAX_SIZES.update(
            {
                int(k): int(float(v) * MiB)
                for k, v in envs.VLLM_FLASHINFER_ALLREDUCE_FUSION_THRESHOLDS_MB.items()
            }
        )
466
467
    except Exception as e:
        raise ValueError(
468
469
            "Failed to parse VLLM_FLASHINFER_ALLREDUCE_FUSION_THRESHOLDS_MB: " + str(e)
        ) from e
470

471
472
473
    # opt for a more conservative default value
    # when world size is not in _FI_MAX_SIZES
    _DEFAULT_FI_MAX_SIZE = MiB // 2
474
475
476
477
478
479
480
481
482
483
484
485

    def call_trtllm_fused_allreduce_norm(
        allreduce_in: torch.Tensor,
        residual: torch.Tensor,
        rms_gamma: torch.Tensor,
        rms_eps: float,
        world_rank: int,
        world_size: int,
        launch_with_pdl: bool,
        trigger_completion_at_end: bool,
        fp32_acc: bool,
        max_token_num: int,
486
487
        pattern_code: int,
        fuse_rms_quant: bool,
488
        norm_out: Optional[torch.Tensor] = None,
489
490
491
        quant_out: Optional[torch.Tensor] = None,
        scale_out: Optional[torch.Tensor] = None,
        scale_factor: Optional[torch.Tensor] = None,
492
    ) -> None:
493
494
495
496
497
498
499
500
        num_tokens, hidden_size = allreduce_in.shape
        element_size = allreduce_in.element_size()
        current_tensor_size = num_tokens * hidden_size * element_size
        max_fusion_size = max_token_num * hidden_size * element_size
        use_flashinfer = current_tensor_size <= min(
            _FI_MAX_SIZES.get(world_size, _DEFAULT_FI_MAX_SIZE),
            max_fusion_size,
        )
501
        if use_flashinfer:
502
503
504
            assert _FI_WORKSPACE_TENSOR is not None, (
                "Flashinfer must be enabled when using flashinfer"
            )
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
            if norm_out is None:
                norm_out = allreduce_in
                residual_out = residual
            else:
                # return residual_out as allreduce_out with zeroed residual_in
                # as flashinfer does not support rms_norm
                # and allreduce_out together
                residual_out = allreduce_in
            # For the sizes that are smaller than the max size,
            # we only use flashinfer one shot allreduce
            flashinfer_comm.trtllm_allreduce_fusion(
                allreduce_in=allreduce_in,
                token_num=allreduce_in.shape[0],
                residual_in=residual,
                residual_out=residual_out,
                norm_out=norm_out,
                rms_gamma=rms_gamma,
                rms_eps=rms_eps,
                world_rank=world_rank,
                world_size=world_size,
                hidden_dim=allreduce_in.shape[-1],
                workspace_ptrs=_FI_WORKSPACE_TENSOR,
                launch_with_pdl=launch_with_pdl,
                use_oneshot=True,
                trigger_completion_at_end=trigger_completion_at_end,
                fp32_acc=fp32_acc,
531
                pattern_code=pattern_code,
532
                allreduce_out=None,
533
534
535
                quant_out=quant_out,
                scale_out=scale_out,
                # in vllm we only support swizzled layout
536
                layout_code=flashinfer_comm.QuantizationSFLayout.SWIZZLED_128x4,
537
                scale_factor=scale_factor,
538
539
540
            )
        else:
            allreduce_out = tensor_model_parallel_all_reduce(allreduce_in)
541
            if scale_factor is not None and scale_out is None and fuse_rms_quant:
542
543
544
                # Do fused rms norm static fp8 quant fused op
                if norm_out is None:
                    torch.ops._C.fused_add_rms_norm_static_fp8_quant(
545
546
547
548
549
550
551
                        quant_out,
                        allreduce_out,
                        residual,
                        rms_gamma,
                        scale_factor,
                        rms_eps,
                    )
552
553
                else:
                    torch.ops._C.rms_norm_static_fp8_quant(
554
555
                        quant_out, allreduce_out, rms_gamma, scale_factor, rms_eps
                    )
556
            else:
557
                if norm_out is None:
558
559
560
                    torch.ops._C.fused_add_rms_norm(
                        allreduce_out, residual, rms_gamma, rms_eps
                    )
561
562
                    norm_out = allreduce_out
                else:
563
                    torch.ops._C.rms_norm(norm_out, allreduce_out, rms_gamma, rms_eps)
564
565
                if scale_factor is not None:
                    if scale_out is not None:
566
567
568
                        torch.ops._C.scaled_fp4_quant(
                            quant_out, norm_out, scale_out, scale_factor
                        )
569
570
                    else:
                        torch.ops._C.static_scaled_fp8_quant(
571
572
                            quant_out, norm_out, scale_factor
                        )
573
            if scale_factor is None or norm_out is not None:
574
                # we need to return allreduce output
575
576
577
                # in cases of non quant fused AR + RMS norm
                # and fused AR + RMS norm + quant without fused add
                allreduce_in.copy_(allreduce_out)
578
579

    def call_trtllm_fused_allreduce_norm_fake(
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
        allreduce_in: torch.Tensor,
        residual: torch.Tensor,
        rms_gamma: torch.Tensor,
        rms_eps: float,
        world_rank: int,
        world_size: int,
        launch_with_pdl: bool,
        trigger_completion_at_end: bool,
        fp32_acc: bool,
        max_token_num: int,
        pattern_code: int,
        fuse_rms_quant: bool,
        norm_out: Optional[torch.Tensor] = None,
        quant_out: Optional[torch.Tensor] = None,
        scale_out: Optional[torch.Tensor] = None,
        scale_factor: Optional[torch.Tensor] = None,
    ) -> None:
597
598
599
600
601
602
603
604
605
        pass

    direct_register_custom_op(
        op_name="flashinfer_trtllm_fused_allreduce_norm",
        op_func=call_trtllm_fused_allreduce_norm,
        mutates_args=[
            "allreduce_in",
            "residual",
            "norm_out",
606
607
            "quant_out",
            "scale_out",
608
609
610
611
        ],
        fake_impl=call_trtllm_fused_allreduce_norm_fake,
    )
    flashinfer_trtllm_fused_allreduce_norm = (
612
613
        torch.ops.vllm.flashinfer_trtllm_fused_allreduce_norm.default
    )
614
615
616
617
618
619
620
621
622
623
624


class FlashInferFusedAllReduceParams:
    """Parameters for FlashInfer fused allreduce operations."""

    def __init__(
        self,
        rank: int,
        world_size: int,
        use_fp32_lamport: bool = False,
        max_token_num: int = 1024,
625
        fuse_rms_quant: bool = False,
626
627
628
629
630
631
632
633
634
    ):
        self.rank = rank
        self.world_size = world_size
        self.use_fp32_lamport = use_fp32_lamport
        self.trigger_completion_at_end = True
        self.launch_with_pdl = True
        self.fp32_acc = True
        self.use_oneshot = False
        self.max_token_num = max_token_num
635
        self.fuse_rms_quant = fuse_rms_quant
636
637
638
639
640
641
642
643
644

    def get_trtllm_fused_allreduce_kwargs(self):
        return {
            "world_rank": self.rank,
            "world_size": self.world_size,
            "launch_with_pdl": self.launch_with_pdl,
            "trigger_completion_at_end": self.trigger_completion_at_end,
            "fp32_acc": self.fp32_acc,
            "max_token_num": self.max_token_num,
645
            "fuse_rms_quant": self.fuse_rms_quant,
646
647
648
        }


649
650
class AllReduceRMSNormPattern(BasePattern):
    """
651
    This pattern replaces the allreduce + rms norm (without residual)
652
653
654
    with fused flashinfer implementation.
    Applies to allreduce + rmsnorm before attn in the first Transformer block.
    """
655
656
657
658
659
660
661
662
663
664
665
666
667
668

    def __init__(
        self,
        epsilon: float,
        dtype: torch.dtype,
        device: str,
        allreduce_params: FlashInferFusedAllReduceParams,
    ):
        super().__init__(dtype, device)
        self.epsilon = epsilon
        self.allreduce_params = allreduce_params

    def get_inputs(self):
        input = torch.empty([1, 8, 4], device=self.device, dtype=self.dtype)
669
        rms_result = torch.empty([1, 8, 4], device=self.device, dtype=self.dtype)
670
671
672
673
674
        weight = torch.empty([4], device=self.device, dtype=self.dtype)

        return [input, rms_result, weight]

    def register(self, pm_pass: PatternMatcherPass):
675
676
677
        def pattern(
            input: torch.Tensor, rms_result: torch.Tensor, weight: torch.Tensor
        ):
678
            allreduce_output = tensor_model_parallel_all_reduce(input)
679
680
681
            rms = auto_functionalized(
                RMS_OP,
                result=rms_result,
682
                input=allreduce_output,
683
684
685
                weight=weight,
                epsilon=self.epsilon,
            )
686
687
            # rms_result, allreduce_output
            return rms[1], allreduce_output
688

689
690
691
        def replacement(
            input: torch.Tensor, rms_result: torch.Tensor, weight: torch.Tensor
        ):
692
693
            residual = torch.zeros_like(input)
            allreduce = auto_functionalized(
694
                flashinfer_trtllm_fused_allreduce_norm,
695
696
697
                allreduce_in=input,
                residual=residual,
                norm_out=rms_result,
698
699
                quant_out=None,
                scale_out=None,
700
701
                rms_gamma=weight,
                rms_eps=self.epsilon,
702
                pattern_code=flashinfer_comm.AllReduceFusionPattern.kARResidualRMSNorm,
703
704
                **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
            )
705
            # rms_result, allreduce_in
706
707
            return allreduce[3], allreduce[1]

708
709
710
        pm.register_replacement(
            pattern, replacement, self.get_inputs(), pm.fwd_only, pm_pass
        )
711
712
713


class AllReduceFusedAddRMSNormPattern(BasePattern):
714
    """
715
    This pattern replaces the allreduce + rms norm (with residual)
716
717
718
    with fused flashinfer implementation.
    Applies to o_proj + rmsnorm after attn and mlp + rmsnorm before attn.
    """
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741

    def __init__(
        self,
        epsilon: float,
        dtype: torch.dtype,
        device: str,
        allreduce_params: FlashInferFusedAllReduceParams,
    ):
        super().__init__(dtype, device)
        self.epsilon = epsilon
        self.allreduce_params = allreduce_params

    def get_inputs(self):
        input = torch.empty([4, 4], device=self.device, dtype=self.dtype)
        residual = torch.empty([4, 4], device=self.device, dtype=self.dtype)
        weight = torch.empty([4, 4], device=self.device, dtype=self.dtype)
        return [
            residual,
            input,
            weight,
        ]

    def register(self, pm_pass: PatternMatcherPass):
742
        def pattern(residual: torch.Tensor, input: torch.Tensor, weight: torch.Tensor):
743
            allreduce_output = tensor_model_parallel_all_reduce(input)
744
745
            rms = auto_functionalized(
                RMS_ADD_OP,
746
                input=allreduce_output,
747
748
749
750
                residual=residual,
                weight=weight,
                epsilon=self.epsilon,
            )
751
            # input, residual
752
753
            return rms[1], rms[2]

754
755
756
        def replacement(
            residual: torch.Tensor, input: torch.Tensor, weight: torch.Tensor
        ):
757
            allreduce = auto_functionalized(
758
                flashinfer_trtllm_fused_allreduce_norm,
759
760
                allreduce_in=input,
                residual=residual,
761
762
763
                norm_out=None,
                quant_out=None,
                scale_out=None,
764
765
                rms_gamma=weight,
                rms_eps=self.epsilon,
766
                pattern_code=flashinfer_comm.AllReduceFusionPattern.kARResidualRMSNorm,
767
768
                **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
            )
769
            # allreduce_in, residual
770
771
            return allreduce[1], allreduce[2]

772
773
774
        pm.register_replacement(
            pattern, replacement, self.get_inputs(), pm.fwd_only, pm_pass
        )
775
776


777
778
class AllReduceFusedRMSNormStaticQuantFP8Pattern(BasePattern):
    """
779
    This pattern replaces the allreduce + rms norm (without residual)
780
    + static fp8 quant with fused flashinfer implementation.
781
    Applies to allreduce + rmsnorm + quant before attn
782
783
784
    in the first Transformer block.
    """

785
786
787
788
789
790
791
    def __init__(
        self,
        epsilon: float,
        dtype: torch.dtype,
        device: str,
        allreduce_params: FlashInferFusedAllReduceParams,
    ):
792
793
794
795
796
797
798
        super().__init__(dtype, device)
        self.epsilon = epsilon
        self.allreduce_params = allreduce_params
        self.quant_dtype = torch.float8_e4m3fn

    def register(self, pm_pass: PatternMatcherPass):
        def get_inputs():
799
800
801
802
803
804
805
            input = torch.zeros([1, 8, 4], device=self.device, dtype=self.dtype)
            rmsnorm_result = torch.empty(
                [1, 8, 4], device=self.device, dtype=self.dtype
            )
            quant_result = torch.empty(
                [1, 8, 4], device=self.device, dtype=self.quant_dtype
            )
806
807
808
809
810
811
812
813
814
815
816
817
            weight = torch.empty([4], device=self.device, dtype=self.dtype)
            scale = torch.tensor(1.0, device=self.device, dtype=torch.float32)
            return [input, rmsnorm_result, quant_result, weight, scale]

        def pattern(
            input: torch.Tensor,
            rmsnorm_result: torch.Tensor,
            quant_result: torch.Tensor,
            weight: torch.Tensor,
            scale: torch.Tensor,
        ):
            all_reduce = tensor_model_parallel_all_reduce(input)
818
819
820
821
822
823
824
            rmsnorm_out_tuple = auto_functionalized(
                RMS_OP,
                result=rmsnorm_result,
                input=all_reduce,
                weight=weight,
                epsilon=self.epsilon,
            )
825

826
827
828
829
830
831
            quant_out_tuple = auto_functionalized(
                STATIC_FP8_QUANT_OP,
                result=quant_result,
                input=rmsnorm_out_tuple[1],
                scale=scale,
            )
832
833
834
835

            # quant_out, allreduce_output
            return quant_out_tuple[1], all_reduce

836
837
838
839
840
841
842
        def replacement(
            input: torch.Tensor,
            result_rms: torch.Tensor,
            quant_result: torch.Tensor,
            weight: torch.Tensor,
            scale: torch.Tensor,
        ):
843
844
845
846
847
848
849
850
851
852
            residual = torch.zeros_like(input)
            allreduce = auto_functionalized(
                flashinfer_trtllm_fused_allreduce_norm,
                allreduce_in=input,
                residual=residual,
                norm_out=result_rms,
                quant_out=quant_result,
                scale_out=None,
                rms_gamma=weight,
                rms_eps=self.epsilon,
853
854
855
856
                # We don't use norm_out afterwards
                pattern_code=(
                    flashinfer_comm.AllReduceFusionPattern.kARResidualRMSNormFP8Quant
                ),
857
858
859
860
861
862
863
                scale_factor=scale,
                **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
            )

            # quant_out, allreduce_output
            return allreduce[4], allreduce[1]

864
865
866
        pm.register_replacement(
            pattern, replacement, get_inputs(), pm.fwd_only, pm_pass
        )
867
868
869
870
871
872


class AllReduceFusedAddRMSNormStaticQuantFP8Pattern(BasePattern):
    """
    This pattern replaces the allreduce + rms norm (with residual)
    + static fp8 quant with fused flashinfer implementation.
873
    Applies to o_proj + rmsnorm after attn + quant and
874
875
876
    mlp + rmsnorm + quant before attn.
    """

877
878
879
880
881
882
883
    def __init__(
        self,
        epsilon: float,
        dtype: torch.dtype,
        device: str,
        allreduce_params: FlashInferFusedAllReduceParams,
    ):
884
885
886
887
888
889
890
891
892
        super().__init__(dtype, device)
        self.epsilon = epsilon
        self.allreduce_params = allreduce_params
        self.quant_dtype = torch.float8_e4m3fn

    def register(self, pm_pass: PatternMatcherPass):
        def get_inputs():
            input = torch.empty([4, 4], device=self.device, dtype=self.dtype)

893
            residual = torch.empty([4, 4], device=self.device, dtype=self.dtype)
894
            weight = torch.empty([4, 4], device=self.device, dtype=self.dtype)
895
896
897
898
            quant_result = torch.empty(
                [4, 4], device=self.device, dtype=self.quant_dtype
            )
            scale = torch.empty([1, 1], device=self.device, dtype=torch.float32)
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916

            return [
                quant_result,
                residual,
                input,
                weight,
                scale,
            ]

        def pattern(
            quant_result: torch.Tensor,
            residual: torch.Tensor,
            input: torch.Tensor,
            weight: torch.Tensor,
            scale: torch.Tensor,
        ):
            allreduce_output = tensor_model_parallel_all_reduce(input)

917
            fused_add_rmsnorm_out_tuple = auto_functionalized(
918
919
920
921
                RMS_ADD_OP,
                input=allreduce_output,
                residual=residual,
                weight=weight,
922
923
                epsilon=self.epsilon,
            )
924
925
926
927
            quant_out_tuple = auto_functionalized(
                STATIC_FP8_QUANT_OP,
                result=quant_result,
                input=fused_add_rmsnorm_out_tuple[1],
928
929
                scale=scale,
            )
930
931
932
933

            # quant_out, allreduce_output
            return quant_out_tuple[1], fused_add_rmsnorm_out_tuple[2]

934
935
936
937
938
939
940
        def replacement(
            quant_result: torch.Tensor,
            residual: torch.Tensor,
            input: torch.Tensor,
            weight: torch.Tensor,
            scale: torch.Tensor,
        ):
941
942
943
944
945
946
947
948
949
            allreduce = auto_functionalized(
                flashinfer_trtllm_fused_allreduce_norm,
                allreduce_in=input,
                residual=residual,
                norm_out=None,
                quant_out=quant_result,
                scale_out=None,
                rms_gamma=weight,
                rms_eps=self.epsilon,
950
951
952
953
                # We don't use norm_out afterwards
                pattern_code=(
                    flashinfer_comm.AllReduceFusionPattern.kARResidualRMSNormFP8Quant
                ),
954
955
956
                scale_factor=scale,
                **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
            )
957
            # quant_out, rms_norm_residual
958
959
            return allreduce[4], allreduce[2]

960
961
962
        pm.register_replacement(
            pattern, replacement, get_inputs(), pm.fwd_only, pm_pass
        )
963
964
965
966


class AllReduceFusedRMSNormStaticQuantNVFP4Pattern(BasePattern):
    """
967
    This pattern replaces the allreduce + rms norm (without residual)
968
    + static nvfp4 quant with fused flashinfer implementation.
969
    Applies to allreduce + rmsnorm + quant before attn
970
971
972
    in the first Transformer block.
    """

973
974
975
976
977
978
979
    def __init__(
        self,
        epsilon: float,
        dtype: torch.dtype,
        device: str,
        allreduce_params: FlashInferFusedAllReduceParams,
    ):
980
981
982
983
984
985
        super().__init__(dtype, device)
        self.epsilon = epsilon
        self.allreduce_params = allreduce_params

    def register(self, pm_pass: PatternMatcherPass):
        def get_inputs():
986
987
988
989
990
991
992
993
994
            input = torch.empty([1, 16, 16], device=self.device, dtype=self.dtype)

            rmsnorm_result = torch.empty(
                [1, 16, 16], device=self.device, dtype=self.dtype
            )
            quant_result = torch.empty((16, 8), device=self.device, dtype=torch.uint8)
            input_global_scale = torch.empty(
                [1, 1], device=self.device, dtype=torch.float32
            )
995
            weight = torch.empty([16], device=self.device, dtype=self.dtype)
996
            output_scale = torch.empty([128, 4], device=self.device, dtype=torch.int32)
997
998

            return [
999
1000
1001
1002
1003
1004
                input,
                rmsnorm_result,
                quant_result,
                weight,
                input_global_scale,
                output_scale,
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
            ]

        def pattern(
            input: torch.Tensor,
            rmsnorm_result: torch.Tensor,
            quant_result: torch.Tensor,
            weight: torch.Tensor,
            input_global_scale: torch.Tensor,
            output_scale: torch.Tensor,
        ):
            all_reduce = tensor_model_parallel_all_reduce(input)
1016
1017
1018
1019
1020
1021
1022
            rmsnorm_out_tuple = auto_functionalized(
                RMS_OP,
                result=rmsnorm_result,
                input=all_reduce,
                weight=weight,
                epsilon=self.epsilon,
            )
1023
1024
1025
1026
1027
1028

            quant_out_tuple = auto_functionalized(
                STATIC_FP4_QUANT_OP,
                output=quant_result,
                input=rmsnorm_out_tuple[1],
                output_scale=output_scale,
1029
1030
                input_scale=input_global_scale,
            )
1031
1032
1033
1034

            # quant_out, allreduce_output, output_scale
            return quant_out_tuple[1], all_reduce, quant_out_tuple[2]

1035
1036
1037
1038
1039
1040
1041
1042
        def replacement(
            input: torch.Tensor,
            result_rms: torch.Tensor,
            quant_result: torch.Tensor,
            weight: torch.Tensor,
            input_global_scale: torch.Tensor,
            output_scale: torch.Tensor,
        ):
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
            residual = torch.zeros_like(input)
            allreduce = auto_functionalized(
                flashinfer_trtllm_fused_allreduce_norm,
                allreduce_in=input,
                residual=residual,
                norm_out=result_rms,
                quant_out=quant_result,
                scale_out=output_scale,
                rms_gamma=weight,
                rms_eps=self.epsilon,
1053
1054
1055
1056
                # We don't use norm_out afterwards
                pattern_code=(
                    flashinfer_comm.AllReduceFusionPattern.kARResidualRMSNormFP4Quant
                ),
1057
1058
1059
1060
1061
1062
1063
                scale_factor=input_global_scale,
                **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
            )

            # quant_out, allreduce_output, output_scale
            return allreduce[4], allreduce[1], allreduce[5]

1064
1065
1066
        pm.register_replacement(
            pattern, replacement, get_inputs(), pm.fwd_only, pm_pass
        )
1067
1068
1069
1070
1071
1072


class AllReduceFusedAddRMSNormStaticQuantNVFP4Pattern(BasePattern):
    """
    This pattern replaces the allreduce + rms norm (with residual)
    + static nvfp4 quant with fused flashinfer implementation.
1073
    Applies to o_proj + rmsnorm after attn + quant and
1074
1075
1076
    mlp + rmsnorm + quant before attn.
    """

1077
1078
1079
1080
1081
1082
1083
    def __init__(
        self,
        epsilon: float,
        dtype: torch.dtype,
        device: str,
        allreduce_params: FlashInferFusedAllReduceParams,
    ):
1084
1085
1086
1087
1088
1089
1090
1091
        super().__init__(dtype, device)
        self.epsilon = epsilon
        self.allreduce_params = allreduce_params

    def register(self, pm_pass: PatternMatcherPass):
        def get_inputs():
            input = torch.empty([16, 16], device=self.device, dtype=self.dtype)

1092
1093
1094
1095
1096
1097
1098
            residual = torch.empty([16, 16], device=self.device, dtype=self.dtype)
            weight = torch.empty([16, 16], device=self.device, dtype=self.dtype)
            quant_result = torch.empty((16, 8), device=self.device, dtype=torch.uint8)
            input_global_scale = torch.empty(
                [1, 1], device=self.device, dtype=torch.float32
            )
            output_scale = torch.empty([128, 4], device=self.device, dtype=torch.int32)
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108

            return [
                quant_result,
                residual,
                input,
                output_scale,
                weight,
                input_global_scale,
            ]

1109
1110
1111
1112
1113
1114
1115
1116
        def pattern(
            quant_result: torch.Tensor,
            residual: torch.Tensor,
            input: torch.Tensor,
            output_scale: torch.Tensor,
            weight: torch.Tensor,
            input_global_scale: torch.Tensor,
        ):
1117
1118
            allreduce_output = tensor_model_parallel_all_reduce(input)

1119
            fused_add_rmsnorm_out_tuple = auto_functionalized(
1120
1121
1122
1123
                RMS_ADD_OP,
                input=allreduce_output,
                residual=residual,
                weight=weight,
1124
1125
                epsilon=self.epsilon,
            )
1126
1127
1128
1129
1130
            quant_out_tuple = auto_functionalized(
                STATIC_FP4_QUANT_OP,
                output=quant_result,
                input=fused_add_rmsnorm_out_tuple[1],
                output_scale=output_scale,
1131
1132
                input_scale=input_global_scale,
            )
1133
1134

            # quant_out, allreduce_output, output_scale
1135
1136
1137
1138
1139
            return (
                quant_out_tuple[1],
                fused_add_rmsnorm_out_tuple[2],
                quant_out_tuple[2],
            )
1140

1141
1142
1143
1144
1145
1146
1147
1148
        def replacement(
            quant_result: torch.Tensor,
            residual: torch.Tensor,
            input: torch.Tensor,
            output_scale: torch.Tensor,
            weight: torch.Tensor,
            input_global_scale: torch.Tensor,
        ):
1149
1150
1151
1152
1153
1154
1155
1156
1157
            allreduce = auto_functionalized(
                flashinfer_trtllm_fused_allreduce_norm,
                allreduce_in=input,
                residual=residual,
                norm_out=None,
                quant_out=quant_result,
                scale_out=output_scale,
                rms_gamma=weight,
                rms_eps=self.epsilon,
1158
1159
1160
1161
                # We don't use norm_out afterwards
                pattern_code=(
                    flashinfer_comm.AllReduceFusionPattern.kARResidualRMSNormFP4Quant
                ),
1162
1163
1164
1165
1166
1167
                scale_factor=input_global_scale,
                **self.allreduce_params.get_trtllm_fused_allreduce_kwargs(),
            )
            # quant_out, rms_norm_residual, output_scale
            return allreduce[4], allreduce[2], allreduce[5]

1168
1169
1170
        pm.register_replacement(
            pattern, replacement, get_inputs(), pm.fwd_only, pm_pass
        )
1171
1172


1173
class AllReduceFusionPass(VllmPatternMatcherPass):
1174
    def __init__(self, config: VllmConfig):
1175
1176
1177
1178
1179
1180
        super().__init__(config)
        self.disabled = True
        self.tp_size = get_tensor_model_parallel_world_size()
        if self.tp_size <= 1:
            return
        self.patterns: PatternMatcherPass = PatternMatcherPass(
1181
1182
            pass_name="all_reduce_fusion_pass"
        )
1183
1184
1185
1186
1187
1188
1189
1190
        if config.model_config is None:
            return
        self.hidden_dim = config.model_config.get_hidden_size()
        self.group = get_tp_group().device_group
        rank = get_tensor_model_parallel_rank()
        use_fp32_lamport = self.model_dtype == torch.float32
        if flashinfer_comm is None:
            logger.warning(
1191
                "Flashinfer is not installed or comm module not found, "
1192
1193
                "skipping allreduce fusion pass"
            )
1194
1195
1196
1197
            return
        # Check if the world size is supported
        if self.tp_size not in _FI_MAX_SIZES:
            logger.warning(
1198
                "Flashinfer allreduce fusion is not supported for world size %s",
1199
1200
1201
                self.tp_size,
            )
            return
1202
        max_num_token = min(
1203
1204
1205
1206
            _FI_MAX_SIZES.get(self.tp_size, _DEFAULT_FI_MAX_SIZE)
            // (self.hidden_dim * self.tp_size * (4 if use_fp32_lamport else 2)),
            config.compilation_config.pass_config.fi_allreduce_fusion_max_token_num,
        )
1207
1208
1209
1210
        self.ipc_handles, workspace_tensor = (
            flashinfer_comm.trtllm_create_ipc_workspace_for_all_reduce_fusion(
                tp_rank=rank,
                tp_size=self.tp_size,
1211
                max_token_num=max_num_token,
1212
1213
1214
                hidden_dim=self.hidden_dim,
                group=self.group,
                use_fp32_lamport=use_fp32_lamport,
1215
1216
            )
        )
1217
1218
1219
1220
1221
1222
1223

        global _FI_WORKSPACE_TENSOR
        _FI_WORKSPACE_TENSOR = workspace_tensor
        self.allreduce_params = FlashInferFusedAllReduceParams(
            rank=rank,
            world_size=self.tp_size,
            use_fp32_lamport=use_fp32_lamport,
1224
1225
1226
            max_token_num=max_num_token,
            # fuse rms norm static fp8 quant fused op
            # in fallback path, when we don't use flashinfer
1227
1228
            fuse_rms_quant=config.compilation_config.pass_config.enable_fusion,
        )
1229

1230
        self.register_patterns()
1231
        self.dump_patterns(config, self.patterns)
1232
1233
1234

    @enable_fake_mode
    def register_patterns(self):
1235
        for epsilon in [1e-5, 1e-6]:
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
            AllReduceFusedRMSNormStaticQuantFP8Pattern(
                epsilon,
                self.model_dtype,
                self.device,
                self.allreduce_params,
            ).register(self.patterns)
            AllReduceFusedAddRMSNormStaticQuantFP8Pattern(
                epsilon,
                self.model_dtype,
                self.device,
                self.allreduce_params,
            ).register(self.patterns)
            if current_platform.has_device_capability(100):
                AllReduceFusedRMSNormStaticQuantNVFP4Pattern(
                    epsilon,
                    self.model_dtype,
                    self.device,
                    self.allreduce_params,
                ).register(self.patterns)
                AllReduceFusedAddRMSNormStaticQuantNVFP4Pattern(
                    epsilon,
                    self.model_dtype,
                    self.device,
                    self.allreduce_params,
                ).register(self.patterns)
            AllReduceRMSNormPattern(
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
                epsilon,
                self.model_dtype,
                self.device,
                self.allreduce_params,
            ).register(self.patterns)
            AllReduceFusedAddRMSNormPattern(
                epsilon,
                self.model_dtype,
                self.device,
                self.allreduce_params,
            ).register(self.patterns)

1274
1275
1276
1277
            # WARNING: This is a hack to clear the pattern matcher cache
            # and allow multiple values of epsilon.
            torch._inductor.pattern_matcher._seen_patterns.clear()

1278
1279
        self.disabled = False

1280
    @VllmInductorPass.time_and_log
1281
1282
    def __call__(self, graph: fx.Graph):
        if self.disabled:
1283
            logger.debug("AllReduceFusionPass disabled")
1284
            return
1285
1286
1287

        self.matched_count = self.patterns.apply(graph)
        logger.debug("Replaced %s patterns", self.matched_count)
1288
1289

    def __del__(self):
1290
        if getattr(self, "disabled", True):
1291
1292
            return
        if flashinfer_comm is not None:
1293
            flashinfer_comm.trtllm_destroy_ipc_workspace_for_all_reduce(
1294
1295
                self.ipc_handles, self.group
            )