"vscode:/vscode.git/clone" did not exist on "d7c79b39e1a96dd63937f054bd7a9de7ae3e0610"
Commit 7c63d1a4 authored by dongcl's avatar dongcl
Browse files

add feature manager to support feature selection

parent 8d5bae2a
from .pipeline_parallel.pipeline_feature import PipelineFeature
ADAPTOR_FEATURES = [
PipelineFeature()
]
\ No newline at end of file
import argparse
class AbstractFeature:
def __init__(self, feature_name: str, optimization_level: int = 2):
self.feature_name = feature_name.strip().replace('-', '_')
self.optimization_level = optimization_level
self.default_patches = self.optimization_level == 0
def register_args(self, parser):
pass
def pre_validate_args(self, args):
pass
def validate_args(self, args):
pass
def post_validate_args(self, args):
pass
def register_patches(self, patch_manager, args):
...
def incompatible_check(self, global_args, check_args):
if getattr(global_args, self.feature_name, None) and getattr(global_args, check_args, None):
raise AssertionError('{} and {} are incompatible.'.format(self.feature_name, check_args))
def dependency_check(self, global_args, check_args):
if getattr(global_args, self.feature_name, None) and not getattr(global_args, check_args, None):
raise AssertionError('{} requires {}.'.format(self.feature_name, check_args))
@staticmethod
def add_parser_argument_choices_value(parser, argument_name, new_choice):
for action in parser._actions:
exist_arg = isinstance(action, argparse.Action) and argument_name in action.option_strings
if exist_arg and action.choices is not None and new_choice not in action.choices:
action.choices.append(new_choice)
from argparse import ArgumentParser
from megatron.core.utils import is_te_min_version
from ..feature import AbstractFeature
class PipelineFeature(AbstractFeature):
def __init__(self):
super().__init__('schedule-method')
def register_args(self, parser: ArgumentParser):
group = parser.add_argument_group(title=self.feature_name)
group.add_argument('--schedule-method', type=str,
default=None,
choices=['dualpipev',
'interleaved_1f1b'])
group.add_argument('--combined-1f1b', action='store_true',
help='Batch-level overlapping in 1f1b stage.')
group.add_argument('--combined-1f1b-recipe', type=str,
choices=['ep_a2a', 'golden'],
default='golden',
help='Options are "ep_a2a" and "golden".')
group.add_argument('--split-bw', action='store_true',
help='Split dgrad and wgrad for batch-level overlapping')
def validate_args(self, args):
if args.schedule_method == "dualpipev":
if args.num_layers_per_virtual_pipeline_stage is not None:
raise AssertionError(
"The dualpipev and virtual_pipeline are incompatible.")
if args.num_layers < args.pipeline_model_parallel_size * 2:
raise AssertionError(
'number of layers must be at least 2*pipeline_model_parallel_size in dualpipe')
num_micro_batch = args.global_batch_size // args.micro_batch_size // args.data_parallel_size
if num_micro_batch < args.pipeline_model_parallel_size * 2 - 1:
raise AssertionError(
"num_micro_batch should be greater than pipeline_model_parallel_size * 2 - 1")
def register_patches(self, patch_manager, args):
if args.schedule_method == "dualpipev":
from megatron.training.utils import print_rank_0
from dcu_megatron.core.pipeline_parallel.dualpipev.dualpipev_schedules import forward_backward_pipelining_with_cutinhalf
from dcu_megatron.core.pipeline_parallel.dualpipev.dualpipev_chunks import (
get_model,
dualpipev_fp16forward,
get_num_layers_to_build,
train_step,
_allreduce_embedding_grads_wrapper
)
patch_manager.register_patch(
'megatron.training.training.get_model', get_model)
patch_manager.register_patch(
'megatron.training.training.train_step', train_step)
patch_manager.register_patch('megatron.core.pipeline_parallel.schedules.forward_backward_pipelining_without_interleaving',
forward_backward_pipelining_with_cutinhalf)
patch_manager.register_patch(
'megatron.legacy.model.module.Float16Module.forward', dualpipev_fp16forward)
patch_manager.register_patch(
'megatron.core.transformer.transformer_block.get_num_layers_to_build', get_num_layers_to_build)
patch_manager.register_patch(
'megatron.training.utils.print_rank_last', print_rank_0)
patch_manager.register_patch(
'megatron.core.distributed.finalize_model_grads._allreduce_embedding_grads', _allreduce_embedding_grads_wrapper)
if (
args.schedule_method == "interleaved_1f1b"
and args.combined_1f1b
):
from megatron.core.extensions.transformer_engine import TEColumnParallelLinear, TERowParallelLinear
from dcu_megatron.core.transformer.moe.token_dispatcher import MoEAlltoAllTokenDispatcher
from dcu_megatron.core.transformer.transformer_layer import TransformerLayer
from dcu_megatron.core.models.gpt.gpt_model import GPTModel
from dcu_megatron.core.pipeline_parallel.schedules import get_pp_rank_microbatches, forward_backward_pipelining_with_interleaving
from dcu_megatron.core.extensions.transformer_engine import (
_get_extra_te_kwargs_wrapper,
TELinear,
TELayerNormColumnParallelLinear,
)
from dcu_megatron.core.transformer.multi_latent_attention import MLASelfAttention
from dcu_megatron.core.transformer.mlp import MLP
from dcu_megatron.core.transformer.moe.experts import TEGroupedMLP
from dcu_megatron.core.transformer.moe.moe_layer import MoELayer
# num_warmup_microbatches + 1
patches_manager.register_patch('megatron.core.pipeline_parallel.schedules.get_pp_rank_microbatches',
get_pp_rank_microbatches)
# a2a_overlap
patches_manager.register_patch('megatron.core.pipeline_parallel.schedules.forward_backward_pipelining_with_interleaving',
forward_backward_pipelining_with_interleaving)
patches_manager.register_patch('megatron.core.transformer.moe.token_dispatcher.MoEAlltoAllTokenDispatcher',
MoEAlltoAllTokenDispatcher)
patches_manager.register_patch('megatron.core.transformer.transformer_layer.TransformerLayer',
TransformerLayer)
patches_manager.register_patch('megatron.core.models.gpt.gpt_model.GPTModel.build_schedule_plan',
GPTModel.build_schedule_plan,
create_dummy=True)
# backward_dw
patches_manager.register_patch('megatron.core.extensions.transformer_engine._get_extra_te_kwargs',
_get_extra_te_kwargs_wrapper,
apply_wrapper=True)
patches_manager.register_patch('megatron.core.extensions.transformer_engine.TELinear',
TELinear)
patches_manager.register_patch('megatron.core.extensions.transformer_engine.TELayerNormColumnParallelLinear',
TELayerNormColumnParallelLinear)
TEColumnParallelLinear.__bases__ = (TELinear,)
TERowParallelLinear.__bases__ = (TELinear,)
if is_te_min_version("1.9.0.dev0"):
from megatron.core.extensions.transformer_engine import TEColumnParallelGroupedLinear, TERowParallelGroupedLinear
from ..core.extensions.transformer_engine import TEGroupedLinear
patches_manager.register_patch('megatron.core.extensions.transformer_engine.TEGroupedLinear',
TEGroupedLinear)
TEColumnParallelGroupedLinear.__bases__ = (TEGroupedLinear,)
TERowParallelGroupedLinear.__bases__ = (TEGroupedLinear,)
patches_manager.register_patch('megatron.core.transformer.multi_latent_attention.MLASelfAttention.backward_dw',
MLASelfAttention.backward_dw,
create_dummy=True)
patches_manager.register_patch('megatron.core.transformer.mlp.MLP.backward_dw',
MLP.backward_dw,
create_dummy=True)
patches_manager.register_patch('megatron.core.transformer.moe.experts.TEGroupedMLP.backward_dw',
TEGroupedMLP.backward_dw,
create_dummy=True)
patches_manager.register_patch('megatron.core.transformer.moe.moe_layer.MoELayer.backward_dw',
MoELayer.backward_dw,
create_dummy=True)
......@@ -7,6 +7,45 @@ import torch
from megatron.core.utils import is_te_min_version
from .features_manager import ADAPTOR_FEATURES
from .patch_utils import MegatronPatchesManager
def add_args(args, key, value):
if key is not None:
key = key[2:].replace('-', '_')
if value is None:
value = True
elif len(value) == 1:
value = value[0]
setattr(args, key, value)
def parser_unknown_args(args, unknown):
i = 0
key = value = None
while i < len(unknown):
if unknown[i].startswith("--"):
add_args(args, key, value)
key = unknown[i]
value = None
else:
if value is None:
value = [unknown[i]]
else:
value.append(unknown[i])
i += 1
add_args(args, key, value)
def get_adaptor_args():
global _ARGS
if _ARGS is None:
parser = argparse.ArgumentParser(description='Adaptor Arguments', allow_abbrev=False)
_ARGS, unknown = process_args(parser).parse_known_args()
parser_unknown_args(_ARGS, unknown)
return _ARGS
class MegatronAdaptation:
"""
......@@ -25,11 +64,7 @@ class MegatronAdaptation:
MegatronAdaptation.apply()
# apply features
from .patch_utils import MegatronPatchesManager
from .features_manager import a2a_overlap_adaptation
a2a_overlap_adaptation(MegatronPatchesManager)
MegatronPatchesManager.apply_patches()
feature_adaptation()
@classmethod
def register(cls, orig_func_name, new_func=None, force_patch=False, create_dummy=False, apply_wrapper=False, remove_origin_wrappers=False):
......@@ -69,6 +104,24 @@ class MegatronAdaptation:
pass
def feature_adaptation():
adaptor_args = get_adaptor_args()
# Advanced acceleration algorithm
adaptation_l2(MegatronPatchesManager, adaptor_args)
MegatronPatchesManager.apply_patches()
def adaptation_l2(patches_manager, adaptor_args):
"""
Advanced acceleration algorithm
"""
for feature in ADAPTOR_FEATURES:
if getattr(adaptor_args, feature.feature_name, None) and feature.optimization_level == 2:
feature.register_patches(patches_manager, mindspeed_args)
class MegatronAdaptationABC:
"""
Abstract class for adaptation.
......
......@@ -4,6 +4,8 @@ import argparse
from typing import Union
from megatron.training.arguments import add_megatron_arguments
from dcu_megatron.adaptor.features_manager import ADAPTOR_FEATURES
def remove_original_params(parser, param_names: Union[list, str]):
if isinstance(param_names, str):
......@@ -17,18 +19,18 @@ def remove_original_params(parser, param_names: Union[list, str]):
del parser._option_string_actions[option_string]
def add_megatron_arguments_patch(parser: argparse.ArgumentParser):
parser = add_megatron_arguments(parser)
def process_adaptor_args(parser):
# add extra arguments
parser = _add_extra_network_size_args(parser)
parser = _add_extra_training_args(parser)
parser = _add_extra_initialization_args(parser)
parser = _add_extra_distributed_args(parser)
parser = _add_extra_tokenizer_args(parser)
parser = _add_extra_moe_args(parser)
parser = _add_flux_args(parser)
for feature in ADAPTOR_FEATURES:
feature.register_args(parser)
return parser
......@@ -37,12 +39,15 @@ def parse_args(extra_args_provider=None, ignore_unknown_args=False):
parser = argparse.ArgumentParser(description='Megatron-LM Arguments',
allow_abbrev=False)
parser = add_megatron_arguments_patch(parser)
parser = add_megatron_arguments(parser)
# Custom arguments.
if extra_args_provider is not None:
parser = extra_args_provider(parser)
# add adaptor args
parser = process_adaptor_args(parser)
# Parse.
if ignore_unknown_args:
args, _ = parser.parse_known_args()
......@@ -136,20 +141,6 @@ def _add_extra_tokenizer_args(parser):
return parser
def _add_extra_moe_args(parser):
group = parser.add_argument_group(title="extra moe args")
group.add_argument('--combined-1f1b', action='store_true',
help='Batch-level overlapping in 1f1b stage.')
group.add_argument('--combined-1f1b-recipe', type=str,
choices=['ep_a2a', 'golden'],
default='golden',
help='Options are "ep_a2a" and "golden".')
group.add_argument('--split-bw', action='store_true',
help='Split dgrad and wgrad for batch-level overlapping')
return parser
def _add_flux_args(parser):
group = parser.add_argument_group(title='flux args')
group.add_argument('--flux-transpose-weight', action='store_true', default=False,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment