Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
OpenDAS
ColossalAI
Commits
95c35f73
Unverified
Commit
95c35f73
authored
Sep 23, 2022
by
HELSON
Committed by
GitHub
Sep 23, 2022
Browse files
[moe] initialize MoE groups by ProcessGroup (#1640)
parent
e57df803
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
67 additions
and
35 deletions
+67
-35
colossalai/context/moe_context.py
colossalai/context/moe_context.py
+4
-35
tests/test_moe/test_moe_colo_init.py
tests/test_moe/test_moe_colo_init.py
+63
-0
No files found.
colossalai/context/moe_context.py
View file @
95c35f73
...
@@ -3,6 +3,7 @@ import torch.distributed as dist
...
@@ -3,6 +3,7 @@ import torch.distributed as dist
from
colossalai.context.parallel_mode
import
ParallelMode
from
colossalai.context.parallel_mode
import
ParallelMode
from
colossalai.context.singleton_meta
import
SingletonMeta
from
colossalai.context.singleton_meta
import
SingletonMeta
from
colossalai.tensor
import
ProcessGroup
from
typing
import
Tuple
from
typing
import
Tuple
...
@@ -22,41 +23,9 @@ class MoeParallelInfo:
...
@@ -22,41 +23,9 @@ class MoeParallelInfo:
_check_sanity
()
_check_sanity
()
self
.
ep_size
=
ep_size
self
.
ep_size
=
ep_size
self
.
dp_size
=
dp_size
self
.
dp_size
=
dp_size
self
.
ep_group
=
None
self
.
pg
=
ProcessGroup
(
tp_degree
=
ep_size
,
dp_degree
=
dp_size
)
# data parallel group for experts, since ep_group is different
self
.
ep_group
=
self
.
pg
.
tp_process_group
()
# we may have different dp_group from get_group(ParallelMode.DATA)
self
.
dp_group
=
self
.
pg
.
dp_process_group
()
self
.
dp_group
=
None
# Here we assume tensor parallel size = 1
# Otherwise, MoE can't be used
# Since TENSOR parallel group and DATA parallel group
# have been created, we can use them directly.
if
ep_size
==
1
:
from
colossalai.core
import
global_context
as
gpc
self
.
ep_group
=
gpc
.
get_group
(
ParallelMode
.
TENSOR
)
self
.
dp_group
=
gpc
.
get_group
(
ParallelMode
.
DATA
)
return
if
dp_size
==
1
:
from
colossalai.core
import
global_context
as
gpc
self
.
ep_group
=
gpc
.
get_group
(
ParallelMode
.
DATA
)
self
.
dp_group
=
gpc
.
get_group
(
ParallelMode
.
TENSOR
)
return
rank
=
dist
.
get_rank
()
# Create expert parallel group
for
i
in
range
(
dp_size
):
ranks
=
[
i
*
ep_size
+
j
for
j
in
range
(
ep_size
)]
group
=
dist
.
new_group
(
ranks
)
if
rank
in
ranks
:
self
.
ep_group
=
group
# Create data parallel group
for
j
in
range
(
ep_size
):
ranks
=
[
i
*
ep_size
+
j
for
i
in
range
(
dp_size
)]
group
=
dist
.
new_group
(
ranks
)
if
rank
in
ranks
:
self
.
dp_group
=
group
class
MoeContext
(
metaclass
=
SingletonMeta
):
class
MoeContext
(
metaclass
=
SingletonMeta
):
...
...
tests/test_moe/test_moe_colo_init.py
0 → 100644
View file @
95c35f73
from
functools
import
partial
import
colossalai
import
pytest
import
torch
import
torch.multiprocessing
as
mp
import
torch.distributed
as
dist
from
colossalai.testing
import
parameterize
from
colossalai.utils
import
free_port
from
colossalai.context
import
MOE_CONTEXT
from
colossalai.tensor
import
ColoParameter
from
colossalai.utils.model.colo_init_context
import
ColoInitContext
from
colossalai.testing
import
rerun_if_address_is_in_use
from
colossalai.utils
import
get_current_device
from
tests.test_zero.common
import
CONFIG
from
tests.test_moe.test_moe_zero_init
import
MoeModel
from
tests.test_tensor.common_utils
import
debug_print
@
parameterize
(
"init_device_type"
,
[
'cpu'
,
'cuda'
])
def
exam_moe_colo_init
(
init_device_type
):
world_size
=
dist
.
get_world_size
()
if
init_device_type
==
'cuda'
:
init_device
=
get_current_device
()
elif
init_device_type
==
'cpu'
:
init_device
=
torch
.
device
(
"cpu"
)
else
:
raise
NotImplementedError
(
"Unknown device found."
)
with
ColoInitContext
(
device
=
init_device
):
model
=
MoeModel
(
checkpoint
=
True
)
for
name
,
param
in
model
.
named_parameters
():
assert
isinstance
(
param
,
ColoParameter
),
"parameter `{}` has an init problem"
.
format
(
name
)
if
hasattr
(
param
,
"moe_info"
):
param
.
set_process_group
(
param
.
moe_info
.
pg
)
if
hasattr
(
param
,
"moe_info"
):
assert
param
.
process_group
.
dp_world_size
()
==
param
.
moe_info
.
dp_size
else
:
assert
param
.
process_group
.
dp_world_size
()
==
world_size
def
_run_dist
(
rank
,
world_size
,
port
):
colossalai
.
launch
(
config
=
CONFIG
,
rank
=
rank
,
world_size
=
world_size
,
host
=
'localhost'
,
port
=
port
,
backend
=
'nccl'
)
MOE_CONTEXT
.
setup
(
seed
=
42
)
exam_moe_colo_init
()
@
pytest
.
mark
.
dist
@
pytest
.
mark
.
parametrize
(
"world_size"
,
[
4
])
@
rerun_if_address_is_in_use
()
def
test_moe_colo_init
(
world_size
):
run_func
=
partial
(
_run_dist
,
world_size
=
world_size
,
port
=
free_port
())
mp
.
spawn
(
run_func
,
nprocs
=
world_size
)
if
__name__
==
'__main__'
:
test_moe_colo_init
(
world_size
=
4
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment