Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
OpenDAS
ColossalAI
Commits
12c8bf38
Unverified
Commit
12c8bf38
authored
Jan 06, 2023
by
Jiarui Fang
Committed by
GitHub
Jan 06, 2023
Browse files
[Pipeline] Refine GPT PP Example
parents
c3d9e232
ad00894f
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
281 additions
and
0 deletions
+281
-0
colossalai/pipeline/rpc/_pipeline_base.py
colossalai/pipeline/rpc/_pipeline_base.py
+2
-0
examples/language/gpt/experiments/pipeline_parallel/README.md
...ples/language/gpt/experiments/pipeline_parallel/README.md
+38
-0
examples/language/gpt/experiments/pipeline_parallel/model_zoo.py
...s/language/gpt/experiments/pipeline_parallel/model_zoo.py
+73
-0
examples/language/gpt/experiments/pipeline_parallel/run.sh
examples/language/gpt/experiments/pipeline_parallel/run.sh
+7
-0
examples/language/gpt/experiments/pipeline_parallel/train_gpt_pp.py
...anguage/gpt/experiments/pipeline_parallel/train_gpt_pp.py
+161
-0
No files found.
colossalai/pipeline/rpc/_pipeline_base.py
View file @
12c8bf38
...
...
@@ -789,6 +789,8 @@ class WorkerBase(ABC):
args_kwargs
=
pyobj_map
(
args_kwargs
,
fn
=
lambda
x
:
x
.
to
(
self
.
device
).
detach
(),
process_types
=
torch
.
Tensor
)
# torch rpc doesn't support args or rets in GPU
args_kwargs
=
pyobj_map
(
args_kwargs
,
fn
=
lambda
x
:
self
.
device
,
process_types
=
torch
.
device
)
# change devices from last stage to current device
args
,
kwargs
=
data_process_func
(
args_kwargs
)
...
...
examples/language/gpt/experiments/pipeline_parallel/README.md
0 → 100644
View file @
12c8bf38
# Pipeline Parallelism Demo with GPT2
## Requirements
Before you can launch training, you need to install the following requirements.
### Install PyTorch
```
bash
#conda
conda
install
pytorch
==
1.12.0
torchvision
==
0.13.0
torchaudio
==
0.12.0
cudatoolkit
=
11.3
-c
pytorch
#pip
pip
install
torch
==
1.12.0+cu113
torchvision
==
0.13.0+cu113
torchaudio
==
0.12.0
--extra-index-url
https://download.pytorch.org/whl/cu113
```
### Install [Colossal-AI v0.2.0](https://colossalai.org/download/) From Official Website
```
bash
pip
install
colossalai
==
0.2.0+torch1.12cu11.3
-f
https://release.colossalai.org
```
### Install transformers
```
bash
pip
install
transformers
```
## Dataset
For simplicity, the input data is randonly generated here.
## Training
```
bash
#Run the Pipeline Parallel on GPT with default setting and a dummy dataset.
#You can change the GPU number or microbatch number in the run.sh .
bash run.sh
```
examples/language/gpt/experiments/pipeline_parallel/model_zoo.py
0 → 100644
View file @
12c8bf38
from
torch
import
nn
from
transformers
import
GPT2Config
,
GPT2LMHeadModel
## Define the Model and Loss Based on Huggingface transformers GPT2LMHeadModel
class
GPTLMModel
(
nn
.
Module
):
def
__init__
(
self
,
hidden_size
=
768
,
num_layers
=
12
,
num_attention_heads
=
12
,
max_seq_len
=
1024
,
vocab_size
=
50257
,
checkpoint
=
False
):
super
().
__init__
()
self
.
checkpoint
=
checkpoint
self
.
config
=
GPT2Config
(
n_embd
=
hidden_size
,
n_layer
=
num_layers
,
n_head
=
num_attention_heads
,
n_positions
=
max_seq_len
,
n_ctx
=
max_seq_len
,
vocab_size
=
vocab_size
)
self
.
model
=
GPT2LMHeadModel
(
self
.
config
)
if
checkpoint
:
self
.
model
.
gradient_checkpointing_enable
()
def
forward
(
self
,
input_ids
,
attention_mask
):
# Only return lm_logits
return
self
.
model
(
input_ids
=
input_ids
,
attention_mask
=
attention_mask
,
use_cache
=
not
self
.
checkpoint
)[
0
]
def
gpt2_medium
(
checkpoint
=
False
):
return
GPTLMModel
(
hidden_size
=
1024
,
num_layers
=
24
,
num_attention_heads
=
16
,
checkpoint
=
checkpoint
)
def
gpt2_xl
(
checkpoint
=
True
):
return
GPTLMModel
(
hidden_size
=
1600
,
num_layers
=
48
,
num_attention_heads
=
32
,
checkpoint
=
checkpoint
)
def
gpt2_10b
(
checkpoint
=
True
):
return
GPTLMModel
(
hidden_size
=
4096
,
num_layers
=
50
,
num_attention_heads
=
16
,
checkpoint
=
checkpoint
)
def
gpt2_14b
(
checkpoint
=
True
):
return
GPTLMModel
(
hidden_size
=
4096
,
num_layers
=
70
,
num_attention_heads
=
16
,
checkpoint
=
checkpoint
)
def
gpt2_20b
(
checkpoint
=
True
):
return
GPTLMModel
(
hidden_size
=
8192
,
num_layers
=
25
,
num_attention_heads
=
16
,
checkpoint
=
checkpoint
)
def
gpt2_24b
(
checkpoint
=
True
):
return
GPTLMModel
(
hidden_size
=
8192
,
num_layers
=
30
,
num_attention_heads
=
16
,
checkpoint
=
checkpoint
)
def
model_builder
(
model_size
:
str
)
->
callable
:
if
model_size
==
"gpt2_medium"
:
return
gpt2_medium
elif
model_size
==
"gpt2_xl"
:
return
gpt2_xl
elif
model_size
==
"gpt2_10b"
:
return
gpt2_10b
elif
model_size
==
"gpt2_14b"
:
return
gpt2_14b
elif
model_size
==
"gpt2_20b"
:
return
gpt2_20b
elif
model_size
==
"gpt2_24b"
:
return
gpt2_24b
else
:
raise
TypeError
(
f
"model_builder
{
model_size
}
"
)
__all__
=
[
'model_builder'
]
examples/language/gpt/experiments/pipeline_parallel/run.sh
0 → 100644
View file @
12c8bf38
export
GPUNUM
=
${
GPUNUM
:-
4
}
export
BATCH_SIZE
=
${
BATCH_SIZE
:-
16
}
export
MODEL_TYPE
=
${
MODEL_TYPE
:-
"gpt2_medium"
}
export
NUM_MICROBATCH
=
${
NUM_MICROBATCH
:-
8
}
mkdir
-p
pp_logs
python train_gpt_pp.py
--device
=
"cuda"
--model_type
=
${
MODEL_TYPE
}
--num_microbatches
=
${
NUM_MICROBATCH
}
--world_size
=
${
GPUNUM
}
--batch_size
=
${
BATCH_SIZE
}
2>&1 |
tee
./pp_logs/
${
MODEL_TYPE
}
_gpu_
${
GPUNUM
}
_bs_
${
BATCH_SIZE
}
_nm_
${
NUM_MICROBATCH
}
.log
examples/language/gpt/experiments/pipeline_parallel/train_gpt_pp.py
0 → 100644
View file @
12c8bf38
import
argparse
import
time
from
functools
import
partial
import
torch
from
model_zoo
import
model_builder
from
torch
import
nn
from
tqdm
import
tqdm
from
colossalai.fx
import
ColoTracer
from
colossalai.fx.passes.adding_split_node_pass
import
avgnode_split_pass
,
split_with_split_nodes_pass
from
colossalai.logging
import
disable_existing_loggers
,
get_dist_logger
from
colossalai.nn.optimizer
import
HybridAdam
from
colossalai.pipeline.middleware.adaptor
import
get_fx_topology
from
colossalai.pipeline.rpc._pipeline_schedule
import
OneFOneBPipelineEngine
from
colossalai.pipeline.rpc.utils
import
rpc_run
def
parse_args
():
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
'--model_type'
,
type
=
str
,
default
=
"gpt2_medium"
)
parser
.
add_argument
(
'--world_size'
,
type
=
int
,
default
=
2
)
parser
.
add_argument
(
'--batch_size'
,
type
=
int
,
default
=
16
)
parser
.
add_argument
(
'--dp_degree'
,
type
=
int
,
default
=
1
)
parser
.
add_argument
(
'--tp_degree'
,
type
=
int
,
default
=
1
)
parser
.
add_argument
(
'--num_microbatches'
,
type
=
int
,
default
=
2
)
parser
.
add_argument
(
'--device'
,
type
=
str
,
choices
=
[
'cpu'
,
'cuda'
],
default
=
'cuda'
)
parser
.
add_argument
(
'--master_addr'
,
type
=
str
,
default
=
'localhost'
)
parser
.
add_argument
(
'--master_port'
,
type
=
str
,
default
=
'29011'
)
parser
.
add_argument
(
'--num_worker_threads'
,
type
=
int
,
default
=
128
)
return
parser
.
parse_args
()
class
GPTLMLoss
(
nn
.
Module
):
def
__init__
(
self
):
super
().
__init__
()
self
.
loss_fn
=
nn
.
CrossEntropyLoss
()
def
forward
(
self
,
logits
,
labels
):
shift_logits
=
logits
[...,
:
-
1
,
:].
contiguous
()
shift_labels
=
labels
[...,
1
:].
contiguous
()
# Flatten the tokens
return
self
.
loss_fn
(
shift_logits
.
view
(
-
1
,
shift_logits
.
size
(
-
1
)),
shift_labels
.
view
(
-
1
))
# Randomly Generated Data
def
get_data
(
batch_size
,
seq_len
,
vocab_size
):
input_ids
=
torch
.
randint
(
0
,
vocab_size
,
(
batch_size
,
seq_len
),
device
=
torch
.
cuda
.
current_device
())
attention_mask
=
torch
.
ones_like
(
input_ids
)
return
input_ids
,
attention_mask
def
get_tflops
(
model_numel
,
batch_size
,
seq_len
,
step_time
):
return
model_numel
*
batch_size
*
seq_len
*
8
/
1e12
/
(
step_time
+
1e-12
)
def
create_partition_module
(
pp_rank
:
int
,
stage_num
:
int
,
model
,
data_kwargs
):
tracer
=
ColoTracer
()
meta_args
=
{
k
:
v
.
to
(
'meta'
)
for
k
,
v
in
data_kwargs
.
items
()}
graph
=
tracer
.
trace
(
root
=
model
,
meta_args
=
meta_args
)
gm
=
torch
.
fx
.
GraphModule
(
model
,
graph
,
model
.
__class__
.
__name__
)
annotated_model
=
avgnode_split_pass
(
gm
,
stage_num
)
top_module
,
split_submodules
=
split_with_split_nodes_pass
(
annotated_model
,
merge_output
=
True
)
topo
=
get_fx_topology
(
top_module
)
for
submodule
in
split_submodules
:
if
isinstance
(
submodule
,
torch
.
fx
.
GraphModule
):
setattr
(
submodule
,
'_topo'
,
topo
)
return
split_submodules
[
pp_rank
+
1
]
def
partition
(
model
,
data_kwargs
,
pp_rank
:
int
,
chunk
:
int
,
stage_num
:
int
):
module
=
create_partition_module
(
pp_rank
,
stage_num
,
model
,
data_kwargs
)
return
module
def
run_master
(
args
):
batch_size
=
args
.
batch_size
device
=
args
.
device
world_size
=
args
.
world_size
stage_num
=
world_size
num_microbatches
=
args
.
num_microbatches
model_type
=
args
.
model_type
# batch size per DP degree
SEQ_LEN
=
1024
VOCAB_SIZE
=
50257
NUM_STEPS
=
10
WARMUP_STEPS
=
1
disable_existing_loggers
()
logger
=
get_dist_logger
()
logger
.
info
(
f
"
{
args
.
model_type
}
, batch size
{
batch_size
}
, num stage
{
stage_num
}
, num microbatch
{
num_microbatches
}
"
,
ranks
=
[
0
])
torch
.
manual_seed
(
123
)
# build criterion
criterion
=
GPTLMLoss
()
# warm up pipeline fx partition
input_ids
,
attn_mask
=
get_data
(
batch_size
,
SEQ_LEN
,
VOCAB_SIZE
)
warmup_data_kwargs
=
{
'input_ids'
:
input_ids
,
'attention_mask'
:
attn_mask
}
# create model
model
=
model_builder
(
model_type
)(
checkpoint
=
False
)
# set 1f1b pipeline engine
pp_engine
=
OneFOneBPipelineEngine
(
partition_fn
=
partial
(
partition
,
model
,
warmup_data_kwargs
),
stage_num
=
stage_num
,
num_microbatches
=
num_microbatches
,
device
=
device
,
chunk
=
1
,
criterion
=
criterion
,
metric
=
None
,
checkpoint
=
False
)
partition_numels
=
pp_engine
.
remote_numels
()
for
rank
,
numel
in
partition_numels
.
items
():
logger
.
info
(
f
'
{
rank
=
}
numel in the partition:
{
numel
}
'
)
# build optim
pp_engine
.
initialize_optimizer
(
HybridAdam
,
lr
=
1e-3
)
ranks_tflops
=
{}
for
n
in
range
(
NUM_STEPS
):
# we just use randomly generated data here
input_ids
,
attn_mask
=
get_data
(
batch_size
,
SEQ_LEN
,
VOCAB_SIZE
)
batch
=
{
'input_ids'
:
input_ids
,
'attention_mask'
:
attn_mask
}
start
=
time
.
time
()
outputs
=
pp_engine
.
forward_backward
(
batch
=
batch
,
labels
=
input_ids
,
forward_only
=
False
)
step_time
=
time
.
time
()
-
start
for
rank
,
numel
in
partition_numels
.
items
():
if
rank
not
in
ranks_tflops
:
ranks_tflops
[
rank
]
=
[]
step_tflops
=
get_tflops
(
numel
,
batch_size
,
SEQ_LEN
,
step_time
)
logger
.
info
(
f
"Rank
{
rank
}
, [
{
n
+
1
}
/
{
NUM_STEPS
}
] , Step time:
{
step_time
:.
3
f
}
s, TFLOPS:
{
get_tflops
(
numel
,
batch_size
,
SEQ_LEN
,
step_time
):.
3
f
}
"
,
ranks
=
[
0
],
)
if
n
>=
WARMUP_STEPS
:
ranks_tflops
[
rank
].
append
(
step_tflops
)
median_index
=
((
NUM_STEPS
-
WARMUP_STEPS
)
>>
1
)
+
WARMUP_STEPS
gpu_tflops
=
[]
for
rank
,
tflops_list
in
ranks_tflops
.
items
():
tflops_list
.
sort
()
gpu_tflops
.
append
(
tflops_list
[
median_index
])
logger
.
info
(
f
"GPU
{
rank
}
Median TFLOPS is
{
tflops_list
[
median_index
]:.
3
f
}
"
)
logger
.
info
(
f
"Total TFLOPS is
{
sum
(
gpu_tflops
):.
3
f
}
"
)
logger
.
info
(
f
"Avg TFLOPS per GPU is
{
sum
(
gpu_tflops
)
/
world_size
:.
3
f
}
"
)
if
__name__
==
'__main__'
:
args
=
parse_args
()
rpc_run
(
args
,
run_master
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment