Unverified Commit ac5c79cd authored by Quan (Andy) Gan's avatar Quan (Andy) Gan Committed by GitHub
Browse files

[Bug] Fixes issues in feature pickling and transmission for subgraphs and blocks (#2139)

* bug fixes

* remove __deepcopy__; not sure what the behavior should be

* lint

* skip gpu test

* fix

* fix dist dataloader

* add comment

* remove assert
parent 7e7376eb
......@@ -13,6 +13,157 @@ def _remove_kwargs_dist(kwargs):
print('Distributed DataLoader does not support pin_memory')
return kwargs
# The following code is a fix to the PyTorch-specific issue in
# https://github.com/dmlc/dgl/issues/2137
#
# Basically the sampled blocks/subgraphs contain the features extracted from the
# parent graph. In DGL, the blocks/subgraphs will hold a reference to the parent
# graph feature tensor and an index tensor, so that the features could be extracted upon
# request. However, in the context of multiprocessed sampling, we do not need to
# transmit the parent graph feature tensor from the subprocess to the main process,
# since they are exactly the same tensor, and transmitting a tensor from a subprocess
# to the main process is costly in PyTorch as it uses shared memory. We work around
# it with the following trick:
#
# In the collator running in the sampler processes:
# For each frame in the block, we check each column and the column with the same name
# in the corresponding parent frame. If the storage of the former column is the
# same object as the latter column, we are sure that the former column is a
# subcolumn of the latter, and set the storage of the former column as None.
#
# In the iterator of the main process:
# For each frame in the block, we check each column and the column with the same name
# in the corresponding parent frame. If the storage of the former column is None,
# we replace it with the storage of the latter column.
def _pop_subframe_storage(subframe, frame):
for key, col in subframe._columns.items():
if key in frame._columns and col.storage is frame._columns[key].storage:
col.storage = None
def _pop_subgraph_storage(subg, g):
for ntype in subg.ntypes:
if ntype not in g.ntypes:
continue
subframe = subg._node_frames[subg.get_ntype_id(ntype)]
frame = g._node_frames[g.get_ntype_id(ntype)]
_pop_subframe_storage(subframe, frame)
for etype in subg.canonical_etypes:
if etype not in g.canonical_etypes:
continue
subframe = subg._edge_frames[subg.get_etype_id(etype)]
frame = g._edge_frames[g.get_etype_id(etype)]
_pop_subframe_storage(subframe, frame)
def _pop_blocks_storage(blocks, g):
for block in blocks:
for ntype in block.srctypes:
if ntype not in g.ntypes:
continue
subframe = block._node_frames[block.get_ntype_id_from_src(ntype)]
frame = g._node_frames[g.get_ntype_id(ntype)]
_pop_subframe_storage(subframe, frame)
for ntype in block.dsttypes:
if ntype not in g.ntypes:
continue
subframe = block._node_frames[block.get_ntype_id_from_dst(ntype)]
frame = g._node_frames[g.get_ntype_id(ntype)]
_pop_subframe_storage(subframe, frame)
for etype in block.canonical_etypes:
if etype not in g.canonical_etypes:
continue
subframe = block._edge_frames[block.get_etype_id(etype)]
frame = g._edge_frames[g.get_etype_id(etype)]
_pop_subframe_storage(subframe, frame)
def _restore_subframe_storage(subframe, frame):
for key, col in subframe._columns.items():
if col.storage is None:
col.storage = frame._columns[key].storage
def _restore_subgraph_storage(subg, g):
for ntype in subg.ntypes:
if ntype not in g.ntypes:
continue
subframe = subg._node_frames[subg.get_ntype_id(ntype)]
frame = g._node_frames[g.get_ntype_id(ntype)]
_restore_subframe_storage(subframe, frame)
for etype in subg.canonical_etypes:
if etype not in g.canonical_etypes:
continue
subframe = subg._edge_frames[subg.get_etype_id(etype)]
frame = g._edge_frames[g.get_etype_id(etype)]
_restore_subframe_storage(subframe, frame)
def _restore_blocks_storage(blocks, g):
for block in blocks:
for ntype in block.srctypes:
if ntype not in g.ntypes:
continue
subframe = block._node_frames[block.get_ntype_id_from_src(ntype)]
frame = g._node_frames[g.get_ntype_id(ntype)]
_restore_subframe_storage(subframe, frame)
for ntype in block.dsttypes:
if ntype not in g.ntypes:
continue
subframe = block._node_frames[block.get_ntype_id_from_dst(ntype)]
frame = g._node_frames[g.get_ntype_id(ntype)]
_restore_subframe_storage(subframe, frame)
for etype in block.canonical_etypes:
if etype not in g.canonical_etypes:
continue
subframe = block._edge_frames[block.get_etype_id(etype)]
frame = g._edge_frames[g.get_etype_id(etype)]
_restore_subframe_storage(subframe, frame)
class _NodeCollator(NodeCollator):
def collate(self, items):
input_nodes, output_nodes, blocks = super().collate(items)
_pop_blocks_storage(blocks, self.g)
return input_nodes, output_nodes, blocks
class _EdgeCollator(EdgeCollator):
def collate(self, items):
if self.negative_sampler is None:
input_nodes, pair_graph, blocks = super().collate(items)
_pop_subgraph_storage(pair_graph, self.g)
_pop_blocks_storage(blocks, self.g_sampling)
return input_nodes, pair_graph, blocks
else:
input_nodes, pair_graph, neg_pair_graph, blocks = super().collate(items)
_pop_subgraph_storage(pair_graph, self.g)
_pop_subgraph_storage(neg_pair_graph, self.g)
_pop_blocks_storage(blocks, self.g_sampling)
return input_nodes, pair_graph, neg_pair_graph, blocks
class _NodeDataLoaderIter:
def __init__(self, node_dataloader):
self.node_dataloader = node_dataloader
self.iter_ = iter(node_dataloader.dataloader)
def __next__(self):
input_nodes, output_nodes, blocks = next(self.iter_)
_restore_blocks_storage(blocks, self.node_dataloader.collator.g)
return input_nodes, output_nodes, blocks
class _EdgeDataLoaderIter:
def __init__(self, edge_dataloader):
self.edge_dataloader = edge_dataloader
self.iter_ = iter(edge_dataloader.dataloader)
def __next__(self):
if self.edge_dataloader.collator.negative_sampler is None:
input_nodes, pair_graph, blocks = next(self.iter_)
_restore_subgraph_storage(pair_graph, self.edge_dataloader.collator.g)
_restore_blocks_storage(blocks, self.edge_dataloader.collator.g_sampling)
return input_nodes, pair_graph, blocks
else:
input_nodes, pair_graph, neg_pair_graph, blocks = next(self.iter_)
_restore_subgraph_storage(pair_graph, self.edge_dataloader.collator.g)
_restore_subgraph_storage(neg_pair_graph, self.edge_dataloader.collator.g)
_restore_blocks_storage(blocks, self.edge_dataloader.collator.g_sampling)
return input_nodes, pair_graph, neg_pair_graph, blocks
class NodeDataLoader:
"""PyTorch dataloader for batch-iterating over a set of nodes, generating the list
of blocks as computation dependency of the said minibatch.
......@@ -51,34 +202,36 @@ class NodeDataLoader:
collator_kwargs[k] = v
else:
dataloader_kwargs[k] = v
self.collator = NodeCollator(g, nids, block_sampler, **collator_kwargs)
if isinstance(g, DistGraph):
# Distributed DataLoader currently does not support heterogeneous graphs
# and does not copy features. Fallback to normal solution
self.collator = NodeCollator(g, nids, block_sampler, **collator_kwargs)
_remove_kwargs_dist(dataloader_kwargs)
self.dataloader = DistDataLoader(self.collator.dataset,
collate_fn=self.collator.collate,
**dataloader_kwargs)
self.is_distributed = True
else:
self.collator = _NodeCollator(g, nids, block_sampler, **collator_kwargs)
self.dataloader = DataLoader(self.collator.dataset,
collate_fn=self.collator.collate,
**dataloader_kwargs)
def __next__(self):
"""Return the next element of the data loader.
Only works when the data loader is created from :class:`dgl.distributed.DistGraph`.
"""
return next(self.dataloader)
self.is_distributed = False
def __iter__(self):
"""Return the iterator of the data loader."""
return iter(self.dataloader)
if self.is_distributed:
# Directly use the iterator of DistDataLoader, which doesn't copy features anyway.
return iter(self.dataloader)
else:
return _NodeDataLoaderIter(self)
def __len__(self):
"""Return the number of batches of the data loader."""
return len(self.dataloader)
class EdgeDataLoader(DataLoader):
class EdgeDataLoader:
"""PyTorch dataloader for batch-iterating over a set of edges, generating the list
of blocks as computation dependency of the said minibatch for edge classification,
edge regression, and link prediction.
......@@ -228,10 +381,18 @@ class EdgeDataLoader(DataLoader):
collator_kwargs[k] = v
else:
dataloader_kwargs[k] = v
self.collator = EdgeCollator(g, eids, block_sampler, **collator_kwargs)
self.collator = _EdgeCollator(g, eids, block_sampler, **collator_kwargs)
assert not isinstance(g, DistGraph), \
'EdgeDataLoader does not support DistGraph for now. ' \
+ 'Please use DistDataLoader directly.'
super().__init__(
self.dataloader = DataLoader(
self.collator.dataset, collate_fn=self.collator.collate, **dataloader_kwargs)
def __iter__(self):
"""Return the iterator of the data loader."""
return _EdgeDataLoaderIter(self)
def __len__(self):
"""Return the number of batches of the data loader."""
return len(self.dataloader)
......@@ -77,6 +77,11 @@ class Column(object):
tensor of this column when the index tensor is not None.
This typically happens when the column is extracted from another
column using the `subcolumn` method.
It can also be None, which may only happen when transmitting a
not-yet-materialized subcolumn from a subprocess to the main process.
In this case, the main process should already maintain the content of
the storage, and is responsible for restoring the subcolumn's storage pointer.
data : Tensor
The actual data tensor of this column.
scheme : Scheme
......@@ -225,7 +230,7 @@ class Column(object):
The operation triggers index selection.
"""
return Column(F.clone(self.data), self.scheme)
return Column(F.clone(self.data), copy.deepcopy(self.scheme))
def subcolumn(self, rowids):
"""Return a subcolumn.
......@@ -261,6 +266,14 @@ class Column(object):
def __repr__(self):
return repr(self.data)
def __getstate__(self):
if self.storage is not None:
_ = self.data # evaluate feature slicing
return self.__dict__
def __copy__(self):
return self.clone()
class Frame(MutableMapping):
"""The columnar storage for node/edge features.
......
......@@ -165,6 +165,43 @@ def test_pickling_batched_heterograph():
new_bg = _reconstruct_pickle(bg)
test_utils.check_graph_equal(bg, new_bg)
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU edge_subgraph w/ relabeling not implemented")
def test_pickling_subgraph():
f1 = io.BytesIO()
f2 = io.BytesIO()
g = dgl.rand_graph(10000, 100000)
g.ndata['x'] = F.randn((10000, 4))
g.edata['x'] = F.randn((100000, 5))
pickle.dump(g, f1)
sg = g.subgraph([0, 1])
sgx = sg.ndata['x'] # materialize
pickle.dump(sg, f2)
# TODO(BarclayII): How should I test that the size of the subgraph pickle file should not
# be as large as the size of the original pickle file?
assert f1.tell() > f2.tell() * 50
f2.seek(0)
f2.truncate()
sgx = sg.edata['x'] # materialize
pickle.dump(sg, f2)
assert f1.tell() > f2.tell() * 50
f2.seek(0)
f2.truncate()
sg = g.edge_subgraph([0])
sgx = sg.edata['x'] # materialize
pickle.dump(sg, f2)
assert f1.tell() > f2.tell() * 50
f2.seek(0)
f2.truncate()
sgx = sg.ndata['x'] # materialize
pickle.dump(sg, f2)
assert f1.tell() > f2.tell() * 50
f1.close()
f2.close()
if __name__ == '__main__':
test_pickling_index()
test_pickling_graph_index()
......
......@@ -35,12 +35,19 @@ def _check_neighbor_sampling_dataloader(g, nids, dl, mode):
uu, vv = block.all_edges(order='eid', etype=canonical_etype)
src = block.srcnodes[utype].data[dgl.NID]
dst = block.dstnodes[vtype].data[dgl.NID]
assert F.array_equal(
block.srcnodes[utype].data['feat'], g.nodes[utype].data['feat'][src])
assert F.array_equal(
block.dstnodes[vtype].data['feat'], g.nodes[vtype].data['feat'][dst])
if prev_dst[utype] is not None:
assert F.array_equal(src, prev_dst[utype])
u = src[uu]
v = dst[vv]
assert F.asnumpy(g.has_edges_between(u, v, etype=canonical_etype)).all()
eid = block.edges[canonical_etype].data[dgl.EID]
assert F.array_equal(
block.edges[canonical_etype].data['feat'],
g.edges[canonical_etype].data['feat'][eid])
ufound, vfound = g.find_edges(eid, etype=canonical_etype)
assert F.array_equal(ufound, u)
assert F.array_equal(vfound, v)
......@@ -75,6 +82,8 @@ def test_neighbor_sampler_dataloader():
g = dgl.heterograph({('user', 'follow', 'user'): ([0, 0, 0, 1, 1], [1, 2, 3, 3, 4])},
{'user': 6}).long()
g = dgl.to_bidirected(g)
g.ndata['feat'] = F.randn((6, 8))
g.edata['feat'] = F.randn((10, 4))
reverse_eids = F.tensor([5, 6, 7, 8, 9, 0, 1, 2, 3, 4], dtype=F.int64)
g_sampler1 = dgl.dataloading.MultiLayerNeighborSampler([2, 2], return_eids=True)
g_sampler2 = dgl.dataloading.MultiLayerFullNeighborSampler(2, return_eids=True)
......@@ -85,6 +94,10 @@ def test_neighbor_sampler_dataloader():
('user', 'play', 'game'): ([0, 1, 1, 3, 5], [0, 1, 2, 0, 2]),
('game', 'played-by', 'user'): ([0, 1, 2, 0, 2], [0, 1, 1, 3, 5])
}).long()
for ntype in hg.ntypes:
hg.nodes[ntype].data['feat'] = F.randn((hg.number_of_nodes(ntype), 8))
for etype in hg.canonical_etypes:
hg.edges[etype].data['feat'] = F.randn((hg.number_of_edges(etype), 4))
hg_sampler1 = dgl.dataloading.MultiLayerNeighborSampler(
[{'play': 1, 'played-by': 1, 'follow': 2, 'followed-by': 1}] * 2, return_eids=True)
hg_sampler2 = dgl.dataloading.MultiLayerFullNeighborSampler(2, return_eids=True)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment