"git@developer.sourcefind.cn:renzhc/diffusers_dcu.git" did not exist on "50e66f2f9560dec6a6e0fa2d50cdb14f1ae56232"
Unverified Commit dc8a0279 authored by czkkkkkk's avatar czkkkkkk Committed by GitHub
Browse files

[Graphbolt] Enable CSCSamplingGraph copy to/load from shared memory and add...

[Graphbolt] Enable CSCSamplingGraph copy to/load from shared memory and add unittests in Python (#5742)
parent a93e5578
......@@ -194,6 +194,24 @@ class CSCSamplingGraph:
), "Nodes cannot have duplicate values."
return self._c_csc_graph.in_subgraph(nodes)
def copy_to_shared_memory(self, shared_memory_name: str):
"""Copy the graph to shared memory.
Parameters
----------
shared_memory_name : str
Name of the shared memory.
Returns
-------
CSCSamplingGraph
The copied CSCSamplingGraph object on shared memory.
"""
return CSCSamplingGraph(
self._c_csc_graph.copy_to_shared_memory(shared_memory_name),
self._metadata,
)
def from_csc(
csc_indptr: torch.Tensor,
......@@ -251,6 +269,28 @@ def from_csc(
)
def load_from_shared_memory(
shared_memory_name: str,
metadata: Optional[GraphMetadata] = None,
) -> CSCSamplingGraph:
"""Load a CSCSamplingGraph object from shared memory.
Parameters
----------
shared_memory_name : str
Name of the shared memory.
Returns
-------
CSCSamplingGraph
The loaded CSCSamplingGraph object on shared memory.
"""
return CSCSamplingGraph(
torch.ops.graphbolt.load_from_shared_memory(shared_memory_name),
metadata,
)
def _csc_sampling_graph_str(graph: CSCSamplingGraph) -> str:
"""Internal function for converting a csc sampling graph to string
representation.
......
......@@ -383,3 +383,119 @@ def test_in_subgraph_heterogeneous():
assert torch.equal(
in_subgraph.type_per_edge, torch.LongTensor([2, 2, 1, 3, 1, 3, 3])
)
def check_tensors_on_the_same_shared_memory(t1: torch.Tensor, t2: torch.Tensor):
"""Check if two tensors are on the same shared memory.
This function copies a random tensor value to `t1` and checks whether `t2`
holds the same random value and checks whether t2 is a distinct tensor
object from `t1`. Their equality confirms that they are separate tensors
that rely on the shared memory for their tensor value.
"""
assert t1.data_ptr() != t2.data_ptr()
old_t1 = t1.clone()
v = torch.randint_like(t1, 100)
t1[:] = v
assert torch.equal(t1, t2)
t1[:] = old_t1
@unittest.skipIf(
F._default_context_str == "gpu",
reason="CSCSamplingGraph is only supported on CPU.",
)
@pytest.mark.parametrize(
"num_nodes, num_edges", [(1, 1), (100, 1), (10, 50), (1000, 50000)]
)
def test_homo_graph_on_shared_memory(num_nodes, num_edges):
csc_indptr, indices = random_homo_graph(num_nodes, num_edges)
graph = gb.from_csc(csc_indptr, indices)
shm_name = "test_homo_g"
graph1 = graph.copy_to_shared_memory(shm_name)
graph2 = gb.load_from_shared_memory(shm_name, graph.metadata)
assert graph1.num_nodes == num_nodes
assert graph1.num_nodes == num_nodes
assert graph2.num_edges == num_edges
assert graph2.num_edges == num_edges
# Test the value of graph1 is correct
assert torch.equal(graph1.csc_indptr, csc_indptr)
assert torch.equal(graph1.indices, indices)
# Test the value of graph2 is correct
assert torch.equal(graph2.csc_indptr, csc_indptr)
assert torch.equal(graph2.indices, indices)
# Test the memory of graph1 and graph2 is on shared memory
check_tensors_on_the_same_shared_memory(
graph1.csc_indptr, graph2.csc_indptr
)
check_tensors_on_the_same_shared_memory(graph1.indices, graph2.indices)
assert graph1.metadata is None and graph2.metadata is None
assert graph1.node_type_offset is None and graph2.node_type_offset is None
assert graph1.type_per_edge is None and graph2.type_per_edge is None
@unittest.skipIf(
F._default_context_str == "gpu",
reason="CSCSamplingGraph is only supported on CPU.",
)
@pytest.mark.parametrize(
"num_nodes, num_edges", [(1, 1), (100, 1), (10, 50), (1000, 50000)]
)
@pytest.mark.parametrize("num_ntypes, num_etypes", [(1, 1), (3, 5), (100, 1)])
def test_hetero_graph_on_shared_memory(
num_nodes, num_edges, num_ntypes, num_etypes
):
(
csc_indptr,
indices,
node_type_offset,
type_per_edge,
metadata,
) = random_hetero_graph(num_nodes, num_edges, num_ntypes, num_etypes)
graph = gb.from_csc(
csc_indptr, indices, node_type_offset, type_per_edge, metadata
)
shm_name = "test_hetero_g"
graph1 = graph.copy_to_shared_memory(shm_name)
graph2 = gb.load_from_shared_memory(shm_name, graph.metadata)
assert graph1.num_nodes == num_nodes
assert graph1.num_nodes == num_nodes
assert graph2.num_edges == num_edges
assert graph2.num_edges == num_edges
# Test the value of graph1 is correct
assert torch.equal(graph1.csc_indptr, csc_indptr)
assert torch.equal(graph1.indices, indices)
assert torch.equal(graph1.node_type_offset, node_type_offset)
assert torch.equal(graph1.type_per_edge, type_per_edge)
# Test the value of graph2 is correct
assert torch.equal(graph2.csc_indptr, csc_indptr)
assert torch.equal(graph2.indices, indices)
assert torch.equal(graph2.node_type_offset, node_type_offset)
assert torch.equal(graph2.type_per_edge, type_per_edge)
# Test the memory of graph1 and graph2 is on shared memory
check_tensors_on_the_same_shared_memory(
graph1.csc_indptr, graph2.csc_indptr
)
check_tensors_on_the_same_shared_memory(graph1.indices, graph2.indices)
check_tensors_on_the_same_shared_memory(
graph1.node_type_offset, graph2.node_type_offset
)
check_tensors_on_the_same_shared_memory(
graph1.type_per_edge, graph2.type_per_edge
)
assert metadata.node_type_to_id == graph1.metadata.node_type_to_id
assert metadata.edge_type_to_id == graph1.metadata.edge_type_to_id
assert metadata.node_type_to_id == graph2.metadata.node_type_to_id
assert metadata.edge_type_to_id == graph2.metadata.edge_type_to_id
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment