Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
dcuai
dlexamples
Commits
316d3f90
Commit
316d3f90
authored
Jul 14, 2022
by
Pan,Huiwen
Browse files
增加ds框架测试模型
parent
aebde649
Changes
227
Hide whitespace changes
Inline
Side-by-side
Showing
20 changed files
with
3547 additions
and
0 deletions
+3547
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/model/gpt2_model.py
...ron-LM-v1.1.5-3D_parallelism/megatron/model/gpt2_model.py
+228
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/model/language_model.py
...LM-v1.1.5-3D_parallelism/megatron/model/language_model.py
+384
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/model/multiple_choice.py
...M-v1.1.5-3D_parallelism/megatron/model/multiple_choice.py
+110
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/model/realm_model.py
...on-LM-v1.1.5-3D_parallelism/megatron/model/realm_model.py
+204
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/model/transformer.py
...on-LM-v1.1.5-3D_parallelism/megatron/model/transformer.py
+631
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/model/utils.py
...Megatron-LM-v1.1.5-3D_parallelism/megatron/model/utils.py
+83
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/module.py
...peed/Megatron-LM-v1.1.5-3D_parallelism/megatron/module.py
+31
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/__init__.py
...egatron-LM-v1.1.5-3D_parallelism/megatron/mpu/__init__.py
+58
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/cross_entropy.py
...on-LM-v1.1.5-3D_parallelism/megatron/mpu/cross_entropy.py
+110
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/data.py
...ed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/data.py
+116
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/grads.py
...d/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/grads.py
+127
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/initialize.py
...atron-LM-v1.1.5-3D_parallelism/megatron/mpu/initialize.py
+252
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/layers.py
.../Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/layers.py
+363
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/mappings.py
...egatron-LM-v1.1.5-3D_parallelism/megatron/mpu/mappings.py
+157
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/random.py
.../Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/random.py
+319
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/tests/__init__.py
...n-LM-v1.1.5-3D_parallelism/megatron/mpu/tests/__init__.py
+0
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/tests/commons.py
...on-LM-v1.1.5-3D_parallelism/megatron/mpu/tests/commons.py
+83
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/tests/test_cross_entropy.py
...5-3D_parallelism/megatron/mpu/tests/test_cross_entropy.py
+108
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/tests/test_data.py
...-LM-v1.1.5-3D_parallelism/megatron/mpu/tests/test_data.py
+88
-0
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/tests/test_initialize.py
....1.5-3D_parallelism/megatron/mpu/tests/test_initialize.py
+95
-0
No files found.
Too many changes to show.
To preserve performance only
227 of 227+
files are displayed.
Plain diff
Email patch
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/model/gpt2_model.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""GPT-2 model."""
import
torch
from
megatron
import
get_args
from
megatron
import
mpu
from
megatron.module
import
MegatronModule
from
.language_model
import
parallel_lm_logits
from
.language_model
import
get_language_model
from
.utils
import
init_method_normal
from
.utils
import
scaled_init_method_normal
# Pipeline parallelism
from
megatron
import
mpu
import
torch.nn.functional
as
F
import
torch.nn.functional
as
F
from
apex.normalization.fused_layer_norm
import
FusedLayerNorm
as
LayerNorm
import
megatron.fp16
as
fp16
from
megatron.model.transformer
import
ParallelTransformerLayerPipe
from
.language_model
import
EmbeddingPipe
from
deepspeed.pipe
import
PipelineModule
,
LayerSpec
,
TiedLayerSpec
def
gpt2_attention_mask_func
(
attention_scores
,
ltor_mask
):
attention_scores
.
masked_fill_
(
ltor_mask
,
-
10000.0
)
return
attention_scores
def
CrossEntropy
(
output
,
labels
):
""" From pretrain_gpt2:forward_step() """
labels
,
loss_mask
=
labels
[
0
],
labels
[
1
]
losses
=
mpu
.
vocab_parallel_cross_entropy
(
output
.
contiguous
().
float
(),
labels
)
loss_mask
=
loss_mask
.
view
(
-
1
)
loss
=
torch
.
sum
(
losses
.
view
(
-
1
)
*
loss_mask
)
/
loss_mask
.
sum
()
return
loss
class
GPT2Model
(
MegatronModule
):
"""GPT-2 Language model."""
def
__init__
(
self
,
num_tokentypes
=
0
,
parallel_output
=
True
):
super
(
GPT2Model
,
self
).
__init__
()
args
=
get_args
()
self
.
parallel_output
=
parallel_output
self
.
fp16_lm_cross_entropy
=
args
.
fp16_lm_cross_entropy
self
.
language_model
,
self
.
_language_model_key
=
get_language_model
(
attention_mask_func
=
gpt2_attention_mask_func
,
num_tokentypes
=
num_tokentypes
,
add_pooler
=
False
,
init_method
=
init_method_normal
(
args
.
init_method_std
),
scaled_init_method
=
scaled_init_method_normal
(
args
.
init_method_std
,
args
.
num_layers
))
def
forward
(
self
,
input_ids
,
position_ids
,
attention_mask
,
labels
=
None
,
tokentype_ids
=
None
,
layer_past
=
None
,
get_key_value
=
False
,
forward_method_parallel_output
=
None
):
# Language model.
lm_output
=
self
.
language_model
(
input_ids
,
position_ids
,
attention_mask
,
tokentype_ids
=
tokentype_ids
,
layer_past
=
layer_past
,
get_key_value
=
get_key_value
)
if
get_key_value
:
lm_output
,
presents
=
lm_output
# Output.
parallel_output
=
self
.
parallel_output
if
forward_method_parallel_output
is
not
None
:
parallel_output
=
forward_method_parallel_output
output
=
parallel_lm_logits
(
lm_output
,
self
.
language_model
.
embedding
.
word_embeddings
.
weight
,
parallel_output
)
if
get_key_value
:
output
=
[
output
,
presents
]
if
labels
is
None
:
return
output
else
:
if
self
.
fp16_lm_cross_entropy
:
assert
output
.
dtype
==
torch
.
half
loss
=
mpu
.
vocab_parallel_cross_entropy
(
output
,
labels
)
else
:
loss
=
mpu
.
vocab_parallel_cross_entropy
(
output
.
float
(),
labels
)
return
loss
def
state_dict_for_save_checkpoint
(
self
,
destination
=
None
,
prefix
=
''
,
keep_vars
=
False
):
state_dict_
=
{}
state_dict_
[
self
.
_language_model_key
]
\
=
self
.
language_model
.
state_dict_for_save_checkpoint
(
destination
,
prefix
,
keep_vars
)
return
state_dict_
def
load_state_dict
(
self
,
state_dict
,
strict
=
True
):
"""Customized load."""
if
self
.
_language_model_key
in
state_dict
:
state_dict
=
state_dict
[
self
.
_language_model_key
]
self
.
language_model
.
load_state_dict
(
state_dict
,
strict
=
strict
)
class
GPT2ModelPipe
(
PipelineModule
,
MegatronModule
):
"""GPT2Model adapted for pipeline parallelism.
The largest change is flattening the GPTModel class so we can express it as a
sequence of layers including embedding, transformer layers, and output.
"""
def
__init__
(
self
,
num_tokentypes
=
0
,
parallel_output
=
True
,
add_pooler
=
False
,
topology
=
None
):
args
=
get_args
()
self
.
parallel_output
=
parallel_output
self
.
hidden_size
=
args
.
hidden_size
self
.
num_tokentypes
=
num_tokentypes
self
.
init_method
=
init_method_normal
(
args
.
init_method_std
)
self
.
output_layer_init_method
=
scaled_init_method_normal
(
args
.
init_method_std
,
args
.
num_layers
)
self
.
add_pooler
=
add_pooler
if
self
.
add_pooler
:
raise
NotImplementedError
(
'Pipeline pooler not yet implemented. Forward needs pooling_sequence_index'
)
# Use torch gelu unless otherwise forced.
gelu
=
F
.
gelu
if
args
.
openai_gelu
:
gelu
=
openai_gelu
#
# forward() prototype
#
self
.
specs
=
[]
# Embedding layer
self
.
specs
.
append
(
TiedLayerSpec
(
'embed'
,
EmbeddingPipe
,
self
.
hidden_size
,
args
.
padded_vocab_size
,
args
.
max_position_embeddings
,
args
.
hidden_dropout
,
self
.
init_method
,
self
.
num_tokentypes
,
tied_weight_attr
=
'word_embeddings_weight'
))
# outputs are now (hidden_states, attention_mask)
# data format change to avoid explicit tranposes : [b s h] --> [s b h]
self
.
specs
.
append
(
lambda
x
:
(
x
[
0
].
transpose
(
0
,
1
).
contiguous
(),
x
[
1
]))
# Transformer layers
for
x
in
range
(
args
.
num_layers
):
self
.
specs
.
append
(
LayerSpec
(
ParallelTransformerLayerPipe
,
attention_mask_func
=
gpt2_attention_mask_func
,
init_method
=
self
.
init_method
,
output_layer_init_method
=
self
.
output_layer_init_method
,
layer_number
=
x
))
# Undo data format change and drop mask
self
.
specs
.
append
(
lambda
x
:
x
[
0
].
transpose
(
0
,
1
).
contiguous
())
# Final layernorm after transformer layers
self
.
specs
.
append
(
LayerSpec
(
LayerNorm
,
args
.
hidden_size
,
eps
=
args
.
layernorm_epsilon
))
# XXX forward_method_parallel_output is assumed to be None, but we're not in a
# fwd method to assert
def
_logits_helper
(
embedding
,
lm_output
):
"""Just a wrapper to massage inputs/outputs from pipeline. """
return
parallel_lm_logits
(
lm_output
,
embedding
.
word_embeddings_weight
,
self
.
parallel_output
)
self
.
specs
.
append
(
TiedLayerSpec
(
'embed'
,
EmbeddingPipe
,
self
.
hidden_size
,
args
.
padded_vocab_size
,
args
.
max_position_embeddings
,
args
.
hidden_dropout
,
self
.
init_method
,
self
.
num_tokentypes
,
forward_fn
=
_logits_helper
,
tied_weight_attr
=
'word_embeddings_weight'
)
)
# Should maybe be done in loss_fn() instead?
if
args
.
fp16
:
self
.
specs
.
append
(
fp16
.
fp16_to_fp32
)
if
args
.
checkpoint_activations
:
interval
=
args
.
checkpoint_num_layers
else
:
interval
=
0
super
().
__init__
(
layers
=
self
.
specs
,
loss_fn
=
CrossEntropy
,
topology
=
topology
,
activation_checkpoint_interval
=
interval
,
partition_method
=
'type:transformer'
)
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/model/language_model.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Transformer based language model."""
import
torch
import
torch.nn.functional
as
F
from
megatron
import
get_args
from
megatron
import
mpu
from
megatron.module
import
MegatronModule
from
megatron.model.transformer
import
ParallelTransformer
from
megatron.model.utils
import
get_linear_layer
from
megatron.model.utils
import
init_method_normal
,
scaled_init_method_normal
def
parallel_lm_logits
(
input_
,
word_embeddings_weight
,
parallel_output
,
bias
=
None
):
"""LM logits using word embedding weights."""
# Parallel logits.
input_parallel
=
mpu
.
copy_to_model_parallel_region
(
input_
)
# Matrix multiply.
if
bias
is
None
:
logits_parallel
=
F
.
linear
(
input_parallel
,
word_embeddings_weight
)
else
:
logits_parallel
=
F
.
linear
(
input_parallel
,
word_embeddings_weight
,
bias
)
# Gather if needed.
if
parallel_output
:
return
logits_parallel
return
mpu
.
gather_from_model_parallel_region
(
logits_parallel
)
def
get_language_model
(
attention_mask_func
,
num_tokentypes
,
add_pooler
,
init_method
=
None
,
scaled_init_method
=
None
):
"""Build language model and return along with the key to save."""
args
=
get_args
()
if
init_method
is
None
:
init_method
=
init_method_normal
(
args
.
init_method_std
)
if
scaled_init_method
is
None
:
scaled_init_method
=
scaled_init_method_normal
(
args
.
init_method_std
,
args
.
num_layers
)
# Language model.
language_model
=
TransformerLanguageModel
(
attention_mask_func
=
attention_mask_func
,
init_method
=
init_method
,
output_layer_init_method
=
scaled_init_method
,
num_tokentypes
=
num_tokentypes
,
add_pooler
=
add_pooler
)
# key used for checkpoints.
language_model_key
=
'language_model'
return
language_model
,
language_model_key
class
Pooler
(
MegatronModule
):
"""Pooler layer.
Pool hidden states of a specific token (for example start of the
sequence) and add a linear transformation followed by a tanh.
Arguments:
hidden_size: hidden size
init_method: weight initialization method for the linear layer.
bias is set to zero.
"""
def
__init__
(
self
,
hidden_size
,
init_method
):
super
(
Pooler
,
self
).
__init__
()
self
.
dense
=
get_linear_layer
(
hidden_size
,
hidden_size
,
init_method
)
def
forward
(
self
,
hidden_states
,
sequence_index
=
0
):
# hidden_states: [b, s, h]
# sequence_index: index of the token to pool.
pooled
=
hidden_states
[:,
sequence_index
,
:]
pooled
=
self
.
dense
(
pooled
)
pooled
=
torch
.
tanh
(
pooled
)
return
pooled
class
Embedding
(
MegatronModule
):
"""Language model embeddings.
Arguments:
hidden_size: hidden size
vocab_size: vocabulary size
max_sequence_length: maximum size of sequence. This
is used for positional embedding
embedding_dropout_prob: dropout probability for embeddings
init_method: weight initialization method
num_tokentypes: size of the token-type embeddings. 0 value
will ignore this embedding
"""
def
__init__
(
self
,
hidden_size
,
vocab_size
,
max_sequence_length
,
embedding_dropout_prob
,
init_method
,
num_tokentypes
=
0
):
super
(
Embedding
,
self
).
__init__
()
self
.
hidden_size
=
hidden_size
self
.
init_method
=
init_method
self
.
num_tokentypes
=
num_tokentypes
# Word embeddings (parallel).
self
.
word_embeddings
=
mpu
.
VocabParallelEmbedding
(
vocab_size
,
self
.
hidden_size
,
init_method
=
self
.
init_method
)
self
.
_word_embeddings_key
=
'word_embeddings'
# Position embedding (serial).
self
.
position_embeddings
=
torch
.
nn
.
Embedding
(
max_sequence_length
,
self
.
hidden_size
)
self
.
_position_embeddings_key
=
'position_embeddings'
# Initialize the position embeddings.
self
.
init_method
(
self
.
position_embeddings
.
weight
)
# Token type embedding.
# Add this as an optional field that can be added through
# method call so we can load a pretrain model without
# token types and add them as needed.
self
.
_tokentype_embeddings_key
=
'tokentype_embeddings'
if
self
.
num_tokentypes
>
0
:
self
.
tokentype_embeddings
=
torch
.
nn
.
Embedding
(
self
.
num_tokentypes
,
self
.
hidden_size
)
# Initialize the token-type embeddings.
self
.
init_method
(
self
.
tokentype_embeddings
.
weight
)
else
:
self
.
tokentype_embeddings
=
None
# Embeddings dropout
self
.
embedding_dropout
=
torch
.
nn
.
Dropout
(
embedding_dropout_prob
)
def
add_tokentype_embeddings
(
self
,
num_tokentypes
):
"""Add token-type embedding. This function is provided so we can add
token-type embeddings in case the pretrained model does not have it.
This allows us to load the model normally and then add this embedding.
"""
if
self
.
tokentype_embeddings
is
not
None
:
raise
Exception
(
'tokentype embeddings is already initialized'
)
if
torch
.
distributed
.
get_rank
()
==
0
:
print
(
'adding embedding for {} tokentypes'
.
format
(
num_tokentypes
),
flush
=
True
)
self
.
num_tokentypes
=
num_tokentypes
self
.
tokentype_embeddings
=
torch
.
nn
.
Embedding
(
num_tokentypes
,
self
.
hidden_size
)
# Initialize the token-type embeddings.
self
.
init_method
(
self
.
tokentype_embeddings
.
weight
)
def
forward
(
self
,
input_ids
,
position_ids
,
tokentype_ids
=
None
):
# Embeddings.
words_embeddings
=
self
.
word_embeddings
(
input_ids
)
position_embeddings
=
self
.
position_embeddings
(
position_ids
)
embeddings
=
words_embeddings
+
position_embeddings
if
tokentype_ids
is
not
None
:
assert
self
.
tokentype_embeddings
is
not
None
embeddings
=
embeddings
+
self
.
tokentype_embeddings
(
tokentype_ids
)
else
:
assert
self
.
tokentype_embeddings
is
None
# Dropout.
embeddings
=
self
.
embedding_dropout
(
embeddings
)
return
embeddings
def
state_dict_for_save_checkpoint
(
self
,
destination
=
None
,
prefix
=
''
,
keep_vars
=
False
):
"""For easy load."""
state_dict_
=
{}
state_dict_
[
self
.
_word_embeddings_key
]
\
=
self
.
word_embeddings
.
state_dict
(
destination
,
prefix
,
keep_vars
)
state_dict_
[
self
.
_position_embeddings_key
]
\
=
self
.
position_embeddings
.
state_dict
(
destination
,
prefix
,
keep_vars
)
if
self
.
num_tokentypes
>
0
:
state_dict_
[
self
.
_tokentype_embeddings_key
]
\
=
self
.
tokentype_embeddings
.
state_dict
(
destination
,
prefix
,
keep_vars
)
return
state_dict_
def
load_state_dict
(
self
,
state_dict
,
strict
=
True
):
"""Customized load."""
# Word embedding.
if
self
.
_word_embeddings_key
in
state_dict
:
state_dict_
=
state_dict
[
self
.
_word_embeddings_key
]
else
:
# for backward compatibility.
state_dict_
=
{}
for
key
in
state_dict
.
keys
():
if
'word_embeddings'
in
key
:
state_dict_
[
key
.
split
(
'word_embeddings.'
)[
1
]]
\
=
state_dict
[
key
]
self
.
word_embeddings
.
load_state_dict
(
state_dict_
,
strict
=
strict
)
# Position embedding.
if
self
.
_position_embeddings_key
in
state_dict
:
state_dict_
=
state_dict
[
self
.
_position_embeddings_key
]
else
:
# for backward compatibility.
state_dict_
=
{}
for
key
in
state_dict
.
keys
():
if
'position_embeddings'
in
key
:
state_dict_
[
key
.
split
(
'position_embeddings.'
)[
1
]]
\
=
state_dict
[
key
]
self
.
position_embeddings
.
load_state_dict
(
state_dict_
,
strict
=
strict
)
# Tokentype embedding.
if
self
.
num_tokentypes
>
0
:
state_dict_
=
{}
if
self
.
_tokentype_embeddings_key
in
state_dict
:
state_dict_
=
state_dict
[
self
.
_tokentype_embeddings_key
]
else
:
# for backward compatibility.
for
key
in
state_dict
.
keys
():
if
'tokentype_embeddings'
in
key
:
state_dict_
[
key
.
split
(
'tokentype_embeddings.'
)[
1
]]
\
=
state_dict
[
key
]
if
len
(
state_dict_
.
keys
())
>
0
:
self
.
tokentype_embeddings
.
load_state_dict
(
state_dict_
,
strict
=
strict
)
else
:
print
(
'***WARNING*** expected tokentype embeddings in the '
'checkpoint but could not find it'
,
flush
=
True
)
class
EmbeddingPipe
(
Embedding
):
"""Extends Embedding to forward attention_mask through the pipeline."""
@
property
def
word_embeddings_weight
(
self
):
"""Easy accessory for the pipeline engine to tie embeddings across stages."""
return
self
.
word_embeddings
.
weight
def
forward
(
self
,
args
):
input_ids
=
args
[
0
]
position_ids
=
args
[
1
]
attention_mask
=
args
[
2
]
if
len
(
args
)
==
4
:
tokentype_ids
=
args
[
3
]
else
:
tokentype_ids
=
None
embeddings
=
super
().
forward
(
input_ids
,
position_ids
,
tokentype_ids
=
tokentype_ids
)
return
embeddings
,
attention_mask
class
TransformerLanguageModel
(
MegatronModule
):
"""Transformer language model.
Arguments:
transformer_hparams: transformer hyperparameters
attention_mask_func: a function that takes `unmaksed-attention-scores`
with size [b, np, s, s] and an `attention-mask` and will apply
the masking. The function should return a masked score of the
same size [b, np, s, s].
masked-attention-scores = attention_mask_func(
unmaksed-attention-scores, attention-mask)
vocab_size: vocabulary size
max_sequence_length: maximum size of sequence. This
is used for positional embedding
embedding_dropout_prob: dropout probability for embeddings
num_tokentypes: size of the token-type embeddings. 0 value
will ignore this embedding
"""
def
__init__
(
self
,
attention_mask_func
,
init_method
,
output_layer_init_method
,
num_tokentypes
=
0
,
add_pooler
=
False
):
super
(
TransformerLanguageModel
,
self
).
__init__
()
args
=
get_args
()
self
.
hidden_size
=
args
.
hidden_size
self
.
num_tokentypes
=
num_tokentypes
self
.
init_method
=
init_method
self
.
add_pooler
=
add_pooler
# Embeddings
self
.
embedding
=
Embedding
(
self
.
hidden_size
,
args
.
padded_vocab_size
,
args
.
max_position_embeddings
,
args
.
hidden_dropout
,
self
.
init_method
,
self
.
num_tokentypes
)
self
.
_embedding_key
=
'embedding'
# Transformer
self
.
transformer
=
ParallelTransformer
(
attention_mask_func
,
self
.
init_method
,
output_layer_init_method
)
self
.
_transformer_key
=
'transformer'
# Pooler
if
self
.
add_pooler
:
self
.
pooler
=
Pooler
(
self
.
hidden_size
,
self
.
init_method
)
self
.
_pooler_key
=
'pooler'
def
forward
(
self
,
input_ids
,
position_ids
,
attention_mask
,
tokentype_ids
=
None
,
layer_past
=
None
,
get_key_value
=
False
,
pooling_sequence_index
=
0
):
# Embeddings.
embedding_output
=
self
.
embedding
(
input_ids
,
position_ids
,
tokentype_ids
=
tokentype_ids
)
# Transformer.
transformer_output
=
self
.
transformer
(
embedding_output
,
attention_mask
,
layer_past
=
layer_past
,
get_key_value
=
get_key_value
)
if
self
.
add_pooler
:
pooled_output
=
self
.
pooler
(
transformer_output
,
pooling_sequence_index
)
return
transformer_output
,
pooled_output
return
transformer_output
def
state_dict_for_save_checkpoint
(
self
,
destination
=
None
,
prefix
=
''
,
keep_vars
=
False
):
"""For easy load."""
state_dict_
=
{}
state_dict_
[
self
.
_embedding_key
]
\
=
self
.
embedding
.
state_dict_for_save_checkpoint
(
destination
,
prefix
,
keep_vars
)
state_dict_
[
self
.
_transformer_key
]
\
=
self
.
transformer
.
state_dict_for_save_checkpoint
(
destination
,
prefix
,
keep_vars
)
if
self
.
add_pooler
:
state_dict_
[
self
.
_pooler_key
]
\
=
self
.
pooler
.
state_dict_for_save_checkpoint
(
destination
,
prefix
,
keep_vars
)
return
state_dict_
def
load_state_dict
(
self
,
state_dict
,
strict
=
True
):
"""Customized load."""
# Embedding.
if
self
.
_embedding_key
in
state_dict
:
state_dict_
=
state_dict
[
self
.
_embedding_key
]
else
:
# for backward compatibility.
state_dict_
=
{}
for
key
in
state_dict
.
keys
():
if
'_embeddings'
in
key
:
state_dict_
[
key
]
=
state_dict
[
key
]
self
.
embedding
.
load_state_dict
(
state_dict_
,
strict
=
strict
)
# Transformer.
if
self
.
_transformer_key
in
state_dict
:
state_dict_
=
state_dict
[
self
.
_transformer_key
]
else
:
# for backward compatibility.
state_dict_
=
{}
for
key
in
state_dict
.
keys
():
if
'transformer.'
in
key
:
state_dict_
[
key
.
split
(
'transformer.'
)[
1
]]
=
state_dict
[
key
]
self
.
transformer
.
load_state_dict
(
state_dict_
,
strict
=
strict
)
# Pooler.
if
self
.
add_pooler
:
assert
'pooler'
in
state_dict
,
\
'could not find data for pooler in the checkpoint'
self
.
pooler
.
load_state_dict
(
state_dict
[
self
.
_pooler_key
],
strict
=
strict
)
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/model/multiple_choice.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Multiple choice model."""
import
torch
from
megatron
import
get_args
,
print_rank_0
from
megatron.model.bert_model
import
bert_attention_mask_func
,
bert_extended_attention_mask
,
bert_position_ids
from
megatron.model.language_model
import
get_language_model
from
megatron.model.utils
import
get_linear_layer
from
megatron.model.utils
import
init_method_normal
from
megatron.model.utils
import
scaled_init_method_normal
from
megatron.module
import
MegatronModule
class
MultipleChoice
(
MegatronModule
):
def
__init__
(
self
,
num_tokentypes
=
2
):
super
(
MultipleChoice
,
self
).
__init__
()
args
=
get_args
()
init_method
=
init_method_normal
(
args
.
init_method_std
)
self
.
language_model
,
self
.
_language_model_key
=
get_language_model
(
attention_mask_func
=
bert_attention_mask_func
,
num_tokentypes
=
num_tokentypes
,
add_pooler
=
True
,
init_method
=
init_method
,
scaled_init_method
=
scaled_init_method_normal
(
args
.
init_method_std
,
args
.
num_layers
))
# Multi-choice head.
self
.
multichoice_dropout
=
torch
.
nn
.
Dropout
(
args
.
hidden_dropout
)
self
.
multichoice_head
=
get_linear_layer
(
args
.
hidden_size
,
1
,
init_method
)
self
.
_multichoice_head_key
=
'multichoice_head'
def
forward
(
self
,
input_ids
,
attention_mask
,
tokentype_ids
):
# [batch, choices, sequence] --> [batch * choices, sequence] -->
# transformer --> [batch, choices] --> softmax
# Ensure the shape is [batch-size, choices, sequence]
assert
len
(
input_ids
.
shape
)
==
3
assert
len
(
attention_mask
.
shape
)
==
3
assert
len
(
tokentype_ids
.
shape
)
==
3
# Reshape and treat choice dimension the same as batch.
num_choices
=
input_ids
.
shape
[
1
]
input_ids
=
input_ids
.
view
(
-
1
,
input_ids
.
size
(
-
1
))
attention_mask
=
attention_mask
.
view
(
-
1
,
attention_mask
.
size
(
-
1
))
tokentype_ids
=
tokentype_ids
.
view
(
-
1
,
tokentype_ids
.
size
(
-
1
))
extended_attention_mask
=
bert_extended_attention_mask
(
attention_mask
,
next
(
self
.
language_model
.
parameters
()).
dtype
)
position_ids
=
bert_position_ids
(
input_ids
)
_
,
pooled_output
=
self
.
language_model
(
input_ids
,
position_ids
,
extended_attention_mask
,
tokentype_ids
=
tokentype_ids
)
# Output.
multichoice_output
=
self
.
multichoice_dropout
(
pooled_output
)
multichoice_logits
=
self
.
multichoice_head
(
multichoice_output
)
# Reshape back to separate choices.
multichoice_logits
=
multichoice_logits
.
view
(
-
1
,
num_choices
)
return
multichoice_logits
def
state_dict_for_save_checkpoint
(
self
,
destination
=
None
,
prefix
=
''
,
keep_vars
=
False
):
"""For easy load when model is combined with other heads,
add an extra key."""
state_dict_
=
{}
state_dict_
[
self
.
_language_model_key
]
\
=
self
.
language_model
.
state_dict_for_save_checkpoint
(
destination
,
prefix
,
keep_vars
)
state_dict_
[
self
.
_multichoice_head_key
]
\
=
self
.
multichoice_head
.
state_dict
(
destination
,
prefix
,
keep_vars
)
return
state_dict_
def
load_state_dict
(
self
,
state_dict
,
strict
=
True
):
"""Customized load."""
self
.
language_model
.
load_state_dict
(
state_dict
[
self
.
_language_model_key
],
strict
=
strict
)
if
self
.
_multichoice_head_key
in
state_dict
:
self
.
multichoice_head
.
load_state_dict
(
state_dict
[
self
.
_multichoice_head_key
],
strict
=
strict
)
else
:
print_rank_0
(
'***WARNING*** could not find {} in the checkpoint, '
'initializing to random'
.
format
(
self
.
_multichoice_head_key
))
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/model/realm_model.py
0 → 100644
View file @
316d3f90
import
os
import
torch
from
megatron
import
get_args
,
print_rank_0
from
megatron.checkpointing
import
get_checkpoint_tracker_filename
,
get_checkpoint_name
from
megatron.model
import
BertModel
from
megatron.module
import
MegatronModule
from
megatron
import
mpu
from
megatron.model.utils
import
get_linear_layer
from
megatron.model.utils
import
init_method_normal
from
megatron.model.language_model
import
get_language_model
from
megatron.model.utils
import
scaled_init_method_normal
from
megatron.model.bert_model
import
bert_attention_mask_func
,
bert_extended_attention_mask
,
bert_position_ids
def
general_ict_model_provider
(
only_query_model
=
False
,
only_block_model
=
False
):
"""Build the model."""
args
=
get_args
()
assert
args
.
ict_head_size
is
not
None
,
\
"Need to specify --ict-head-size to provide an ICTBertModel"
assert
args
.
model_parallel_size
==
1
,
\
"Model parallel size > 1 not supported for ICT"
print_rank_0
(
'building ICTBertModel...'
)
# simpler to just keep using 2 tokentypes since the LM we initialize with has 2 tokentypes
model
=
ICTBertModel
(
ict_head_size
=
args
.
ict_head_size
,
num_tokentypes
=
2
,
parallel_output
=
True
,
only_query_model
=
only_query_model
,
only_block_model
=
only_block_model
)
return
model
class
ICTBertModel
(
MegatronModule
):
"""Bert-based module for Inverse Cloze task."""
def
__init__
(
self
,
ict_head_size
,
num_tokentypes
=
1
,
parallel_output
=
True
,
only_query_model
=
False
,
only_block_model
=
False
):
super
(
ICTBertModel
,
self
).
__init__
()
bert_kwargs
=
dict
(
ict_head_size
=
ict_head_size
,
num_tokentypes
=
num_tokentypes
,
parallel_output
=
parallel_output
)
assert
not
(
only_block_model
and
only_query_model
)
self
.
use_block_model
=
not
only_query_model
self
.
use_query_model
=
not
only_block_model
if
self
.
use_query_model
:
# this model embeds (pseudo-)queries - Embed_input in the paper
self
.
query_model
=
IREncoderBertModel
(
**
bert_kwargs
)
self
.
_query_key
=
'question_model'
if
self
.
use_block_model
:
# this model embeds evidence blocks - Embed_doc in the paper
self
.
block_model
=
IREncoderBertModel
(
**
bert_kwargs
)
self
.
_block_key
=
'context_model'
def
forward
(
self
,
query_tokens
,
query_attention_mask
,
block_tokens
,
block_attention_mask
):
"""Run a forward pass for each of the models and return the respective embeddings."""
query_logits
=
self
.
embed_query
(
query_tokens
,
query_attention_mask
)
block_logits
=
self
.
embed_block
(
block_tokens
,
block_attention_mask
)
return
query_logits
,
block_logits
def
embed_query
(
self
,
query_tokens
,
query_attention_mask
):
"""Embed a batch of tokens using the query model"""
if
self
.
use_query_model
:
query_types
=
torch
.
cuda
.
LongTensor
(
*
query_tokens
.
shape
).
fill_
(
0
)
query_ict_logits
,
_
=
self
.
query_model
.
forward
(
query_tokens
,
query_attention_mask
,
query_types
)
return
query_ict_logits
else
:
raise
ValueError
(
"Cannot embed query without query model."
)
def
embed_block
(
self
,
block_tokens
,
block_attention_mask
):
"""Embed a batch of tokens using the block model"""
if
self
.
use_block_model
:
block_types
=
torch
.
cuda
.
LongTensor
(
*
block_tokens
.
shape
).
fill_
(
0
)
block_ict_logits
,
_
=
self
.
block_model
.
forward
(
block_tokens
,
block_attention_mask
,
block_types
)
return
block_ict_logits
else
:
raise
ValueError
(
"Cannot embed block without block model."
)
def
state_dict_for_save_checkpoint
(
self
,
destination
=
None
,
prefix
=
''
,
keep_vars
=
False
):
"""Save dict with state dicts of each of the models."""
state_dict_
=
{}
if
self
.
use_query_model
:
state_dict_
[
self
.
_query_key
]
\
=
self
.
query_model
.
state_dict_for_save_checkpoint
(
destination
,
prefix
,
keep_vars
)
if
self
.
use_block_model
:
state_dict_
[
self
.
_block_key
]
\
=
self
.
block_model
.
state_dict_for_save_checkpoint
(
destination
,
prefix
,
keep_vars
)
return
state_dict_
def
load_state_dict
(
self
,
state_dict
,
strict
=
True
):
"""Load the state dicts of each of the models"""
if
self
.
use_query_model
:
print
(
"Loading ICT query model"
,
flush
=
True
)
self
.
query_model
.
load_state_dict
(
state_dict
[
self
.
_query_key
],
strict
=
strict
)
if
self
.
use_block_model
:
print
(
"Loading ICT block model"
,
flush
=
True
)
self
.
block_model
.
load_state_dict
(
state_dict
[
self
.
_block_key
],
strict
=
strict
)
def
init_state_dict_from_bert
(
self
):
"""Initialize the state from a pretrained BERT model on iteration zero of ICT pretraining"""
args
=
get_args
()
tracker_filename
=
get_checkpoint_tracker_filename
(
args
.
bert_load
)
if
not
os
.
path
.
isfile
(
tracker_filename
):
raise
FileNotFoundError
(
"Could not find BERT load for ICT"
)
with
open
(
tracker_filename
,
'r'
)
as
f
:
iteration
=
int
(
f
.
read
().
strip
())
assert
iteration
>
0
checkpoint_name
=
get_checkpoint_name
(
args
.
bert_load
,
iteration
,
False
)
if
mpu
.
get_data_parallel_rank
()
==
0
:
print
(
'global rank {} is loading checkpoint {}'
.
format
(
torch
.
distributed
.
get_rank
(),
checkpoint_name
))
try
:
state_dict
=
torch
.
load
(
checkpoint_name
,
map_location
=
'cpu'
)
except
BaseException
:
raise
ValueError
(
"Could not load checkpoint"
)
# load the LM state dict into each model
model_dict
=
state_dict
[
'model'
][
'language_model'
]
self
.
query_model
.
language_model
.
load_state_dict
(
model_dict
)
self
.
block_model
.
language_model
.
load_state_dict
(
model_dict
)
# give each model the same ict_head to begin with as well
query_ict_head_state_dict
=
self
.
state_dict_for_save_checkpoint
()[
self
.
_query_key
][
'ict_head'
]
self
.
block_model
.
ict_head
.
load_state_dict
(
query_ict_head_state_dict
)
class
IREncoderBertModel
(
MegatronModule
):
"""BERT-based encoder for queries or blocks used for learned information retrieval."""
def
__init__
(
self
,
ict_head_size
,
num_tokentypes
=
2
,
parallel_output
=
True
):
super
(
IREncoderBertModel
,
self
).
__init__
()
args
=
get_args
()
self
.
ict_head_size
=
ict_head_size
self
.
parallel_output
=
parallel_output
init_method
=
init_method_normal
(
args
.
init_method_std
)
scaled_init_method
=
scaled_init_method_normal
(
args
.
init_method_std
,
args
.
num_layers
)
self
.
language_model
,
self
.
_language_model_key
=
get_language_model
(
attention_mask_func
=
bert_attention_mask_func
,
num_tokentypes
=
num_tokentypes
,
add_pooler
=
True
,
init_method
=
init_method
,
scaled_init_method
=
scaled_init_method
)
self
.
ict_head
=
get_linear_layer
(
args
.
hidden_size
,
ict_head_size
,
init_method
)
self
.
_ict_head_key
=
'ict_head'
def
forward
(
self
,
input_ids
,
attention_mask
,
tokentype_ids
=
None
):
extended_attention_mask
=
bert_extended_attention_mask
(
attention_mask
,
next
(
self
.
language_model
.
parameters
()).
dtype
)
position_ids
=
bert_position_ids
(
input_ids
)
lm_output
,
pooled_output
=
self
.
language_model
(
input_ids
,
position_ids
,
extended_attention_mask
,
tokentype_ids
=
tokentype_ids
)
# Output.
ict_logits
=
self
.
ict_head
(
pooled_output
)
return
ict_logits
,
None
def
state_dict_for_save_checkpoint
(
self
,
destination
=
None
,
prefix
=
''
,
keep_vars
=
False
):
"""For easy load when model is combined with other heads,
add an extra key."""
state_dict_
=
{}
state_dict_
[
self
.
_language_model_key
]
\
=
self
.
language_model
.
state_dict_for_save_checkpoint
(
destination
,
prefix
,
keep_vars
)
state_dict_
[
self
.
_ict_head_key
]
\
=
self
.
ict_head
.
state_dict
(
destination
,
prefix
,
keep_vars
)
return
state_dict_
def
load_state_dict
(
self
,
state_dict
,
strict
=
True
):
"""Customized load."""
self
.
language_model
.
load_state_dict
(
state_dict
[
self
.
_language_model_key
],
strict
=
strict
)
self
.
ict_head
.
load_state_dict
(
state_dict
[
self
.
_ict_head_key
],
strict
=
strict
)
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/model/transformer.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Transformer."""
import
math
import
torch
import
torch.nn.functional
as
F
from
megatron
import
get_args
from
megatron
import
mpu
from
megatron.mpu
import
LayerNorm
from
megatron.module
import
MegatronModule
from
megatron.checkpointing
import
get_checkpoint_version
from
megatron.model.fused_softmax
import
FusedScaleMaskSoftmax
from
megatron.model.fused_bias_gelu
import
bias_gelu_impl
from
megatron.model.utils
import
openai_gelu
,
erf_gelu
import
deepspeed
# flags required to enable jit fusion kernels
torch
.
_C
.
_jit_set_profiling_mode
(
False
)
torch
.
_C
.
_jit_set_profiling_executor
(
False
)
torch
.
_C
.
_jit_override_can_fuse_on_cpu
(
True
)
torch
.
_C
.
_jit_override_can_fuse_on_gpu
(
True
)
""" We use the following notation throughout this file:
h: hidden size
n: number of attention heads
p: number of model parallel partitions
np: n/p
hp: h/p
hn: h/n
b: batch size
s: sequence length
l: number of layers
Transformer takes input of size [s, b, h] and returns a
tensor of the same size. We use the following arguments:
hyperparameters: transformer hyperparameters
attention_mask_func: a function that takes `unmaksed-attention-scores`
with size [b, np, s, s] and an `attention-mask` and will apply
the masking. The function should return a masked score of the
same size [b, np, s, s].
masked-attention-scores = attention_mask_func(
unmaksed-attention-scores, attention-mask)
"""
class
ParallelMLP
(
MegatronModule
):
"""MLP.
MLP will take the input with h hidden state, project it to 4*h
hidden dimension, perform nonlinear transformation, and project the
state back into h hidden dimension. At the end, dropout is also
applied.
"""
def
__init__
(
self
,
init_method
,
output_layer_init_method
):
super
(
ParallelMLP
,
self
).
__init__
()
args
=
get_args
()
# Project to 4h.
self
.
dense_h_to_4h
=
mpu
.
ColumnParallelLinear
(
args
.
hidden_size
,
4
*
args
.
hidden_size
,
gather_output
=
False
,
init_method
=
init_method
,
skip_bias_add
=
True
)
self
.
bias_gelu_fusion
=
args
.
bias_gelu_fusion
self
.
activation_func
=
F
.
gelu
if
args
.
openai_gelu
:
self
.
activation_func
=
openai_gelu
elif
args
.
onnx_safe
:
self
.
activation_func
=
erf_gelu
# Project back to h.
self
.
dense_4h_to_h
=
mpu
.
RowParallelLinear
(
4
*
args
.
hidden_size
,
args
.
hidden_size
,
input_is_parallel
=
True
,
init_method
=
output_layer_init_method
,
skip_bias_add
=
True
)
def
forward
(
self
,
hidden_states
):
# [s, b, 4hp]
intermediate_parallel
,
bias_parallel
=
self
.
dense_h_to_4h
(
hidden_states
)
if
self
.
bias_gelu_fusion
:
intermediate_parallel
=
\
bias_gelu_impl
(
intermediate_parallel
,
bias_parallel
)
else
:
intermediate_parallel
=
\
self
.
activation_func
(
intermediate_parallel
+
bias_parallel
)
# [s, b, h]
output
,
output_bias
=
self
.
dense_4h_to_h
(
intermediate_parallel
)
return
output
,
output_bias
class
ParallelSelfAttention
(
MegatronModule
):
"""Parallel self-attention layer abstract class.
Self-attention layer takes input with size [b, s, h]
and returns output of the same size.
"""
def
__init__
(
self
,
attention_mask_func
,
init_method
,
output_layer_init_method
,
layer_number
):
super
(
ParallelSelfAttention
,
self
).
__init__
()
args
=
get_args
()
self
.
fp16
=
args
.
fp16
self
.
attention_mask_func
=
attention_mask_func
self
.
apply_query_key_layer_scaling
=
args
.
apply_query_key_layer_scaling
self
.
attention_softmax_in_fp32
=
args
.
attention_softmax_in_fp32
if
self
.
apply_query_key_layer_scaling
:
self
.
attention_softmax_in_fp32
=
True
self
.
layer_number
=
max
(
1
,
layer_number
)
# Per attention head and per partition values.
world_size
=
mpu
.
get_model_parallel_world_size
()
self
.
hidden_size_per_partition
=
mpu
.
divide
(
args
.
hidden_size
,
world_size
)
self
.
hidden_size_per_attention_head
=
mpu
.
divide
(
args
.
hidden_size
,
args
.
num_attention_heads
)
self
.
num_attention_heads_per_partition
=
mpu
.
divide
(
args
.
num_attention_heads
,
world_size
)
# Strided linear layer.
self
.
query_key_value
=
mpu
.
ColumnParallelLinear
(
args
.
hidden_size
,
3
*
args
.
hidden_size
,
gather_output
=
False
,
init_method
=
init_method
)
coeff
=
None
self
.
norm_factor
=
math
.
sqrt
(
self
.
hidden_size_per_attention_head
)
if
self
.
apply_query_key_layer_scaling
:
coeff
=
self
.
layer_number
self
.
norm_factor
*=
coeff
self
.
scale_mask_softmax
=
FusedScaleMaskSoftmax
(
self
.
fp16
,
args
.
scaled_upper_triang_masked_softmax_fusion
,
args
.
scaled_masked_softmax_fusion
,
self
.
attention_mask_func
,
self
.
attention_softmax_in_fp32
,
coeff
)
# Dropout. Note that for a single iteration, this layer will generate
# different outputs on different number of parallel partitions but
# on average it should not be partition dependent.
self
.
attention_dropout
=
torch
.
nn
.
Dropout
(
args
.
attention_dropout
)
# Output.
self
.
dense
=
mpu
.
RowParallelLinear
(
args
.
hidden_size
,
args
.
hidden_size
,
input_is_parallel
=
True
,
init_method
=
output_layer_init_method
,
skip_bias_add
=
True
)
if
deepspeed
.
checkpointing
.
is_configured
():
global
get_cuda_rng_tracker
,
checkpoint
get_cuda_rng_tracker
=
deepspeed
.
checkpointing
.
get_cuda_rng_tracker
checkpoint
=
deepspeed
.
checkpointing
.
checkpoint
def
_transpose_last_dim
(
self
,
mixed_layer
,
num_splits
,
num_splits_first
):
input_shape
=
mixed_layer
.
size
();
if
num_splits_first
:
"""[s, b, num_splits * np * hn]
-->(view) [s, b, num_splits, np, hn]
-->(tranpose) [s, b, np, num_splits, hn]
-->(view) [s, b, np * num_splits * hn] """
intermediate_shape
=
input_shape
[:
-
1
]
+
\
(
num_splits
,
self
.
num_attention_heads_per_partition
,
self
.
hidden_size_per_attention_head
)
mixed_layer
=
mixed_layer
.
view
(
*
intermediate_shape
)
mixed_layer
=
mixed_layer
.
transpose
(
-
2
,
-
3
).
contiguous
()
else
:
"""[s, b, np * hn * num_splits]
-->(view) [s, b, np, hn, num_splits]
-->(tranpose) [s, b, np, num_splits, hn]
-->(view) [s, b, np * num_splits * hn] """
intermediate_shape
=
input_shape
[:
-
1
]
+
\
(
self
.
num_attention_heads_per_partition
,
self
.
hidden_size_per_attention_head
,
num_splits
)
mixed_layer
=
mixed_layer
.
view
(
*
intermediate_shape
)
mixed_layer
=
mixed_layer
.
transpose
(
-
1
,
-
2
).
contiguous
()
mixed_layer
=
mixed_layer
.
view
(
*
input_shape
)
return
mixed_layer
def
forward
(
self
,
hidden_states
,
attention_mask
,
layer_past
=
None
,
get_key_value
=
False
):
# hidden_states: [sq, b, h]
# =====================
# Query, Key, and Value
# =====================
# Attention heads [sq, b, h] --> [sq, b, (np * 3 * hn)]
mixed_x_layer
,
_
=
self
.
query_key_value
(
hidden_states
)
checkpoint_version
=
get_checkpoint_version
()
if
checkpoint_version
is
not
None
:
if
checkpoint_version
==
0
:
# [s, b, (3 * np * hn)] --> [s, b, (np * 3 * hn)]
mixed_x_layer
=
self
.
_transpose_last_dim
(
mixed_x_layer
,
3
,
True
)
elif
checkpoint_version
==
1.0
:
# [s, b, (np * hn * 3)] --> [s, b, (np * 3 * hn)]
mixed_x_layer
=
self
.
_transpose_last_dim
(
mixed_x_layer
,
3
,
False
)
# [sq, b, (np * 3 * hn)] --> [sq, b, np, 3 * hn]
new_tensor_shape
=
mixed_x_layer
.
size
()[:
-
1
]
+
\
(
self
.
num_attention_heads_per_partition
,
3
*
self
.
hidden_size_per_attention_head
)
mixed_x_layer
=
mixed_x_layer
.
view
(
*
new_tensor_shape
)
# [sq, b, np, 3 * hn] --> 3 [sq, b, np, hn]
(
query_layer
,
key_layer
,
value_layer
)
=
mpu
.
split_tensor_along_last_dim
(
mixed_x_layer
,
3
)
# ==================================
# Adjust key and value for inference
# ==================================
if
layer_past
is
not
None
:
past_key
,
past_value
=
layer_past
key_layer
=
torch
.
cat
((
past_key
.
type_as
(
key_layer
),
key_layer
),
dim
=
0
)
value_layer
=
torch
.
cat
((
past_value
.
type_as
(
value_layer
),
value_layer
),
dim
=
0
)
if
get_key_value
:
present
=
(
key_layer
,
value_layer
)
# ===================================
# Raw attention scores. [b, np, s, s]
# ===================================
# [b, np, sq, sk]
output_size
=
(
query_layer
.
size
(
1
),
query_layer
.
size
(
2
),
query_layer
.
size
(
0
),
key_layer
.
size
(
0
))
# [sq, b, np, hn] -> [sq, b * np, hn]
query_layer
=
query_layer
.
view
(
output_size
[
2
],
output_size
[
0
]
*
output_size
[
1
],
-
1
)
key_layer
=
key_layer
.
view
(
output_size
[
3
],
output_size
[
0
]
*
output_size
[
1
],
-
1
)
# preallocting result tensor: [b * np, sq, sk]
matmul_result
=
torch
.
empty
(
output_size
[
0
]
*
output_size
[
1
],
output_size
[
2
],
output_size
[
3
],
dtype
=
query_layer
.
dtype
,
device
=
torch
.
cuda
.
current_device
())
# Raw attention scores. [b * np, sq, sk]
matmul_result
=
torch
.
baddbmm
(
matmul_result
,
query_layer
.
transpose
(
0
,
1
),
# [b * np, sq, hn]
key_layer
.
transpose
(
0
,
1
).
transpose
(
1
,
2
),
#[b * np, hn, sk]
beta
=
0.0
,
alpha
=
(
1.0
/
self
.
norm_factor
))
# change view to [b, np, sq, sk]
attention_scores
=
matmul_result
.
view
(
*
output_size
)
# ==================================================
# Update attention mask for inference. [b, np, sq, sk]
# ==================================================
if
get_key_value
:
with
torch
.
no_grad
():
if
layer_past
is
not
None
:
attention_mask
=
attention_mask
[
...,
attention_scores
.
size
(
3
)
-
1
,
:
attention_scores
.
size
(
3
)].
unsqueeze
(
2
)
else
:
attention_mask
=
attention_mask
[
...,
:
attention_scores
.
size
(
3
),
:
attention_scores
.
size
(
3
)]
# ===========================
# Attention probs and dropout
# ===========================
# attention scores and attention mask [b, np, sq, sk]
attention_probs
=
self
.
scale_mask_softmax
(
attention_scores
,
attention_mask
)
# This is actually dropping out entire tokens to attend to, which might
# seem a bit unusual, but is taken from the original Transformer paper.
with
mpu
.
get_cuda_rng_tracker
().
fork
():
attention_probs
=
self
.
attention_dropout
(
attention_probs
)
# =========================
# Context layer. [sq, b, hp]
# =========================
# value_layer -> context layer.
# [sk, b, np, hn] --> [b, np, sq, hn]
# context layer shape: [b, np, sq, hn]
output_size
=
(
value_layer
.
size
(
1
),
value_layer
.
size
(
2
),
query_layer
.
size
(
0
),
value_layer
.
size
(
3
))
# change view [sk, b * np, hn]
value_layer
=
value_layer
.
view
(
value_layer
.
size
(
0
),
output_size
[
0
]
*
output_size
[
1
],
-
1
)
# change view [b * np, sq, sk]
attention_probs
=
attention_probs
.
view
(
output_size
[
0
]
*
output_size
[
1
],
output_size
[
2
],
-
1
)
# matmul: [b * np, sq, hn]
context_layer
=
torch
.
bmm
(
attention_probs
,
value_layer
.
transpose
(
0
,
1
))
# change view [b, np, sq, hn]
context_layer
=
context_layer
.
view
(
*
output_size
)
# [b, np, sq, hn] --> [sq, b, np, hn]
context_layer
=
context_layer
.
permute
(
2
,
0
,
1
,
3
).
contiguous
()
# [sq, b, np, hn] --> [sq, b, hp]
new_context_layer_shape
=
context_layer
.
size
()[:
-
2
]
+
\
(
self
.
hidden_size_per_partition
,)
context_layer
=
context_layer
.
view
(
*
new_context_layer_shape
)
# =================
# Output. [sq, b, h]
# =================
output
,
bias
=
self
.
dense
(
context_layer
)
if
get_key_value
:
output
=
[
output
,
present
]
return
output
,
bias
def
bias_dropout_add
(
x
,
bias
,
residual
,
prob
,
training
)
:
# type: (Tensor, Tensor, Tensor, float, bool) -> Tensor
out
=
torch
.
nn
.
functional
.
dropout
(
x
+
bias
,
p
=
prob
,
training
=
training
)
out
=
residual
+
out
return
out
def
get_bias_dropout_add
(
training
):
def
_bias_dropout_add
(
x
,
bias
,
residual
,
prob
):
return
bias_dropout_add
(
x
,
bias
,
residual
,
prob
,
training
)
return
_bias_dropout_add
@
torch
.
jit
.
script
def
bias_dropout_add_fused_train
(
x
,
bias
,
residual
,
prob
)
:
# type: (Tensor, Tensor, Tensor, float) -> Tensor
return
bias_dropout_add
(
x
,
bias
,
residual
,
prob
,
True
)
@
torch
.
jit
.
script
def
bias_dropout_add_fused_inference
(
x
,
bias
,
residual
,
prob
)
:
# type: (Tensor, Tensor, Tensor, float) -> Tensor
return
bias_dropout_add
(
x
,
bias
,
residual
,
prob
,
False
)
class
ParallelTransformerLayer
(
MegatronModule
):
"""A single transformer layer.
Transformore layer takes input with size [b, s, h] and returns an
output of the same size.
"""
def
__init__
(
self
,
attention_mask_func
,
init_method
,
output_layer_init_method
,
layer_number
):
args
=
get_args
()
super
(
ParallelTransformerLayer
,
self
).
__init__
()
self
.
layer_number
=
layer_number
self
.
apply_residual_connection_post_layernorm
\
=
args
.
apply_residual_connection_post_layernorm
# Layernorm on the input data.
self
.
input_layernorm
=
LayerNorm
(
args
.
hidden_size
,
eps
=
args
.
layernorm_epsilon
)
# Self attention.
self
.
attention
=
ParallelSelfAttention
(
attention_mask_func
,
init_method
,
output_layer_init_method
,
layer_number
)
self
.
hidden_dropout
=
args
.
hidden_dropout
self
.
bias_dropout_fusion
=
args
.
bias_dropout_fusion
# Layernorm on the input data.
self
.
post_attention_layernorm
=
LayerNorm
(
args
.
hidden_size
,
eps
=
args
.
layernorm_epsilon
)
# MLP
self
.
mlp
=
ParallelMLP
(
init_method
,
output_layer_init_method
)
def
forward
(
self
,
hidden_states
,
attention_mask
,
layer_past
=
None
,
get_key_value
=
False
):
# hidden_states: [b, s, h]
# Layer norm at the begining of the transformer layer.
layernorm_output
=
self
.
input_layernorm
(
hidden_states
)
# Self attention.
attention_output
,
attention_bias
=
\
self
.
attention
(
layernorm_output
,
attention_mask
,
layer_past
=
layer_past
,
get_key_value
=
get_key_value
)
if
get_key_value
:
attention_output
,
presents
=
attention_output
# Residual connection.
if
self
.
apply_residual_connection_post_layernorm
:
residual
=
layernorm_output
else
:
residual
=
hidden_states
# jit scripting for a nn.module (with dropout) is not
# trigerring the fusion kernel. For now, we use two
# different nn.functional routines to account for varying
# dropout semantics during training and inference phases.
if
self
.
bias_dropout_fusion
:
if
self
.
training
:
bias_dropout_add_func
=
bias_dropout_add_fused_train
else
:
bias_dropout_add_func
=
bias_dropout_add_fused_inference
else
:
bias_dropout_add_func
=
get_bias_dropout_add
(
self
.
training
)
#re-enable torch grad to enable fused optimization.
with
torch
.
enable_grad
():
layernorm_input
=
bias_dropout_add_func
(
attention_output
,
attention_bias
.
expand_as
(
residual
),
residual
,
self
.
hidden_dropout
)
# Layer norm post the self attention.
layernorm_output
=
self
.
post_attention_layernorm
(
layernorm_input
)
# MLP.
mlp_output
,
mlp_bias
=
self
.
mlp
(
layernorm_output
)
# Second residual connection.
if
self
.
apply_residual_connection_post_layernorm
:
residual
=
layernorm_output
else
:
residual
=
layernorm_input
#re-enable torch grad to enable fused optimization.
with
torch
.
enable_grad
():
output
=
bias_dropout_add_func
(
mlp_output
,
mlp_bias
.
expand_as
(
residual
),
residual
,
self
.
hidden_dropout
)
if
get_key_value
:
output
=
[
output
,
presents
]
return
output
class
ParallelTransformerLayerPipe
(
ParallelTransformerLayer
):
"""Extends ParallelTransformerLayer to forward attention_mask through the pipeline. """
def
forward
(
self
,
args
):
hidden_states
,
attention_mask
=
args
[
0
],
args
[
1
]
return
super
().
forward
(
*
args
),
attention_mask
class
ParallelTransformer
(
MegatronModule
):
"""Transformer class."""
def
__init__
(
self
,
attention_mask_func
,
init_method
,
output_layer_init_method
):
super
(
ParallelTransformer
,
self
).
__init__
()
args
=
get_args
()
# Store activation checkpoiting flag.
self
.
checkpoint_activations
=
args
.
checkpoint_activations
self
.
checkpoint_num_layers
=
args
.
checkpoint_num_layers
# Number of layers:
self
.
num_layers
=
args
.
num_layers
self
.
num_unique_layers
=
args
.
num_unique_layers
if
self
.
num_unique_layers
is
None
:
self
.
num_unique_layers
=
self
.
num_layers
assert
self
.
num_layers
%
self
.
num_unique_layers
==
0
,
\
'number of layers should be divisible by number of unique layers'
self
.
param_sharing_style
=
args
.
param_sharing_style
# Transformer layers.
def
build_layer
(
layer_number
):
return
ParallelTransformerLayer
(
attention_mask_func
,
init_method
,
output_layer_init_method
,
layer_number
)
self
.
layers
=
torch
.
nn
.
ModuleList
(
[
build_layer
(
i
+
1
)
for
i
in
range
(
self
.
num_unique_layers
)])
# Print layer ordering.
if
self
.
num_layers
!=
self
.
num_unique_layers
:
if
torch
.
distributed
.
get_rank
()
==
0
:
print
(
'> will be using the following layer ordering:'
)
for
i
in
range
(
self
.
num_layers
):
print
(
' layer id: {:3d} --> unique layer id: '
'{:3d}'
.
format
(
i
,
self
.
_get_layer_index
(
i
)),
flush
=
True
)
# Final layer norm before output.
self
.
final_layernorm
=
LayerNorm
(
args
.
hidden_size
,
eps
=
args
.
layernorm_epsilon
)
if
deepspeed
.
checkpointing
.
is_configured
():
global
get_cuda_rng_tracker
,
checkpoint
get_cuda_rng_tracker
=
deepspeed
.
checkpointing
.
get_cuda_rng_tracker
checkpoint
=
deepspeed
.
checkpointing
.
checkpoint
def
_get_layer_index
(
self
,
layer_number
):
if
self
.
param_sharing_style
==
'grouped'
:
return
layer_number
%
self
.
num_unique_layers
if
self
.
param_sharing_style
==
'spaced'
:
return
layer_number
//
(
self
.
num_layers
//
self
.
num_unique_layers
)
assert
False
,
'should not be here'
def
_get_layer
(
self
,
layer_number
):
return
self
.
layers
[
self
.
_get_layer_index
(
layer_number
)]
def
_checkpointed_forward
(
self
,
hidden_states
,
attention_mask
):
"""Forward method with activation checkpointing."""
def
custom
(
start
,
end
):
def
custom_forward
(
*
inputs
):
x_
=
inputs
[
0
]
for
index
in
range
(
start
,
end
):
layer
=
self
.
_get_layer
(
index
)
x_
=
layer
(
x_
,
inputs
[
1
])
return
x_
return
custom_forward
# Make sure memory is freed.
mpu
.
reset_checkpointed_activations_memory_buffer
()
l
=
0
while
l
<
self
.
num_layers
:
hidden_states
=
mpu
.
checkpoint
(
custom
(
l
,
l
+
self
.
checkpoint_num_layers
),
hidden_states
,
attention_mask
)
l
+=
self
.
checkpoint_num_layers
return
hidden_states
def
forward
(
self
,
hidden_states
,
attention_mask
,
layer_past
=
None
,
get_key_value
=
False
):
# Checks
if
layer_past
is
not
None
:
assert
get_key_value
,
\
'for not None values in layer_past, '
\
'expected get_key_value to be set'
if
get_key_value
:
assert
not
self
.
checkpoint_activations
,
\
'get_key_value does not work with '
\
'activation checkpointing'
# data format change to avoid explicit tranposes : [b s h] --> [s b h]
hidden_states
=
hidden_states
.
transpose
(
0
,
1
).
contiguous
()
if
self
.
checkpoint_activations
:
hidden_states
=
self
.
_checkpointed_forward
(
hidden_states
,
attention_mask
)
else
:
if
get_key_value
:
presents
=
[]
for
index
in
range
(
self
.
num_layers
):
layer
=
self
.
_get_layer
(
index
)
past
=
None
if
layer_past
is
not
None
:
past
=
layer_past
[
index
]
hidden_states
=
layer
(
hidden_states
,
attention_mask
,
layer_past
=
past
,
get_key_value
=
get_key_value
)
if
get_key_value
:
hidden_states
,
present
=
hidden_states
presents
.
append
(
present
)
# reverting data format change [s b h] --> [b s h]
hidden_states
=
hidden_states
.
transpose
(
0
,
1
).
contiguous
()
# Final layer norm.
output
=
self
.
final_layernorm
(
hidden_states
)
if
get_key_value
:
output
=
[
output
,
presents
]
return
output
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/model/utils.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for models."""
import
math
import
torch
from
.transformer
import
LayerNorm
def
init_method_normal
(
sigma
):
"""Init method based on N(0, sigma)."""
def
init_
(
tensor
):
return
torch
.
nn
.
init
.
normal_
(
tensor
,
mean
=
0.0
,
std
=
sigma
)
return
init_
def
scaled_init_method_normal
(
sigma
,
num_layers
):
"""Init method based on N(0, sigma/sqrt(2*num_layers)."""
std
=
sigma
/
math
.
sqrt
(
2.0
*
num_layers
)
def
init_
(
tensor
):
return
torch
.
nn
.
init
.
normal_
(
tensor
,
mean
=
0.0
,
std
=
std
)
return
init_
def
get_linear_layer
(
rows
,
columns
,
init_method
):
"""Simple linear layer with weight initialization."""
layer
=
torch
.
nn
.
Linear
(
rows
,
columns
)
init_method
(
layer
.
weight
)
with
torch
.
no_grad
():
layer
.
bias
.
zero_
()
return
layer
@
torch
.
jit
.
script
def
gelu_impl
(
x
):
"""OpenAI's gelu implementation."""
return
0.5
*
x
*
(
1.0
+
torch
.
tanh
(
0.7978845608028654
*
x
*
(
1.0
+
0.044715
*
x
*
x
)))
def
openai_gelu
(
x
):
return
gelu_impl
(
x
)
#This is actually Python equivalent of torch.nn.functional.gelu(), also with type hints for ONNX exporter
@
torch
.
jit
.
script
def
erf_gelu
(
x
):
return
x
*
0.5
*
(
torch
.
erf
(
x
/
1.41421
).
to
(
dtype
=
x
.
dtype
)
+
torch
.
ones_like
(
x
).
to
(
dtype
=
x
.
dtype
))
def
get_params_for_weight_decay_optimization
(
module
):
"""Divide params into with-weight-decay and without-weight-decay groups.
Layernorms and baises will have no weight decay but the rest will.
"""
weight_decay_params
=
{
'params'
:
[]}
no_weight_decay_params
=
{
'params'
:
[],
'weight_decay'
:
0.0
}
for
module_
in
module
.
modules
():
if
isinstance
(
module_
,
LayerNorm
):
no_weight_decay_params
[
'params'
].
extend
(
[
p
for
p
in
list
(
module_
.
_parameters
.
values
())
if
p
is
not
None
])
else
:
weight_decay_params
[
'params'
].
extend
(
[
p
for
n
,
p
in
list
(
module_
.
_parameters
.
items
())
if
p
is
not
None
and
n
!=
'bias'
])
no_weight_decay_params
[
'params'
].
extend
(
[
p
for
n
,
p
in
list
(
module_
.
_parameters
.
items
())
if
p
is
not
None
and
n
==
'bias'
])
return
weight_decay_params
,
no_weight_decay_params
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/module.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Megatron Module"""
import
torch
class
MegatronModule
(
torch
.
nn
.
Module
):
"""Megatron specific extentions of torch Module."""
def
__init__
(
self
):
super
(
MegatronModule
,
self
).
__init__
()
def
state_dict_for_save_checkpoint
(
self
,
destination
=
None
,
prefix
=
''
,
keep_vars
=
False
):
"""Use this function to override the state dict for
saving checkpoints."""
return
self
.
state_dict
(
destination
,
prefix
,
keep_vars
)
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/__init__.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Model parallel utility interface."""
from
.cross_entropy
import
vocab_parallel_cross_entropy
from
.data
import
broadcast_data
from
.grads
import
clip_grad_norm
from
.initialize
import
is_unitialized
from
.initialize
import
destroy_model_parallel
from
.initialize
import
get_data_parallel_group
from
.initialize
import
get_data_parallel_rank
from
.initialize
import
get_data_parallel_world_size
from
.initialize
import
get_model_parallel_group
from
.initialize
import
get_model_parallel_rank
,
set_model_parallel_rank
from
.initialize
import
get_model_parallel_src_rank
from
.initialize
import
get_model_parallel_world_size
,
set_model_parallel_world_size
from
.initialize
import
get_topology
from
.initialize
import
get_pipe_parallel_group
from
.initialize
import
get_pipe_parallel_rank
from
.initialize
import
get_pipe_parallel_world_size
from
.initialize
import
get_io_parallel_group
from
.initialize
import
initialize_model_parallel
from
.initialize
import
model_parallel_is_initialized
from
.layers
import
LayerNorm
from
.layers
import
ColumnParallelLinear
from
.layers
import
RowParallelLinear
from
.layers
import
VocabParallelEmbedding
from
.mappings
import
copy_to_model_parallel_region
from
.mappings
import
gather_from_model_parallel_region
from
.mappings
import
reduce_from_model_parallel_region
from
.mappings
import
scatter_to_model_parallel_region
from
.random
import
checkpoint
from
.random
import
get_cuda_rng_tracker
from
.random
import
init_checkpointed_activations_memory_buffer
from
.random
import
model_parallel_cuda_manual_seed
from
.random
import
reset_checkpointed_activations_memory_buffer
from
.utils
import
divide
from
.utils
import
split_tensor_along_last_dim
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/cross_entropy.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
torch
from
.initialize
import
get_model_parallel_group
from
.initialize
import
get_model_parallel_rank
from
.initialize
import
get_model_parallel_world_size
from
.utils
import
VocabUtility
class
_VocabParallelCrossEntropy
(
torch
.
autograd
.
Function
):
@
staticmethod
def
forward
(
ctx
,
vocab_parallel_logits
,
target
):
# Maximum value along vocab dimension across all GPUs.
logits_max
=
torch
.
max
(
vocab_parallel_logits
,
dim
=-
1
)[
0
]
torch
.
distributed
.
all_reduce
(
logits_max
,
op
=
torch
.
distributed
.
ReduceOp
.
MAX
,
group
=
get_model_parallel_group
())
# Subtract the maximum value.
vocab_parallel_logits
.
sub_
(
logits_max
.
unsqueeze
(
dim
=-
1
))
# Get the partition's vocab indecies
get_vocab_range
=
VocabUtility
.
vocab_range_from_per_partition_vocab_size
partition_vocab_size
=
vocab_parallel_logits
.
size
()[
-
1
]
rank
=
get_model_parallel_rank
()
world_size
=
get_model_parallel_world_size
()
vocab_start_index
,
vocab_end_index
=
get_vocab_range
(
partition_vocab_size
,
rank
,
world_size
)
# Create a mask of valid vocab ids (1 means it needs to be masked).
target_mask
=
(
target
<
vocab_start_index
)
|
(
target
>=
vocab_end_index
)
masked_target
=
target
.
clone
()
-
vocab_start_index
masked_target
[
target_mask
]
=
0
# Get predicted-logits = logits[target].
# For Simplicity, we convert logits to a 2-D tensor with size
# [*, partition-vocab-size] and target to a 1-D tensor of size [*].
logits_2d
=
vocab_parallel_logits
.
view
(
-
1
,
partition_vocab_size
)
masked_target_1d
=
masked_target
.
view
(
-
1
)
arange_1d
=
torch
.
arange
(
start
=
0
,
end
=
logits_2d
.
size
()[
0
],
device
=
logits_2d
.
device
)
predicted_logits_1d
=
logits_2d
[
arange_1d
,
masked_target_1d
]
predicted_logits_1d
=
predicted_logits_1d
.
clone
().
contiguous
()
predicted_logits
=
predicted_logits_1d
.
view_as
(
target
)
predicted_logits
[
target_mask
]
=
0.0
# All reduce is needed to get the chunks from other GPUs.
torch
.
distributed
.
all_reduce
(
predicted_logits
,
op
=
torch
.
distributed
.
ReduceOp
.
SUM
,
group
=
get_model_parallel_group
())
# Sum of exponential of logits along vocab dimension across all GPUs.
exp_logits
=
vocab_parallel_logits
torch
.
exp
(
vocab_parallel_logits
,
out
=
exp_logits
)
sum_exp_logits
=
exp_logits
.
sum
(
dim
=-
1
)
torch
.
distributed
.
all_reduce
(
sum_exp_logits
,
op
=
torch
.
distributed
.
ReduceOp
.
SUM
,
group
=
get_model_parallel_group
())
# Loss = log(sum(exp(logits))) - predicted-logit.
loss
=
torch
.
log
(
sum_exp_logits
)
-
predicted_logits
# Store softmax, target-mask and masked-target for backward pass.
exp_logits
.
div_
(
sum_exp_logits
.
unsqueeze
(
dim
=-
1
))
ctx
.
save_for_backward
(
exp_logits
,
target_mask
,
masked_target_1d
)
return
loss
@
staticmethod
def
backward
(
ctx
,
grad_output
):
# Retreive tensors from the forward path.
softmax
,
target_mask
,
masked_target_1d
=
ctx
.
saved_tensors
# All the inputs have softmax as thier gradient.
grad_input
=
softmax
# For simplicity, work with the 2D gradient.
partition_vocab_size
=
softmax
.
size
()[
-
1
]
grad_2d
=
grad_input
.
view
(
-
1
,
partition_vocab_size
)
# Add the gradient from matching classes.
arange_1d
=
torch
.
arange
(
start
=
0
,
end
=
grad_2d
.
size
()[
0
],
device
=
grad_2d
.
device
)
grad_2d
[
arange_1d
,
masked_target_1d
]
-=
(
1.0
-
target_mask
.
view
(
-
1
).
float
())
# Finally elementwise multiplication with the output gradients.
grad_input
.
mul_
(
grad_output
.
unsqueeze
(
dim
=-
1
))
return
grad_input
,
None
def
vocab_parallel_cross_entropy
(
vocab_parallel_logits
,
target
):
"""Helper function for the cross entropy."""
return
_VocabParallelCrossEntropy
.
apply
(
vocab_parallel_logits
,
target
)
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/data.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
torch
from
.initialize
import
get_model_parallel_group
from
.initialize
import
get_model_parallel_rank
from
.initialize
import
get_model_parallel_src_rank
_MAX_DATA_DIM
=
4
def
_check_data_types
(
keys
,
data
,
target_dtype
):
"""Check that all the keys have the same target data type."""
for
key
in
keys
:
assert
data
[
key
].
dtype
==
target_dtype
,
'{} has data type {} which '
\
'is different than {}'
.
format
(
key
,
data
[
key
].
dtype
,
target_dtype
)
def
_build_key_size_numel_dictionaries
(
keys
,
data
):
"""Build the size on rank 0 and broadcast."""
max_dim
=
_MAX_DATA_DIM
sizes
=
[
0
for
_
in
range
(
max_dim
)
for
_
in
keys
]
# Pack the sizes on rank zero.
if
get_model_parallel_rank
()
==
0
:
offset
=
0
for
key
in
keys
:
assert
data
[
key
].
dim
()
<
max_dim
,
'you should increase MAX_DATA_DIM'
size
=
data
[
key
].
size
()
for
i
,
s
in
enumerate
(
size
):
sizes
[
i
+
offset
]
=
s
offset
+=
max_dim
# Move to GPU and broadcast.
sizes_cuda
=
torch
.
cuda
.
LongTensor
(
sizes
)
torch
.
distributed
.
broadcast
(
sizes_cuda
,
get_model_parallel_src_rank
(),
group
=
get_model_parallel_group
())
# Move back to cpu and unpack.
sizes_cpu
=
sizes_cuda
.
cpu
()
key_size
=
{}
key_numel
=
{}
total_numel
=
0
offset
=
0
for
key
in
keys
:
i
=
0
size
=
[]
numel
=
1
while
sizes_cpu
[
offset
+
i
]
>
0
:
this_size
=
sizes_cpu
[
offset
+
i
]
size
.
append
(
this_size
)
numel
*=
this_size
i
+=
1
key_size
[
key
]
=
size
key_numel
[
key
]
=
numel
total_numel
+=
numel
offset
+=
max_dim
return
key_size
,
key_numel
,
total_numel
def
broadcast_data
(
keys
,
data
,
datatype
):
"""Broadcast data from rank zero of each model parallel group to the
members of the same model parallel group.
Arguments:
keys: list of keys in the data disctionary to be broadcasted
data: data dictionary of string keys and cpu tensor values.
datatype: torch data type of all tensors in data associated
with keys.
"""
# Build (key, size) and (key, number of elements) dictionaries along
# with the total number of elements on all ranks.
key_size
,
key_numel
,
total_numel
=
_build_key_size_numel_dictionaries
(
keys
,
data
)
# Pack on rank zero.
if
get_model_parallel_rank
()
==
0
:
# Check that all keys have the same data type.
_check_data_types
(
keys
,
data
,
datatype
)
# Flatten the data associated with the keys
flatten_data
=
torch
.
cat
(
[
data
[
key
].
contiguous
().
view
(
-
1
)
for
key
in
keys
],
dim
=
0
).
cuda
()
else
:
flatten_data
=
torch
.
empty
(
total_numel
,
device
=
torch
.
cuda
.
current_device
(),
dtype
=
datatype
)
# Boradcast
torch
.
distributed
.
broadcast
(
flatten_data
,
get_model_parallel_src_rank
(),
group
=
get_model_parallel_group
())
# Unpack
output
=
{}
offset
=
0
for
key
in
keys
:
size
=
key_size
[
key
]
numel
=
key_numel
[
key
]
output
[
key
]
=
flatten_data
.
narrow
(
0
,
offset
,
numel
).
view
(
size
)
offset
+=
numel
return
output
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/grads.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Parts of the code here are adapted from PyTorch
# repo: https://github.com/pytorch/pytorch
import
torch
from
torch._six
import
inf
try
:
from
apex.multi_tensor_apply
import
multi_tensor_applier
import
amp_C
except
Exception
as
e
:
print
(
'WARNING: APEX is not installed, multi_tensor_applier will not be available.'
)
from
.initialize
import
get_model_parallel_group
from
.initialize
import
get_model_parallel_rank
def
l2_grad_clipper
(
parameters
,
max_norm
):
"""Efficient L2 norm gradient clipping."""
overflow_buf
=
torch
.
zeros
(
1
,
dtype
=
torch
.
int
,
device
=
'cuda'
)
# Make sure we have an iterable.
if
isinstance
(
parameters
,
torch
.
Tensor
):
parameters
=
[
parameters
]
# Filter parameters with gradients.
parameters_with_grads
=
list
(
filter
(
lambda
p
:
p
.
grad
is
not
None
,
parameters
))
# Filter parameters for norm calculations.
mp_rank_is_zero
=
(
get_model_parallel_rank
()
==
0
)
parameters_for_norm
=
list
(
filter
(
lambda
p
:
p
.
model_parallel
or
mp_rank_is_zero
,
parameters_with_grads
))
# Calculate L2 norm.
norm
,
_
=
multi_tensor_applier
(
amp_C
.
multi_tensor_l2norm
,
overflow_buf
,
[
parameters_for_norm
],
False
# no per-parameter norm
)
# Sum across all model parallel GPUs.
norm_2
=
norm
*
norm
torch
.
distributed
.
all_reduce
(
norm_2
,
op
=
torch
.
distributed
.
ReduceOp
.
SUM
,
group
=
get_model_parallel_group
())
total_norm
=
norm_2
.
item
()
**
0.5
# Scale to get max_norm.
clip_coef
=
float
(
max_norm
)
/
(
total_norm
+
1.0e-6
)
grads
=
[
p
.
grad
for
p
in
parameters_with_grads
]
if
clip_coef
<
1.0
:
multi_tensor_applier
(
amp_C
.
multi_tensor_scale
,
overflow_buf
,
[
grads
,
grads
],
clip_coef
)
return
total_norm
def
clip_grad_norm
(
parameters
,
max_norm
,
norm_type
=
2
):
"""Clips gradient norm of an iterable of parameters.
This is adapted from torch.nn.utils.clip_grad.clip_grad_norm_ and
added functionality to handle model parallel parameters. Note that
the gradients are modified in place.
Arguments:
parameters (Iterable[Tensor] or Tensor): an iterable of Tensors or a
single Tensor that will have gradients normalized
max_norm (float or int): max norm of the gradients
norm_type (float or int): type of the used p-norm. Can be ``'inf'`` for
infinity norm.
Returns:
Total norm of the parameters (viewed as a single vector).
"""
if
isinstance
(
parameters
,
torch
.
Tensor
):
parameters
=
[
parameters
]
parameters
=
list
(
filter
(
lambda
p
:
p
.
grad
is
not
None
,
parameters
))
max_norm
=
float
(
max_norm
)
norm_type
=
float
(
norm_type
)
if
norm_type
==
inf
:
total_norm
=
max
(
p
.
grad
.
data
.
abs
().
max
()
for
p
in
parameters
)
total_norm_cuda
=
torch
.
cuda
.
FloatTensor
([
float
(
total_norm
)])
# Take max across all GPUs.
torch
.
distributed
.
all_reduce
(
total_norm_cuda
,
op
=
torch
.
distributed
.
ReduceOp
.
MAX
,
group
=
get_model_parallel_group
())
total_norm
=
total_norm_cuda
[
0
].
item
()
clip_coef
=
max_norm
/
(
total_norm
+
1e-6
)
if
clip_coef
<
1
:
for
p
in
parameters
:
p
.
grad
.
data
.
mul_
(
clip_coef
)
#elif norm_type == 2:
# total_norm = l2_grad_clipper(parameters, max_norm)
else
:
total_norm
=
0
for
p
in
parameters
:
if
p
.
model_parallel
or
(
get_model_parallel_rank
()
==
0
):
param_norm
=
p
.
grad
.
data
.
norm
(
norm_type
)
total_norm
+=
param_norm
.
item
()
**
norm_type
# Sum across all model parallel GPUs.
total_norm_cuda
=
torch
.
cuda
.
FloatTensor
([
float
(
total_norm
)])
torch
.
distributed
.
all_reduce
(
total_norm_cuda
,
op
=
torch
.
distributed
.
ReduceOp
.
SUM
,
group
=
get_model_parallel_group
())
total_norm
=
total_norm_cuda
[
0
].
item
()
**
(
1.
/
norm_type
)
clip_coef
=
max_norm
/
(
total_norm
+
1e-6
)
if
clip_coef
<
1
:
for
p
in
parameters
:
p
.
grad
.
data
.
mul_
(
clip_coef
)
return
total_norm
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/initialize.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Model and data parallel groups."""
import
torch
from
.utils
import
ensure_divisibility
# Model parallel group that the current rank belongs to.
_MODEL_PARALLEL_GROUP
=
None
# Data parallel group that the current rank belongs to.
_DATA_PARALLEL_GROUP
=
None
# Pipeline parallel group that the current rank belongs to.
_PIPE_PARALLEL_GROUP
=
None
# A group used to sync during the IO process. Usually this is data_parallel_group(),
# but with pipeline parallelism it must also involve the last stage (which is not in the
# DP group of rank 0)
_IO_PARALLEL_GROUP
=
None
# These values enable us to change the mpu sizes on the fly.
_MPU_WORLD_SIZE
=
None
_MPU_RANK
=
None
# Used to query 3D topology
_MPU_TOPOLOGY
=
None
def
is_unitialized
():
"""Useful for code segments that may be accessed with or without mpu initialization"""
return
_DATA_PARALLEL_GROUP
is
None
def
initialize_model_parallel
(
model_parallel_size_
,
topology
=
None
):
"""
Initialize model data parallel groups.
Arguments:
model_parallel_size: number of GPUs used to parallelize model.
Let's say we have a total of 8 GPUs denoted by g0 ... g7 and we
use 2 GPUs to parallelize the model. The present function will
create 4 model parallel groups and 2 data parallel grous as:
4 model parallel groups:
[g0, g1], [g2, g3], [g4, g5], [g6, g7]
2 data parallel groups:
[g0, g2, g4, g6], [g1, g3, g5, g7]
Note that for efficiency, the caller should make sure adjacent ranks
are on the same DGX box. For example if we are using 2 DGX-1 boxes
with a total of 16 GPUs, rank 0 to 7 belong to the first box and
ranks 8 to 15 belong to the second box.
"""
if
torch
.
distributed
.
get_rank
()
==
0
:
print
(
'> initializing model parallel with size {}'
.
format
(
model_parallel_size_
))
# Get world size and rank. Ensure some consistencies.
assert
torch
.
distributed
.
is_initialized
()
world_size
=
torch
.
distributed
.
get_world_size
()
model_parallel_size
=
min
(
model_parallel_size_
,
world_size
)
ensure_divisibility
(
world_size
,
model_parallel_size
)
rank
=
torch
.
distributed
.
get_rank
()
global
_MPU_TOPOLOGY
if
topology
:
_MPU_TOPOLOGY
=
topology
# Build the data parallel groups.
global
_DATA_PARALLEL_GROUP
assert
_DATA_PARALLEL_GROUP
is
None
,
\
'data parallel group is already initialized'
if
topology
:
for
dp_group
in
topology
.
get_axis_comm_lists
(
'data'
):
group
=
torch
.
distributed
.
new_group
(
ranks
=
dp_group
)
if
rank
==
0
:
print
(
f
'MPU DP:'
,
dp_group
)
if
rank
in
dp_group
:
_DATA_PARALLEL_GROUP
=
group
else
:
for
i
in
range
(
model_parallel_size
):
ranks
=
range
(
i
,
world_size
,
model_parallel_size
)
group
=
torch
.
distributed
.
new_group
(
ranks
)
if
i
==
(
rank
%
model_parallel_size
):
_DATA_PARALLEL_GROUP
=
group
# Build pipeline parallel group
if
topology
is
not
None
:
global
_PIPE_PARALLEL_GROUP
for
pp_group
in
topology
.
get_axis_comm_lists
(
'pipe'
):
group
=
torch
.
distributed
.
new_group
(
ranks
=
pp_group
)
if
rank
==
0
:
print
(
f
'MPU PP:'
,
pp_group
)
if
rank
in
pp_group
:
_PIPE_PARALLEL_GROUP
=
group
# Build IO group
global
_IO_PARALLEL_GROUP
if
topology
and
topology
.
get_dim
(
'pipe'
)
>
1
:
io_stages
=
[
0
,
topology
.
get_dim
(
'pipe'
)
-
1
]
io_group
=
[]
for
stage
in
io_stages
:
io_group
.
extend
(
topology
.
filter_match
(
pipe
=
stage
,
model
=
0
))
if
rank
==
0
:
print
(
f
'MPU IO:'
,
io_group
)
group
=
torch
.
distributed
.
new_group
(
ranks
=
io_group
)
if
rank
in
io_group
:
_IO_PARALLEL_GROUP
=
group
else
:
_IO_PARALLEL_GROUP
=
get_data_parallel_group
()
# Build the model parallel groups.
global
_MODEL_PARALLEL_GROUP
assert
_MODEL_PARALLEL_GROUP
is
None
,
\
'model parallel group is already initialized'
if
topology
:
# Short circuit case without model parallelism.
# TODO: it would be nice to avoid this branching case?
if
model_parallel_size
==
1
:
for
group_rank
in
range
(
world_size
):
group
=
torch
.
distributed
.
new_group
(
ranks
=
[
group_rank
])
if
rank
==
0
:
print
(
f
'MPU MP:'
,
[
group_rank
])
if
rank
==
group_rank
:
_MODEL_PARALLEL_GROUP
=
group
return
for
mp_group
in
topology
.
get_axis_comm_lists
(
'model'
):
group
=
torch
.
distributed
.
new_group
(
ranks
=
mp_group
)
if
rank
==
0
:
print
(
f
'MPU MP:'
,
mp_group
)
if
rank
in
mp_group
:
_MODEL_PARALLEL_GROUP
=
group
else
:
for
i
in
range
(
world_size
//
model_parallel_size
):
ranks
=
range
(
i
*
model_parallel_size
,
(
i
+
1
)
*
model_parallel_size
)
group
=
torch
.
distributed
.
new_group
(
ranks
)
if
i
==
(
rank
//
model_parallel_size
):
_MODEL_PARALLEL_GROUP
=
group
def
model_parallel_is_initialized
():
"""Check if model and data parallel groups are initialized."""
if
_MODEL_PARALLEL_GROUP
is
None
or
_DATA_PARALLEL_GROUP
is
None
:
return
False
return
True
def
get_model_parallel_group
():
"""Get the model parallel group the caller rank belongs to."""
assert
_MODEL_PARALLEL_GROUP
is
not
None
,
\
'model parallel group is not initialized'
return
_MODEL_PARALLEL_GROUP
def
get_data_parallel_group
():
"""Get the data parallel group the caller rank belongs to."""
assert
_DATA_PARALLEL_GROUP
is
not
None
,
\
'data parallel group is not initialized'
return
_DATA_PARALLEL_GROUP
def
get_io_parallel_group
():
"""Get the IO parallel group the caller rank belongs to."""
assert
_IO_PARALLEL_GROUP
is
not
None
,
\
'IO parallel group is not initialized'
return
_IO_PARALLEL_GROUP
def
set_model_parallel_world_size
(
world_size
):
"""Set the model parallel size"""
global
_MPU_WORLD_SIZE
_MPU_WORLD_SIZE
=
world_size
def
get_model_parallel_world_size
():
"""Return world size for the model parallel group."""
global
_MPU_WORLD_SIZE
if
_MPU_WORLD_SIZE
is
not
None
:
return
_MPU_WORLD_SIZE
return
torch
.
distributed
.
get_world_size
(
group
=
get_model_parallel_group
())
def
set_model_parallel_rank
(
rank
):
"""Set model parallel rank."""
global
_MPU_RANK
_MPU_RANK
=
rank
def
get_model_parallel_rank
():
"""Return my rank for the model parallel group."""
global
_MPU_RANK
if
_MPU_RANK
is
not
None
:
return
_MPU_RANK
return
torch
.
distributed
.
get_rank
(
group
=
get_model_parallel_group
())
def
get_model_parallel_src_rank
():
"""Calculate the global rank corresponding to a local rank zeor
in the model parallel group."""
global_rank
=
torch
.
distributed
.
get_rank
()
local_world_size
=
get_model_parallel_world_size
()
return
(
global_rank
//
local_world_size
)
*
local_world_size
def
get_data_parallel_world_size
():
"""Return world size for the data parallel group."""
return
torch
.
distributed
.
get_world_size
(
group
=
get_data_parallel_group
())
def
get_data_parallel_rank
():
"""Return my rank for the data parallel group."""
return
torch
.
distributed
.
get_rank
(
group
=
get_data_parallel_group
())
def
get_topology
():
return
_MPU_TOPOLOGY
def
get_pipe_parallel_group
():
"""Get the pipe parallel group the caller rank belongs to."""
assert
_PIPE_PARALLEL_GROUP
is
not
None
,
\
'data parallel group is not initialized'
return
_PIPE_PARALLEL_GROUP
def
get_pipe_parallel_rank
():
"""Return my rank for the pipe parallel group."""
return
torch
.
distributed
.
get_rank
(
group
=
get_pipe_parallel_group
())
def
get_pipe_parallel_world_size
():
"""Return world size for the pipe parallel group."""
return
torch
.
distributed
.
get_world_size
(
group
=
get_pipe_parallel_group
())
def
destroy_model_parallel
():
"""Set the groups to none."""
global
_MODEL_PARALLEL_GROUP
_MODEL_PARALLEL_GROUP
=
None
global
_DATA_PARALLEL_GROUP
_DATA_PARALLEL_GROUP
=
None
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/layers.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Parts of the code here are adapted from PyTorch
# repo: https://github.com/pytorch/pytorch
import
math
import
torch
import
torch.nn.functional
as
F
import
torch.nn.init
as
init
from
torch.nn.parameter
import
Parameter
try
:
from
apex.normalization.fused_layer_norm
import
FusedLayerNorm
as
LayerNorm
# Try to use FusedLayerNorm from Apex - this will trigger an error.
_
=
LayerNorm
(
8
,
eps
=
1e-5
)
except
Exception
as
e
:
print
(
'WARNING: APEX is not installed, using torch.nn.LayerNorm '
'instead of apex.normalization.FusedLayerNorm!'
)
from
torch.nn
import
LayerNorm
from
.initialize
import
get_model_parallel_rank
from
.initialize
import
get_model_parallel_world_size
from
.mappings
import
copy_to_model_parallel_region
from
.mappings
import
gather_from_model_parallel_region
from
.mappings
import
reduce_from_model_parallel_region
from
.mappings
import
scatter_to_model_parallel_region
from
.random
import
get_cuda_rng_tracker
from
.utils
import
divide
from
.utils
import
split_tensor_along_last_dim
from
.utils
import
VocabUtility
from
megatron
import
get_args
def
_initialize_affine_weight_gpu
(
weight
,
init_method
,
partition_dim
,
stride
=
1
):
"""Initialize affine weight for model parallel on GPU."""
weight
.
model_parallel
=
True
weight
.
partition_dim
=
partition_dim
weight
.
partition_stride
=
stride
with
get_cuda_rng_tracker
().
fork
():
init_method
(
weight
)
def
_initialize_affine_weight_cpu
(
weight
,
output_size
,
input_size
,
per_partition_size
,
partition_dim
,
init_method
,
stride
=
1
,
return_master_weight
=
False
):
"""Initialize affine weight for model parallel.
Build the master weight on all processes and scatter
the relevant chunk."""
weight
.
model_parallel
=
True
weight
.
partition_dim
=
partition_dim
weight
.
partition_stride
=
stride
# Initialize master weight
master_weight
=
torch
.
empty
(
output_size
,
input_size
,
dtype
=
torch
.
float
,
requires_grad
=
False
)
init_method
(
master_weight
)
args
=
get_args
()
master_weight
=
master_weight
.
to
(
dtype
=
args
.
params_dtype
)
# Split and copy
per_partition_per_stride_size
=
divide
(
per_partition_size
,
stride
)
weight_list
=
torch
.
split
(
master_weight
,
per_partition_per_stride_size
,
dim
=
partition_dim
)
rank
=
get_model_parallel_rank
()
world_size
=
get_model_parallel_world_size
()
my_weight_list
=
weight_list
[
rank
::
world_size
]
with
torch
.
no_grad
():
torch
.
cat
(
my_weight_list
,
dim
=
partition_dim
,
out
=
weight
)
if
return_master_weight
:
return
master_weight
return
None
class
VocabParallelEmbedding
(
torch
.
nn
.
Module
):
"""Embedding parallelized in the vocabulary dimension.
This is mainly adapted from torch.nn.Embedding and all the default
values are kept.
Arguments:
num_embeddings: vocabulary size.
embedding_dim: size of hidden state.
init_method: method to initialize weights.
"""
def
__init__
(
self
,
num_embeddings
,
embedding_dim
,
init_method
=
init
.
xavier_normal_
):
super
(
VocabParallelEmbedding
,
self
).
__init__
()
# Keep the input dimensions.
self
.
num_embeddings
=
num_embeddings
self
.
embedding_dim
=
embedding_dim
# Set the detauls for compatibility.
self
.
padding_idx
=
None
self
.
max_norm
=
None
self
.
norm_type
=
2.
self
.
scale_grad_by_freq
=
False
self
.
sparse
=
False
self
.
_weight
=
None
self
.
model_parallel_size
=
get_model_parallel_world_size
()
# Divide the weight matrix along the vocaburaly dimension.
self
.
vocab_start_index
,
self
.
vocab_end_index
=
\
VocabUtility
.
vocab_range_from_global_vocab_size
(
self
.
num_embeddings
,
get_model_parallel_rank
(),
self
.
model_parallel_size
)
self
.
num_embeddings_per_partition
=
self
.
vocab_end_index
-
\
self
.
vocab_start_index
# Allocate weights and initialize.
args
=
get_args
()
if
args
.
use_cpu_initialization
:
self
.
weight
=
Parameter
(
torch
.
empty
(
self
.
num_embeddings_per_partition
,
self
.
embedding_dim
,
dtype
=
args
.
params_dtype
))
_initialize_affine_weight_cpu
(
self
.
weight
,
self
.
num_embeddings
,
self
.
embedding_dim
,
self
.
num_embeddings_per_partition
,
0
,
init_method
)
else
:
self
.
weight
=
Parameter
(
torch
.
empty
(
self
.
num_embeddings_per_partition
,
self
.
embedding_dim
,
device
=
torch
.
cuda
.
current_device
(),
dtype
=
args
.
params_dtype
))
_initialize_affine_weight_gpu
(
self
.
weight
,
init_method
,
partition_dim
=
0
,
stride
=
1
)
def
forward
(
self
,
input_
):
if
self
.
model_parallel_size
>
1
:
# Build the mask.
input_mask
=
(
input_
<
self
.
vocab_start_index
)
|
\
(
input_
>=
self
.
vocab_end_index
)
# Mask the input.
masked_input
=
input_
.
clone
()
-
self
.
vocab_start_index
masked_input
[
input_mask
]
=
0
else
:
masked_input
=
input_
# Get the embeddings.
output_parallel
=
F
.
embedding
(
masked_input
,
self
.
weight
,
self
.
padding_idx
,
self
.
max_norm
,
self
.
norm_type
,
self
.
scale_grad_by_freq
,
self
.
sparse
)
# Mask the output embedding.
if
self
.
model_parallel_size
>
1
:
output_parallel
[
input_mask
,
:]
=
0.0
# Reduce across all the model parallel GPUs.
output
=
reduce_from_model_parallel_region
(
output_parallel
)
return
output
class
ColumnParallelLinear
(
torch
.
nn
.
Module
):
"""Linear layer with column parallelism.
The linear layer is defined as Y = XA + b. A is parallelized along
its second dimension as A = [A_1, ..., A_p].
Arguments:
input_size: first dimension of matrix A.
output_size: second dimension of matrix A.
bias: If true, add bias
gather_output: If true, call all-gether on output and make Y avaiable
to all GPUs, otherwise, every GPU will have its output
which is Y_i = XA_i
init_method: method to initialize weights. Note that bias is always set
to zero.
stride: For the strided linear layers.
keep_master_weight_for_test: This was added for testing and should be
set to False. It returns the master weights
used for initialization.
skip_bias_add: This was added to enable performance optimations where bias
can be fused with other elementwise operations. we skip
adding bias but instead return it.
"""
def
__init__
(
self
,
input_size
,
output_size
,
bias
=
True
,
gather_output
=
True
,
init_method
=
init
.
xavier_normal_
,
stride
=
1
,
keep_master_weight_for_test
=
False
,
skip_bias_add
=
False
):
super
(
ColumnParallelLinear
,
self
).
__init__
()
# Keep input parameters
self
.
input_size
=
input_size
self
.
output_size
=
output_size
self
.
gather_output
=
gather_output
# Divide the weight matrix along the last dimension.
world_size
=
get_model_parallel_world_size
()
self
.
output_size_per_partition
=
divide
(
output_size
,
world_size
)
self
.
skip_bias_add
=
skip_bias_add
# Parameters.
# Note: torch.nn.functional.linear performs XA^T + b and as a result
# we allocate the transpose.
# Initialize weight.
args
=
get_args
()
if
args
.
use_cpu_initialization
:
self
.
weight
=
Parameter
(
torch
.
empty
(
self
.
output_size_per_partition
,
self
.
input_size
,
dtype
=
args
.
params_dtype
))
self
.
master_weight
=
_initialize_affine_weight_cpu
(
self
.
weight
,
self
.
output_size
,
self
.
input_size
,
self
.
output_size_per_partition
,
0
,
init_method
,
stride
=
stride
,
return_master_weight
=
keep_master_weight_for_test
)
else
:
self
.
weight
=
Parameter
(
torch
.
empty
(
self
.
output_size_per_partition
,
self
.
input_size
,
device
=
torch
.
cuda
.
current_device
(),
dtype
=
args
.
params_dtype
))
_initialize_affine_weight_gpu
(
self
.
weight
,
init_method
,
partition_dim
=
0
,
stride
=
stride
)
if
bias
:
if
args
.
use_cpu_initialization
:
self
.
bias
=
Parameter
(
torch
.
empty
(
self
.
output_size_per_partition
,
dtype
=
args
.
params_dtype
))
else
:
self
.
bias
=
Parameter
(
torch
.
empty
(
self
.
output_size_per_partition
,
device
=
torch
.
cuda
.
current_device
(),
dtype
=
args
.
params_dtype
))
self
.
bias
.
model_parallel
=
True
self
.
bias
.
partition_dim
=
0
self
.
bias
.
stride
=
stride
# Always initialize bias to zero.
with
torch
.
no_grad
():
self
.
bias
.
zero_
()
else
:
self
.
register_parameter
(
'bias'
,
None
)
def
forward
(
self
,
input_
):
# Set up backprop all-reduce.
input_parallel
=
copy_to_model_parallel_region
(
input_
)
# Matrix multiply.
bias
=
self
.
bias
if
not
self
.
skip_bias_add
else
None
output_parallel
=
F
.
linear
(
input_parallel
,
self
.
weight
,
bias
)
if
self
.
gather_output
:
# All-gather across the partitions.
output
=
gather_from_model_parallel_region
(
output_parallel
)
else
:
output
=
output_parallel
output_bias
=
self
.
bias
if
self
.
skip_bias_add
else
None
return
output
,
output_bias
class
RowParallelLinear
(
torch
.
nn
.
Module
):
"""Linear layer with row parallelism.
The linear layer is defined as Y = XA + b. A is parallelized along
its first dimension and X along its second dimension as:
- -
| A_1 |
| . |
A = | . | X = [X_1, ..., X_p]
| . |
| A_p |
- -
Arguments:
input_size: first dimension of matrix A.
output_size: second dimension of matrix A.
bias: If true, add bias. Note that bias is not parallelized.
input_is_parallel: If true, we assume that the input is already
split across the GPUs and we do not split
again.
init_method: method to initialize weights. Note that bias is always set
to zero.
stride: For the strided linear layers.
keep_master_weight_for_test: This was added for testing and should be
set to False. It returns the master weights
used for initialization.
skip_bias_add: This was added to enable performance optimations where bias
can be fused with other elementwise operations. we skip
adding bias but instead return it.
"""
def
__init__
(
self
,
input_size
,
output_size
,
bias
=
True
,
input_is_parallel
=
False
,
init_method
=
init
.
xavier_normal_
,
stride
=
1
,
keep_master_weight_for_test
=
False
,
skip_bias_add
=
False
):
super
(
RowParallelLinear
,
self
).
__init__
()
# Keep input parameters
self
.
input_size
=
input_size
self
.
output_size
=
output_size
self
.
input_is_parallel
=
input_is_parallel
# Divide the weight matrix along the last dimension.
world_size
=
get_model_parallel_world_size
()
self
.
input_size_per_partition
=
divide
(
input_size
,
world_size
)
self
.
skip_bias_add
=
skip_bias_add
# Parameters.
# Note: torch.nn.functional.linear performs XA^T + b and as a result
# we allocate the transpose.
# Initialize weight.
args
=
get_args
()
if
args
.
use_cpu_initialization
:
self
.
weight
=
Parameter
(
torch
.
empty
(
self
.
output_size
,
self
.
input_size_per_partition
,
dtype
=
args
.
params_dtype
))
self
.
master_weight
=
_initialize_affine_weight_cpu
(
self
.
weight
,
self
.
output_size
,
self
.
input_size
,
self
.
input_size_per_partition
,
1
,
init_method
,
stride
=
stride
,
return_master_weight
=
keep_master_weight_for_test
)
else
:
self
.
weight
=
Parameter
(
torch
.
empty
(
self
.
output_size
,
self
.
input_size_per_partition
,
device
=
torch
.
cuda
.
current_device
(),
dtype
=
args
.
params_dtype
))
_initialize_affine_weight_gpu
(
self
.
weight
,
init_method
,
partition_dim
=
1
,
stride
=
stride
)
if
bias
:
if
args
.
use_cpu_initialization
:
self
.
bias
=
Parameter
(
torch
.
empty
(
self
.
output_size
,
dtype
=
args
.
params_dtype
))
else
:
self
.
bias
=
Parameter
(
torch
.
empty
(
self
.
output_size
,
device
=
torch
.
cuda
.
current_device
(),
dtype
=
args
.
params_dtype
))
# Always initialize bias to zero.
with
torch
.
no_grad
():
self
.
bias
.
zero_
()
else
:
self
.
register_parameter
(
'bias'
,
None
)
def
forward
(
self
,
input_
):
# Set up backprop all-reduce.
if
self
.
input_is_parallel
:
input_parallel
=
input_
else
:
input_parallel
=
scatter_to_model_parallel_region
(
input_
)
# Matrix multiply.
output_parallel
=
F
.
linear
(
input_parallel
,
self
.
weight
)
# All-reduce across all the partitions.
output_
=
reduce_from_model_parallel_region
(
output_parallel
)
if
not
self
.
skip_bias_add
:
output
=
output_
+
self
.
bias
if
self
.
bias
is
not
None
else
output_
output_bias
=
None
else
:
output
=
output_
output_bias
=
self
.
bias
return
output
,
output_bias
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/mappings.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
torch
from
.initialize
import
get_model_parallel_group
,
get_model_parallel_world_size
,
get_model_parallel_rank
from
.utils
import
split_tensor_along_last_dim
def
_reduce
(
input_
):
"""All-reduce the the input tensor across model parallel group."""
# Bypass the function if we are using only 1 GPU.
if
get_model_parallel_world_size
()
==
1
:
return
input_
# All-reduce.
torch
.
distributed
.
all_reduce
(
input_
,
group
=
get_model_parallel_group
())
return
input_
def
_split
(
input_
):
"""Split the tensor along its last dimension and keep the
corresponding slice."""
world_size
=
get_model_parallel_world_size
()
# Bypass the function if we are using only 1 GPU.
if
world_size
==
1
:
return
input_
# Split along last dimension.
input_list
=
split_tensor_along_last_dim
(
input_
,
world_size
)
# Note: torch.split does not create contiguous tensors by default.
rank
=
get_model_parallel_rank
()
output
=
input_list
[
rank
].
contiguous
()
return
output
def
_gather
(
input_
):
"""Gather tensors and concatinate along the last dimension."""
world_size
=
get_model_parallel_world_size
()
# Bypass the function if we are using only 1 GPU.
if
world_size
==
1
:
return
input_
# Size and dimension.
last_dim
=
input_
.
dim
()
-
1
rank
=
get_model_parallel_rank
()
tensor_list
=
[
torch
.
empty_like
(
input_
)
for
_
in
range
(
world_size
)]
tensor_list
[
rank
]
=
input_
torch
.
distributed
.
all_gather
(
tensor_list
,
input_
,
group
=
get_model_parallel_group
())
# Note: torch.cat already creates a contiguous tensor.
output
=
torch
.
cat
(
tensor_list
,
dim
=
last_dim
).
contiguous
()
return
output
class
_CopyToModelParallelRegion
(
torch
.
autograd
.
Function
):
"""Pass the input to the model parallel region."""
@
staticmethod
def
symbolic
(
graph
,
input_
):
return
input_
@
staticmethod
def
forward
(
ctx
,
input_
):
return
input_
@
staticmethod
def
backward
(
ctx
,
grad_output
):
return
_reduce
(
grad_output
)
class
_ReduceFromModelParallelRegion
(
torch
.
autograd
.
Function
):
"""All-redcue the input from the model parallel region."""
@
staticmethod
def
symbolic
(
graph
,
input_
):
return
_reduce
(
input_
)
@
staticmethod
def
forward
(
ctx
,
input_
):
return
_reduce
(
input_
)
@
staticmethod
def
backward
(
ctx
,
grad_output
):
return
grad_output
class
_ScatterToModelParallelRegion
(
torch
.
autograd
.
Function
):
"""Split the input and keep only the corresponding chuck to the rank."""
@
staticmethod
def
symbolic
(
graph
,
input_
):
return
_split
(
input_
)
@
staticmethod
def
forward
(
ctx
,
input_
):
return
_split
(
input_
)
@
staticmethod
def
backward
(
ctx
,
grad_output
):
return
_gather
(
grad_output
)
class
_GatherFromModelParallelRegion
(
torch
.
autograd
.
Function
):
"""Gather the input from model parallel region and concatinate."""
@
staticmethod
def
symbolic
(
graph
,
input_
):
return
_gather
(
input_
)
@
staticmethod
def
forward
(
ctx
,
input_
):
return
_gather
(
input_
)
@
staticmethod
def
backward
(
ctx
,
grad_output
):
return
_split
(
grad_output
)
# -----------------
# Helper functions.
# -----------------
def
copy_to_model_parallel_region
(
input_
):
return
_CopyToModelParallelRegion
.
apply
(
input_
)
def
reduce_from_model_parallel_region
(
input_
):
return
_ReduceFromModelParallelRegion
.
apply
(
input_
)
def
scatter_to_model_parallel_region
(
input_
):
return
_ScatterToModelParallelRegion
.
apply
(
input_
)
def
gather_from_model_parallel_region
(
input_
):
return
_GatherFromModelParallelRegion
.
apply
(
input_
)
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/random.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Parts of the code here are adapted from PyTorch
# repo: https://github.com/pytorch/pytorch
import
contextlib
import
torch
from
torch
import
_C
from
torch.cuda
import
_lazy_call
,
device
as
device_ctx_manager
from
torch.utils.checkpoint
import
detach_variable
from
megatron
import
get_args
from
megatron.memory
import
allocate_mem_buff
from
.initialize
import
get_data_parallel_rank
from
.initialize
import
get_model_parallel_group
from
.initialize
import
get_model_parallel_rank
from
.initialize
import
get_model_parallel_world_size
# Default name for the model parallel rng tracker.
_MODEL_PARALLEL_RNG_TRACKER_NAME
=
'model-parallel-rng'
# Whether apply model parallelsim to checkpointed hidden states.
_CHECKPOINTED_ACTIVATIONS_MEMORY_BUFFER
=
None
def
init_checkpointed_activations_memory_buffer
():
"""Initializ the memory buffer for the checkpointed activations."""
args
=
get_args
()
per_layer
=
args
.
batch_size
*
args
.
max_position_embeddings
*
\
args
.
hidden_size
//
args
.
model_parallel_size
assert
args
.
num_layers
%
args
.
checkpoint_num_layers
==
0
,
\
'number of layers is not divisible by checkpoint-num-layers'
num_checkpointer_layers
=
args
.
num_layers
//
args
.
checkpoint_num_layers
numel
=
per_layer
*
num_checkpointer_layers
dtype
=
torch
.
half
if
not
args
.
fp16
:
dtype
=
torch
.
float
global
_CHECKPOINTED_ACTIVATIONS_MEMORY_BUFFER
assert
_CHECKPOINTED_ACTIVATIONS_MEMORY_BUFFER
is
None
,
\
'checkpointed activations memory buffer is already allocated.'
_CHECKPOINTED_ACTIVATIONS_MEMORY_BUFFER
=
allocate_mem_buff
(
'checkpointed activations'
,
numel
,
dtype
,
track_usage
=
False
)
def
reset_checkpointed_activations_memory_buffer
():
"""Reset the memory used for checkpointing."""
if
_CHECKPOINTED_ACTIVATIONS_MEMORY_BUFFER
is
not
None
:
_CHECKPOINTED_ACTIVATIONS_MEMORY_BUFFER
.
reset
()
def
_set_cuda_rng_state
(
new_state
,
device
=-
1
):
"""Sets the random number generator state of the current GPU.
Argumentss:
new_state (torch.ByteTensor): The desired state
This function is adapted from PyTorch repo (torch.cuda.set_rng_state)
with a single change: the input state is not cloned. Cloning caused
major performance issues for +4 GPU cases.
"""
if
hasattr
(
_C
,
'_cuda_setRNGState'
)
and
callable
(
_C
.
_cuda_setRNGState
):
# older PyTorch
def
cb
():
with
device_ctx_manager
(
device
):
_C
.
_cuda_setRNGState
(
new_state
)
else
:
# newer PyTorch
if
device
==
-
1
:
device
=
torch
.
device
(
'cuda'
)
elif
isinstance
(
device
,
str
):
device
=
torch
.
device
(
device
)
elif
isinstance
(
device
,
int
):
device
=
torch
.
device
(
'cuda'
,
device
)
def
cb
():
idx
=
device
.
index
if
idx
is
None
:
idx
=
torch
.
cuda
.
current_device
()
default_generator
=
torch
.
cuda
.
default_generators
[
idx
]
default_generator
.
set_state
(
new_state
)
_lazy_call
(
cb
)
def
split_tensor_into_1d_equal_chunks
(
tensor
):
"""Break a tensor into equal 1D chunks."""
data
=
tensor
.
view
(
-
1
)
partition_size
=
torch
.
numel
(
data
)
//
get_model_parallel_world_size
()
start_index
=
partition_size
*
get_model_parallel_rank
()
end_index
=
start_index
+
partition_size
return
data
[
start_index
:
end_index
]
def
gather_split_1d_tensor
(
tensor
):
"""Opposite of above function, gather values from model parallel ranks."""
world_size
=
get_model_parallel_world_size
()
numel
=
torch
.
numel
(
tensor
)
numel_gathered
=
world_size
*
numel
gathered
=
torch
.
empty
(
numel_gathered
,
dtype
=
tensor
.
dtype
,
device
=
torch
.
cuda
.
current_device
(),
requires_grad
=
False
)
chunks
=
[
gathered
[
i
*
numel
:(
i
+
1
)
*
numel
]
for
i
in
range
(
world_size
)]
torch
.
distributed
.
all_gather
(
chunks
,
tensor
,
group
=
get_model_parallel_group
())
return
gathered
class
CudaRNGStatesTracker
:
"""Tracker for the cuda RNG states.
Using the `add` method, a cuda rng state is initialized based on
the input `seed` and is assigned to `name`. Later, by forking the
rng state, we can perform operations and return to our starting
cuda state.
"""
def
__init__
(
self
):
# Map from a string name to the cuda rng state.
self
.
states_
=
{}
# Seeds are just for book keeping and ensure no seed is set twice.
self
.
seeds_
=
set
()
def
reset
(
self
):
"""Set to the initial state (no tracker)."""
self
.
states_
=
{}
self
.
seeds_
=
set
()
def
get_states
(
self
):
"""Get rng states. Copy the dictionary so we have direct
pointers to the states, not just a pointer to the dictionary."""
states
=
{}
for
name
in
self
.
states_
:
states
[
name
]
=
self
.
states_
[
name
]
return
states
def
set_states
(
self
,
states
):
"""Set the rng states. For efficiency purposes, we do not check
the size of seed for compatibility."""
self
.
states_
=
states
def
add
(
self
,
name
,
seed
):
"""Track the rng state."""
# Check seed is not already used.
if
seed
in
self
.
seeds_
:
raise
Exception
(
'seed {} already exists'
.
format
(
seed
))
self
.
seeds_
.
add
(
seed
)
# Check that state is not already defined.
if
name
in
self
.
states_
:
raise
Exception
(
'cuda rng state {} already exists'
.
format
(
name
))
# Get the current rng state.
orig_rng_state
=
torch
.
cuda
.
get_rng_state
()
# Set the new state and store it.
torch
.
cuda
.
manual_seed
(
seed
)
self
.
states_
[
name
]
=
torch
.
cuda
.
get_rng_state
()
# Reset rng state to what it was.
_set_cuda_rng_state
(
orig_rng_state
)
@
contextlib
.
contextmanager
def
fork
(
self
,
name
=
_MODEL_PARALLEL_RNG_TRACKER_NAME
):
"""Fork the cuda rng state, perform operations, and exit with
the original state."""
# Check if we have added the state
if
name
not
in
self
.
states_
:
raise
Exception
(
'cuda rng state {} is not added'
.
format
(
name
))
# Store current rng state.
orig_cuda_rng_state
=
torch
.
cuda
.
get_rng_state
()
# Set rng state to the desired one
_set_cuda_rng_state
(
self
.
states_
[
name
])
# Do the stuff we wanted to do.
try
:
yield
finally
:
# Update the current rng state for later use.
self
.
states_
[
name
]
=
torch
.
cuda
.
get_rng_state
()
# And set the state to the original state we started with.
_set_cuda_rng_state
(
orig_cuda_rng_state
)
# RNG tracker object.
_CUDA_RNG_STATE_TRACKER
=
CudaRNGStatesTracker
()
def
get_cuda_rng_tracker
():
"""Get cuda rng tracker."""
return
_CUDA_RNG_STATE_TRACKER
def
model_parallel_cuda_manual_seed
(
seed
):
"""Initialize model parallel cuda seed.
This function should be called after the model parallel is
initialized. Also, no torch.cuda.manual_seed should be called
after this function. Basically, this is replacement for that
function.
Two set of RNG states are tracked:
default state: This is for data parallelism and is the same among a
set of model parallel GPUs but different across
different model paralle groups. This is used for
example for dropout in the non-model-parallel regions.
model-parallel state: This state is different among a set of model
parallel GPUs, but the same across data parallel
groups. This is used for example for dropout in
model parallel regions.
"""
# 2718 is just for fun and any POSITIVE value will work.
offset
=
seed
+
2718
model_parallel_seed
=
offset
+
get_model_parallel_rank
()
# Data parallel gets the original sedd.
data_parallel_seed
=
seed
if
torch
.
distributed
.
get_rank
()
==
0
:
print
(
'> initializing model parallel cuda seeds on global rank {}, '
'model parallel rank {}, and data parallel rank {} with '
'model parallel seed: {} and data parallel seed: {}'
.
format
(
torch
.
distributed
.
get_rank
(),
get_model_parallel_rank
(),
get_data_parallel_rank
(),
model_parallel_seed
,
data_parallel_seed
),
flush
=
True
)
_CUDA_RNG_STATE_TRACKER
.
reset
()
# Set the default state.
torch
.
cuda
.
manual_seed
(
data_parallel_seed
)
# and model parallel state.
_CUDA_RNG_STATE_TRACKER
.
add
(
_MODEL_PARALLEL_RNG_TRACKER_NAME
,
model_parallel_seed
)
class
CheckpointFunction
(
torch
.
autograd
.
Function
):
"""This function is adapted from torch.utils.checkpoint with
two main changes:
1) torch.cuda.set_rng_state is replaced with `_set_cuda_rng_state`
2) the states in the model parallel tracker are also properly
tracked/set/reset.
"""
@
staticmethod
def
forward
(
ctx
,
run_function
,
*
args
):
ctx
.
run_function
=
run_function
# Copy the rng states.
ctx
.
fwd_cpu_rng_state
=
torch
.
get_rng_state
()
ctx
.
fwd_cuda_rng_state
=
torch
.
cuda
.
get_rng_state
()
ctx
.
fwd_cuda_rng_state_tracker
=
get_cuda_rng_tracker
().
get_states
()
with
torch
.
no_grad
():
outputs
=
run_function
(
*
args
)
# Divide hidden states across model parallel group and only keep
# the chunk corresponding to the current rank.
if
_CHECKPOINTED_ACTIVATIONS_MEMORY_BUFFER
is
not
None
:
ctx
.
input_0_shape
=
args
[
0
].
data
.
shape
args
[
0
].
data
=
split_tensor_into_1d_equal_chunks
(
args
[
0
].
data
)
args
[
0
].
data
=
_CHECKPOINTED_ACTIVATIONS_MEMORY_BUFFER
.
add
(
args
[
0
].
data
)
# Store everything.
ctx
.
save_for_backward
(
*
args
)
return
outputs
@
staticmethod
def
backward
(
ctx
,
*
args
):
if
not
torch
.
autograd
.
_is_checkpoint_valid
():
raise
RuntimeError
(
"Checkpointing is not compatible with .grad(), "
"please use .backward() if possible"
)
inputs
=
ctx
.
saved_tensors
if
_CHECKPOINTED_ACTIVATIONS_MEMORY_BUFFER
is
not
None
:
inputs
[
0
].
data
=
gather_split_1d_tensor
(
inputs
[
0
].
data
)
inputs
[
0
].
data
=
inputs
[
0
].
data
.
view
(
ctx
.
input_0_shape
)
# Store the current states.
bwd_cpu_rng_state
=
torch
.
get_rng_state
()
bwd_cuda_rng_state
=
torch
.
cuda
.
get_rng_state
()
bwd_cuda_rng_state_tracker
=
get_cuda_rng_tracker
().
get_states
()
# Set the states to what it used to be before the forward pass.
torch
.
set_rng_state
(
ctx
.
fwd_cpu_rng_state
)
_set_cuda_rng_state
(
ctx
.
fwd_cuda_rng_state
)
get_cuda_rng_tracker
().
set_states
(
ctx
.
fwd_cuda_rng_state_tracker
)
# Compute the forward pass.
detached_inputs
=
detach_variable
(
inputs
)
with
torch
.
enable_grad
():
outputs
=
ctx
.
run_function
(
*
detached_inputs
)
# Set the states back to what it was at the start of this function.
torch
.
set_rng_state
(
bwd_cpu_rng_state
)
_set_cuda_rng_state
(
bwd_cuda_rng_state
)
get_cuda_rng_tracker
().
set_states
(
bwd_cuda_rng_state_tracker
)
if
isinstance
(
outputs
,
torch
.
Tensor
):
outputs
=
(
outputs
,)
torch
.
autograd
.
backward
(
outputs
,
args
)
grads
=
tuple
(
inp
.
grad
if
isinstance
(
inp
,
torch
.
Tensor
)
else
inp
for
inp
in
detached_inputs
)
return
(
None
,)
+
grads
def
checkpoint
(
function
,
*
args
):
"""Checkpoint a model or part of the model.
This has been directly copied from torch.utils.checkpoint."""
return
CheckpointFunction
.
apply
(
function
,
*
args
)
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/tests/__init__.py
0 → 100644
View file @
316d3f90
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/tests/commons.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import
argparse
import
os
import
random
import
numpy
import
torch
import
mpu
class
IdentityLayer
(
torch
.
nn
.
Module
):
def
__init__
(
self
,
size
,
scale
=
1.0
):
super
(
IdentityLayer
,
self
).
__init__
()
self
.
weight
=
torch
.
nn
.
Parameter
(
scale
*
torch
.
randn
(
size
))
def
forward
(
self
):
return
self
.
weight
def
set_random_seed
(
seed
):
"""Set random seed for reproducability."""
random
.
seed
(
seed
)
numpy
.
random
.
seed
(
seed
)
torch
.
manual_seed
(
seed
)
mpu
.
model_parallel_cuda_manual_seed
(
seed
)
def
initialize_distributed
(
backend
=
'nccl'
):
"""Initialize torch.distributed."""
# Get local rank in case it is provided.
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
'--local_rank'
,
type
=
int
,
default
=
None
,
help
=
'local rank passed from distributed launcher'
)
args
=
parser
.
parse_args
()
local_rank
=
args
.
local_rank
# Get rank and world size.
rank
=
int
(
os
.
getenv
(
'RANK'
,
'0'
))
world_size
=
int
(
os
.
getenv
(
"WORLD_SIZE"
,
'1'
))
print
(
'> initializing torch.distributed with local rank: {}, '
'rank: {}, world size: {}'
.
format
(
local_rank
,
rank
,
world_size
))
# Set the device id.
device
=
rank
%
torch
.
cuda
.
device_count
()
if
local_rank
is
not
None
:
device
=
local_rank
torch
.
cuda
.
set_device
(
device
)
# Call the init process.
init_method
=
'tcp://'
master_ip
=
os
.
getenv
(
'MASTER_ADDR'
,
'localhost'
)
master_port
=
os
.
getenv
(
'MASTER_PORT'
,
'6000'
)
init_method
+=
master_ip
+
':'
+
master_port
torch
.
distributed
.
init_process_group
(
backend
=
backend
,
world_size
=
world_size
,
rank
=
rank
,
init_method
=
init_method
)
def
print_separator
(
message
):
torch
.
distributed
.
barrier
()
filler_len
=
(
78
-
len
(
message
))
//
2
filler
=
'-'
*
filler_len
string
=
'
\n
'
+
filler
+
' {} '
.
format
(
message
)
+
filler
if
torch
.
distributed
.
get_rank
()
==
0
:
print
(
string
,
flush
=
True
)
torch
.
distributed
.
barrier
()
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/tests/test_cross_entropy.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
commons
import
set_random_seed
from
commons
import
IdentityLayer
from
commons
import
print_separator
from
commons
import
initialize_distributed
from
mpu.cross_entropy
import
vocab_parallel_cross_entropy
import
mpu
import
torch.nn.functional
as
F
import
torch
import
random
import
sys
sys
.
path
.
append
(
"../.."
)
def
torch_cross_entropy
(
batch_size
,
seq_length
,
vocab_size
,
logits_scale
,
seed
):
set_random_seed
(
seed
)
identity
=
IdentityLayer
((
batch_size
,
seq_length
,
vocab_size
),
scale
=
logits_scale
).
cuda
()
logits
=
identity
()
target
=
torch
.
cuda
.
LongTensor
(
size
=
(
batch_size
,
seq_length
)).
random_
(
0
,
vocab_size
)
loss
=
F
.
cross_entropy
(
logits
.
view
(
-
1
,
logits
.
size
()[
-
1
]),
target
.
view
(
-
1
),
reduction
=
'none'
).
view_as
(
target
).
mean
()
loss
.
backward
()
return
loss
,
identity
.
weight
.
grad
def
mpu_cross_entropy
(
batch_size
,
seq_length
,
vocab_size
,
logits_scale
,
seed
):
set_random_seed
(
seed
)
identity
=
IdentityLayer
((
batch_size
,
seq_length
,
vocab_size
),
scale
=
logits_scale
).
cuda
()
logits
=
identity
()
logits_parallel
=
mpu
.
scatter_to_model_parallel_region
(
logits
)
target
=
torch
.
cuda
.
LongTensor
(
size
=
(
batch_size
,
seq_length
)).
random_
(
0
,
vocab_size
)
loss
=
vocab_parallel_cross_entropy
(
logits_parallel
,
target
).
mean
()
loss
.
backward
()
return
loss
,
identity
.
weight
.
grad
def
test_cross_entropy
(
model_parallel_size
):
if
torch
.
distributed
.
get_rank
()
==
0
:
print
(
'> testing cross entropy with model parallel size {} ...'
.
format
(
model_parallel_size
))
mpu
.
initialize_model_parallel
(
model_parallel_size
)
model_parallel_size
=
mpu
.
get_model_parallel_world_size
()
batch_size
=
13
seq_length
=
17
vocab_size_per_partition
=
11
logits_scale
=
1000.0
vocab_size
=
vocab_size_per_partition
*
model_parallel_size
seed
=
1234
loss_torch
,
grad_torch
=
torch_cross_entropy
(
batch_size
,
seq_length
,
vocab_size
,
logits_scale
,
seed
)
loss_mpu
,
grad_mpu
=
mpu_cross_entropy
(
batch_size
,
seq_length
,
vocab_size
,
logits_scale
,
seed
)
error
=
loss_torch
.
sub_
(
loss_mpu
).
abs
().
max
()
print
(
' max error in loss on global rank {}: {}'
.
format
(
torch
.
distributed
.
get_rank
(),
error
))
assert
error
<
1.0e-6
error
=
grad_torch
.
sub_
(
grad_mpu
).
abs
().
max
()
print
(
' max error in grad on global rank {}: {}'
.
format
(
torch
.
distributed
.
get_rank
(),
error
))
assert
error
<
1.0e-6
# Reset groups
mpu
.
destroy_model_parallel
()
torch
.
distributed
.
barrier
()
if
torch
.
distributed
.
get_rank
()
==
0
:
print
(
'>> passed the test :-)'
)
if
__name__
==
'__main__'
:
initialize_distributed
()
world_size
=
torch
.
distributed
.
get_world_size
()
model_parallel_size
=
1
while
model_parallel_size
<=
world_size
:
print_separator
(
'test cross entropy'
)
test_cross_entropy
(
model_parallel_size
)
model_parallel_size
*=
2
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/tests/test_data.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
commons
import
print_separator
from
commons
import
initialize_distributed
from
mpu
import
data
as
data_utils
import
mpu
import
torch
import
functools
import
operator
import
sys
sys
.
path
.
append
(
"../.."
)
def
test_boradcast_data
(
model_parallel_size
):
if
torch
.
distributed
.
get_rank
()
==
0
:
print
(
'> testing boradcast_data with model parallel size {} ...'
.
format
(
model_parallel_size
))
mpu
.
initialize_model_parallel
(
model_parallel_size
)
torch
.
manual_seed
(
1234
+
mpu
.
get_data_parallel_rank
())
model_parallel_size
=
mpu
.
get_model_parallel_world_size
()
key_size_t
=
{
'key1'
:
[
7
,
11
],
'key2'
:
[
8
,
2
,
1
],
'key3'
:
[
13
],
'key4'
:
[
5
,
1
,
2
],
'key5'
:
[
5
,
12
]}
keys
=
list
(
key_size_t
.
keys
())
data
=
{}
data_t
=
{}
for
key
in
key_size_t
:
data
[
key
]
=
torch
.
LongTensor
(
size
=
key_size_t
[
key
]).
random_
(
0
,
1000
)
data_t
[
key
]
=
data
[
key
].
clone
()
data
[
'keyX'
]
=
torch
.
FloatTensor
(
size
=
(
5
,
)).
random_
(
0
,
1000
)
data_t
[
'keyX'
]
=
data
[
'keyX'
].
clone
()
if
mpu
.
get_model_parallel_rank
()
!=
0
:
data
=
None
data_utils
.
_check_data_types
(
keys
,
data_t
,
torch
.
int64
)
key_size
,
key_numel
,
\
total_numel
=
data_utils
.
_build_key_size_numel_dictionaries
(
keys
,
data
)
for
key
in
keys
:
assert
key_size
[
key
]
==
key_size_t
[
key
]
total_numel_t
=
0
for
key
in
keys
:
target_size
=
functools
.
reduce
(
operator
.
mul
,
key_size_t
[
key
],
1
)
assert
key_numel
[
key
]
==
target_size
total_numel_t
+=
target_size
assert
total_numel
==
total_numel_t
data_b
=
data_utils
.
broadcast_data
(
keys
,
data
,
torch
.
int64
)
for
key
in
keys
:
tensor
=
data_t
[
key
].
cuda
()
assert
data_b
[
key
].
sub
(
tensor
).
abs
().
max
()
==
0
# Reset groups
mpu
.
destroy_model_parallel
()
torch
.
distributed
.
barrier
()
if
torch
.
distributed
.
get_rank
()
==
0
:
print
(
'>> passed the test :-)'
)
if
__name__
==
'__main__'
:
initialize_distributed
()
world_size
=
torch
.
distributed
.
get_world_size
()
model_parallel_size
=
1
while
model_parallel_size
<=
world_size
:
print_separator
(
'test test boradcast data'
)
test_boradcast_data
(
model_parallel_size
)
model_parallel_size
*=
2
Deepspeed/Megatron-LM-v1.1.5-3D_parallelism/megatron/mpu/tests/test_initialize.py
0 → 100644
View file @
316d3f90
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
commons
import
print_separator
from
commons
import
initialize_distributed
import
mpu
import
torch
import
sys
sys
.
path
.
append
(
"../.."
)
def
test_initialize_model_parallel
(
model_parallel_size
):
if
torch
.
distributed
.
get_rank
()
==
0
:
print
(
'> testing initialize_model_parallel with size {} ...'
.
format
(
model_parallel_size
))
model_parallel_size_
=
min
(
model_parallel_size
,
torch
.
distributed
.
get_world_size
())
assert
not
mpu
.
model_parallel_is_initialized
()
mpu
.
initialize_model_parallel
(
model_parallel_size_
)
assert
mpu
.
model_parallel_is_initialized
()
# Checks.
def
check
(
group
,
world_size
,
rank
):
assert
world_size
==
torch
.
distributed
.
get_world_size
(
group
=
group
)
assert
rank
==
torch
.
distributed
.
get_rank
(
group
=
group
)
# Model parallel.
world_size
=
model_parallel_size_
rank
=
torch
.
distributed
.
get_rank
()
%
model_parallel_size_
assert
world_size
==
mpu
.
get_model_parallel_world_size
()
assert
rank
==
mpu
.
get_model_parallel_rank
()
check
(
mpu
.
get_model_parallel_group
(),
world_size
,
rank
)
# Data parallel.
world_size
=
torch
.
distributed
.
get_world_size
()
//
model_parallel_size_
rank
=
torch
.
distributed
.
get_rank
()
//
model_parallel_size
assert
world_size
==
mpu
.
get_data_parallel_world_size
()
assert
rank
==
mpu
.
get_data_parallel_rank
()
check
(
mpu
.
get_data_parallel_group
(),
world_size
,
rank
)
# Reset groups
mpu
.
destroy_model_parallel
()
torch
.
distributed
.
barrier
()
if
torch
.
distributed
.
get_rank
()
==
0
:
print
(
'>> passed the test :-)'
)
def
test_get_model_parallel_src_rank
(
model_parallel_size_
):
if
torch
.
distributed
.
get_rank
()
==
0
:
print
(
'> testing get_model_parallel_src_rank with size {} ...'
.
format
(
model_parallel_size_
))
model_parallel_size
=
min
(
model_parallel_size_
,
torch
.
distributed
.
get_world_size
())
assert
not
mpu
.
model_parallel_is_initialized
()
mpu
.
initialize_model_parallel
(
model_parallel_size
)
assert
mpu
.
model_parallel_is_initialized
()
# Checks
src_rank
=
torch
.
distributed
.
get_rank
()
-
mpu
.
get_model_parallel_rank
()
assert
mpu
.
get_model_parallel_src_rank
()
==
src_rank
# Reset groups
mpu
.
destroy_model_parallel
()
torch
.
distributed
.
barrier
()
if
torch
.
distributed
.
get_rank
()
==
0
:
print
(
'>> passed the test :-)'
)
if
__name__
==
'__main__'
:
initialize_distributed
()
world_size
=
torch
.
distributed
.
get_world_size
()
model_parallel_size
=
1
while
model_parallel_size
<=
world_size
:
print_separator
(
'test initialize model parallel'
)
test_initialize_model_parallel
(
model_parallel_size
)
print_separator
(
'test model parallel source rank'
)
test_get_model_parallel_src_rank
(
model_parallel_size
)
model_parallel_size
*=
2
Prev
1
…
4
5
6
7
8
9
10
11
12
Next
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment