Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
OpenDAS
ColossalAI
Commits
bf020460
Unverified
Commit
bf020460
authored
Feb 20, 2023
by
Jiarui Fang
Committed by
GitHub
Feb 20, 2023
Browse files
[exmaple] add bert and albert (#2824)
parent
cf6409dd
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
356 additions
and
0 deletions
+356
-0
examples/language/bert/run_gemini.sh
examples/language/bert/run_gemini.sh
+22
-0
examples/language/bert/test_ci.sh
examples/language/bert/test_ci.sh
+2
-0
examples/language/bert/train_bert_demo.py
examples/language/bert/train_bert_demo.py
+332
-0
No files found.
examples/language/bert/run_gemini.sh
0 → 100644
View file @
bf020460
set
-x
# distplan in ["CAI_ZeRO1", "CAI_ZeRO2", "CAI_Gemini", "Pytorch_DDP", "Pytorch_ZeRO"]
export
DISTPLAN
=
${
DISTPLAN
:-
"CAI_Gemini"
}
# The following options only valid when DISTPLAN="colossalai"
export
GPUNUM
=
${
GPUNUM
:-
1
}
export
PLACEMENT
=
${
PLACEMENT
:-
"cpu"
}
export
BATCH_SIZE
=
${
BATCH_SIZE
:-
16
}
# bert | albert
export
MODEL_TYPE
=
${
MODEL_TYPE
:-
"bert"
}
export
TRAIN_STEP
=
${
TRAIN_STEP
:-
10
}
mkdir
-p
gemini_logs
env
CUDA_LAUNCH_BLOCKING
=
1 torchrun
--standalone
--nproc_per_node
=
${
GPUNUM
}
./train_bert_demo.py
\
--model_type
=
${
MODEL_TYPE
}
\
--batch_size
=
${
BATCH_SIZE
}
\
--placement
=
${
PLACEMENT
}
\
--distplan
=
${
DISTPLAN
}
\
--train_step
=
${
TRAIN_STEP
}
\
2>&1 |
tee
./gemini_logs/
${
MODEL_TYPE
}
_
${
DISTPLAN
}
_gpu_
${
GPUNUM
}
_bs_
${
BATCH_SIZE
}
_
${
PLACEMENT
}
.log
examples/language/bert/test_ci.sh
0 → 100644
View file @
bf020460
set
-x
env
GPUNUM
=
1 bash run_gemini.sh
examples/language/bert/train_bert_demo.py
0 → 100644
View file @
bf020460
import
os
from
functools
import
partial
from
time
import
time
import
psutil
import
torch
from
packaging
import
version
from
torch
import
nn
from
torch.nn.parallel
import
DistributedDataParallel
as
DDP
from
transformers
import
AlbertConfig
,
AlbertForSequenceClassification
,
BertConfig
,
BertForSequenceClassification
import
colossalai
from
colossalai.logging
import
disable_existing_loggers
,
get_dist_logger
from
colossalai.nn.optimizer
import
HybridAdam
from
colossalai.nn.parallel
import
zero_model_wrapper
,
zero_optim_wrapper
from
colossalai.tensor
import
ColoParameter
,
ComputePattern
,
ComputeSpec
,
ProcessGroup
,
ReplicaSpec
,
ShardSpec
from
colossalai.utils
import
get_current_device
from
colossalai.utils.model.colo_init_context
import
ColoInitContext
CAI_VERSION
=
colossalai
.
__version__
def
get_tflops
(
model_numel
,
batch_size
,
seq_len
,
step_time
):
return
model_numel
*
batch_size
*
seq_len
*
8
/
1e12
/
(
step_time
+
1e-12
)
def
get_profile_context
(
enable_flag
,
warmup_steps
,
active_steps
,
save_dir
):
from
contextlib
import
nullcontext
from
torch.profiler
import
ProfilerActivity
,
profile
,
schedule
,
tensorboard_trace_handler
if
enable_flag
:
return
profile
(
activities
=
[
ProfilerActivity
.
CPU
,
ProfilerActivity
.
CUDA
],
schedule
=
schedule
(
wait
=
0
,
warmup
=
warmup_steps
,
active
=
active_steps
),
on_trace_ready
=
tensorboard_trace_handler
(
save_dir
),
record_shapes
=
True
,
profile_memory
=
True
)
else
:
class
DummyProfiler
:
def
__init__
(
self
):
self
.
step_number
=
0
def
step
(
self
):
self
.
step_number
+=
1
return
nullcontext
(
DummyProfiler
())
def
get_time_stamp
():
import
time
cur_time
=
time
.
strftime
(
"%d-%H:%M"
,
time
.
localtime
())
return
cur_time
def
get_bert_data
(
batch_size
:
int
,
sequence_length
:
int
,
vacob_size
:
int
,
n_class
:
int
,
device
:
torch
.
device
):
input
=
torch
.
randint
(
low
=
0
,
high
=
vacob_size
,
size
=
(
batch_size
,
sequence_length
),
device
=
device
,
dtype
=
torch
.
long
,
)
label
=
torch
.
randint
(
low
=
0
,
high
=
n_class
,
size
=
(
batch_size
,),
device
=
device
,
dtype
=
torch
.
long
)
return
input
,
label
def
parse_args
():
parser
=
colossalai
.
get_default_parser
()
parser
.
add_argument
(
"--distplan"
,
type
=
str
,
default
=
'CAI_Gemini'
,
help
=
"The distributed plan [colossalai, zero1, zero2, torch_ddp, torch_zero]."
,
)
parser
.
add_argument
(
"--placement"
,
type
=
str
,
default
=
'cpu'
,
help
=
"Placement Policy for Gemini. Valid when using colossalai as dist plan."
,
)
parser
.
add_argument
(
"--batch_size"
,
type
=
int
,
default
=
8
,
help
=
"batch size per DP group of training."
,
)
parser
.
add_argument
(
"--model_type"
,
type
=
str
,
default
=
"bert"
,
help
=
"bert or albert"
,
)
parser
.
add_argument
(
"--train_step"
,
type
=
int
,
default
=
10
,
help
=
"training iterations for test"
,
)
args
=
parser
.
parse_args
()
return
args
SEQ_LEN
=
512
VOCAB_SIZE
=
1000
NUM_LABELS
=
10
# Parameter Sharding Strategies for Tensor Parallelism
def
split_param_single_dim_tp1d
(
dim
:
int
,
param
:
ColoParameter
,
pg
:
ProcessGroup
):
spec
=
(
ShardSpec
([
dim
],
[
pg
.
tp_world_size
()]),
ComputeSpec
(
ComputePattern
.
TP1D
))
param
.
set_tensor_spec
(
*
spec
)
def
split_param_row_tp1d
(
param
:
ColoParameter
,
pg
:
ProcessGroup
):
split_param_single_dim_tp1d
(
0
,
param
,
pg
)
def
split_param_col_tp1d
(
param
:
ColoParameter
,
pg
:
ProcessGroup
):
split_param_single_dim_tp1d
(
-
1
,
param
,
pg
)
def
get_cpu_mem
():
return
psutil
.
Process
().
memory_info
().
rss
/
1024
**
2
def
get_gpu_mem
():
return
torch
.
cuda
.
memory_allocated
()
/
1024
**
2
def
get_mem_info
(
prefix
=
''
):
return
f
'
{
prefix
}
GPU memory usage:
{
get_gpu_mem
():.
2
f
}
MB, CPU memory usage:
{
get_cpu_mem
():.
2
f
}
MB'
def
get_model_size
(
model
:
nn
.
Module
):
total_numel
=
0
for
module
in
model
.
modules
():
for
p
in
module
.
parameters
(
recurse
=
False
):
total_numel
+=
p
.
numel
()
return
total_numel
def
model_builder
(
args
):
if
args
.
model_type
==
"bert"
:
cfg
=
BertConfig
(
vocab_size
=
VOCAB_SIZE
,
num_labels
=
NUM_LABELS
)
return
BertForSequenceClassification
(
cfg
)
elif
args
.
model_type
==
"albert"
:
cfg
=
AlbertConfig
(
vocab_size
=
VOCAB_SIZE
,
num_labels
=
NUM_LABELS
)
return
AlbertForSequenceClassification
(
cfg
)
else
:
raise
RuntimeError
def
model_size_formatter
(
numel
:
int
)
->
str
:
GB_SIZE
=
10
**
9
MB_SIZE
=
10
**
6
KB_SIZE
=
10
**
3
if
numel
>=
GB_SIZE
:
return
f
'
{
numel
/
GB_SIZE
:.
1
f
}
B'
elif
numel
>=
MB_SIZE
:
return
f
'
{
numel
/
MB_SIZE
:.
1
f
}
M'
elif
numel
>=
KB_SIZE
:
return
f
'
{
numel
/
KB_SIZE
:.
1
f
}
K'
else
:
return
str
(
numel
)
def
set_cpu_maximum_parallelism
():
conf_str
=
torch
.
__config__
.
parallel_info
()
inter_str
=
conf_str
.
split
(
"hardware_concurrency() : "
)[
1
]
max_concurrency
=
inter_str
.
split
(
'
\n
'
)[
0
]
os
.
environ
[
"OMP_NUM_THREADS"
]
=
max_concurrency
print
(
f
"environmental variable OMP_NUM_THREADS is set to
{
max_concurrency
}
."
)
def
main
():
# version check
# this example is supposed to work for versions greater than 0.2.0
assert
version
.
parse
(
CAI_VERSION
)
>=
version
.
parse
(
"0.2.0"
)
set_cpu_maximum_parallelism
()
args
=
parse_args
()
# if args.distplan not in ["colossalai", "torch_ddp", "torch_zero", "zero1", "zero2"]:
if
args
.
distplan
not
in
[
"CAI_ZeRO1"
,
"CAI_ZeRO2"
,
"CAI_Gemini"
,
"Pytorch_DDP"
,
"Pytorch_ZeRO"
]:
raise
TypeError
(
f
"
{
args
.
distplan
}
is error"
)
# batch size per DP degree
BATCH_SIZE
=
args
.
batch_size
NUM_STEPS
=
args
.
train_step
WARMUP_STEPS
=
1
assert
WARMUP_STEPS
<
NUM_STEPS
,
"warmup steps should smaller than the total steps"
assert
(
NUM_STEPS
-
WARMUP_STEPS
)
%
2
==
1
,
"the number of valid steps should be odd to take the median"
PROF_FLAG
=
False
# The flag of profiling, False by default
disable_existing_loggers
()
colossalai
.
launch_from_torch
(
config
=
{})
logger
=
get_dist_logger
()
logger
.
info
(
f
"
{
args
.
distplan
}
, batch size
{
BATCH_SIZE
}
"
,
ranks
=
[
0
])
torch
.
manual_seed
(
123
)
if
args
.
distplan
.
startswith
(
"CAI"
):
# all param must use the same process group.
world_size
=
torch
.
distributed
.
get_world_size
()
# build a base-bert model
with
ColoInitContext
(
device
=
get_current_device
(),
dtype
=
torch
.
half
):
model
=
model_builder
(
args
)
# model = BertForSequenceClassification(BertConfig(vocal_size = VOCAB_SIZE))
# asign running configurations
gemini_config
=
None
if
args
.
distplan
.
startswith
(
"CAI_ZeRO"
):
optim_config
=
dict
(
reduce_bucket_size
=
12
*
1024
*
1024
,
overlap_communication
=
True
,
verbose
=
True
)
elif
args
.
distplan
==
"CAI_Gemini"
:
gemini_config
=
dict
(
strict_ddp_mode
=
True
,
device
=
get_current_device
(),
placement_policy
=
args
.
placement
,
pin_memory
=
True
,
hidden_dim
=
model
.
config
.
hidden_size
,
search_range_mb
=
128
)
optim_config
=
dict
(
gpu_margin_mem_ratio
=
0.
)
else
:
raise
RuntimeError
# build a highly optimized gpu/cpu optimizer
optimizer
=
HybridAdam
(
model
.
parameters
(),
lr
=
1e-3
)
if
args
.
distplan
==
"CAI_ZeRO1"
:
zero_stage
=
1
elif
args
.
distplan
==
"CAI_ZeRO2"
:
zero_stage
=
2
elif
args
.
distplan
==
"CAI_Gemini"
:
zero_stage
=
3
else
:
raise
RuntimeError
# wrap your model and optimizer
model
=
zero_model_wrapper
(
model
,
zero_stage
,
gemini_config
)
optimizer
=
zero_optim_wrapper
(
model
,
optimizer
,
optim_config
=
optim_config
)
logger
.
info
(
get_mem_info
(
prefix
=
'After init optim, '
),
ranks
=
[
0
])
elif
args
.
distplan
.
startswith
(
"Pytorch"
):
model
=
model_builder
(
args
).
cuda
()
model
=
DDP
(
model
)
if
args
.
distplan
.
endswith
(
"DDP"
):
optimizer
=
torch
.
optim
.
Adam
(
model
.
parameters
(),
lr
=
1e-3
)
elif
args
.
distplan
.
endswith
(
"ZeRO"
):
from
torch.distributed.optim
import
ZeroRedundancyOptimizer
optimizer
=
ZeroRedundancyOptimizer
(
model
.
parameters
(),
optimizer_class
=
torch
.
optim
.
Adam
,
lr
=
1e-3
)
else
:
raise
RuntimeError
# model is shared after TP
numel
=
get_model_size
(
model
)
logger
.
info
(
f
"the size of testing model size is
{
model_size_formatter
(
numel
)
}
."
)
logger
.
info
(
get_mem_info
(
prefix
=
'After init model, '
),
ranks
=
[
0
])
# Tflops_per_GPU = global_batch * global_numel * seq_len * 8 / #gpu
# = (batch_per_DP_group * dp_degree) * (numel * tp_degree) * seq_len * 8 / (tp_degree * dp_degree)
# = batch_per_DP_group * numel * seq_len * 8
get_tflops_func
=
partial
(
get_tflops
,
numel
,
BATCH_SIZE
,
SEQ_LEN
)
torch
.
cuda
.
synchronize
()
model
.
train
()
tflops_list
=
[]
def
train_step
():
# we just use randomly generated data here
input_ids
,
labels
=
get_bert_data
(
BATCH_SIZE
,
SEQ_LEN
,
VOCAB_SIZE
,
NUM_LABELS
,
device
=
torch
.
cuda
.
current_device
())
optimizer
.
zero_grad
()
start
=
time
()
outputs
=
model
(
input_ids
,
labels
=
labels
)
loss
,
logits
=
outputs
[:
2
]
torch
.
cuda
.
synchronize
()
fwd_end
=
time
()
fwd_time
=
fwd_end
-
start
logger
.
info
(
get_mem_info
(
prefix
=
f
'[
{
n
+
1
}
/
{
NUM_STEPS
}
] Forward '
),
ranks
=
[
0
])
if
args
.
distplan
.
startswith
(
"CAI"
):
optimizer
.
backward
(
loss
)
elif
args
.
distplan
.
startswith
(
"Pytorch"
):
loss
.
backward
()
else
:
raise
RuntimeError
torch
.
cuda
.
synchronize
()
bwd_end
=
time
()
bwd_time
=
bwd_end
-
fwd_end
logger
.
info
(
get_mem_info
(
prefix
=
f
'[
{
n
+
1
}
/
{
NUM_STEPS
}
] Backward '
),
ranks
=
[
0
])
optimizer
.
step
()
torch
.
cuda
.
synchronize
()
optim_time
=
time
()
-
bwd_end
step_time
=
time
()
-
start
logger
.
info
(
get_mem_info
(
prefix
=
f
'[
{
n
+
1
}
/
{
NUM_STEPS
}
] Optimizer step '
),
ranks
=
[
0
])
step_tflops
=
get_tflops_func
(
step_time
)
logger
.
info
(
f
"[
{
n
+
1
}
/
{
NUM_STEPS
}
] Loss:
{
loss
.
item
():.
3
f
}
, Step time:
{
step_time
:.
3
f
}
s, TFLOPS:
{
get_tflops_func
(
step_time
):.
3
f
}
, FWD time:
{
fwd_time
:.
3
f
}
s, BWD time:
{
bwd_time
:.
3
f
}
s, OPTIM time:
{
optim_time
:.
3
f
}
s"
,
ranks
=
[
0
],
)
if
n
>=
WARMUP_STEPS
:
tflops_list
.
append
(
step_tflops
)
demo_profiler
=
get_profile_context
(
PROF_FLAG
,
WARMUP_STEPS
,
NUM_STEPS
-
WARMUP_STEPS
,
save_dir
=
f
"profile/
{
get_time_stamp
()
}
-demo"
)
with
demo_profiler
as
prof
:
for
n
in
range
(
NUM_STEPS
):
train_step
()
prof
.
step
()
tflops_list
.
sort
()
median_index
=
((
NUM_STEPS
-
WARMUP_STEPS
)
>>
1
)
+
WARMUP_STEPS
logger
.
info
(
f
"Median TFLOPS is
{
tflops_list
[
median_index
]:.
3
f
}
"
)
torch
.
cuda
.
synchronize
()
if
__name__
==
'__main__'
:
main
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment