"...text-generation-inference.git" did not exist on "762dbf3f198a9fbe7edffd60edc909e60e66878a"
Unverified Commit 40950629 authored by Qidong Su's avatar Qidong Su Committed by GitHub
Browse files

[Feature] Shared memory utilities (#1807)



* update

* update

* update

* update

* fix

* update

* fix

* update

* update

* win32

* update

* fix

* update

* update

* update

* updat

* update

* update

* fix

* update

* update

* update

* update

* update

* fix

* TODO

* 111

* fix

* minor fix

* minor fix

* fox

* Update shared_mem_manager.cc

* update

* update

* update

* update metis

* update metis

* update
Co-authored-by: default avatarVoVAllen <jz1749@nyu.edu>
Co-authored-by: default avatarJinjing Zhou <VoVAllen@users.noreply.github.com>
parent 1c1bb772
......@@ -12,6 +12,7 @@
#include <vector>
#include <utility>
#include <tuple>
#include <string>
#include "./types.h"
#include "./array_ops.h"
#include "./spmat.h"
......
......@@ -10,6 +10,7 @@
#include <dmlc/serializer.h>
#include <vector>
#include <tuple>
#include <string>
#include "./types.h"
#include "./array_ops.h"
#include "./spmat.h"
......
......@@ -11,7 +11,6 @@
namespace dgl {
namespace runtime {
#ifndef _WIN32
/*
* \brief This class owns shared memory.
*
......@@ -79,7 +78,6 @@ class SharedMemory {
*/
static bool Exist(const std::string &name);
};
#endif // _WIN32
} // namespace runtime
} // namespace dgl
......
......@@ -15,6 +15,7 @@ __all__ = [
'graph',
'bipartite',
'hetero_from_relations',
'hetero_from_shared_memory',
'heterograph',
'to_hetero',
'to_homo',
......@@ -414,6 +415,24 @@ def hetero_from_relations(rel_graphs, num_nodes_per_type=None):
retg._edge_frames[i].update(rgrh._edge_frames[0])
return retg
def hetero_from_shared_memory(name):
"""Create a heterograph from shared memory with the given name.
The newly created graph will have the same node types and edge types as the original graph.
But it does not have node features or edges features.
Paramaters
----------
name : str
The name of the share memory
Returns
-------
HeteroGraph (in shared memory)
"""
g, ntypes, etypes = heterograph_index.create_heterograph_from_shared_memory(name)
return DGLHeteroGraph(g, ntypes, etypes)
def heterograph(data_dict,
num_nodes_dict=None,
validate=True,
......
......@@ -4827,6 +4827,34 @@ class DGLHeteroGraph(object):
ret._graph = self._graph.asbits(bits)
return ret
# TODO: Formats should not be specified, just saving all the materialized formats
def shared_memory(self, name, formats=('coo', 'csr', 'csc')):
"""Return a copy of this graph in shared memory, without node data or edge data.
It moves the graph index to shared memory and returns a DGLHeterograph object which
has the same graph structure, node types and edge types but does not contain node data
or edge data.
Parameters
----------
name : str
The name of the shared memory.
formats : list of str (optional)
Desired formats to be materialized.
Returns
-------
HeteroGraph
The graph in shared memory
"""
assert len(name) > 0, "The name of shared memory cannot be empty"
assert len(formats) > 0
for fmt in formats:
assert fmt in ("coo", "csr", "csc")
gidx = self._graph.shared_memory(name, self.ntypes, self.etypes, formats)
return DGLHeteroGraph(gidx, self.ntypes, self.etypes)
def long(self):
"""Cast this graph to use int64 IDs.
......
......@@ -233,6 +233,33 @@ class HeteroGraphIndex(ObjectBase):
"""
return _CAPI_DGLHeteroCopyTo(self, ctx.device_type, ctx.device_id)
def shared_memory(self, name, ntypes=None, etypes=None, formats=('coo', 'csr', 'csc')):
"""Return a copy of this graph in shared memory
Parameters
----------
name : str
The name of the shared memory.
ntypes : list of str
Name of node types
etypes : list of str
Name of edge types
format : list of str
Desired formats to be materialized.
Returns
-------
HeteroGraphIndex
The graph index in shared memory
"""
assert len(name) > 0, "The name of shared memory cannot be empty"
assert len(formats) > 0
for fmt in formats:
assert fmt in ("coo", "csr", "csc")
ntypes = [] if ntypes is None else ntypes
etypes = [] if etypes is None else etypes
return _CAPI_DGLHeteroCopyToSharedMem(self, name, ntypes, etypes, formats)
def is_multigraph(self):
"""Return whether the graph is a multigraph
The time cost will be O(E)
......@@ -1026,6 +1053,25 @@ def create_heterograph_from_relations(metagraph, rel_graphs, num_nodes_per_type)
return _CAPI_DGLHeteroCreateHeteroGraphWithNumNodes(
metagraph, rel_graphs, num_nodes_per_type.todgltensor())
def create_heterograph_from_shared_memory(name):
"""Create a heterograph from shared memory with the given name.
Paramaters
----------
name : str
The name of the share memory
Returns
-------
HeteroGraphIndex (in shared memory)
ntypes : list of str
Names of node types
etypes : list of str
Names of edge types
"""
g, ntypes, etypes = _CAPI_DGLHeteroCreateFromSharedMem(name)
return g, list(ntypes), list(etypes)
def joint_union(metagraph, gidx_list):
"""Return a joint union of the input heterographs.
......
......@@ -14,7 +14,7 @@ import numpy as _np
from ._ffi.object import register_object, ObjectBase
from ._ffi.function import _init_api
from ._ffi.ndarray import DGLContext, DGLType, NDArrayBase
from ._ffi.ndarray import context, empty, from_dlpack, numpyasarray
from ._ffi.ndarray import context, empty, empty_shared_mem, from_dlpack, numpyasarray
from ._ffi.ndarray import _set_class_ndarray
from . import backend as F
......@@ -23,6 +23,20 @@ class NDArray(NDArrayBase):
def __len__(self):
return functools.reduce(operator.mul, self.shape, 1)
def shared_memory(self, name):
"""Return a copy of the ndarray in shared memory
Parameters
----------
name : str
The name of the shared memory
Returns
-------
NDArray
"""
return empty_shared_mem(name, True, self.shape, self.dtype).copyfrom(self)
def cpu(dev_id=0):
"""Construct a CPU device
......
......@@ -7,6 +7,8 @@
#include <dgl/array.h>
#include <dgl/immutable_graph.h>
#include <dgl/graph_serializer.h>
#include <dmlc/memory_io.h>
#include <memory>
#include <vector>
#include <tuple>
#include <utility>
......@@ -262,6 +264,112 @@ HeteroGraphPtr HeteroGraph::CopyTo(HeteroGraphPtr g, const DLContext& ctx) {
hgindex->num_verts_per_type_));
}
std::string HeteroGraph::SharedMemName() const {
return shared_mem_ ? shared_mem_->GetName() : "";
}
HeteroGraphPtr HeteroGraph::CopyToSharedMem(
HeteroGraphPtr g, const std::string& name, const std::vector<std::string>& ntypes,
const std::vector<std::string>& etypes, const std::set<std::string>& fmts) {
// TODO(JJ): Raise error when calling shared_memory if graph index is on gpu
auto hg = std::dynamic_pointer_cast<HeteroGraph>(g);
CHECK_NOTNULL(hg);
if (hg->SharedMemName() == name)
return g;
// Copy buffer to share memory
auto mem = std::make_shared<SharedMemory>(name);
auto mem_buf = mem->CreateNew(SHARED_MEM_METAINFO_SIZE_MAX);
dmlc::MemoryFixedSizeStream strm(mem_buf, SHARED_MEM_METAINFO_SIZE_MAX);
SharedMemManager shm(name, &strm);
bool has_coo = fmts.find("coo") != fmts.end();
bool has_csr = fmts.find("csr") != fmts.end();
bool has_csc = fmts.find("csc") != fmts.end();
shm.Write(g->NumBits());
shm.Write(has_coo);
shm.Write(has_csr);
shm.Write(has_csc);
shm.Write(ImmutableGraph::ToImmutable(hg->meta_graph_));
shm.Write(hg->num_verts_per_type_);
std::vector<HeteroGraphPtr> relgraphs(g->NumEdgeTypes());
for (dgl_type_t etype = 0 ; etype < g->NumEdgeTypes() ; ++etype) {
aten::COOMatrix coo;
aten::CSRMatrix csr, csc;
std::string prefix = name + "_" + std::to_string(etype);
if (has_coo) {
coo = shm.CopyToSharedMem(hg->GetCOOMatrix(etype), prefix + "_coo");
}
if (has_csr) {
csr = shm.CopyToSharedMem(hg->GetCSRMatrix(etype), prefix + "_csr");
}
if (has_csc) {
csc = shm.CopyToSharedMem(hg->GetCSCMatrix(etype), prefix + "_csc");
}
relgraphs[etype] = UnitGraph::CreateHomographFrom(csc, csr, coo, has_csc, has_csr, has_coo);
}
auto ret = std::shared_ptr<HeteroGraph>(
new HeteroGraph(hg->meta_graph_, relgraphs, hg->num_verts_per_type_));
ret->shared_mem_ = mem;
shm.Write(ntypes);
shm.Write(etypes);
return ret;
}
std::tuple<HeteroGraphPtr, std::vector<std::string>, std::vector<std::string>>
HeteroGraph::CreateFromSharedMem(const std::string &name) {
auto mem = std::make_shared<SharedMemory>(name);
auto mem_buf = mem->Open(SHARED_MEM_METAINFO_SIZE_MAX);
dmlc::MemoryFixedSizeStream strm(mem_buf, SHARED_MEM_METAINFO_SIZE_MAX);
SharedMemManager shm(name, &strm);
uint8_t nbits;
CHECK(shm.Read(&nbits)) << "invalid nbits (unit8_t)";
bool has_coo, has_csr, has_csc;
CHECK(shm.Read(&has_coo)) << "invalid nbits (unit8_t)";
CHECK(shm.Read(&has_csr)) << "invalid csr (unit8_t)";
CHECK(shm.Read(&has_csc)) << "invalid csc (unit8_t)";
auto meta_imgraph = Serializer::make_shared<ImmutableGraph>();
CHECK(shm.Read(&meta_imgraph)) << "Invalid meta graph";
GraphPtr metagraph = meta_imgraph;
std::vector<int64_t> num_verts_per_type;
CHECK(shm.Read(&num_verts_per_type)) << "Invalid number of vertices per type";
std::vector<HeteroGraphPtr> relgraphs(metagraph->NumEdges());
for (dgl_type_t etype = 0 ; etype < metagraph->NumEdges() ; ++etype) {
aten::COOMatrix coo;
aten::CSRMatrix csr, csc;
std::string prefix = name + "_" + std::to_string(etype);
if (has_coo) {
shm.CreateFromSharedMem(&coo, prefix + "_coo");
}
if (has_csr) {
shm.CreateFromSharedMem(&csr, prefix + "_csr");
}
if (has_csc) {
shm.CreateFromSharedMem(&csc, prefix + "_csc");
}
relgraphs[etype] = UnitGraph::CreateHomographFrom(csc, csr, coo, has_csc, has_csr, has_coo);
}
auto ret = std::make_shared<HeteroGraph>(metagraph, relgraphs, num_verts_per_type);
ret->shared_mem_ = mem;
std::vector<std::string> ntypes;
std::vector<std::string> etypes;
CHECK(shm.Read(&ntypes)) << "invalid ntypes";
CHECK(shm.Read(&etypes)) << "invalid etypes";
return std::make_tuple(ret, ntypes, etypes);
}
HeteroGraphPtr HeteroGraph::GetGraphInFormat(dgl_format_code_t formats) const {
std::vector<HeteroGraphPtr> format_rels(NumEdgeTypes());
for (dgl_type_t etype = 0; etype < NumEdgeTypes(); ++etype) {
......
......@@ -7,12 +7,16 @@
#ifndef DGL_GRAPH_HETEROGRAPH_H_
#define DGL_GRAPH_HETEROGRAPH_H_
#include <dgl/runtime/shared_mem.h>
#include <dgl/base_heterograph.h>
#include <dgl/lazy.h>
#include <utility>
#include <string>
#include <vector>
#include <set>
#include <tuple>
#include "./unit_graph.h"
#include "shared_mem_manager.h"
namespace dgl {
......@@ -222,6 +226,19 @@ class HeteroGraph : public BaseHeteroGraph {
/*! \brief Copy the data to another context */
static HeteroGraphPtr CopyTo(HeteroGraphPtr g, const DLContext& ctx);
/*! \brief Copy the data to shared memory.
*
* Also save names of node types and edge types of the HeteroGraph object to shared memory
*/
static HeteroGraphPtr CopyToSharedMem(
HeteroGraphPtr g, const std::string& name, const std::vector<std::string>& ntypes,
const std::vector<std::string>& etypes, const std::set<std::string>& fmts);
/*! \brief Create a heterograph from
* \return the HeteroGraphPtr, names of node types, names of edge types
*/
static std::tuple<HeteroGraphPtr, std::vector<std::string>, std::vector<std::string>>
CreateFromSharedMem(const std::string &name);
/*! \brief Creat a LineGraph of self */
HeteroGraphPtr LineGraph(bool backtracking) const;
......@@ -243,6 +260,12 @@ class HeteroGraph : public BaseHeteroGraph {
/*! \brief A map from vert type to the number of verts in the type */
std::vector<int64_t> num_verts_per_type_;
/*! \brief The shared memory object for meta info*/
std::shared_ptr<runtime::SharedMemory> shared_mem_;
/*! \brief The name of the shared memory. Return empty string if it is not in shared memory. */
std::string SharedMemName() const;
/*! \brief template class for Flatten operation
*
* \tparam IdType Graph's index data type, can be int32_t or int64_t
......
......@@ -7,6 +7,7 @@
#include <dgl/packed_func_ext.h>
#include <dgl/immutable_graph.h>
#include <dgl/runtime/container.h>
#include <set>
#include "../c_api_common.h"
#include "./heterograph.h"
......@@ -456,6 +457,45 @@ DGL_REGISTER_GLOBAL("heterograph_index._CAPI_DGLHeteroCopyTo")
*rv = HeteroGraphRef(hg_new);
});
DGL_REGISTER_GLOBAL("heterograph_index._CAPI_DGLHeteroCopyToSharedMem")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
HeteroGraphRef hg = args[0];
std::string name = args[1];
List<Value> ntypes = args[2];
List<Value> etypes = args[3];
List<Value> fmts = args[4];
auto ntypes_vec = ListValueToVector<std::string>(ntypes);
auto etypes_vec = ListValueToVector<std::string>(etypes);
std::set<std::string> fmts_set;
for (const auto &fmt : fmts) {
std::string fmt_data = fmt->data;
fmts_set.insert(fmt_data);
}
auto hg_share = HeteroGraph::CopyToSharedMem(
hg.sptr(), name, ntypes_vec, etypes_vec, fmts_set);
*rv = HeteroGraphRef(hg_share);
});
DGL_REGISTER_GLOBAL("heterograph_index._CAPI_DGLHeteroCreateFromSharedMem")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
std::string name = args[0];
HeteroGraphPtr hg;
std::vector<std::string> ntypes;
std::vector<std::string> etypes;
std::tie(hg, ntypes, etypes) = HeteroGraph::CreateFromSharedMem(name);
List<Value> ntypes_list;
List<Value> etypes_list;
for (const auto &ntype : ntypes)
ntypes_list.push_back(Value(MakeValue(ntype)));
for (const auto &etype : etypes)
etypes_list.push_back(Value(MakeValue(etype)));
List<ObjectRef> ret;
ret.push_back(HeteroGraphRef(hg));
ret.push_back(ntypes_list);
ret.push_back(etypes_list);
*rv = ret;
});
DGL_REGISTER_GLOBAL("heterograph_index._CAPI_DGLHeteroJointUnion")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphRef meta_graph = args[0];
......
/*!
* Copyright (c) 2018 by Contributors
* \file graph/shared_mem_manager.cc
* \brief DGL sampler implementation
*/
#include "shared_mem_manager.h"
#include <dgl/array.h>
#include <dgl/base_heterograph.h>
#include <dgl/immutable_graph.h>
#include <dgl/packed_func_ext.h>
#include <dgl/random.h>
#include <dgl/runtime/container.h>
#include <dgl/sampler.h>
#include <dmlc/io.h>
#include <dmlc/memory_io.h>
#include <algorithm>
#include <array>
#include <cmath>
#include <cstdlib>
#include <numeric>
#include <vector>
#include "../c_api_common.h"
#include "heterograph.h"
using namespace dgl::runtime;
using namespace dgl::aten;
namespace dgl {
template <>
NDArray SharedMemManager::CopyToSharedMem<NDArray>(const NDArray &data,
std::string name) {
DLContext ctx = {kDLCPU, 0};
std::vector<int64_t> shape(data->shape, data->shape + data->ndim);
strm_->Write(data->ndim);
strm_->Write(data->dtype);
int ndim = data->ndim;
strm_->WriteArray(data->shape, ndim);
bool is_null = IsNullArray(data);
strm_->Write(is_null);
if (is_null) {
return data;
} else {
auto nd =
NDArray::EmptyShared(graph_name_ + name, shape, data->dtype, ctx, true);
nd.CopyFrom(data);
return nd;
}
}
template <>
CSRMatrix SharedMemManager::CopyToSharedMem<CSRMatrix>(const CSRMatrix &csr,
std::string name) {
auto indptr_shared_mem = CopyToSharedMem(csr.indptr, name + "_indptr");
auto indices_shared_mem = CopyToSharedMem(csr.indices, name + "_indices");
auto data_shared_mem = CopyToSharedMem(csr.data, name + "_data");
strm_->Write(csr.num_rows);
strm_->Write(csr.num_cols);
strm_->Write(csr.sorted);
return CSRMatrix(csr.num_rows, csr.num_cols, indptr_shared_mem,
indices_shared_mem, data_shared_mem, csr.sorted);
}
template <>
COOMatrix SharedMemManager::CopyToSharedMem<COOMatrix>(const COOMatrix &coo,
std::string name) {
auto row_shared_mem = CopyToSharedMem(coo.row, name + "_row");
auto col_shared_mem = CopyToSharedMem(coo.col, name + "_col");
auto data_shared_mem = CopyToSharedMem(coo.data, name + "_data");
strm_->Write(coo.num_rows);
strm_->Write(coo.num_cols);
strm_->Write(coo.row_sorted);
strm_->Write(coo.col_sorted);
return COOMatrix(coo.num_rows, coo.num_cols, row_shared_mem, col_shared_mem,
data_shared_mem, coo.row_sorted, coo.col_sorted);
}
template <>
bool SharedMemManager::CreateFromSharedMem<NDArray>(NDArray *nd,
std::string name) {
int ndim;
DLContext ctx = {kDLCPU, 0};
DLDataType dtype;
CHECK(this->Read(&ndim)) << "Invalid DLTensor file format";
CHECK(this->Read(&dtype)) << "Invalid DLTensor file format";
std::vector<int64_t> shape(ndim);
if (ndim != 0) {
CHECK(this->ReadArray(&shape[0], ndim)) << "Invalid DLTensor file format";
}
bool is_null;
this->Read(&is_null);
if (is_null) {
*nd = NDArray::Empty(shape, dtype, ctx);
} else {
*nd =
NDArray::EmptyShared(graph_name_ + name, shape, dtype, ctx, false);
}
return true;
}
template <>
bool SharedMemManager::CreateFromSharedMem<COOMatrix>(COOMatrix *coo,
std::string name) {
CreateFromSharedMem(&coo->row, name + "_row");
CreateFromSharedMem(&coo->col, name + "_col");
CreateFromSharedMem(&coo->data, name + "_data");
strm_->Read(&coo->num_rows);
strm_->Read(&coo->num_cols);
strm_->Read(&coo->row_sorted);
strm_->Read(&coo->col_sorted);
return true;
}
template <>
bool SharedMemManager::CreateFromSharedMem<CSRMatrix>(CSRMatrix *csr,
std::string name) {
CreateFromSharedMem(&csr->indices, name + "_indices");
CreateFromSharedMem(&csr->indptr, name + "_indptr");
CreateFromSharedMem(&csr->data, name + "_data");
strm_->Read(&csr->num_rows);
strm_->Read(&csr->num_cols);
strm_->Read(&csr->sorted);
return true;
}
} // namespace dgl
/*!
* Copyright (c) 2018 by Contributors
* \file graph/shared_mem_manager.cc
* \brief DGL shared mem manager APIs
*/
#ifndef DGL_GRAPH_SHARED_MEM_MANAGER_H_
#define DGL_GRAPH_SHARED_MEM_MANAGER_H_
#include <dgl/array.h>
#include <dmlc/io.h>
#include <dmlc/memory_io.h>
#include <algorithm>
#include <array>
#include <cmath>
#include <cstdlib>
#include <memory>
#include <numeric>
#include <string>
namespace dgl {
using dgl::runtime::SharedMemory;
const size_t SHARED_MEM_METAINFO_SIZE_MAX = 1024 * 32;
// Utility class to copy objects to shared memory and record metadatas
class SharedMemManager : public dmlc::Stream {
public:
explicit SharedMemManager(std::string graph_name, dmlc::Stream* strm)
: graph_name_(graph_name),
strm_(strm) {}
template <typename T>
T CopyToSharedMem(const T& data, std::string name);
template <typename T>
bool CreateFromSharedMem(T* out_data, std::string name);
// delegate methods to strm_
virtual size_t Read(void* ptr, size_t size) { return strm_->Read(ptr, size); }
virtual void Write(const void* ptr, size_t size) { strm_->Write(ptr, size); }
using dmlc::Stream::Read;
using dmlc::Stream::Write;
private:
std::string graph_name_;
dmlc::Stream* strm_;
};
} // namespace dgl
#endif // DGL_GRAPH_SHARED_MEM_MANAGER_H_
......@@ -16,25 +16,33 @@
namespace dgl {
namespace runtime {
#ifndef _WIN32
SharedMemory::SharedMemory(const std::string &name) {
#ifndef _WIN32
this->name = name;
this->own = false;
this->fd = -1;
this->ptr = nullptr;
this->size = 0;
#else
LOG(FATAL) << "Shared memory is not supported on Windows.";
#endif // _WIN32
}
SharedMemory::~SharedMemory() {
#ifndef _WIN32
munmap(ptr, size);
close(fd);
if (own) {
LOG(INFO) << "remove " << name << " for shared memory";
shm_unlink(name.c_str());
}
#else
LOG(FATAL) << "Shared memory is not supported on Windows.";
#endif // _WIN32
}
void *SharedMemory::CreateNew(size_t size) {
#ifndef _WIN32
this->own = true;
int flag = O_RDWR|O_CREAT;
......@@ -47,9 +55,13 @@ void *SharedMemory::CreateNew(size_t size) {
CHECK_NE(ptr, MAP_FAILED)
<< "Failed to map shared memory. mmap failed with error " << strerror(errno);
return ptr;
#else
LOG(FATAL) << "Shared memory is not supported on Windows.";
#endif // _WIN32
}
void *SharedMemory::Open(size_t size) {
#ifndef _WIN32
int flag = O_RDWR;
fd = shm_open(name.c_str(), flag, S_IRUSR | S_IWUSR);
CHECK_NE(fd, -1) << "fail to open " << name << ": " << strerror(errno);
......@@ -57,9 +69,13 @@ void *SharedMemory::Open(size_t size) {
CHECK_NE(ptr, MAP_FAILED)
<< "Failed to map shared memory. mmap failed with error " << strerror(errno);
return ptr;
#else
LOG(FATAL) << "Shared memory is not supported on Windows.";
#endif // _WIN32
}
bool SharedMemory::Exist(const std::string &name) {
#ifndef _WIN32
int fd = shm_open(name.c_str(), O_RDONLY, S_IRUSR | S_IWUSR);
if (fd >= 0) {
close(fd);
......@@ -67,8 +83,10 @@ bool SharedMemory::Exist(const std::string &name) {
} else {
return false;
}
}
#else
LOG(FATAL) << "Shared memory is not supported on Windows.";
#endif // _WIN32
}
} // namespace runtime
} // namespace dgl
import networkx as nx
import scipy.sparse as ssp
import dgl
import dgl.contrib as contrib
from dgl.frame import Frame, FrameRef, Column
from dgl.graph_index import create_graph_index
from dgl.utils import toindex
import backend as F
import dgl.function as fn
import pickle
import io
import unittest
from utils import parametrize_dtype
import multiprocessing as mp
import os
def create_test_graph(idtype):
plays_spmat = ssp.coo_matrix(([1, 1, 1, 1], ([0, 1, 2, 1], [0, 0, 1, 1])))
wishes_nx = nx.DiGraph()
wishes_nx.add_nodes_from(['u0', 'u1', 'u2'], bipartite=0)
wishes_nx.add_nodes_from(['g0', 'g1'], bipartite=1)
wishes_nx.add_edge('u0', 'g1', id=0)
wishes_nx.add_edge('u2', 'g0', id=1)
follows_g = dgl.graph([(0, 1), (1, 2)], 'user', 'follows', idtype=idtype)
plays_g = dgl.bipartite(plays_spmat, 'user', 'plays', 'game', idtype=idtype)
wishes_g = dgl.bipartite(wishes_nx, 'user', 'wishes', 'game', idtype=idtype)
develops_g = dgl.bipartite([(0, 0), (1, 1)], 'developer', 'develops', 'game', idtype=idtype)
g = dgl.hetero_from_relations([follows_g, plays_g, wishes_g, develops_g])
return g
def _assert_is_identical_hetero(g, g2):
assert g.is_readonly == g2.is_readonly
assert g.ntypes == g2.ntypes
assert g.canonical_etypes == g2.canonical_etypes
# check if two metagraphs are identical
for edges, features in g.metagraph.edges(keys=True).items():
assert g2.metagraph.edges(keys=True)[edges] == features
# check if node ID spaces and feature spaces are equal
for ntype in g.ntypes:
assert g.number_of_nodes(ntype) == g2.number_of_nodes(ntype)
# check if edge ID spaces and feature spaces are equal
for etype in g.canonical_etypes:
src, dst = g.all_edges(etype=etype, order='eid')
src2, dst2 = g2.all_edges(etype=etype, order='eid')
assert F.array_equal(src, src2)
assert F.array_equal(dst, dst2)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@parametrize_dtype
def test_single_process(idtype):
hg = create_test_graph(idtype=idtype)
hg_share = hg.shared_memory("hg")
hg_rebuild = dgl.hetero_from_shared_memory('hg')
hg_save_again = hg_rebuild.shared_memory("hg")
_assert_is_identical_hetero(hg, hg_share)
_assert_is_identical_hetero(hg, hg_rebuild)
_assert_is_identical_hetero(hg, hg_save_again)
def sub_proc(hg_origin, name):
hg_rebuild = dgl.hetero_from_shared_memory(name)
hg_save_again = hg_rebuild.shared_memory(name)
_assert_is_identical_hetero(hg_origin, hg_rebuild)
_assert_is_identical_hetero(hg_origin, hg_save_again)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@parametrize_dtype
def test_multi_process(idtype):
hg = create_test_graph(idtype=idtype)
hg_share = hg.shared_memory("hg1")
p = mp.Process(target=sub_proc, args=(hg, "hg1"))
p.start()
p.join()
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(F._default_context_str == 'cpu', reason="Need gpu for this test")
def test_copy_from_gpu():
hg = create_test_graph(idtype=F.int32)
hg_gpu = hg.to(F.cuda())
hg_share = hg_gpu.shared_memory("hg_gpu")
p = mp.Process(target=sub_proc, args=(hg, "hg_gpu"))
p.start()
p.join()
# TODO: Test calling shared_memory with Blocks (a subclass of HeteroGraph)
if __name__ == "__main__":
test_single_process(F.int64)
test_multi_process(F.int32)
test_copy_from_gpu()
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment