"git@developer.sourcefind.cn:OpenDAS/dgl.git" did not exist on "290b7c25c25eb3f10372c5423d6f12da1031955e"
Commit 314a75f3 authored by Minjie Wang's avatar Minjie Wang
Browse files

WIP: batched graph

parent 79f62400
......@@ -13,6 +13,8 @@ typedef tvm::runtime::NDArray DegreeArray;
typedef tvm::runtime::NDArray BoolArray;
class Graph;
class GraphOp;
struct Subgraph;
/*!
* \brief Base dgl graph class.
......@@ -246,7 +248,7 @@ class Graph {
* \param vids The vertices in the subgraph.
* \return the induced subgraph
*/
Graph Subgraph(IdArray vids) const;
Subgraph VertexSubgraph(IdArray vids) const;
/*!
* \brief Construct the induced edge subgraph of the given edges.
......@@ -264,7 +266,7 @@ class Graph {
* \param vids The edges in the subgraph.
* \return the induced edge subgraph
*/
Graph EdgeSubgraph(IdArray src, IdArray dst) const;
Subgraph EdgeSubgraph(IdArray src, IdArray dst) const;
/*!
* \brief Return a new graph with all the edges reversed.
......@@ -275,24 +277,8 @@ class Graph {
*/
Graph Reverse() const;
// TODO
std::vector<Graph> Split(std::vector<IdArray> vids_array) const;
/*!
* \brief Merge several graphs into one graph.
*
* The new graph will include all the nodes/edges in the given graphs.
* Nodes/Edges will be relabled by adding the cumsum of the previous graph sizes
* in the given sequence order. For example, giving input [g1, g2, g3], where
* they have 5, 6, 7 nodes respectively. Then node#2 of g2 will become node#7
* in the result graph. Edge ids are re-assigned similarly.
*
* \param graphs A list of input graphs to be merged.
* \return the merged graph
*/
static Graph Merge(std::vector<const Graph*> graphs);
private:
protected:
friend class GraphOp;
/*! \brief Internal edge list type */
struct EdgeList {
/*! \brief successor vertex list */
......@@ -318,6 +304,22 @@ class Graph {
uint64_t num_edges_ = 0;
};
/*! \brief Subgraph data structure */
struct Subgraph {
/*! \brief The graph. */
Graph graph;
/*!
* \brief The induced vertex ids.
* \note This is also a map from the new vertex id to the vertex id in the parent graph.
*/
IdArray induced_vertices;
/*!
* \brief The induced edge ids.
* \note This is also a map from the new edge id to the edge id in the parent graph.
*/
IdArray induced_edges;
};
} // namespace dgl
#endif // DGL_DGLGRAPH_H_
// Graph operations
#ifndef DGL_GRAPH_OP_H_
#define DGL_GRAPH_OP_H_
#include "graph.h"
namespace dgl {
class GraphOp {
public:
/*!
* \brief Return a disjoint union of the input graphs.
*
* The new graph will include all the nodes/edges in the given graphs.
* Nodes/Edges will be relabled by adding the cumsum of the previous graph sizes
* in the given sequence order. For example, giving input [g1, g2, g3], where
* they have 5, 6, 7 nodes respectively. Then node#2 of g2 will become node#7
* in the result graph. Edge ids are re-assigned similarly.
*
* \param graphs A list of input graphs to be unioned.
* \return the disjoint union of the graphs
*/
static Graph DisjointUnion(std::vector<const Graph*> graphs);
/*!
* \brief Partition the graph into several subgraphs.
*
* The graph will be partitioned by the node ids. Edges between partitions
* will be ignored. This requires the given number of partitions to evenly
* divides the number of nodes in the graph.
*
* \param num The number of partitions.
* \return a list of partitioned graphs
*/
static std::vector<Graph> PartitionByNum(const Graph* graph, size_t num);
};
} // namespace dgl
#endif // DGL_GRAPH_OP_H_
......@@ -3,57 +3,88 @@ from __future__ import absolute_import
import numpy as np
from .base import ALL, is_all
from .frame import FrameRef
from .graph import DGLGraph
from . import graph_index as gi
from . import backend as F
class BatchedDGLGraph(DGLGraph):
def __init__(self, graph_list, node_attrs=None, edge_attrs=None, **attr):
super(BatchedDGLGraph, self).__init__(**attr)
self.graph_list = graph_list
self.graph_idx = {}
for idx, g in enumerate(self.graph_list):
self.graph_idx[g] = idx
self.num_nodes = [len(g) for g in self.graph_list]
self.num_edges = [g.size() for g in self.graph_list]
# calc index offset
self.node_offset = np.cumsum([0] + self.num_nodes)
self.edge_offset = np.cumsum([0] + self.num_edges)
# in-order add relabeled nodes
self.add_nodes_from(range(self.node_offset[-1]))
# in-order add relabeled edges
self.new_edge_list = [np.array(g.edge_list) + offset
for g, offset in zip(self.graph_list, self.node_offset[:-1])]
self.new_edges = np.concatenate(self.new_edge_list)
self.add_edges_from(self.new_edges)
assert self.size() == self.edge_offset[-1]
# set new node attr
if node_attrs:
attrs = {}
for key in node_attrs:
vals = [g.pop_n_repr(key) for g in self.graph_list]
attrs[key] = F.pack(vals)
self.set_n_repr(attrs)
else:
for g in self.graph_list:
self._node_frame.append(g._node_frame)
# set new edge attr
if edge_attrs:
attrs = {}
for key in edge_attrs:
vals = [g.pop_e_repr(key) for g in self.graph_list]
attrs[key] = F.pack(vals)
self.set_e_repr(attrs)
else:
for g in self.graph_list:
self._edge_frame.append(g._edge_frame)
"""The batched DGL graph.
The batched graph is read-only.
Parameters
----------
graph_list : iterable
A list of DGLGraphs to be batched.
node_attrs : str or iterable
The node attributes to also be batched.
edge_attrs : str or iterable, optional
The edge attributes to also be batched.
"""
def __init__(self, graph_list, node_attrs, edge_attrs):
# TODO(minjie): handle the input is again a batched graph.
# create batched graph index
batched_index = gi.disjoint_union([g._graph for g in graph_list])
# create batched node and edge frames
# NOTE: following code will materialize the columns of the input graphs.
batched_node_frame = FrameRef()
for gr in graph_list:
cols = {gr._node_frame[key] for key in node_attrs}
batched_node_frame.append(cols)
batched_edge_frame = FrameRef()
for gr in graph_list:
cols = {gr._edge_frame[key] for key in edge_attrs}
batched_edge_frame.append(cols)
super(BatchedDGLGraph, self).__init__(
graph_data=batched_index,
node_frame=batched_node_frame,
edge_frame=batched_edge_frame)
# extra members
self._batch_size = len(graph_list)
self._batch_num_nodes = [gr.number_of_nodes() for gr in graph_list]
self._batch_num_edges = [gr.number_of_edges() for gr in graph_list]
@property
def batch_size(self):
"""Number of graphs in this batch."""
return self._batch_size
@property
def batch_num_nodes(self):
"""Number of nodes of each graph in this batch."""
return self._batch_num_nodes
@property
def batch_num_edges(self):
"""Number of edges of each graph in this batch."""
return self._batch_num_edges
# override APIs
def add_nodes(self, num, reprs=None):
"""Add nodes."""
raise RuntimeError('Readonly graph. Mutation is not allowed.')
def add_edge(self, u, v, reprs=None):
"""Add one edge."""
raise RuntimeError('Readonly graph. Mutation is not allowed.')
def add_edges(self, u, v, reprs=None):
"""Add many edges."""
raise RuntimeError('Readonly graph. Mutation is not allowed.')
# new APIs
def __getitem__(self, idx):
"""Slice the batch and return the batch of graphs specified by the idx."""
pass
def __setitem__(self, idx, val):
"""Set the value of the slice. The graph size cannot be changed."""
pass
'''
def query_new_node(self, g, u):
idx = self.graph_idx[g]
offset = self.node_offset[idx]
......@@ -76,7 +107,12 @@ class BatchedDGLGraph(DGLGraph):
def query_edge_start_offset(self):
return self.edge_offset[:-1].copy()
'''
def split(graph_batch, num_or_size_splits):
"""Split the batch."""
# TODO(minjie): could follow torch.split syntax
pass
def unbatch(graph_batch):
"""Unbatch the graph and return a list of subgraphs.
......@@ -86,6 +122,7 @@ def unbatch(graph_batch):
graph_batch : DGLGraph
The batched graph.
"""
assert False, "disabled for now"
graph_list = graph_batch.graph_list
num_graphs = len(graph_list)
# split and set node attrs
......@@ -108,29 +145,36 @@ def unbatch(graph_batch):
return graph_list
# FIXME (lingfan): Do we really need the batch API?
# Can't we let user call BatchedDGLGraph(graph_list) directly
# and make unbatch a member function of BatchedDGLGraph
def batch(graph_list, node_attrs=None, edge_attrs=None):
def batch(graph_list, node_attrs=ALL, edge_attrs=ALL):
"""Batch a list of DGLGraphs into one single graph.
Once batch is called, the structure of both merged graph and graphs in graph_list
must not bbe mutated, or unbatch's behavior will be undefined.
must not be mutated, or unbatch's behavior will be undefined.
Parameters
----------
graph_list : iterable
A list of DGLGraphs to be batched.
node_attrs : str or iterable
A list of node attributes needed for merged graph
It's user's resposiblity to make sure node_attrs exists
edge_attrs : str or iterable
A list of edge attributes needed for merged graph
It's user's resposiblity to make sure edge_attrs exists
Return
------
newgrh: DGLGraph
one single merged graph
node_attrs : str or iterable, optional
The node attributes to also be batched. Specify None to not batch any attributes.
edge_attrs : str or iterable, optional
The edge attributes to also be batched. Specify None to not batch any attributes.
Returns
-------
newgrh: BatchedDGLGraph
one single batched graph
"""
if node_attrs is None:
node_attrs = []
elif is_all(node_attrs):
node_attrs = graph_list[0].node_attr_schemes()
elif if isinstance(node_attrs, str):
node_attrs = [node_attrs]
if edge_attrs is None:
edge_attrs = []
elif is_all(edge_attrs):
edge_attrs = graph_list[0].edge_attr_schemes()
elif if isinstance(edge_attrs, str):
edge_attrs = [edge_attrs]
return BatchedDGLGraph(graph_list, node_attrs, edge_attrs)
......@@ -367,6 +367,27 @@ class GraphIndex(object):
v_array = v.todgltensor()
return utils.toindex(_CAPI_DGLGraphOutDegrees(self._handle, v_array))
def node_subgraph(self, v):
"""Return the induced node subgraph.
Parameters
----------
v : utils.Index
The nodes.
Returns
-------
GraphIndex
The subgraph index.
utils.Index
The induced edge ids. This is also a map from new edge id to parent edge id.
"""
v_array = v.todgltensor()
rst = _CAPI_DGLGraphVertexSubgraph(self._handle, v_array)
gi = GraphIndex(rst(0))
induced_edges = utils.toindex(rst(2))
return gi, induced_edges
def adjacency_matrix(self):
"""Return the adjacency matrix representation of this graph.
......@@ -438,30 +459,29 @@ class GraphIndex(object):
dst = utils.toindex(dst)
self.add_edges(src, dst)
@staticmethod
def merge(graphs):
"""Merge a list of graphs into one graph.
def disjoint_union(graphs):
"""Return a disjoint union of the input graphs.
The new graph will include all the nodes/edges in the given graphs.
Nodes/Edges will be relabled by adding the cumsum of the previous graph sizes
in the given sequence order. For example, giving input [g1, g2, g3], where
they have 5, 6, 7 nodes respectively. Then node#2 of g2 will become node#7
in the result graph. Edge ids are re-assigned similarly.
The new graph will include all the nodes/edges in the given graphs.
Nodes/Edges will be relabled by adding the cumsum of the previous graph sizes
in the given sequence order. For example, giving input [g1, g2, g3], where
they have 5, 6, 7 nodes respectively. Then node#2 of g2 will become node#7
in the result graph. Edge ids are re-assigned similarly.
Parameters
----------
graphs : iterable of GraphIndex
The input graphs
Parameters
----------
graphs : iterable of GraphIndex
The input graphs
Returns
-------
GraphIndex
The merged graph
"""
inputs = c_array(GraphIndexHandle, [gr._handle for gr in graphs])
inputs = ctypes.cast(inputs, ctypes.c_void_p)
handle = _CAPI_DGLGraphMerge(inputs, len(graphs))
return GraphIndex(handle)
Returns
-------
GraphIndex
The disjoint union
"""
inputs = c_array(GraphIndexHandle, [gr._handle for gr in graphs])
inputs = ctypes.cast(inputs, ctypes.c_void_p)
handle = _CAPI_DGLDisjointUnion(inputs, len(graphs))
return GraphIndex(handle)
def create_graph_index(graph_data=None):
"""Create a graph index object.
......@@ -471,6 +491,8 @@ def create_graph_index(graph_data=None):
graph_data : graph data, optional
Data to initialize graph. Same as networkx's semantics.
"""
if isinstance(graph_data, GraphIndex):
return graph_data
handle = _CAPI_DGLGraphCreate()
gi = GraphIndex(handle)
if graph_data is not None:
......
// Graph class implementation
#include <algorithm>
#include <unordered_map>
#include <dgl/graph.h>
namespace dgl {
......@@ -345,14 +346,38 @@ DegreeArray Graph::OutDegrees(IdArray vids) const {
return rst;
}
Graph Graph::Subgraph(IdArray vids) const {
LOG(FATAL) << "not implemented";
return *this;
Subgraph Graph::VertexSubgraph(IdArray vids) const {
CHECK(IsValidIdArray(vids)) << "Invalid vertex id array.";
const auto len = vids->shape[0];
std::unordered_map<dgl_id_t, dgl_id_t> oldv2newv;
std::vector<dgl_id_t> edges;
const int64_t* vid_data = static_cast<int64_t*>(vids->data);
for (int64_t i = 0; i < len; ++i) {
oldv2newv[vid_data[i]] = i;
}
Subgraph rst;
rst.induced_vertices = vids;
rst.graph.AddVertices(len);
for (int64_t i = 0; i < len; ++i) {
const dgl_id_t oldvid = vid_data[i];
const dgl_id_t newvid = i;
for (size_t j = 0; j < adjlist_[oldvid].succ.size(); ++j) {
const dgl_id_t oldsucc = adjlist_[oldvid].succ[j];
if (oldv2newv.count(oldsucc)) {
const dgl_id_t newsucc = oldv2newv[oldsucc];
edges.push_back(adjlist_[oldvid].edge_id[j]);
rst.graph.AddEdge(newvid, newsucc);
}
}
}
rst.induced_edges = IdArray::Empty({static_cast<int64_t>(edges.size())}, vids->dtype, vids->ctx);
std::copy(edges.begin(), edges.end(), static_cast<int64_t*>(rst.induced_edges->data));
return rst;
}
Graph Graph::EdgeSubgraph(IdArray src, IdArray dst) const {
Subgraph Graph::EdgeSubgraph(IdArray src, IdArray dst) const {
LOG(FATAL) << "not implemented";
return *this;
return Subgraph();
}
Graph Graph::Reverse() const {
......@@ -360,17 +385,4 @@ Graph Graph::Reverse() const {
return *this;
}
Graph Graph::Merge(std::vector<const Graph*> graphs) {
Graph rst;
uint64_t cumsum = 0;
for (const Graph* gr : graphs) {
rst.AddVertices(gr->NumVertices());
for (uint64_t i = 0; i < gr->NumEdges(); ++i) {
rst.AddEdge(gr->all_edges_src_[i] + cumsum, gr->all_edges_dst_[i] + cumsum);
}
cumsum += gr->NumVertices();
}
return rst;
}
} // namespace dgl
#include <dgl/runtime/packed_func.h>
#include <dgl/runtime/registry.h>
#include <dgl/graph.h>
#include <dgl/graph_op.h>
using tvm::runtime::TVMArgs;
using tvm::runtime::TVMArgValue;
......@@ -8,17 +9,41 @@ using tvm::runtime::TVMRetValue;
using tvm::runtime::PackedFunc;
namespace dgl {
// Graph handler type
typedef void* GraphHandle;
namespace {
/*!\brief Convert EdgeArray structure to PackedFunc */
// Convert EdgeArray structure to PackedFunc.
PackedFunc ConvertEdgeArrayToPackedFunc(const Graph::EdgeArray& ea) {
auto body = [ea] (TVMArgs args, TVMRetValue* rv) {
int which = args[0];
if (which == 0) {
*rv = ea.src;
*rv = std::move(ea.src);
} else if (which == 1) {
*rv = std::move(ea.dst);
} else if (which == 2) {
*rv = std::move(ea.id);
} else {
LOG(FATAL) << "invalid choice";
}
};
return PackedFunc(body);
}
// Convert Subgraph structure to PackedFunc.
PackedFunc ConvertSubgraphToPackedFunc(const Subgraph& sg) {
auto body = [sg] (TVMArgs args, TVMRetValue* rv) {
int which = args[0];
if (which == 0) {
Graph* gptr = new Graph();
*gptr = std::move(sg.graph);
GraphHandle ghandle = gptr;
*rv = ghandle;
} else if (which == 1) {
*rv = ea.dst;
*rv = std::move(sg.induced_vertices);
} else if (which == 2) {
*rv = ea.id;
*rv = std::move(sg.induced_edges);
} else {
LOG(FATAL) << "invalid choice";
}
......@@ -26,6 +51,7 @@ PackedFunc ConvertEdgeArrayToPackedFunc(const Graph::EdgeArray& ea) {
return PackedFunc(body);
}
// Convert the given DLTensor to a temporary DLManagedTensor that does not own memory.
DLManagedTensor* CreateTmpDLManagedTensor(const TVMArgValue& arg) {
const DLTensor* dl_tensor = arg;
DLManagedTensor* ret = new DLManagedTensor();
......@@ -37,9 +63,6 @@ DLManagedTensor* CreateTmpDLManagedTensor(const TVMArgValue& arg) {
} // namespace
// Graph handler type
typedef void* GraphHandle;
TVM_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphCreate")
.set_body([] (TVMArgs args, TVMRetValue* rv) {
GraphHandle ghandle = new Graph();
......@@ -242,7 +265,15 @@ TVM_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphOutDegrees")
*rv = gptr->OutDegrees(vids);
});
TVM_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphMerge")
TVM_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphVertexSubgraph")
.set_body([] (TVMArgs args, TVMRetValue* rv) {
GraphHandle ghandle = args[0];
const Graph* gptr = static_cast<Graph*>(ghandle);
const IdArray vids = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[1]));
*rv = ConvertSubgraphToPackedFunc(gptr->VertexSubgraph(vids));
});
TVM_REGISTER_GLOBAL("graph_index._CAPI_DGLDisjointUnion")
.set_body([] (TVMArgs args, TVMRetValue* rv) {
void* list = args[0];
GraphHandle* inhandles = static_cast<GraphHandle*>(list);
......@@ -253,7 +284,7 @@ TVM_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphMerge")
graphs.push_back(gr);
}
Graph* gptr = new Graph();
*gptr = Graph::Merge(std::move(graphs));
*gptr = GraphOp::DisjointUnion(std::move(graphs));
GraphHandle ghandle = gptr;
*rv = ghandle;
});
......
// Graph operation implementation
#include <dgl/graph_op.h>
namespace dgl {
Graph GraphOp::DisjointUnion(std::vector<const Graph*> graphs) {
Graph rst;
uint64_t cumsum = 0;
for (const Graph* gr : graphs) {
rst.AddVertices(gr->NumVertices());
for (uint64_t i = 0; i < gr->NumEdges(); ++i) {
rst.AddEdge(gr->all_edges_src_[i] + cumsum, gr->all_edges_dst_[i] + cumsum);
}
cumsum += gr->NumVertices();
}
return rst;
}
} // namespace dgl
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment