deepseek_mtp.py 17.1 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3
4
import typing
from collections.abc import Callable, Iterable
5
6
7
8
9

import torch
import torch.nn as nn
from transformers import PretrainedConfig

10
from vllm._aiter_ops import rocm_aiter_ops
11
from vllm.compilation.decorators import support_torch_compile
12
from vllm.config import VllmConfig
13
from vllm.logger import init_logger
14
from vllm.model_executor.layers.fused_moe import SharedFusedMoE
15
16
17
18
from vllm.model_executor.layers.layernorm import RMSNorm
from vllm.model_executor.layers.logits_processor import LogitsProcessor
from vllm.model_executor.layers.quantization import QuantizationConfig
from vllm.model_executor.layers.vocab_parallel_embedding import (
19
20
21
    ParallelLMHead,
    VocabParallelEmbedding,
)
22
23
24
25
from vllm.model_executor.model_loader.weight_utils import (
    default_weight_loader,
    maybe_remap_kv_scale_name,
)
26
from vllm.platforms import current_platform
27
28
from vllm.sequence import IntermediateTensors

29
30
from .deepseek_v2 import (
    DeepseekV2DecoderLayer,
31
32
    DeepseekV2MixtureOfExperts,
    DeepseekV2MoE,
33
34
    get_spec_layer_idx_from_weight_name,
)
Jiayi Yao's avatar
Jiayi Yao committed
35
from .interfaces import SupportsPP
36
37
from .utils import maybe_prefix

38
39
logger = init_logger(__name__)

40
41
42
43
44

class SharedHead(nn.Module):
    def __init__(
        self,
        config: PretrainedConfig,
45
        prefix: str,
46
        quant_config: QuantizationConfig | None = None,
47
48
49
    ) -> None:
        super().__init__()
        self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
50
51
52
53
54
55
        self.head = ParallelLMHead(
            config.vocab_size,
            config.hidden_size,
            quant_config=quant_config,
            prefix=maybe_prefix(prefix, "head"),
        )
56
57
58
59
60
61

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        return self.norm(hidden_states)


class DeepSeekMultiTokenPredictorLayer(nn.Module):
62
    def __init__(self, vllm_config: VllmConfig, prefix: str) -> None:
63
        super().__init__()
64

65
66
        config = vllm_config.speculative_config.draft_model_config.hf_config
        self.config = config
67
68
        quant_config = vllm_config.quant_config

69
70
        self.enorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
        self.hnorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
71
        self.eh_proj = nn.Linear(config.hidden_size * 2, config.hidden_size, bias=False)
72

73
74
        self.device = current_platform.device_type

75
76
77
78
79
80
81
        self.is_v32 = hasattr(config, "index_topk")
        if self.is_v32:
            topk_tokens = config.index_topk
            topk_indices_buffer = torch.empty(
                vllm_config.scheduler_config.max_num_batched_tokens,
                topk_tokens,
                dtype=torch.int32,
82
                device=self.device,
83
            )
84
85
        else:
            topk_indices_buffer = None
86

87
88
89
90
        self.shared_head = SharedHead(
            config=config, prefix=prefix, quant_config=quant_config
        )
        self.mtp_block = DeepseekV2DecoderLayer(
91
92
93
94
            vllm_config,
            prefix,
            config=self.config,
            topk_indices_buffer=topk_indices_buffer,
95
        )
96
97
98
99
100
101

    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
        previous_hidden_states: torch.Tensor,
102
        inputs_embeds: torch.Tensor | None = None,
103
104
105
106
        spec_step_index: int = 0,
    ) -> torch.Tensor:
        assert inputs_embeds is not None
        # masking inputs at position 0, as not needed by MTP
107
        inputs_embeds = torch.where(positions.unsqueeze(-1) == 0, 0, inputs_embeds)
108
109
110
111
        inputs_embeds = self.enorm(inputs_embeds)
        previous_hidden_states = self.hnorm(previous_hidden_states)

        hidden_states = self.eh_proj(
112
113
            torch.cat([inputs_embeds, previous_hidden_states], dim=-1)
        )
114

115
116
117
        hidden_states, residual = self.mtp_block(
            positions=positions, hidden_states=hidden_states, residual=None
        )
118
        hidden_states = residual + hidden_states
119
        return hidden_states
120
121
122
123
124
125
126
127
128


class DeepSeekMultiTokenPredictor(nn.Module):
    def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
        super().__init__()
        config = vllm_config.model_config.hf_config
        self.mtp_start_layer_idx = config.num_hidden_layers
        self.num_mtp_layers = config.num_nextn_predict_layers
        # to map the exact layer index from weights
129

130
131
132
133
134
135
136
137
138
139
140
        self.layers = torch.nn.ModuleDict(
            {
                str(idx): DeepSeekMultiTokenPredictorLayer(
                    vllm_config, f"{prefix}.layers.{idx}"
                )
                for idx in range(
                    self.mtp_start_layer_idx,
                    self.mtp_start_layer_idx + self.num_mtp_layers,
                )
            }
        )
141
142
143
144
        self.embed_tokens = VocabParallelEmbedding(
            config.vocab_size,
            config.hidden_size,
        )
145
146
        self.logits_processor = LogitsProcessor(config.vocab_size)

147
    def embed_input_ids(self, input_ids: torch.Tensor) -> torch.Tensor:
148
149
        return self.embed_tokens(input_ids)

150
151
152
153
154
    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
        previous_hidden_states: torch.Tensor,
155
        inputs_embeds: torch.Tensor | None = None,
156
157
        spec_step_idx: int = 0,
    ) -> torch.Tensor:
158
159
        if inputs_embeds is None:
            inputs_embeds = self.embed_tokens(input_ids)
160
        current_step_idx = spec_step_idx % self.num_mtp_layers
161
        return self.layers[str(self.mtp_start_layer_idx + current_step_idx)](
162
163
164
165
            input_ids,
            positions,
            previous_hidden_states,
            inputs_embeds,
166
            current_step_idx,
167
168
169
170
171
172
173
        )

    def compute_logits(
        self,
        hidden_states: torch.Tensor,
        spec_step_idx: int = 0,
    ) -> torch.Tensor:
174
175
176
177
178
        current_step_idx = spec_step_idx % self.num_mtp_layers
        mtp_layer = self.layers[str(self.mtp_start_layer_idx + current_step_idx)]
        logits = self.logits_processor(
            mtp_layer.shared_head.head, mtp_layer.shared_head(hidden_states)
        )
179
180
181
        return logits


182
@support_torch_compile
183
class DeepSeekMTP(nn.Module, SupportsPP, DeepseekV2MixtureOfExperts):
184
185
186
    def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
        super().__init__()
        self.config = vllm_config.model_config.hf_config
187
188
189
        self.model = DeepSeekMultiTokenPredictor(
            vllm_config=vllm_config, prefix=maybe_prefix(prefix, "model")
        )
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
        # Set MoE hyperparameters
        self.set_moe_parameters()

    def set_moe_parameters(self):
        self.expert_weights = []
        self.num_moe_layers = self.config.num_nextn_predict_layers
        self.num_expert_groups = self.config.n_group

        self.moe_layers = []
        self.moe_mlp_layers = []
        example_moe = None
        for layer in self.model.layers.values():
            assert isinstance(layer, DeepSeekMultiTokenPredictorLayer)
            layer = layer.mtp_block
            assert isinstance(layer, DeepseekV2DecoderLayer)
            if isinstance(layer.mlp, DeepseekV2MoE):
                example_moe = layer.mlp
                self.moe_mlp_layers.append(layer.mlp)
                self.moe_layers.append(layer.mlp.experts)
        self.extract_moe_parameters(example_moe)
210

211
212
    def embed_input_ids(self, input_ids: torch.Tensor) -> torch.Tensor:
        return self.model.embed_input_ids(input_ids)
213

214
215
216
217
    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
218
        hidden_states: torch.Tensor,
219
220
        intermediate_tensors: IntermediateTensors | None = None,
        inputs_embeds: torch.Tensor | None = None,
221
222
        spec_step_idx: int = 0,
    ) -> torch.Tensor:
223
224
225
        hidden_states = self.model(
            input_ids, positions, hidden_states, inputs_embeds, spec_step_idx
        )
226
227
228
229
230
231
        return hidden_states

    def compute_logits(
        self,
        hidden_states: torch.Tensor,
        spec_step_idx: int = 0,
232
    ) -> torch.Tensor | None:
233
        return self.model.compute_logits(hidden_states, spec_step_idx)
234

235
    def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]) -> set[str]:
236
237
238
        rocm_aiter_moe_shared_expert_enabled = (
            rocm_aiter_ops.is_fusion_moe_shared_experts_enabled()
        )
239
240
241
        stacked_params_mapping = [
            ("gate_up_proj", "gate_proj", 0),
            ("gate_up_proj", "up_proj", 1),
242
243
            ("fused_qkv_a_proj", "q_a_proj", 0),
            ("fused_qkv_a_proj", "kv_a_proj_with_mqa", 1),
244
245
        ]

246
        expert_params_mapping = SharedFusedMoE.make_expert_params_mapping(
247
248
249
            ckpt_gate_proj_name="gate_proj",
            ckpt_down_proj_name="down_proj",
            ckpt_up_proj_name="up_proj",
250
251
252
253
254
255
            num_experts=self.config.n_routed_experts
            + (
                self.config.n_shared_experts
                if rocm_aiter_moe_shared_expert_enabled
                else 0
            ),
256
        )
257
258

        params_dict = dict(self.named_parameters())
259
        loaded_params: set[str] = set()
260
261
262
263
264
265
        for name, loaded_weight in weights:
            if "rotary_emb.inv_freq" in name:
                continue
            spec_layer = get_spec_layer_idx_from_weight_name(self.config, name)
            if spec_layer is None:
                continue
266
267
268
            is_fusion_moe_shared_experts_layer = (
                rocm_aiter_moe_shared_expert_enabled and ("mlp.shared_experts" in name)
            )
269
            name = self._rewrite_spec_layer_name(spec_layer, name)
270
            for param_name, weight_name, shard_id in stacked_params_mapping:
271
272
273
274
275
276
277
278
279
                # Skip non-stacked layers and experts (experts handled below).
                if weight_name not in name:
                    continue
                # We have mlp.experts[0].gate_proj in the checkpoint.
                # Since we handle the experts below in expert_params_mapping,
                # we need to skip here BEFORE we update the name, otherwise
                # name will be updated to mlp.experts[0].gate_up_proj, which
                # will then be updated below in expert_params_mapping
                # for mlp.experts[0].gate_gate_up_proj, which breaks load.
280
                if ("mlp.experts." in name) and name not in params_dict:
281
                    continue
282
283
                if is_fusion_moe_shared_experts_layer:
                    continue
284
                name_mapped = name.replace(weight_name, param_name)
285
286
287

                # QKV fusion is optional, fall back to normal
                # weight loading if it's not enabled
288
289
290
                if (
                    param_name == "fused_qkv_a_proj"
                ) and name_mapped not in params_dict:
291
                    continue
292
293
                else:
                    name = name_mapped
294

295
296
297
298
299
300
301
302
303
                # Skip loading extra bias for GPTQ models.
                if name.endswith(".bias") and name not in params_dict:
                    continue

                param = params_dict[name]
                weight_loader = param.weight_loader
                weight_loader(param, loaded_weight, shard_id)
                break
            else:
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
                # Special handling: when AITER fusion_shared_experts is enabled,
                # checkpoints may provide a single widened shared_experts tensor
                # without explicit expert indices
                # (e.g. ...mlp.shared_experts.gate_proj.weight).
                # For models with multiple shared experts, split that tensor
                # evenly into per-shared-expert slices and load them into
                # appended expert slots mlp.experts.{n_routed_experts + j}.*
                # accordingly.
                num_chunks = 1
                if is_fusion_moe_shared_experts_layer:
                    num_chunks = getattr(self.config, "n_shared_experts", 1) or 1
                    # Determine split axis based on op type
                    # gate/up: ColumnParallel → split along dim 0
                    # down: RowParallel → split along dim 1
                    split_dim = 1 if "down_proj.weight" in name else 0
                    total = loaded_weight.shape[split_dim]
                    assert total % num_chunks == 0, (
                        f"Shared expert weight dim {total} "
                        f"not divisible by num_chunks {num_chunks}"
323
                    )
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
                    chunk_size = total // num_chunks

                for j in range(num_chunks):
                    chunk_name = name
                    weight_to_load = loaded_weight

                    if is_fusion_moe_shared_experts_layer:
                        if split_dim == 0:
                            weight_to_load = loaded_weight[
                                j * chunk_size : (j + 1) * chunk_size, :
                            ]
                        else:
                            weight_to_load = loaded_weight[
                                :, j * chunk_size : (j + 1) * chunk_size
                            ]
                        # Synthesize an expert-style name so expert mapping
                        # can route it
                        chunk_name = name.replace(
                            "mlp.shared_experts",
                            f"mlp.experts.{self.config.n_routed_experts + j}",
                        )

                    # Use expert_params_mapping to locate the destination
                    # param and delegate to its expert-aware weight_loader
                    # with expert_id.
                    for mapping in expert_params_mapping:
                        param_name, weight_name, expert_id, shard_id = mapping
                        if weight_name not in chunk_name:
                            continue

                        # Do not modify `name` since the loop may continue here
                        # Instead, create a new variable
                        name_mapped = chunk_name.replace(weight_name, param_name)

                        param = params_dict[name_mapped]
                        # We should ask the weight loader to return success or
                        # not here since otherwise we may skip experts with
                        # other available replicas.
                        weight_loader = typing.cast(
                            Callable[..., bool], param.weight_loader
                        )
                        success = weight_loader(
                            param,
                            weight_to_load,
                            name_mapped,
                            shard_id=shard_id,
                            expert_id=expert_id,
                            return_success=True,
                        )
                        if success:
                            if not is_fusion_moe_shared_experts_layer:
                                name = name_mapped
                            else:
                                loaded_params.add(name_mapped)
                            break
                    else:
                        # Skip loading extra bias for GPTQ models.
                        if name.endswith(".bias") and name not in params_dict:
                            continue

                        name = maybe_remap_kv_scale_name(name, params_dict)
                        if name is None:
                            continue

                        # According to DeepSeek-V3 Technical Report, MTP modules
                        # shares embedding layer. We only load the first weights.
                        if (
                            spec_layer != self.model.mtp_start_layer_idx
                            and ".layers" not in name
                        ):
                            continue

                        param = params_dict[name]
                        weight_loader = getattr(
                            param, "weight_loader", default_weight_loader
                        )
                        weight_loader(param, loaded_weight)
            if not is_fusion_moe_shared_experts_layer:
                loaded_params.add(name)
403
404
405
406
407
408
        return loaded_params

    def _rewrite_spec_layer_name(self, spec_layer: int, name: str) -> str:
        """
        Rewrite the weight name to match the format of the original model.
        Add .mtp_block for modules in transformer layer block for spec layer
409
        and rename shared layer weights to be top level.
410
411
        """
        spec_layer_weight_names = [
412
413
414
415
416
            "embed_tokens",
            "enorm",
            "hnorm",
            "eh_proj",
            "shared_head",
417
        ]
418
        shared_weight_names = ["embed_tokens"]
419
        spec_layer_weight = False
420
        shared_weight = False
421
422
423
        for weight_name in spec_layer_weight_names:
            if weight_name in name:
                spec_layer_weight = True
424
425
                if weight_name in shared_weight_names:
                    shared_weight = True
426
427
428
                break
        if not spec_layer_weight:
            # treat rest weights as weights for transformer layer block
429
430
431
            name = name.replace(
                f"model.layers.{spec_layer}.", f"model.layers.{spec_layer}.mtp_block."
            )
432
433
434
        elif shared_weight:
            # treat shared weights as top level weights
            name = name.replace(f"model.layers.{spec_layer}.", "model.")
435
        return name