Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
OpenDAS
dgl
Commits
43912418
Unverified
Commit
43912418
authored
Feb 06, 2024
by
Muhammed Fatih BALIN
Committed by
GitHub
Feb 06, 2024
Browse files
[GraphBolt][CUDA] puregpu option for the multiGPU example. (#7089)
parent
845864d2
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
30 additions
and
22 deletions
+30
-22
examples/multigpu/graphbolt/node_classification.py
examples/multigpu/graphbolt/node_classification.py
+19
-18
python/dgl/graphbolt/impl/fused_csc_sampling_graph.py
python/dgl/graphbolt/impl/fused_csc_sampling_graph.py
+3
-2
python/dgl/graphbolt/impl/torch_based_feature_store.py
python/dgl/graphbolt/impl/torch_based_feature_store.py
+8
-2
No files found.
examples/multigpu/graphbolt/node_classification.py
View file @
43912418
...
@@ -151,9 +151,7 @@ def evaluate(rank, model, dataloader, num_classes, device):
...
@@ -151,9 +151,7 @@ def evaluate(rank, model, dataloader, num_classes, device):
y
=
[]
y
=
[]
y_hats
=
[]
y_hats
=
[]
for
step
,
data
in
(
for
data
in
tqdm
.
tqdm
(
dataloader
)
if
rank
==
0
else
dataloader
:
tqdm
.
tqdm
(
enumerate
(
dataloader
))
if
rank
==
0
else
enumerate
(
dataloader
)
):
blocks
=
data
.
blocks
blocks
=
data
.
blocks
x
=
data
.
node_features
[
"feat"
]
x
=
data
.
node_features
[
"feat"
]
y
.
append
(
data
.
labels
)
y
.
append
(
data
.
labels
)
...
@@ -271,8 +269,11 @@ def run(rank, world_size, args, devices, dataset):
...
@@ -271,8 +269,11 @@ def run(rank, world_size, args, devices, dataset):
# Pin the graph and features to enable GPU access.
# Pin the graph and features to enable GPU access.
if
args
.
storage_device
==
"pinned"
:
if
args
.
storage_device
==
"pinned"
:
dataset
.
graph
.
pin_memory_
()
graph
=
dataset
.
graph
.
pin_memory_
()
dataset
.
feature
.
pin_memory_
()
feature
=
dataset
.
feature
.
pin_memory_
()
else
:
graph
=
dataset
.
graph
.
to
(
args
.
storage_device
)
feature
=
dataset
.
feature
.
to
(
args
.
storage_device
)
train_set
=
dataset
.
tasks
[
0
].
train_set
train_set
=
dataset
.
tasks
[
0
].
train_set
valid_set
=
dataset
.
tasks
[
0
].
validation_set
valid_set
=
dataset
.
tasks
[
0
].
validation_set
...
@@ -280,13 +281,13 @@ def run(rank, world_size, args, devices, dataset):
...
@@ -280,13 +281,13 @@ def run(rank, world_size, args, devices, dataset):
args
.
fanout
=
list
(
map
(
int
,
args
.
fanout
.
split
(
","
)))
args
.
fanout
=
list
(
map
(
int
,
args
.
fanout
.
split
(
","
)))
num_classes
=
dataset
.
tasks
[
0
].
metadata
[
"num_classes"
]
num_classes
=
dataset
.
tasks
[
0
].
metadata
[
"num_classes"
]
in_size
=
dataset
.
feature
.
size
(
"node"
,
None
,
"feat"
)[
0
]
in_size
=
feature
.
size
(
"node"
,
None
,
"feat"
)[
0
]
hidden_size
=
256
hidden_size
=
256
out_size
=
num_classes
out_size
=
num_classes
if
args
.
gpu_cache_size
>
0
:
if
args
.
gpu_cache_size
>
0
and
args
.
storage_device
!=
"cuda"
:
dataset
.
feature
.
_features
[(
"node"
,
None
,
"feat"
)]
=
gb
.
GPUCachedFeature
(
feature
.
_features
[(
"node"
,
None
,
"feat"
)]
=
gb
.
GPUCachedFeature
(
dataset
.
feature
.
_features
[(
"node"
,
None
,
"feat"
)],
feature
.
_features
[(
"node"
,
None
,
"feat"
)],
args
.
gpu_cache_size
,
args
.
gpu_cache_size
,
)
)
...
@@ -297,24 +298,24 @@ def run(rank, world_size, args, devices, dataset):
...
@@ -297,24 +298,24 @@ def run(rank, world_size, args, devices, dataset):
# Create data loaders.
# Create data loaders.
train_dataloader
=
create_dataloader
(
train_dataloader
=
create_dataloader
(
args
,
args
,
dataset
.
graph
,
graph
,
dataset
.
feature
,
feature
,
train_set
,
train_set
,
device
,
device
,
is_train
=
True
,
is_train
=
True
,
)
)
valid_dataloader
=
create_dataloader
(
valid_dataloader
=
create_dataloader
(
args
,
args
,
dataset
.
graph
,
graph
,
dataset
.
feature
,
feature
,
valid_set
,
valid_set
,
device
,
device
,
is_train
=
False
,
is_train
=
False
,
)
)
test_dataloader
=
create_dataloader
(
test_dataloader
=
create_dataloader
(
args
,
args
,
dataset
.
graph
,
graph
,
dataset
.
feature
,
feature
,
test_set
,
test_set
,
device
,
device
,
is_train
=
False
,
is_train
=
False
,
...
@@ -396,9 +397,9 @@ def parse_args():
...
@@ -396,9 +397,9 @@ def parse_args():
parser
.
add_argument
(
parser
.
add_argument
(
"--mode"
,
"--mode"
,
default
=
"pinned-cuda"
,
default
=
"pinned-cuda"
,
choices
=
[
"cpu-cuda"
,
"pinned-cuda"
],
choices
=
[
"cpu-cuda"
,
"pinned-cuda"
,
"cuda-cuda"
],
help
=
"Dataset storage placement and Train device: 'cpu' for CPU and RAM
,
"
help
=
"Dataset storage placement and Train device: 'cpu' for CPU and RAM"
" 'pinned' for pinned memory in RAM, 'cuda' for GPU and GPU memory."
,
"
,
'pinned' for pinned memory in RAM, 'cuda' for GPU and GPU memory."
,
)
)
return
parser
.
parse_args
()
return
parser
.
parse_args
()
...
...
python/dgl/graphbolt/impl/fused_csc_sampling_graph.py
View file @
43912418
...
@@ -1092,7 +1092,8 @@ class FusedCSCSamplingGraph(SamplingGraph):
...
@@ -1092,7 +1092,8 @@ class FusedCSCSamplingGraph(SamplingGraph):
return
self2
.
_apply_to_members
(
_pin
if
device
==
"pinned"
else
_to
)
return
self2
.
_apply_to_members
(
_pin
if
device
==
"pinned"
else
_to
)
def
pin_memory_
(
self
):
def
pin_memory_
(
self
):
"""Copy `FusedCSCSamplingGraph` to the pinned memory in-place."""
"""Copy `FusedCSCSamplingGraph` to the pinned memory in-place. Returns
the same object modified in-place."""
# torch.Tensor.pin_memory() is not an inplace operation. To make it
# torch.Tensor.pin_memory() is not an inplace operation. To make it
# truly in-place, we need to use cudaHostRegister. Then, we need to use
# truly in-place, we need to use cudaHostRegister. Then, we need to use
# cudaHostUnregister to unpin the tensor in the destructor.
# cudaHostUnregister to unpin the tensor in the destructor.
...
@@ -1123,7 +1124,7 @@ class FusedCSCSamplingGraph(SamplingGraph):
...
@@ -1123,7 +1124,7 @@ class FusedCSCSamplingGraph(SamplingGraph):
return
x
return
x
self
.
_apply_to_members
(
_pin
)
return
self
.
_apply_to_members
(
_pin
)
def
fused_csc_sampling_graph
(
def
fused_csc_sampling_graph
(
...
...
python/dgl/graphbolt/impl/torch_based_feature_store.py
View file @
43912418
...
@@ -175,7 +175,8 @@ class TorchBasedFeature(Feature):
...
@@ -175,7 +175,8 @@ class TorchBasedFeature(Feature):
)
)
def
pin_memory_
(
self
):
def
pin_memory_
(
self
):
"""In-place operation to copy the feature to pinned memory."""
"""In-place operation to copy the feature to pinned memory. Returns the
same object modified in-place."""
# torch.Tensor.pin_memory() is not an inplace operation. To make it
# torch.Tensor.pin_memory() is not an inplace operation. To make it
# truly in-place, we need to use cudaHostRegister. Then, we need to use
# truly in-place, we need to use cudaHostRegister. Then, we need to use
# cudaHostUnregister to unpin the tensor in the destructor.
# cudaHostUnregister to unpin the tensor in the destructor.
...
@@ -194,6 +195,8 @@ class TorchBasedFeature(Feature):
...
@@ -194,6 +195,8 @@ class TorchBasedFeature(Feature):
self
.
_is_inplace_pinned
.
add
(
x
)
self
.
_is_inplace_pinned
.
add
(
x
)
return
self
def
is_pinned
(
self
):
def
is_pinned
(
self
):
"""Returns True if the stored feature is pinned."""
"""Returns True if the stored feature is pinned."""
return
self
.
_tensor
.
is_pinned
()
return
self
.
_tensor
.
is_pinned
()
...
@@ -289,10 +292,13 @@ class TorchBasedFeatureStore(BasicFeatureStore):
...
@@ -289,10 +292,13 @@ class TorchBasedFeatureStore(BasicFeatureStore):
super
().
__init__
(
features
)
super
().
__init__
(
features
)
def
pin_memory_
(
self
):
def
pin_memory_
(
self
):
"""In-place operation to copy the feature store to pinned memory."""
"""In-place operation to copy the feature store to pinned memory.
Returns the same object modified in-place."""
for
feature
in
self
.
_features
.
values
():
for
feature
in
self
.
_features
.
values
():
feature
.
pin_memory_
()
feature
.
pin_memory_
()
return
self
def
is_pinned
(
self
):
def
is_pinned
(
self
):
"""Returns True if all the stored features are pinned."""
"""Returns True if all the stored features are pinned."""
return
all
(
feature
.
is_pinned
()
for
feature
in
self
.
_features
.
values
())
return
all
(
feature
.
is_pinned
()
for
feature
in
self
.
_features
.
values
())
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment