"examples/mxnet/sampling/dis_sampling/README.md" did not exist on "9b4fb2fb32ec564a8d0636aa2d97a7accd83fb33"
Unverified Commit 56ffb650 authored by peizhou001's avatar peizhou001 Committed by GitHub
Browse files

[API Deprecation]Deprecate contrib module (#5114)

parent 436de3d1
import os
import sys
import time
import scipy
from xmlrpc.server import SimpleXMLRPCServer
import xmlrpc.client
import numpy as np
from functools import partial
from collections.abc import MutableMapping
from ..base import ALL, is_all, DGLError, dgl_warning
from .. import backend as F
from .._deprecate.graph import DGLGraph
from .. import utils
from ..graph_index import GraphIndex, create_graph_index, from_shared_mem_graph_index
from .._ffi.ndarray import empty_shared_mem
from .._ffi.function import _init_api
from .. import ndarray as nd
from ..init import zero_initializer
def _get_ndata_path(graph_name, ndata_name):
return "/" + graph_name + "_node_" + ndata_name
def _get_edata_path(graph_name, edata_name):
return "/" + graph_name + "_edge_" + edata_name
def _get_graph_path(graph_name):
return "/" + graph_name
dtype_dict = F.data_type_dict
dtype_dict = {dtype_dict[key]:key for key in dtype_dict}
def _move_data_to_shared_mem_array(arr, name):
dlpack = F.zerocopy_to_dlpack(arr)
dgl_tensor = nd.from_dlpack(dlpack)
new_arr = empty_shared_mem(name, True, F.shape(arr), dtype_dict[F.dtype(arr)])
dgl_tensor.copyto(new_arr)
dlpack = new_arr.to_dlpack()
return F.zerocopy_from_dlpack(dlpack)
class NodeDataView(MutableMapping):
"""The data view class when G.nodes[...].data is called.
See Also
--------
dgl.DGLGraph.nodes
"""
__slots__ = ['_graph', '_nodes', '_graph_name']
def __init__(self, graph, nodes, graph_name):
self._graph = graph
self._nodes = nodes
self._graph_name = graph_name
def __getitem__(self, key):
return self._graph.get_n_repr(self._nodes)[key]
def __setitem__(self, key, val):
# Move the data in val to shared memory.
val = _move_data_to_shared_mem_array(val, _get_ndata_path(self._graph_name, key))
self._graph.set_n_repr({key : val}, self._nodes)
def __delitem__(self, key):
if not is_all(self._nodes):
raise DGLError('Delete feature data is not supported on only a subset'
' of nodes. Please use `del G.ndata[key]` instead.')
self._graph.pop_n_repr(key)
def __len__(self):
return len(self._graph._node_frame)
def __iter__(self):
return iter(self._graph._node_frame)
def __repr__(self):
data = self._graph.get_n_repr(self._nodes)
return repr({key : data[key] for key in self._graph._node_frame})
class EdgeDataView(MutableMapping):
"""The data view class when G.edges[...].data is called.
See Also
--------
dgl.DGLGraph.edges
"""
__slots__ = ['_graph', '_edges', '_graph_name']
def __init__(self, graph, edges, graph_name):
self._graph = graph
self._edges = edges
self._graph_name = graph_name
def __getitem__(self, key):
return self._graph.get_e_repr(self._edges)[key]
def __setitem__(self, key, val):
# Move the data in val to shared memory.
val = _move_data_to_shared_mem_array(val, _get_edata_path(self._graph_name, key))
self._graph.set_e_repr({key : val}, self._edges)
def __delitem__(self, key):
if not is_all(self._edges):
raise DGLError('Delete feature data is not supported on only a subset'
' of nodes. Please use `del G.edata[key]` instead.')
self._graph.pop_e_repr(key)
def __len__(self):
return len(self._graph._edge_frame)
def __iter__(self):
return iter(self._graph._edge_frame)
def __repr__(self):
data = self._graph.get_e_repr(self._edges)
return repr({key : data[key] for key in self._graph._edge_frame})
class Barrier(object):
""" A barrier in the KVStore server used for one synchronization.
All workers have to enter the barrier before any of them can proceed
with any further computation.
Parameters
----------
num_workers: int
The number of workers will enter the barrier.
"""
def __init__(self, num_workers):
self.num_enters = 0
self.num_leaves = 0
self.num_workers = num_workers
def enter(self):
""" A worker enters the barrier.
"""
self.num_enters += 1
def leave(self):
""" A worker notifies the server that it's going to leave the barrier.
"""
self.num_leaves += 1
def all_enter(self):
""" Indicate that all workers have entered the barrier.
"""
return self.num_enters == self.num_workers
def all_leave(self):
""" Indicate that all workers have left the barrier.
"""
return self.num_leaves == self.num_workers
class BarrierManager(object):
""" The manager of barriers
When a worker wants to enter a barrier, it creates the barrier if it doesn't
exist. Otherwise, the worker will enter an existing barrier.
The manager needs to know the number of workers in advance so that it can
keep track of barriers and workers.
Parameters
----------
num_workers: int
The number of workers that need to synchronize with barriers.
"""
def __init__(self, num_workers):
self.num_workers = num_workers
self.barrier_ids = [0] * num_workers
self.barriers = {}
def enter(self, worker_id):
""" A worker enters a barrier.
Parameters
----------
worker_id : int
The worker that wants to enter a barrier.
"""
bid = self.barrier_ids[worker_id]
self.barrier_ids[worker_id] += 1
if bid in self.barriers:
self.barriers[bid].enter()
else:
self.barriers.update({bid : Barrier(self.num_workers)})
self.barriers[bid].enter()
return bid
def all_enter(self, worker_id, barrier_id):
""" Indicate whether all workers have entered a specified barrier.
"""
return self.barriers[barrier_id].all_enter()
def leave(self, worker_id, barrier_id):
""" A worker leaves a barrier.
This is useful for garbage collection of used barriers.
"""
self.barriers[barrier_id].leave()
if self.barriers[barrier_id].all_leave():
del self.barriers[barrier_id]
def shared_mem_zero_initializer(shape, dtype, name): # pylint: disable=unused-argument
"""Zero feature initializer in shared memory
"""
data = empty_shared_mem(name, True, shape, dtype)
dlpack = data.to_dlpack()
arr = F.zerocopy_from_dlpack(dlpack)
arr[:] = 0
return arr
class InitializerManager(object):
"""Manage initializer.
We need to convert built-in frame initializer to strings
and send them to the graph store server through RPC.
Through the conversion, we need to convert local built-in initializer
to shared-memory initializer.
"""
# Map the built-in initializer functions to strings.
_fun2str = {
zero_initializer: 'zero',
}
# Map the strings to built-in initializer functions.
_str2fun = {
'zero': shared_mem_zero_initializer,
}
def serialize(self, init):
"""Convert the initializer function to string.
Parameters
----------
init : callable
the initializer function.
Returns
------
string
The name of the built-in initializer function.
"""
if init in self._fun2str:
return self._fun2str[init]
else:
raise Exception("Shared-memory graph store doesn't support user's initializer")
def deserialize(self, init):
"""Convert the string to the initializer function.
Parameters
----------
init : string
the name of the initializer function
Returns
-------
callable
The shared-memory initializer function.
"""
if init in self._str2fun:
return self._str2fun[init]
else:
raise Exception("Shared-memory graph store doesn't support initializer "
+ str(init))
class SharedMemoryStoreServer(object):
"""The graph store server.
The server loads graph structure and node embeddings and edge embeddings
and store them in shared memory. The loaded graph can be identified by
the graph name in the input argument.
DGL graph accepts graph data of multiple formats:
* NetworkX graph,
* scipy matrix,
* DGLGraph.
If the input graph data is DGLGraph, the constructed DGLGraph only contains
its graph index.
Parameters
----------
graph_data : graph data
Data to initialize graph.
graph_name : string
Define the name of the graph, so the client can use the name to access the graph.
multigraph : bool, optional
Deprecated (Will be deleted in the future).
Whether the graph would be a multigraph (default: True)
num_workers : int
The number of workers that will connect to the server.
port : int
The port that the server listens to.
"""
def __init__(self, graph_data, graph_name, multigraph, num_workers, port):
self.server = None
if multigraph is not None:
dgl_warning("multigraph will be deprecated." \
"DGL will treat all graphs as multigraph in the future.")
if isinstance(graph_data, GraphIndex):
graph_data = graph_data.copyto_shared_mem(_get_graph_path(graph_name))
elif isinstance(graph_data, DGLGraph):
graph_data = graph_data._graph.copyto_shared_mem(_get_graph_path(graph_name))
else:
graph_data = create_graph_index(graph_data, readonly=True)
graph_data = graph_data.copyto_shared_mem(_get_graph_path(graph_name))
self._graph = DGLGraph(graph_data, readonly=True)
self._num_workers = num_workers
self._graph_name = graph_name
self._registered_nworkers = 0
self._barrier = BarrierManager(num_workers)
self._init_manager = InitializerManager()
# RPC command: register a graph to the graph store server.
def register(graph_name):
if graph_name != self._graph_name:
print("graph store has %s, but the worker wants %s"
% (self._graph_name, graph_name))
return (-1, -1)
worker_id = self._registered_nworkers
self._registered_nworkers += 1
return worker_id, self._num_workers
# RPC command: get the graph information from the graph store server.
def get_graph_info(graph_name):
assert graph_name == self._graph_name
# if the integers are larger than 2^31, xmlrpc can't handle them.
# we convert them to strings to send them to clients.
return str(self._graph.number_of_nodes()), str(self._graph.number_of_edges())
# RPC command: initialize node embedding in the server.
def init_ndata(init, ndata_name, shape, dtype):
if ndata_name in self._graph.ndata:
ndata = self._graph.ndata[ndata_name]
assert np.all(tuple(F.shape(ndata)) == tuple(shape))
return 0
assert self._graph.number_of_nodes() == shape[0]
init = self._init_manager.deserialize(init)
data = init(shape, dtype, _get_ndata_path(graph_name, ndata_name))
self._graph.ndata[ndata_name] = data
F.sync()
return 0
# RPC command: initialize edge embedding in the server.
def init_edata(init, edata_name, shape, dtype):
if edata_name in self._graph.edata:
edata = self._graph.edata[edata_name]
assert np.all(tuple(F.shape(edata)) == tuple(shape))
return 0
assert self._graph.number_of_edges() == shape[0]
init = self._init_manager.deserialize(init)
data = init(shape, dtype, _get_edata_path(graph_name, edata_name))
F.sync()
self._graph.edata[edata_name] = data
return 0
# RPC command: get the names of all node embeddings.
def list_ndata():
ndata = self._graph.ndata
return [[key, tuple(F.shape(ndata[key])), dtype_dict[F.dtype(ndata[key])]] for key in ndata]
# RPC command: get the names of all edge embeddings.
def list_edata():
edata = self._graph.edata
return [[key, tuple(F.shape(edata[key])), dtype_dict[F.dtype(edata[key])]] for key in edata]
# RPC command: notify the server of the termination of the client.
def terminate():
self._num_workers -= 1
return 0
# RPC command: a worker enters a barrier.
def enter_barrier(worker_id):
return self._barrier.enter(worker_id)
# RPC command: a worker leaves a barrier.
def leave_barrier(worker_id, barrier_id):
self._barrier.leave(worker_id, barrier_id)
return 0
# RPC command: test if all workers have left a barrier.
def all_enter(worker_id, barrier_id):
return self._barrier.all_enter(worker_id, barrier_id)
self.server = SimpleXMLRPCServer(("127.0.0.1", port), logRequests=False)
self.server.register_function(register, "register")
self.server.register_function(get_graph_info, "get_graph_info")
self.server.register_function(init_ndata, "init_ndata")
self.server.register_function(init_edata, "init_edata")
self.server.register_function(terminate, "terminate")
self.server.register_function(list_ndata, "list_ndata")
self.server.register_function(list_edata, "list_edata")
self.server.register_function(enter_barrier, "enter_barrier")
self.server.register_function(leave_barrier, "leave_barrier")
self.server.register_function(all_enter, "all_enter")
def __del__(self):
if self.server is not None:
self.server.server_close()
self._graph = None
@property
def ndata(self):
"""Return the data view of all the nodes.
DGLGraph.ndata is an abbreviation of DGLGraph.nodes[:].data
See Also
--------
dgl.DGLGraph.nodes
"""
return NodeDataView(self._graph, ALL, self._graph_name)
@property
def edata(self):
"""Return the data view of all the edges.
DGLGraph.data is an abbreviation of DGLGraph.edges[:].data
See Also
--------
dgl.DGLGraph.edges
"""
return EdgeDataView(self._graph, ALL, self._graph_name)
def run(self):
"""Run the graph store server.
The server runs to process RPC requests from clients.
"""
while self._num_workers > 0:
self.server.handle_request()
self._graph = None
class BaseGraphStore(DGLGraph):
"""The base class of the graph store.
Shared-memory graph store and distributed graph store will be inherited from
this base class. The graph stores only support large read-only graphs. Thus, many of
DGLGraph APIs aren't supported.
Specially, the graph store doesn't support the following methods:
- ndata
- edata
- incidence_matrix
- line_graph
- reverse
"""
def __init__(self,
graph_data=None,
multigraph=None):
super(BaseGraphStore, self).__init__(graph_data, multigraph=multigraph, readonly=True)
@property
def ndata(self):
"""Return the data view of all the nodes.
DGLGraph.ndata is an abbreviation of DGLGraph.nodes[:].data
"""
raise Exception("Graph store doesn't support access data of all nodes.")
@property
def edata(self):
"""Return the data view of all the edges.
DGLGraph.data is an abbreviation of DGLGraph.edges[:].data
See Also
--------
dgl.DGLGraph.edges
"""
raise Exception("Graph store doesn't support access data of all edges.")
def incidence_matrix(self, typestr, ctx=F.cpu()):
"""Return the incidence matrix representation of this graph.
Parameters
----------
typestr : str
Can be either ``in``, ``out`` or ``both``
ctx : context, optional (default=cpu)
The context of returned incidence matrix.
Returns
-------
SparseTensor
The incidence matrix.
"""
raise Exception("Graph store doesn't support creating an incidence matrix.")
def line_graph(self, backtracking=True, shared=False):
"""Return the line graph of this graph.
See :func:`~dgl.transforms.line_graph`.
"""
raise Exception("Graph store doesn't support creating an line matrix.")
def reverse(self, share_ndata=False, share_edata=False):
"""Return the reverse of this graph.
See :func:`~dgl.transforms.reverse`.
"""
raise Exception("Graph store doesn't support reversing a matrix.")
class SharedMemoryDGLGraph(BaseGraphStore):
"""Shared-memory DGLGraph.
This is a client to access data in the shared-memory graph store that has loads
the graph structure and node embeddings and edge embeddings to shared memory.
It provides the DGLGraph interface.
Parameters
----------
graph_name : string
Define the name of the graph.
port : int
The port that the server listens to.
"""
def __init__(self, graph_name, port):
self._graph_name = graph_name
self._pid = os.getpid()
self.proxy = xmlrpc.client.ServerProxy("http://127.0.0.1:" + str(port) + "/")
self._worker_id, self._num_workers = self.proxy.register(graph_name)
if self._worker_id < 0:
raise Exception('fail to get graph ' + graph_name + ' from the graph store')
num_nodes, num_edges = self.proxy.get_graph_info(graph_name)
num_nodes, num_edges = int(num_nodes), int(num_edges)
graph_idx = from_shared_mem_graph_index(_get_graph_path(graph_name))
super(SharedMemoryDGLGraph, self).__init__(graph_idx)
self._init_manager = InitializerManager()
# map all ndata and edata from the server.
ndata_infos = self.proxy.list_ndata()
for name, shape, dtype in ndata_infos:
self._init_ndata(name, shape, dtype)
edata_infos = self.proxy.list_edata()
for name, shape, dtype in edata_infos:
self._init_edata(name, shape, dtype)
# Set the ndata and edata initializers.
# so that when a new node/edge embedding is created, it'll be created on the server as well.
# These two functions create initialized tensors on the server.
def node_initializer(init, name, shape, dtype, ctx):
init = self._init_manager.serialize(init)
dtype = dtype_dict[dtype]
self.proxy.init_ndata(init, name, tuple(shape), dtype)
data = empty_shared_mem(_get_ndata_path(self._graph_name, name),
False, shape, dtype)
dlpack = data.to_dlpack()
return F.zerocopy_from_dlpack(dlpack)
def edge_initializer(init, name, shape, dtype, ctx):
init = self._init_manager.serialize(init)
dtype = dtype_dict[dtype]
self.proxy.init_edata(init, name, tuple(shape), dtype)
data = empty_shared_mem(_get_edata_path(self._graph_name, name),
False, shape, dtype)
dlpack = data.to_dlpack()
return F.zerocopy_from_dlpack(dlpack)
self._node_frame.set_remote_init_builder(lambda init, name: partial(node_initializer, init, name))
self._edge_frame.set_remote_init_builder(lambda init, name: partial(edge_initializer, init, name))
self._msg_frame.set_remote_init_builder(lambda init, name: partial(edge_initializer, init, name))
def __del__(self):
if self.proxy is not None:
self.proxy.terminate()
def _init_ndata(self, ndata_name, shape, dtype):
assert self.number_of_nodes() == shape[0]
data = empty_shared_mem(_get_ndata_path(self._graph_name, ndata_name), False, shape, dtype)
dlpack = data.to_dlpack()
self.set_n_repr({ndata_name: F.zerocopy_from_dlpack(dlpack)})
def _init_edata(self, edata_name, shape, dtype):
assert self.number_of_edges() == shape[0]
data = empty_shared_mem(_get_edata_path(self._graph_name, edata_name), False, shape, dtype)
dlpack = data.to_dlpack()
self.set_e_repr({edata_name: F.zerocopy_from_dlpack(dlpack)})
@property
def num_workers(self):
""" The number of workers using the graph store.
"""
return self._num_workers
@property
def worker_id(self):
""" The id of the current worker using the graph store.
When a worker connects to a graph store, it is assigned with a worker id.
This is useful for the graph store server to identify who is sending
requests.
The worker id is a unique number between 0 and num_workers.
This is also useful for user's code. For example, user's code can
use this number to decide how to assign GPUs to workers in multi-processing
training.
"""
return self._worker_id
def _sync_barrier(self, timeout=None):
"""This is a sync barrier among all workers.
Parameters
----------
timeout: int
time out in seconds.
"""
# Before entering the barrier, we need to make sure all computation in the local
# process has completed.
F.sync()
# Here I manually implement multi-processing barrier with RPC.
# It uses busy wait with RPC. Whenever, all_enter is called, there is
# a context switch, so it doesn't burn CPUs so badly.
# if timeout isn't specified, we wait forever.
if timeout is None:
timeout = sys.maxsize
bid = self.proxy.enter_barrier(self._worker_id)
start = time.time()
while not self.proxy.all_enter(self._worker_id, bid) and time.time() - start < timeout:
continue
self.proxy.leave_barrier(self._worker_id, bid)
if time.time() - start >= timeout and not self.proxy.all_enter(self._worker_id, bid):
raise TimeoutError("leave the sync barrier because of timeout.")
def init_ndata(self, ndata_name, shape, dtype, ctx=F.cpu()):
"""Create node embedding.
It first creates the node embedding in the server and maps it to the current process
with shared memory.
Parameters
----------
ndata_name : string
The name of node embedding
shape : tuple
The shape of the node embedding
dtype : string
The data type of the node embedding. The currently supported data types
are "float32" and "int32".
ctx : DGLContext
The column context.
"""
if ctx != F.cpu():
raise Exception("graph store only supports CPU context for node data")
init = self._node_frame.get_initializer(ndata_name)
if init is None:
self._node_frame._frame._set_zero_default_initializer()
init = self._node_frame.get_initializer(ndata_name)
init = self._init_manager.serialize(init)
self.proxy.init_ndata(init, ndata_name, tuple(shape), dtype)
self._init_ndata(ndata_name, shape, dtype)
def init_edata(self, edata_name, shape, dtype, ctx=F.cpu()):
"""Create edge embedding.
It first creates the edge embedding in the server and maps it to the current process
with shared memory.
Parameters
----------
edata_name : string
The name of edge embedding
shape : tuple
The shape of the edge embedding
dtype : string
The data type of the edge embedding. The currently supported data types
are "float32" and "int32".
ctx : DGLContext
The column context.
"""
if ctx != F.cpu():
raise Exception("graph store only supports CPU context for edge data")
init = self._edge_frame.get_initializer(edata_name)
if init is None:
self._edge_frame._frame._set_zero_default_initializer()
init = self._edge_frame.get_initializer(edata_name)
init = self._init_manager.serialize(init)
self.proxy.init_edata(init, edata_name, tuple(shape), dtype)
self._init_edata(edata_name, shape, dtype)
def get_n_repr(self, u=ALL):
"""Get node(s) representation.
The returned feature tensor batches multiple node features on the first dimension.
Parameters
----------
u : node, container or tensor
The node(s).
Returns
-------
dict
Representation dict from feature name to feature tensor.
"""
if len(self.node_attr_schemes()) == 0:
return dict()
if is_all(u):
dgl_warning("It may not be safe to access node data of all nodes."
"It's recommended to node data of a subset of nodes directly.")
return dict(self._node_frame)
else:
u = utils.toindex(u)
return self._node_frame.select_rows(u)
def get_e_repr(self, edges=ALL):
"""Get edge(s) representation.
Parameters
----------
edges : edges
Edges can be a pair of endpoint nodes (u, v), or a
tensor of edge ids. The default value is all the edges.
Returns
-------
dict
Representation dict
"""
if is_all(edges):
dgl_warning("It may not be safe to access edge data of all edges."
"It's recommended to edge data of a subset of edges directly.")
return super(SharedMemoryDGLGraph, self).get_e_repr(edges)
def set_n_repr(self, data, u=ALL, inplace=True):
"""Set node(s) representation.
`data` is a dictionary from the feature name to feature tensor. Each tensor
is of shape (B, D1, D2, ...), where B is the number of nodes to be updated,
and (D1, D2, ...) be the shape of the node representation tensor. The
length of the given node ids must match B (i.e, len(u) == B).
In the graph store, all updates are written inplace.
Parameters
----------
data : dict of tensor
Node representation.
u : node, container or tensor
The node(s).
inplace : bool
The value is always True.
"""
super(BaseGraphStore, self).set_n_repr(data, u, inplace=True)
def set_e_repr(self, data, edges=ALL, inplace=True):
"""Set edge(s) representation.
`data` is a dictionary from the feature name to feature tensor. Each tensor
is of shape (B, D1, D2, ...), where B is the number of edges to be updated,
and (D1, D2, ...) be the shape of the edge representation tensor.
In the graph store, all updates are written inplace.
Parameters
----------
data : tensor or dict of tensor
Edge representation.
edges : edges
Edges can be a pair of endpoint nodes (u, v), or a
tensor of edge ids. The default value is all the edges.
inplace : bool
The value is always True.
"""
super(BaseGraphStore, self).set_e_repr(data, edges, inplace=True)
def apply_nodes(self, func="default", v=ALL, inplace=True):
"""Apply the function on the nodes to update their features.
If None is provided for ``func``, nothing will happen.
In the graph store, all updates are written inplace.
Parameters
----------
func : callable or None, optional
Apply function on the nodes. The function should be
a :mod:`Node UDF <dgl.udf>`.
v : int, iterable of int, tensor, optional
The node (ids) on which to apply ``func``. The default
value is all the nodes.
inplace : bool, optional
The value is always True.
"""
super(BaseGraphStore, self).apply_nodes(func, v, inplace=True)
def apply_edges(self, func="default", edges=ALL, inplace=True):
"""Apply the function on the edges to update their features.
If None is provided for ``func``, nothing will happen.
In the graph store, all updates are written inplace.
Parameters
----------
func : callable, optional
Apply function on the edge. The function should be
an :mod:`Edge UDF <dgl.udf>`.
edges : valid edges type, optional
Edges on which to apply ``func``. See :func:`send` for valid
edges type. Default is all the edges.
inplace: bool, optional
The value is always True.
"""
super(BaseGraphStore, self).apply_edges(func, edges, inplace=True)
def group_apply_edges(self, group_by, func, edges=ALL, inplace=True):
"""Group the edges by nodes and apply the function on the grouped edges to
update their features.
In the graph store, all updates are written inplace.
Parameters
----------
group_by : str
Specify how to group edges. Expected to be either 'src' or 'dst'
func : callable
Apply function on the edge. The function should be
an :mod:`Edge UDF <dgl.udf>`. The input of `Edge UDF` should
be (bucket_size, degrees, *feature_shape), and
return the dict with values of the same shapes.
edges : valid edges type, optional
Edges on which to group and apply ``func``. See :func:`send` for valid
edges type. Default is all the edges.
inplace: bool, optional
The value is always True.
"""
super(BaseGraphStore, self).group_apply_edges(group_by, func, edges, inplace=True)
def recv(self,
v=ALL,
reduce_func="default",
apply_node_func="default",
inplace=True):
"""Receive and reduce incoming messages and update the features of node(s) :math:`v`.
Optionally, apply a function to update the node features after receive.
In the graph store, all updates are written inplace.
* `reduce_func` will be skipped for nodes with no incoming message.
* If all ``v`` have no incoming message, this will downgrade to an :func:`apply_nodes`.
* If some ``v`` have no incoming message, their new feature value will be calculated
by the column initializer (see :func:`set_n_initializer`). The feature shapes and
dtypes will be inferred.
The node features will be updated by the result of the ``reduce_func``.
Messages are consumed once received.
The provided UDF maybe called multiple times so it is recommended to provide
function with no side effect.
Parameters
----------
v : node, container or tensor, optional
The node to be updated. Default is receiving all the nodes.
reduce_func : callable, optional
Reduce function on the node. The function should be
a :mod:`Node UDF <dgl.udf>`.
apply_node_func : callable
Apply function on the nodes. The function should be
a :mod:`Node UDF <dgl.udf>`.
inplace: bool, optional
The value is always True.
"""
super(BaseGraphStore, self).recv(v, reduce_func, apply_node_func, inplace=True)
def send_and_recv(self,
edges,
message_func="default",
reduce_func="default",
apply_node_func="default",
inplace=True):
"""Send messages along edges and let destinations receive them.
Optionally, apply a function to update the node features after receive.
In the graph store, all updates are written inplace.
This is a convenient combination for performing
``send(self, self.edges, message_func)`` and
``recv(self, dst, reduce_func, apply_node_func)``, where ``dst``
are the destinations of the ``edges``.
Parameters
----------
edges : valid edges type
Edges on which to apply ``func``. See :func:`send` for valid
edges type.
message_func : callable, optional
Message function on the edges. The function should be
an :mod:`Edge UDF <dgl.udf>`.
reduce_func : callable, optional
Reduce function on the node. The function should be
a :mod:`Node UDF <dgl.udf>`.
apply_node_func : callable, optional
Apply function on the nodes. The function should be
a :mod:`Node UDF <dgl.udf>`.
inplace: bool, optional
The value is always True.
"""
super(BaseGraphStore, self).send_and_recv(edges, message_func, reduce_func,
apply_node_func, inplace=True)
def pull(self,
v,
message_func="default",
reduce_func="default",
apply_node_func="default",
inplace=True):
"""Pull messages from the node(s)' predecessors and then update their features.
Optionally, apply a function to update the node features after receive.
In the graph store, all updates are written inplace.
* `reduce_func` will be skipped for nodes with no incoming message.
* If all ``v`` have no incoming message, this will downgrade to an :func:`apply_nodes`.
* If some ``v`` have no incoming message, their new feature value will be calculated
by the column initializer (see :func:`set_n_initializer`). The feature shapes and
dtypes will be inferred.
Parameters
----------
v : int, iterable of int, or tensor
The node(s) to be updated.
message_func : callable, optional
Message function on the edges. The function should be
an :mod:`Edge UDF <dgl.udf>`.
reduce_func : callable, optional
Reduce function on the node. The function should be
a :mod:`Node UDF <dgl.udf>`.
apply_node_func : callable, optional
Apply function on the nodes. The function should be
a :mod:`Node UDF <dgl.udf>`.
inplace: bool, optional
The value is always True.
"""
super(BaseGraphStore, self).pull(v, message_func, reduce_func,
apply_node_func, inplace=True)
def push(self,
u,
message_func="default",
reduce_func="default",
apply_node_func="default",
inplace=True):
"""Send message from the node(s) to their successors and update them.
Optionally, apply a function to update the node features after receive.
In the graph store, all updates are written inplace.
Parameters
----------
u : int, iterable of int, or tensor
The node(s) to push messages out.
message_func : callable, optional
Message function on the edges. The function should be
an :mod:`Edge UDF <dgl.udf>`.
reduce_func : callable, optional
Reduce function on the node. The function should be
a :mod:`Node UDF <dgl.udf>`.
apply_node_func : callable, optional
Apply function on the nodes. The function should be
a :mod:`Node UDF <dgl.udf>`.
inplace: bool, optional
The value is always True.
"""
super(BaseGraphStore, self).push(u, message_func, reduce_func,
apply_node_func, inplace=True)
def update_all(self, message_func="default",
reduce_func="default",
apply_node_func="default"):
""" Distribute the computation in update_all among all pre-defined workers.
update_all requires that all workers invoke this method and will
return only when all workers finish their own portion of computation.
The number of workers are pre-defined. If one of them doesn't invoke the method,
it won't return because some portion of computation isn't finished.
Parameters
----------
message_func : callable, optional
Message function on the edges. The function should be
an :mod:`Edge UDF <dgl.udf>`.
reduce_func : callable, optional
Reduce function on the node. The function should be
a :mod:`Node UDF <dgl.udf>`.
apply_node_func : callable, optional
Apply function on the nodes. The function should be
a :mod:`Node UDF <dgl.udf>`.
"""
num_worker_nodes = int(self.number_of_nodes() / self.num_workers) + 1
start_node = self.worker_id * num_worker_nodes
end_node = min((self.worker_id + 1) * num_worker_nodes, self.number_of_nodes())
worker_nodes = np.arange(start_node, end_node, dtype=np.int64)
self.pull(worker_nodes, message_func, reduce_func, apply_node_func, inplace=True)
self._sync_barrier()
def destroy(self):
"""Destroy the graph store.
This notifies the server that this client has terminated.
"""
if self.proxy is not None:
self.proxy.terminate()
self.proxy = None
def create_graph_store_server(graph_data, graph_name, store_type, num_workers,
multigraph=None, port=8000):
"""Create the graph store server.
The server loads graph structure and node embeddings and edge embeddings.
Currently, only shared-memory graph store server is supported, so `store_type`
can only be "shared_mem".
After the server runs, the graph store clients can access the graph data
with the specified graph name.
DGL graph accepts graph data of multiple formats:
* NetworkX graph,
* scipy matrix,
* DGLGraph.
If the input graph data is DGLGraph, the constructed DGLGraph only contains
its graph index.
Parameters
----------
graph_data : graph data
Data to initialize graph.
graph_name : string
Define the name of the graph.
store_type : string
The type of the graph store. The current option is "shared_mem".
num_workers : int
The number of workers that will connect to the server.
multigraph : bool, optional
Deprecated (Will be deleted in the future).
Whether the graph would be a multigraph (default: True)
port : int
The port that the server listens to.
Returns
-------
SharedMemoryStoreServer
The graph store server
"""
if multigraph is not None:
dgl_warning("multigraph is deprecated." \
"DGL treat all graphs as multigraph by default.")
return SharedMemoryStoreServer(graph_data, graph_name, None,
num_workers, port)
def create_graph_from_store(graph_name, store_type, port=8000):
"""Create a client from the graph store.
The client constructs the graph structure and node embeddings and edge embeddings
that has been loaded by the graph store server.
Currently, only shared-memory graph store server is supported, so `store_type`
can only be "shared_memory".
Parameters
----------
graph_name : string
Define the name of the graph.
store_type : string
The type of the graph store. The current option is "shared_mem".
port : int
The port that the server listens to.
Returns
-------
SharedMemoryDGLGraph
The shared-memory DGLGraph
"""
return SharedMemoryDGLGraph(graph_name, port)
_init_api("dgl.contrib.graph_store")
from .sampler import NeighborSampler, LayerSampler, EdgeSampler
from .dis_sampler import SamplerSender, SamplerReceiver
from .dis_sampler import SamplerPool
# This file contains DGL distributed samplers APIs.
from ...network import _send_nodeflow, _recv_nodeflow
from ...network import _create_sender, _create_receiver
from ...network import _finalize_sender, _finalize_receiver
from ...network import _add_receiver_addr, _sender_connect
from ...network import _receiver_wait, _send_sampler_end_signal
from multiprocessing import Pool
from abc import ABCMeta, abstractmethod
class SamplerPool(object):
"""SamplerPool is an abstract class, in which the worker() method
should be implemented by users. SamplerPool will fork() N (N = num_worker)
child processes, and each process will perform worker() method independently.
Note that, the fork() API uses shared memory for N processes and the OS will
perfrom copy-on-write on that only when developers write that piece of memory.
So fork N processes and load N copies of graph will not increase the memory overhead.
For example, users can use this class like this:
class MySamplerPool(SamplerPool):
def worker(self):
# Do anything here #
if __name__ == '__main__':
...
args = parser.parse_args()
pool = MySamplerPool()
pool.start(args.num_sender, args)
"""
__metaclass__ = ABCMeta
def start(self, num_worker, args):
"""Start sampler pool
Parameters
----------
num_worker : int
number of child process
args : arguments
any arguments passed by user
"""
p = Pool()
for i in range(num_worker):
print("Start child sampler process %d ..." % i)
p.apply_async(self.worker, args=(args,))
# Waiting for all subprocesses done ...
p.close()
p.join()
@abstractmethod
def worker(self, args):
"""User-defined function for worker
Parameters
----------
args : arguments
any arguments passed by user
"""
pass
class SamplerSender(object):
"""SamplerSender for DGL distributed training.
Users use SamplerSender to send sampled subgraphs (NodeFlow)
to remote SamplerReceiver. Note that, a SamplerSender can connect
to multiple SamplerReceiver currently. The underlying implementation
will send different subgraphs to different SamplerReceiver in parallel
via multi-threading.
Parameters
----------
namebook : dict
IP address namebook of SamplerReceiver, where the
key is recevier's ID (start from 0) and value is receiver's address, e.g.,
{ 0:'168.12.23.45:50051',
1:'168.12.23.21:50051',
2:'168.12.46.12:50051' }
net_type : str
networking type, e.g., 'socket' (default) or 'mpi'.
"""
def __init__(self, namebook, net_type='socket'):
assert len(namebook) > 0, 'namebook cannot be empty.'
assert net_type in ('socket', 'mpi'), 'Unknown network type.'
self._namebook = namebook
self._sender = _create_sender(net_type)
for ID, addr in self._namebook.items():
ip_port = addr.split(':')
assert len(ip_port) == 2, 'Uncorrect format of IP address.'
_add_receiver_addr(self._sender, ip_port[0], int(ip_port[1]), ID)
_sender_connect(self._sender)
def __del__(self):
"""Finalize Sender
"""
_finalize_sender(self._sender)
def send(self, nodeflow, recv_id):
"""Send sampled subgraph (NodeFlow) to remote trainer. Note that,
the send() API is non-blocking and it returns immediately if the
underlying message queue is not full.
Parameters
----------
nodeflow : NodeFlow
sampled NodeFlow
recv_id : int
receiver's ID
"""
assert recv_id >= 0, 'recv_id cannot be a negative number.'
_send_nodeflow(self._sender, nodeflow, recv_id)
def batch_send(self, nf_list, id_list):
"""Send a batch of subgraphs (Nodeflow) to remote trainer. Note that,
the batch_send() API is non-blocking and it returns immediately if the
underlying message queue is not full.
Parameters
----------
nf_list : list
a list of NodeFlow object
id_list : list
a list of recv_id
"""
assert len(nf_list) > 0, 'nf_list cannot be empty.'
assert len(nf_list) == len(id_list), 'The length of nf_list must be equal to id_list.'
for i in range(len(nf_list)):
assert id_list[i] >= 0, 'recv_id cannot be a negative number.'
_send_nodeflow(self._sender, nf_list[i], id_list[i])
def signal(self, recv_id):
"""When the samplling of each epoch is finished, users can
invoke this API to tell SamplerReceiver that sampler has finished its job.
Parameters
----------
recv_id : int
receiver's ID
"""
assert recv_id >= 0, 'recv_id cannot be a negative number.'
_send_sampler_end_signal(self._sender, recv_id)
class SamplerReceiver(object):
"""SamplerReceiver for DGL distributed training.
Users use SamplerReceiver to receive sampled subgraphs (NodeFlow)
from remote SamplerSender. Note that SamplerReceiver can receive messages
from multiple SamplerSenders concurrently by given the num_sender parameter.
Only when all SamplerSenders connected to SamplerReceiver successfully,
SamplerReceiver can start its job.
Parameters
----------
graph : DGLGraph
The parent graph
addr : str
address of SamplerReceiver, e.g., '127.0.0.1:50051'
num_sender : int
total number of SamplerSender
net_type : str
networking type, e.g., 'socket' (default) or 'mpi'.
"""
def __init__(self, graph, addr, num_sender, net_type='socket'):
assert num_sender > 0, 'num_sender must be large than zero.'
assert net_type in ('socket', 'mpi'), 'Unknown network type.'
self._graph = graph
self._addr = addr
self._num_sender = num_sender
self._tmp_count = 0
self._receiver = _create_receiver(net_type)
ip_port = addr.split(':')
assert len(ip_port) == 2, 'Uncorrect format of IP address.'
_receiver_wait(self._receiver, ip_port[0], int(ip_port[1]), num_sender);
def __del__(self):
"""Finalize Receiver
"""
_finalize_receiver(self._receiver)
def __iter__(self):
"""Sampler iterator
"""
return self
def __next__(self):
"""Return sampled NodeFlow object
"""
while True:
res = _recv_nodeflow(self._receiver, self._graph)
if isinstance(res, int): # recv an end-signal
self._tmp_count += 1
if self._tmp_count == self._num_sender:
self._tmp_count = 0
raise StopIteration
else:
return res # recv a nodeflow
"""This file contains NodeFlow samplers."""
import sys
import numpy as np
import threading
from numbers import Integral
import traceback
from ..._ffi.function import _init_api
from ..._ffi.object import register_object, ObjectBase
from ..._ffi.ndarray import empty
from ... import utils
from ..._deprecate.nodeflow import NodeFlow
from ... import backend as F
from ..._deprecate.graph import DGLGraph as DGLGraphStale
from ...base import NID, EID, dgl_warning
try:
import Queue as queue
except ImportError:
import queue
__all__ = ['NeighborSampler', 'LayerSampler', 'EdgeSampler']
class SamplerIter(object):
def __init__(self, sampler):
super(SamplerIter, self).__init__()
self._sampler = sampler
self._batches = []
self._batch_idx = 0
def prefetch(self):
batches = self._sampler.fetch(self._batch_idx)
self._batches.extend(batches)
self._batch_idx += len(batches)
def __next__(self):
if len(self._batches) == 0:
self.prefetch()
if len(self._batches) == 0:
raise StopIteration
return self._batches.pop(0)
class PrefetchingWrapper(object):
"""Internal shared prefetcher logic. It can be sub-classed by a Thread-based implementation
or Process-based implementation."""
_dataq = None # Data queue transmits prefetched elements
_controlq = None # Control queue to instruct thread / process shutdown
_errorq = None # Error queue to transmit exceptions from worker to master
_checked_start = False # True once startup has been checkd by _check_start
def __init__(self, sampler_iter, num_prefetch):
super(PrefetchingWrapper, self).__init__()
self.sampler_iter = sampler_iter
assert num_prefetch > 0, 'Unbounded Prefetcher is unsupported.'
self.num_prefetch = num_prefetch
def run(self):
"""Method representing the process activity."""
# Startup - Master waits for this
try:
loader_iter = self.sampler_iter
self._errorq.put(None)
except Exception as e: # pylint: disable=broad-except
tb = traceback.format_exc()
self._errorq.put((e, tb))
while True:
try: # Check control queue
c = self._controlq.get(False)
if c is None:
break
else:
raise RuntimeError('Got unexpected control code {}'.format(repr(c)))
except queue.Empty:
pass
except RuntimeError as e:
tb = traceback.format_exc()
self._errorq.put((e, tb))
self._dataq.put(None)
try:
data = next(loader_iter)
error = None
except Exception as e: # pylint: disable=broad-except
tb = traceback.format_exc()
error = (e, tb)
data = None
finally:
self._errorq.put(error)
self._dataq.put(data)
def __next__(self):
next_item = self._dataq.get()
next_error = self._errorq.get()
if next_error is None:
return next_item
else:
self._controlq.put(None)
if isinstance(next_error[0], StopIteration):
raise StopIteration
else:
return self._reraise(*next_error)
def _reraise(self, e, tb):
print('Reraising exception from Prefetcher', file=sys.stderr)
print(tb, file=sys.stderr)
raise e
def _check_start(self):
assert not self._checked_start
self._checked_start = True
next_error = self._errorq.get(block=True)
if next_error is not None:
self._reraise(*next_error)
def next(self):
return self.__next__()
class ThreadPrefetchingWrapper(PrefetchingWrapper, threading.Thread):
"""Internal threaded prefetcher."""
def __init__(self, *args, **kwargs):
super(ThreadPrefetchingWrapper, self).__init__(*args, **kwargs)
self._dataq = queue.Queue(self.num_prefetch)
self._controlq = queue.Queue()
self._errorq = queue.Queue(self.num_prefetch)
self.daemon = True
self.start()
self._check_start()
class NodeFlowSampler(object):
'''Base class that generates NodeFlows from a graph.
Class properties
----------------
immutable_only : bool
Whether the sampler only works on immutable graphs.
Subclasses can override this property.
'''
immutable_only = False
def __init__(
self,
g,
batch_size,
seed_nodes,
shuffle,
num_prefetch,
prefetching_wrapper_class):
self._g = g
if self.immutable_only and not g._graph.is_readonly():
raise NotImplementedError("This loader only support read-only graphs.")
self._batch_size = int(batch_size)
if seed_nodes is None:
self._seed_nodes = F.arange(0, g.number_of_nodes())
else:
self._seed_nodes = seed_nodes
if shuffle:
self._seed_nodes = F.rand_shuffle(self._seed_nodes)
self._seed_nodes = utils.toindex(self._seed_nodes)
if num_prefetch:
self._prefetching_wrapper_class = prefetching_wrapper_class
self._num_prefetch = num_prefetch
def fetch(self, current_nodeflow_index):
'''
Method that returns the next "bunch" of NodeFlows.
Each worker will return a single NodeFlow constructed from a single
batch.
Subclasses of NodeFlowSampler should override this method.
Parameters
----------
current_nodeflow_index : int
How many NodeFlows the sampler has generated so far.
Returns
-------
list[NodeFlow]
Next "bunch" of nodeflows to be processed.
'''
raise NotImplementedError
def __iter__(self):
it = SamplerIter(self)
if self._num_prefetch:
return self._prefetching_wrapper_class(it, self._num_prefetch)
else:
return it
@property
def g(self):
return self._g
@property
def seed_nodes(self):
return self._seed_nodes
@property
def batch_size(self):
return self._batch_size
class NeighborSampler(NodeFlowSampler):
r'''Create a sampler that samples neighborhood.
It returns a generator of :class:`~dgl.NodeFlow`. This can be viewed as
an analogy of *mini-batch training* on graph data -- the given graph represents
the whole dataset and the returned generator produces mini-batches (in the form
of :class:`~dgl.NodeFlow` objects).
A NodeFlow grows from sampled nodes. It first samples a set of nodes from the given
``seed_nodes`` (or all the nodes if not given), then samples their neighbors
and extracts the subgraph. If the number of hops is :math:`k(>1)`, the process is repeated
recursively, with the neighbor nodes just sampled become the new seed nodes.
The result is a graph we defined as :class:`~dgl.NodeFlow` that contains :math:`k+1`
layers. The last layer is the initial seed nodes. The sampled neighbor nodes in
layer :math:`i+1` are in layer :math:`i`. All the edges are from nodes
in layer :math:`i` to layer :math:`i+1`.
.. image:: https://data.dgl.ai/tutorial/sampling/NodeFlow.png
As an analogy to mini-batch training, the ``batch_size`` here is equal to the number
of the initial seed nodes (number of nodes in the last layer).
The number of nodeflow objects (the number of batches) is calculated by
``len(seed_nodes) // batch_size`` (if ``seed_nodes`` is None, then it is equal
to the set of all nodes in the graph).
Note: NeighborSampler currently only supprts immutable graphs.
Parameters
----------
g : DGLGraphStale
The DGLGraphStale where we sample NodeFlows.
batch_size : int
The batch size (i.e, the number of nodes in the last layer)
expand_factor : int
The number of neighbors sampled from the neighbor list of a vertex.
Note that no matter how large the expand_factor, the max number of sampled neighbors
is the neighborhood size.
num_hops : int, optional
The number of hops to sample (i.e, the number of layers in the NodeFlow).
Default: 1
neighbor_type: str, optional
Indicates the neighbors on different types of edges.
* "in": the neighbors on the in-edges.
* "out": the neighbors on the out-edges.
Default: "in"
transition_prob : str, optional
A 1D tensor containing the (unnormalized) transition probability.
The probability of a node v being sampled from a neighbor u is proportional to
the edge weight, normalized by the sum over edge weights grouping by the
destination node.
In other words, given a node v, the probability of node u and edge (u, v)
included in the NodeFlow layer preceding that of v is given by:
.. math::
p(u, v) = \frac{w_{u, v}}{\sum_{u', (u', v) \in E} w_{u', v}}
If neighbor type is "out", then the probability is instead normalized by the sum
grouping by source node:
.. math::
p(v, u) = \frac{w_{v, u}}{\sum_{u', (v, u') \in E} w_{v, u'}}
If a str is given, the edge weight will be loaded from the edge feature column with
the same name. The feature column must be a scalar column in this case.
Default: None
seed_nodes : Tensor, optional
A 1D tensor list of nodes where we sample NodeFlows from.
If None, the seed vertices are all the vertices in the graph.
Default: None
shuffle : bool, optional
Indicates the sampled NodeFlows are shuffled. Default: False
num_workers : int, optional
The number of worker threads that sample NodeFlows in parallel. Default: 1
prefetch : bool, optional
If true, prefetch the samples in the next batch. Default: False
add_self_loop : bool, optional
If true, add self loop to the sampled NodeFlow.
The edge IDs of the self loop edges are -1. Default: False
'''
immutable_only = True
def __init__(
self,
g,
batch_size,
expand_factor=None,
num_hops=1,
neighbor_type='in',
transition_prob=None,
seed_nodes=None,
shuffle=False,
num_workers=1,
prefetch=False,
add_self_loop=False):
super(NeighborSampler, self).__init__(
g, batch_size, seed_nodes, shuffle, num_workers * 2 if prefetch else 0,
ThreadPrefetchingWrapper)
dgl_warning('dgl.contrib.sampling.NeighborSampler is deprecated starting from v0.5.'
' Please read our guide<link> for how to use the new sampling APIs.')
assert g.is_readonly, "NeighborSampler doesn't support mutable graphs. " + \
"Please turn it into an immutable graph with DGLGraphStale.readonly"
assert isinstance(expand_factor, Integral), 'non-int expand_factor not supported'
self._expand_factor = int(expand_factor)
self._num_hops = int(num_hops)
self._add_self_loop = add_self_loop
self._num_workers = int(num_workers)
self._neighbor_type = neighbor_type
self._transition_prob = transition_prob
def fetch(self, current_nodeflow_index):
if self._transition_prob is None:
prob = F.tensor([], F.float32)
elif isinstance(self._transition_prob, str):
prob = self.g.edata[self._transition_prob]
else:
prob = self._transition_prob
nfobjs = _CAPI_NeighborSampling(
self.g._graph,
self.seed_nodes.todgltensor(),
current_nodeflow_index, # start batch id
self.batch_size, # batch size
self._num_workers, # num batches
self._expand_factor,
self._num_hops,
self._neighbor_type,
self._add_self_loop,
F.zerocopy_to_dgl_ndarray(prob))
nflows = [NodeFlow(self.g, obj) for obj in nfobjs]
return nflows
class LayerSampler(NodeFlowSampler):
'''Create a sampler that samples neighborhood.
This creates a NodeFlow loader that samples subgraphs from the input graph
with layer-wise sampling. This sampling method is implemented in C and can perform
sampling very efficiently.
The NodeFlow loader returns a list of NodeFlows.
The size of the NodeFlow list is the number of workers.
Note: LayerSampler currently only supprts immutable graphs.
Parameters
----------
g : DGLGraphStale
The DGLGraphStale where we sample NodeFlows.
batch_size : int
The batch size (i.e, the number of nodes in the last layer)
layer_size: int
A list of layer sizes.
neighbor_type: str, optional
Indicates the neighbors on different types of edges.
* "in": the neighbors on the in-edges.
* "out": the neighbors on the out-edges.
Default: "in"
node_prob : Tensor, optional
A 1D tensor for the probability that a neighbor node is sampled.
None means uniform sampling. Otherwise, the number of elements
should be equal to the number of vertices in the graph.
It's not implemented.
Default: None
seed_nodes : Tensor, optional
A 1D tensor list of nodes where we sample NodeFlows from.
If None, the seed vertices are all the vertices in the graph.
Default: None
shuffle : bool, optional
Indicates the sampled NodeFlows are shuffled. Default: False
num_workers : int, optional
The number of worker threads that sample NodeFlows in parallel. Default: 1
prefetch : bool, optional
If true, prefetch the samples in the next batch. Default: False
'''
immutable_only = True
def __init__(
self,
g,
batch_size,
layer_sizes,
neighbor_type='in',
node_prob=None,
seed_nodes=None,
shuffle=False,
num_workers=1,
prefetch=False):
super(LayerSampler, self).__init__(
g, batch_size, seed_nodes, shuffle, num_workers * 2 if prefetch else 0,
ThreadPrefetchingWrapper)
assert g.is_readonly, "LayerSampler doesn't support mutable graphs. " + \
"Please turn it into an immutable graph with DGLGraphStale.readonly"
assert node_prob is None, 'non-uniform node probability not supported'
self._num_workers = int(num_workers)
self._neighbor_type = neighbor_type
self._layer_sizes = utils.toindex(layer_sizes)
def fetch(self, current_nodeflow_index):
nfobjs = _CAPI_LayerSampling(
self.g._graph,
self.seed_nodes.todgltensor(),
current_nodeflow_index, # start batch id
self.batch_size, # batch size
self._num_workers, # num batches
self._layer_sizes.todgltensor(),
self._neighbor_type)
nflows = [NodeFlow(self.g, obj) for obj in nfobjs]
return nflows
class EdgeSubgraph(DGLGraphStale):
''' The subgraph sampled from an edge sampler.
A user can access the head nodes and tail nodes of the subgraph directly.
'''
def __init__(self, parent, sgi, neg):
super(EdgeSubgraph, self).__init__(graph_data=sgi.graph,
readonly=True,
parent=parent)
self.ndata[NID] = sgi.induced_nodes.tousertensor()
self.edata[EID] = sgi.induced_edges.tousertensor()
self.sgi = sgi
self.neg = neg
self.head = None
self.tail = None
def set_head_tail(self):
if self.head is None or self.tail is None:
if self.neg:
exist = _CAPI_GetEdgeSubgraphHead(self.sgi)
self.head = utils.toindex(exist).tousertensor()
exist = _CAPI_GetEdgeSubgraphTail(self.sgi)
self.tail = utils.toindex(exist).tousertensor()
else:
head, tail = self.all_edges()
self.head = F.unique(head)
self.tail = F.unique(tail)
@property
def head_nid(self):
''' The unique Ids of the head nodes.
'''
self.set_head_tail()
return self.head
@property
def tail_nid(self):
''' The unique Ids of the tail nodes.
'''
self.set_head_tail()
return self.tail
class EdgeSampler(object):
'''Edge sampler for link prediction.
This samples edges from a given graph. The edges sampled for a batch are
placed in a subgraph before returning. In many link prediction tasks,
negative edges are required to train a model. A negative edge is constructed by
corrupting an existing edge in the graph. The current implementation
support two ways of corrupting an edge: corrupt the head node of
an edge (by randomly selecting a node as the head node), or corrupt
the tail node of an edge. When we corrupt the head node of an edge, we randomly
sample a node from the entire graph as the head node. It's possible the constructed
edge exists in the graph. By default, the implementation doesn't explicitly check
if the sampled negative edge exists in a graph. However, a user can exclude
positive edges from negative edges by specifying 'exclude_positive=True'.
When negative edges are created, a batch of negative edges are also placed
in a subgraph.
Currently, `negative_mode` only supports:
* 'head': the negative edges are generated by corrupting head nodes with uniformly randomly sampled nodes,
* 'tail': the negative edges are generated by corrupting tail nodes with uniformly randomly sampled nodes,
* 'chunk-head': the negative edges are generated for a chunk of positive edges. \
It first groups positive edges into chunks and corrupts a chunk of edges together \
by replacing a set of head nodes with the same set of nodes uniformly randomly sampled \
from the graph.
* 'chunk-tail': the negative edges are generated by corrupting a set \
of tail nodes with the same set of nodes similar to 'chunk-head'.
When we use chunked negative sampling, a chunk size needs to be specified. By default,
the chunk size is the same as the number of negative edges.
The sampler returns EdgeSubgraph, where a user can access the unique head nodes
and tail nodes directly.
This sampler allows to non-uniformly sample positive edges and negative edges.
For non-uniformly sampling positive edges, users need to provide an array of m
elements (m is the number of edges), i.e. edge_weight, each of which represents
the sampling probability of an edge. For non-uniformly sampling negative edges,
users need to provide an array of n elements, i.e. node_weight and the sampler
samples nodes based on the sampling probability to corrupt a positive edge. If
both edge_weight and node_weight are not provided, a uniformed sampler is used.
if only edge_weight is provided, the sampler will take uniform sampling when
corrupt positive edges.
When the flag `return_false_neg` is turned on, the sampler will also check
if the generated negative edges are true negative edges and will return
a vector that indicates false negative edges. The vector is stored in
the negative graph as `false_neg` edge data.
When checking false negative edges, a user can provide edge relations
for a knowledge graph. A negative edge is considered as a false negative
edge only if the triple (source node, destination node and relation)
matches one of the edges in the graph.
This sampler samples positive edges without replacement by default, which means
it returns a fixed number of batches (i.e., num_edges/batch_size), and the
positive edges sampled will not be duplicated. However, one can explicitly
specify sampling with replacement (replacement = True), that the sampler treats
each sampling of a single positive edge as a standalone event.
To contorl how many samples the sampler can return, a reset parameter can be used.
If it is set to true, the sampler will generate samples infinitely. For the sampler
with replacement, it will reshuffle the seed edges each time it consumes all the
edges and reset the replacement state. If it is set to false, the sampler will only
generate num_edges/batch_size samples.
Note: If node_weight is extremely imbalanced, the sampler will take much longer
time to return a minibatch, as sampled negative nodes must not be duplicated for
one corruptted positive edge.
Parameters
----------
g : DGLGraphStale
The DGLGraphStale where we sample edges.
batch_size : int
The batch size (i.e, the number of edges from the graph)
seed_edges : tensor, optional
A list of edges where we sample from.
edge_weight : tensor, optional
The weight of each edge which decide the change of certain edge being sampled.
node_weight : tensor, optional
The weight of each node which decide the change of certain node being sampled.
Used in negative sampling. If not provided, uniform node sampling is used.
shuffle : bool, optional
whether randomly shuffle the list of edges where we sample from.
num_workers : int, optional
The number of workers to sample edges in parallel.
prefetch : bool, optional
If true, prefetch the samples in the next batch. Default: False
replacement: bool, optional
Whether the sampler samples edges with or without repalcement. Default False
reset: bool, optional
If true, the sampler will generate samples infinitely, and for the sampler with
replacement, it will reshuffle the edges each time it consumes all the edges and
reset the replacement state.
If false, the sampler will only generate num_edges/batch_size samples by default.
Default: False.
negative_mode : string, optional
The method used to construct negative edges. Possible values are 'head', 'tail'.
neg_sample_size : int, optional
The number of negative edges to sample for each edge.
chunk_size : int, optional
The chunk size for chunked negative sampling.
exclude_positive : int, optional
Whether to exclude positive edges from the negative edges.
return_false_neg: bool, optional
Whether to calculate false negative edges and return them as edge data in negative graphs.
relations: tensor, optional
relations of the edges if this is a knowledge graph.
Examples
--------
>>> for pos_g, neg_g in EdgeSampler(g, batch_size=10):
>>> print(pos_g.head_nid, pos_g.tail_nid)
>>> print(neg_g.head_nid, pos_g.tail_nid)
>>> print(neg_g.edata['false_neg'])
Class properties
----------------
immutable_only : bool
Whether the sampler only works on immutable graphs.
Subclasses can override this property.
'''
immutable_only = False
def __init__(
self,
g,
batch_size,
seed_edges=None,
edge_weight=None,
node_weight=None,
shuffle=False,
num_workers=1,
prefetch=False,
replacement=False,
reset=False,
negative_mode="",
neg_sample_size=0,
exclude_positive=False,
return_false_neg=False,
relations=None,
chunk_size=None):
self._g = g
if self.immutable_only and not g._graph.is_readonly():
raise NotImplementedError("This loader only support read-only graphs.")
if relations is None:
relations = empty((0,), 'int64')
else:
relations = utils.toindex(relations)
relations = relations.todgltensor()
assert g.number_of_edges() == len(relations)
self._relations = relations
if batch_size < 0 or neg_sample_size < 0:
raise Exception('Invalid arguments')
self._return_false_neg = return_false_neg
self._batch_size = int(batch_size)
if seed_edges is None:
self._seed_edges = F.arange(0, g.number_of_edges())
else:
self._seed_edges = seed_edges
if shuffle:
self._seed_edges = F.rand_shuffle(self._seed_edges)
if edge_weight is None:
self._is_uniform = True
else:
self._is_uniform = False
self._edge_weight = F.zerocopy_to_dgl_ndarray(F.gather_row(edge_weight, self._seed_edges))
if node_weight is None:
self._node_weight = empty((0,), 'float32')
else:
self._node_weight = F.zerocopy_to_dgl_ndarray(node_weight)
self._seed_edges = utils.toindex(self._seed_edges)
if prefetch:
self._prefetching_wrapper_class = ThreadPrefetchingWrapper
self._num_prefetch = num_workers * 2 if prefetch else 0
self._replacement = replacement
self._reset = reset
if chunk_size is None and negative_mode in ('chunk-head', 'chunk-tail'):
chunk_size = neg_sample_size
elif chunk_size is None:
chunk_size = -1
assert negative_mode in ('', 'head', 'tail', 'chunk-head', 'chunk-tail')
self._num_workers = int(num_workers)
self._negative_mode = negative_mode
self._chunk_size = chunk_size
self._neg_sample_size = neg_sample_size
self._exclude_positive = exclude_positive
if self._is_uniform:
self._sampler = _CAPI_CreateUniformEdgeSampler(
self.g._graph,
self.seed_edges.todgltensor(),
self._batch_size, # batch size
self._num_workers, # num batches
self._replacement,
self._reset,
self._negative_mode,
self._neg_sample_size,
self._exclude_positive,
self._return_false_neg,
self._relations,
self._chunk_size)
else:
self._sampler = _CAPI_CreateWeightedEdgeSampler(
self.g._graph,
self._seed_edges.todgltensor(),
self._edge_weight,
self._node_weight,
self._batch_size, # batch size
self._num_workers, # num batches
self._replacement,
self._reset,
self._negative_mode,
self._neg_sample_size,
self._exclude_positive,
self._return_false_neg,
self._relations,
self._chunk_size)
def fetch(self, current_index):
'''
It returns a list of subgraphs if it only samples positive edges.
It returns a list of subgraph pairs if it samples both positive edges
and negative edges.
Parameters
----------
current_index : int
deprecated, not used actually.
Returns
-------
list[GraphIndex] or list[(GraphIndex, GraphIndex)]
Next "bunch" of edges to be processed.
If negative_mode is specified, a list of (pos_subg, neg_subg) pairs i
s returned.
If return_false_neg is specified as True, the true negative edges and
false negative edges in neg_subg is identified in neg_subg.edata['false_neg'].
'''
if self._is_uniform:
subgs = _CAPI_FetchUniformEdgeSample(
self._sampler)
else:
subgs = _CAPI_FetchWeightedEdgeSample(
self._sampler)
if len(subgs) == 0:
return []
if self._negative_mode == "":
# If no negative subgraphs.
return [self.g._create_subgraph(subg,
subg.induced_nodes,
subg.induced_edges) for subg in subgs]
else:
rets = []
assert len(subgs) % 2 == 0
num_pos = int(len(subgs) / 2)
for i in range(num_pos):
pos_subg = EdgeSubgraph(self.g, subgs[i], False)
neg_subg = EdgeSubgraph(self.g, subgs[i + num_pos], True)
if self._return_false_neg:
exist = _CAPI_GetNegEdgeExistence(subgs[i + num_pos])
neg_subg.edata['false_neg'] = utils.toindex(exist).tousertensor()
rets.append((pos_subg, neg_subg))
return rets
def __iter__(self):
it = SamplerIter(self)
if self._is_uniform:
_CAPI_ResetUniformEdgeSample(self._sampler)
else:
_CAPI_ResetWeightedEdgeSample(self._sampler)
if self._num_prefetch:
return self._prefetching_wrapper_class(it, self._num_prefetch)
else:
return it
@property
def g(self):
return self._g
@property
def seed_edges(self):
return self._seed_edges
@property
def batch_size(self):
return self._batch_size
def create_full_nodeflow(g, num_layers, add_self_loop=False):
"""Convert a full graph to NodeFlow to run a L-layer GNN model.
Parameters
----------
g : DGLGraphStale
a DGL graph
num_layers : int
The number of layers
add_self_loop : bool, default False
Whether to add self loop to the sampled NodeFlow.
If True, the edge IDs of the self loop edges are -1.
Returns
-------
NodeFlow
a NodeFlow with a specified number of layers.
"""
batch_size = g.number_of_nodes()
expand_factor = g.number_of_nodes()
sampler = NeighborSampler(g, batch_size, expand_factor,
num_layers, add_self_loop=add_self_loop)
return next(iter(sampler))
_init_api('dgl.sampling', __name__)
"""Unified Tensor."""
from .. import backend as F
from .._ffi.function import _init_api
from .. import utils
class UnifiedTensor: #UnifiedTensor
'''Class for storing unified tensor. Declaration of
UnifiedTensor automatically pins the input tensor.
Upon a successful declaration of UnifiedTensor, the
target GPU device will have the address mapping of the
input CPU tensor for zero-copy (direct) access over
external interconnects (e.g., PCIe).
Parameters
----------
input : Tensor
Tensor which we want to convert into the
unified tensor.
device : device
GPU to create the address mapping of the input CPU tensor.
Examples
--------
With a given CPU tensor ``feats``, a new UnifiedTensor targetting a default
GPU can be created as follows:
>>> feats = torch.rand((128,128))
>>> feats = dgl.contrib.UnifiedTensor(feats, device=torch.device('cuda'))
Now, the elements of the new tensor ``feats`` can be accessed with ``[]``
indexing. The context of the index tensor is a switch to trigger the
zero-copy access from GPU. For example, to use the ordinary CPU-based
data access, one can use the following method:
>>> idx = torch.Tensor([0,1,2])
>>> output = feats[idx]
Now, to use GPU to do a zero-copy access, do this:
>>> idx = torch.Tensor([0,1,2]).to('cuda')
>>> output = feats[idx]
For the multi-GPU operation, to allow multiple GPUs to access the original CPU tensor
``feats`` using UnifiedTensor, one can do the following:
>>> feats = torch.rand((128,128))
>>> feats_gpu0 = dgl.contrib.UnifiedTensor(feats, device=torch.device('cuda:0'))
>>> feats_gpu1 = dgl.contrib.UnifiedTensor(feats, device=torch.device('cuda:1'))
>>> feats_gpu2 = dgl.contrib.UnifiedTensor(feats, device=torch.device('cuda:2'))
Now, the ``cuda:0``, ``cuda:1``, and ``cuda:2`` devices will be able to access the
identical tensor located in the CPU memory using ``feats_gpu0``, ``feats_gpu1``, and ``feats_gpu2`` tensors, respectively.
One can simply use following operations to slice the sub tensors into different GPU devices directly.
>>> feats_idx_gpu0 = torch.randint(128, 16, device='cuda:0')
>>> feats_idx_gpu1 = torch.randint(128, 16, device='cuda:1')
>>> feats_idx_gpu2 = torch.randint(128, 16, device='cuda:2')
>>> sub_feat_gpu0 = feats_gpu0[feats_idx_gpu0]
>>> sub_feat_gpu1 = feats_gpu1[feats_idx_gpu1]
>>> sub_feat_gpu2 = feats_gpu2[feats_idx_gpu2]
``feats_gpu2`` tensors, respectively.
'''
def __init__(self, input, device):
if F.device_type(device) != 'cuda':
raise ValueError("Target device must be a cuda device")
if F.device_type(F.context(input)) != 'cpu':
raise ValueError("Input tensor must be a cpu tensor")
self._input = input
self._array = F.zerocopy_to_dgl_ndarray(self._input)
self._device = device
self._array.pin_memory_()
def __len__(self):
return len(self._array)
def __repr__(self):
return self._input.__repr__()
def __getitem__(self, key):
'''Perform zero-copy access from GPU if the context of
the key is cuda. Otherwise, just safely fallback to the
backend specific indexing scheme.
Parameters
----------
key : Tensor
Tensor which contains the index ids
'''
if F.device_type(F.context(key)) != 'cuda':
return self._input[key]
else:
return F.zerocopy_from_dgl_ndarray(
_CAPI_DGLIndexSelectCPUFromGPU(self._array,
F.zerocopy_to_dgl_ndarray(key)))
def __setitem__(self, key, val):
self._input[key] = val
def __del__(self):
if hasattr(self, '_array') and self._array != None:
self._array.unpin_memory_()
self._array = None
if hasattr(self, '_input'):
self._input = None
@property
def shape(self):
"""Shape of this tensor"""
return self._array.shape
@property
def dtype(self):
"""Type of this tensor"""
return self._array.dtype
@property
def device(self):
"""Device of this tensor"""
return self._device
_init_api("dgl.ndarray.uvm", __name__)
......@@ -5,8 +5,6 @@ import sys
from itertools import product
from .base import BuiltinFunction, TargetCode
from .._deprecate.runtime import ir
from .._deprecate.runtime.ir import var
__all__ = ["copy_u", "copy_e",
......@@ -16,13 +14,6 @@ __all__ = ["copy_u", "copy_e",
class MessageFunction(BuiltinFunction):
"""Base builtin message function class."""
def _invoke(self, graph, src_frame, dst_frame, edge_frame, out_size,
src_map, dst_map, edge_map, out_map, reducer="none"):
"""Symbolic computation of this builtin function to create
runtime.executor
"""
raise NotImplementedError
@property
def name(self):
"""Return the name of this builtin function."""
......@@ -44,23 +35,6 @@ class BinaryMessageFunction(MessageFunction):
self.rhs_field = rhs_field
self.out_field = out_field
def _invoke(self, graph, src_frame, dst_frame, edge_frame, out_size,
src_map, dst_map, edge_map, out_map, reducer="none"):
"""Symbolic computation of builtin binary message function to create
runtime.executor
"""
graph = var.GRAPH(graph)
in_frames = (src_frame, dst_frame, edge_frame)
in_maps = (src_map, dst_map, edge_map)
lhs_data = ir.READ_COL(in_frames[self.lhs], var.STR(self.lhs_field))
rhs_data = ir.READ_COL(in_frames[self.rhs], var.STR(self.rhs_field))
lhs_map = var.MAP(in_maps[self.lhs])
rhs_map = var.MAP(in_maps[self.rhs])
out_map = var.MAP(out_map)
return ir.BINARY_REDUCE(reducer, self.binary_op, graph, self.lhs,
self.rhs, lhs_data, rhs_data, out_size,
lhs_map, rhs_map, out_map)
@property
def name(self):
lhs = TargetCode.CODE2STR[self.lhs]
......@@ -80,20 +54,6 @@ class CopyMessageFunction(MessageFunction):
self.in_field = in_field
self.out_field = out_field
def _invoke(self, graph, src_frame, dst_frame, edge_frame, out_size,
src_map, dst_map, edge_map, out_map, reducer="none"):
"""Symbolic computation of builtin message function to create
runtime.executor
"""
graph = var.GRAPH(graph)
in_frames = (src_frame, dst_frame, edge_frame)
in_maps = (src_map, dst_map, edge_map)
in_data = ir.READ_COL(in_frames[self.target], var.STR(self.in_field))
in_map = var.MAP(in_maps[self.target])
out_map = var.MAP(out_map)
return ir.COPY_REDUCE(reducer, graph, self.target, in_data, out_size,
in_map, out_map)
@property
def name(self):
return "copy_{}".format(TargetCode.CODE2STR[self.target])
......
......@@ -4,20 +4,12 @@ from __future__ import absolute_import
import sys
from .._deprecate.runtime import ir
from .._deprecate.runtime.ir import var
from .base import BuiltinFunction, TargetCode
from .base import BuiltinFunction
class ReduceFunction(BuiltinFunction):
"""Base builtin reduce function class."""
def _invoke(self, graph, edge_frame, out_size, edge_map=None, out_map=None):
"""Symbolic computation of this builtin function to create
runtime.executor
"""
raise NotImplementedError
@property
def name(self):
"""Return the name of this builtin function."""
......@@ -33,23 +25,6 @@ class SimpleReduceFunction(ReduceFunction):
self.msg_field = msg_field
self.out_field = out_field
def _invoke(self, graph, edge_frame, out_size, edge_map=None, out_map=None):
"""Symbolic execution of this builtin function"""
reducer = self._name
graph = var.GRAPH(graph)
edge_map = var.MAP(edge_map)
out_map = var.MAP(out_map)
edge_data = ir.READ_COL(edge_frame, var.STR(self.msg_field))
return ir.COPY_REDUCE(
reducer,
graph,
TargetCode.EDGE,
edge_data,
out_size,
edge_map,
out_map,
)
@property
def name(self):
return self._name
......
......@@ -7,8 +7,6 @@ from enum import Enum
import dgl.backend as F
from . import utils
from ._deprecate.nodeflow import NodeFlow
from ._ffi.function import _init_api
_init_api("dgl.network")
......@@ -118,35 +116,6 @@ def _receiver_wait(receiver, ip_addr, port, num_sender):
################################ Distributed Sampler Components ################################
def _send_nodeflow(sender, nodeflow, recv_id):
"""Send sampled subgraph (Nodeflow) to remote Receiver.
Parameters
----------
sender : ctypes.c_void_p
C Sender handle
nodeflow : NodeFlow
NodeFlow object
recv_id : int
Receiver ID
"""
assert recv_id >= 0, "recv_id cannot be a negative number."
gidx = nodeflow._graph
node_mapping = nodeflow._node_mapping.todgltensor()
edge_mapping = nodeflow._edge_mapping.todgltensor()
layers_offsets = utils.toindex(nodeflow._layer_offsets).todgltensor()
flows_offsets = utils.toindex(nodeflow._block_offsets).todgltensor()
_CAPI_SenderSendNodeFlow(
sender,
int(recv_id),
gidx,
node_mapping,
edge_mapping,
layers_offsets,
flows_offsets,
)
def _send_sampler_end_signal(sender, recv_id):
"""Send an epoch-end signal to remote Receiver.
......@@ -161,27 +130,6 @@ def _send_sampler_end_signal(sender, recv_id):
_CAPI_SenderSendSamplerEndSignal(sender, int(recv_id))
def _recv_nodeflow(receiver, graph):
"""Receive sampled subgraph (NodeFlow) from remote sampler.
Parameters
----------
receiver : ctypes.c_void_p
C Receiver handle
graph : DGLGraph
The parent graph
Returns
-------
NodeFlow or an end-signal
"""
res = _CAPI_ReceiverRecvNodeFlow(receiver)
if isinstance(res, int):
return res
else:
return NodeFlow(graph, res)
################################ Distributed KVStore Components ################################
......
......@@ -43,16 +43,12 @@ from .. import subgraph
from .. import function
from ..sampling.neighbor import sample_neighbors
# TO BE DEPRECATED
from .._deprecate.graph import DGLGraph as DGLGraphStale
__all__ = [
'line_graph',
'khop_adj',
'khop_graph',
'reverse',
'to_bidirected',
'to_bidirected_stale',
'add_reverse_edges',
'laplacian_lambda_max',
'knn_graph',
......@@ -1319,59 +1315,6 @@ def to_simple_graph(g):
dgl_warning('dgl.to_simple_graph is renamed to dgl.to_simple in v0.5.')
return to_simple(g)
def to_bidirected_stale(g, readonly=True):
"""NOTE: this function only works on the deprecated
:class:`dgl.DGLGraphStale` object.
Convert the graph to a bidirected graph.
The function generates a new graph with no node/edge feature.
If g has an edge for ``(u, v)`` but no edge for ``(v, u)``, then the
returned graph will have both ``(u, v)`` and ``(v, u)``.
If the input graph is a multigraph (there are multiple edges from node u to node v),
the returned graph isn't well defined.
Parameters
----------
g : DGLGraphStale
The input graph.
readonly : bool
Whether the returned bidirected graph is readonly or not.
(Default: True)
Notes
-----
Please make sure g is a simple graph, otherwise the return value is undefined.
This function discards the batch information. Please use
:func:`dgl.DGLGraph.set_batch_num_nodes`
and :func:`dgl.DGLGraph.set_batch_num_edges` on the transformed graph
to maintain the information.
Returns
-------
DGLGraph
Examples
--------
The following two examples use PyTorch backend, one for non-multi graph
and one for multi-graph.
>>> g = dgl._deprecate.graph.DGLGraph()
>>> g.add_nodes(2)
>>> g.add_edges([0, 0], [0, 1])
>>> bg1 = dgl.to_bidirected_stale(g)
>>> bg1.edges()
(tensor([0, 1, 0]), tensor([0, 0, 1]))
"""
if readonly:
newgidx = _CAPI_DGLToBidirectedImmutableGraph(g._graph)
else:
newgidx = _CAPI_DGLToBidirectedMutableGraph(g._graph)
return DGLGraphStale(newgidx)
def laplacian_lambda_max(g):
"""Return the largest eigenvalue of the normalized symmetric Laplacian of a graph.
......
import backend as F
import numpy as np
import scipy as sp
import dgl
from dgl import utils
import os
import time
def generate_rand_graph(n):
arr = (sp.sparse.random(n, n, density=0.1, format='coo') != 0).astype(np.int64)
return dgl.DGLGraph(arr, readonly=True)
def start_trainer():
g = generate_rand_graph(100)
sampler = dgl.contrib.sampling.SamplerReceiver(graph=g, addr='127.0.0.1:50051', num_sender=1)
for subg in sampler:
seed_ids = subg.layer_parent_nid(-1)
assert len(seed_ids) == 1
src, dst, eid = g.in_edges(seed_ids, form='all')
assert subg.number_of_nodes() == len(src) + 1
assert subg.number_of_edges() == len(src)
assert seed_ids == subg.layer_parent_nid(-1)
child_src, child_dst, child_eid = subg.in_edges(subg.layer_nid(-1), form='all')
assert F.array_equal(child_src, subg.layer_nid(0))
src1 = subg.map_to_parent_nid(child_src)
assert F.array_equal(src1, src)
def start_sampler():
g = generate_rand_graph(100)
namebook = { 0:'127.0.0.1:50051' }
sender = dgl.contrib.sampling.SamplerSender(namebook)
for i, subg in enumerate(dgl.contrib.sampling.NeighborSampler(
g, 1, 100, neighbor_type='in', num_workers=4)):
sender.send(subg, 0)
sender.signal(0)
if __name__ == '__main__':
pid = os.fork()
if pid == 0:
start_trainer()
else:
time.sleep(2) # wait trainer start
start_sampler()
import os
import time
import backend as F
import numpy as np
import scipy as sp
from numpy.testing import assert_array_equal
import dgl
from dgl import utils
from dgl.contrib import KVClient, KVServer
num_entries = 10
dim_size = 3
server_namebook = {0: [0, "127.0.0.1", 30070, 1]}
data_0 = F.zeros((num_entries, dim_size), F.float32, F.cpu())
g2l_0 = F.arange(0, num_entries)
partition_0 = F.zeros(num_entries, F.int64, F.cpu())
data_1 = F.zeros((num_entries * 2, dim_size), F.float32, F.cpu())
g2l_1 = F.arange(0, num_entries * 2)
partition_1 = F.zeros(num_entries * 2, F.int64, F.cpu())
data_3 = F.zeros((num_entries, dim_size), F.int64, F.cpu())
data_4 = F.zeros((num_entries, dim_size), F.float64, F.cpu())
data_5 = F.zeros((num_entries, dim_size), F.int32, F.cpu())
def start_server():
my_server = KVServer(
server_id=0, server_namebook=server_namebook, num_client=1
)
my_server.set_global2local(name="data_0", global2local=g2l_0)
my_server.set_global2local(name="data_1", global2local=g2l_1)
my_server.set_global2local(name="data_3", global2local=g2l_0)
my_server.set_global2local(name="data_4", global2local=g2l_0)
my_server.set_global2local(name="data_5", global2local=g2l_0)
my_server.set_partition_book(name="data_0", partition_book=partition_0)
my_server.set_partition_book(name="data_1", partition_book=partition_1)
my_server.set_partition_book(name="data_3", partition_book=partition_0)
my_server.set_partition_book(name="data_4", partition_book=partition_0)
my_server.set_partition_book(name="data_5", partition_book=partition_0)
my_server.init_data(name="data_0", data_tensor=data_0)
my_server.init_data(name="data_1", data_tensor=data_1)
my_server.init_data(name="data_3", data_tensor=data_3)
my_server.init_data(name="data_4", data_tensor=data_4)
my_server.init_data(name="data_5", data_tensor=data_5)
my_server.start()
def start_client():
my_client = KVClient(server_namebook=server_namebook)
my_client.connect()
my_client.init_data(
name="data_2",
shape=(num_entries, dim_size),
dtype=F.float32,
target_name="data_0",
)
print("Init data from client..")
name_list = my_client.get_data_name_list()
assert len(name_list) == 6
assert "data_0" in name_list
assert "data_1" in name_list
assert "data_2" in name_list
assert "data_3" in name_list
assert "data_4" in name_list
assert "data_5" in name_list
meta_0 = my_client.get_data_meta("data_0")
assert meta_0[0] == F.float32
assert meta_0[1] == tuple(F.shape(data_0))
assert_array_equal(meta_0[2], partition_0)
meta_1 = my_client.get_data_meta("data_1")
assert meta_1[0] == F.float32
assert meta_1[1] == tuple(F.shape(data_1))
assert_array_equal(meta_1[2], partition_1)
meta_2 = my_client.get_data_meta("data_2")
assert meta_2[0] == F.float32
assert meta_2[1] == tuple(F.shape(data_0))
assert_array_equal(meta_2[2], partition_0)
meta_3 = my_client.get_data_meta("data_3")
assert meta_3[0] == F.int64
assert meta_3[1] == tuple(F.shape(data_3))
assert_array_equal(meta_3[2], partition_0)
meta_4 = my_client.get_data_meta("data_4")
assert meta_4[0] == F.float64
assert meta_4[1] == tuple(F.shape(data_4))
assert_array_equal(meta_3[2], partition_0)
meta_5 = my_client.get_data_meta("data_5")
assert meta_5[0] == F.int32
assert meta_5[1] == tuple(F.shape(data_5))
assert_array_equal(meta_3[2], partition_0)
my_client.push(
name="data_0",
id_tensor=F.tensor([0, 1, 2]),
data_tensor=F.tensor(
[[1.0, 1.0, 1.0], [2.0, 2.0, 2.0], [3.0, 3.0, 3.0]]
),
)
my_client.push(
name="data_2",
id_tensor=F.tensor([0, 1, 2]),
data_tensor=F.tensor(
[[1.0, 1.0, 1.0], [2.0, 2.0, 2.0], [3.0, 3.0, 3.0]]
),
)
my_client.push(
name="data_3",
id_tensor=F.tensor([0, 1, 2]),
data_tensor=F.tensor([[1, 1, 1], [2, 2, 2], [3, 3, 3]]),
)
my_client.push(
name="data_4",
id_tensor=F.tensor([0, 1, 2]),
data_tensor=F.tensor(
[[1.0, 1.0, 1.0], [2.0, 2.0, 2.0], [3.0, 3.0, 3.0]], F.float64
),
)
my_client.push(
name="data_5",
id_tensor=F.tensor([0, 1, 2]),
data_tensor=F.tensor([[1, 1, 1], [2, 2, 2], [3, 3, 3]], F.int32),
)
target = F.tensor([[1.0, 1.0, 1.0], [2.0, 2.0, 2.0], [3.0, 3.0, 3.0]])
res = my_client.pull(name="data_0", id_tensor=F.tensor([0, 1, 2]))
assert_array_equal(res, target)
res = my_client.pull(name="data_2", id_tensor=F.tensor([0, 1, 2]))
assert_array_equal(res, target)
target = F.tensor([[1, 1, 1], [2, 2, 2], [3, 3, 3]])
res = my_client.pull(name="data_3", id_tensor=F.tensor([0, 1, 2]))
assert_array_equal(res, target)
target = F.tensor(
[[1.0, 1.0, 1.0], [2.0, 2.0, 2.0], [3.0, 3.0, 3.0]], F.float64
)
res = my_client.pull(name="data_4", id_tensor=F.tensor([0, 1, 2]))
assert_array_equal(res, target)
target = F.tensor([[1, 1, 1], [2, 2, 2], [3, 3, 3]], F.int32)
res = my_client.pull(name="data_5", id_tensor=F.tensor([0, 1, 2]))
assert_array_equal(res, target)
my_client.shut_down()
if __name__ == "__main__":
pid = os.fork()
if pid == 0:
start_server()
else:
time.sleep(2) # wait trainer start
start_client()
import networkx as nx
import scipy.sparse as ssp
import dgl
import dgl.contrib as contrib
from dgl.graph_index import create_graph_index
from dgl.utils import toindex
import backend as F
......
import backend as F
import numpy as np
import scipy as sp
import dgl
from dgl import utils
import unittest
from numpy.testing import assert_array_equal
np.random.seed(42)
def generate_rand_graph(n):
arr = (sp.sparse.random(n, n, density=0.1, format='coo') != 0).astype(np.int64)
return dgl.DGLGraphStale(arr, readonly=True)
def test_create_full():
g = generate_rand_graph(100)
full_nf = dgl.contrib.sampling.sampler.create_full_nodeflow(g, 5)
assert full_nf.number_of_nodes() == g.number_of_nodes() * 6
assert full_nf.number_of_edges() == g.number_of_edges() * 5
def test_1neighbor_sampler_all():
g = generate_rand_graph(100)
# In this case, NeighborSampling simply gets the neighborhood of a single vertex.
for i, subg in enumerate(dgl.contrib.sampling.NeighborSampler(
g, 1, g.number_of_nodes(), neighbor_type='in', num_workers=4)):
seed_ids = subg.layer_parent_nid(-1)
assert len(seed_ids) == 1
src, dst, eid = g.in_edges(seed_ids, form='all')
assert subg.number_of_nodes() == len(src) + 1
assert subg.number_of_edges() == len(src)
assert seed_ids == subg.layer_parent_nid(-1)
child_src, child_dst, child_eid = subg.in_edges(subg.layer_nid(-1), form='all')
assert F.array_equal(child_src, subg.layer_nid(0))
src1 = subg.map_to_parent_nid(child_src)
assert F.array_equal(src1, src)
def is_sorted(arr):
return np.sum(np.sort(arr) == arr, 0) == len(arr)
def verify_subgraph(g, subg, seed_id):
seed_id = F.asnumpy(seed_id)
seeds = F.asnumpy(subg.map_to_parent_nid(subg.layer_nid(-1)))
assert seed_id in seeds
child_seed = F.asnumpy(subg.layer_nid(-1))[seeds == seed_id]
src, dst, eid = g.in_edges(seed_id, form='all')
child_src, child_dst, child_eid = subg.in_edges(child_seed, form='all')
child_src = F.asnumpy(child_src)
# We don't allow duplicate elements in the neighbor list.
assert(len(np.unique(child_src)) == len(child_src))
# The neighbor list also needs to be sorted.
assert(is_sorted(child_src))
# a neighbor in the subgraph must also exist in parent graph.
src = F.asnumpy(src)
for i in subg.map_to_parent_nid(child_src):
assert F.asnumpy(i) in src
def test_1neighbor_sampler():
g = generate_rand_graph(100)
# In this case, NeighborSampling simply gets the neighborhood of a single vertex.
for subg in dgl.contrib.sampling.NeighborSampler(g, 1, 5, neighbor_type='in',
num_workers=4):
seed_ids = subg.layer_parent_nid(-1)
assert len(seed_ids) == 1
assert subg.number_of_nodes() <= 6
assert subg.number_of_edges() <= 5
verify_subgraph(g, subg, seed_ids)
def test_prefetch_neighbor_sampler():
g = generate_rand_graph(100)
# In this case, NeighborSampling simply gets the neighborhood of a single vertex.
for subg in dgl.contrib.sampling.NeighborSampler(g, 1, 5, neighbor_type='in',
num_workers=4, prefetch=True):
seed_ids = subg.layer_parent_nid(-1)
assert len(seed_ids) == 1
assert subg.number_of_nodes() <= 6
assert subg.number_of_edges() <= 5
verify_subgraph(g, subg, seed_ids)
def test_10neighbor_sampler_all():
g = generate_rand_graph(100)
# In this case, NeighborSampling simply gets the neighborhood of a single vertex.
for subg in dgl.contrib.sampling.NeighborSampler(g, 10, g.number_of_nodes(),
neighbor_type='in', num_workers=4):
seed_ids = subg.layer_parent_nid(-1)
assert F.array_equal(seed_ids, subg.map_to_parent_nid(subg.layer_nid(-1)))
src, dst, eid = g.in_edges(seed_ids, form='all')
child_src, child_dst, child_eid = subg.in_edges(subg.layer_nid(-1), form='all')
src1 = subg.map_to_parent_nid(child_src)
assert F.array_equal(src1, src)
def check_10neighbor_sampler(g, seeds):
# In this case, NeighborSampling simply gets the neighborhood of a single vertex.
for subg in dgl.contrib.sampling.NeighborSampler(g, 10, 5, neighbor_type='in',
num_workers=4, seed_nodes=seeds):
seed_ids = subg.layer_parent_nid(-1)
assert subg.number_of_nodes() <= 6 * len(seed_ids)
assert subg.number_of_edges() <= 5 * len(seed_ids)
for seed_id in seed_ids:
verify_subgraph(g, subg, seed_id)
def test_10neighbor_sampler():
g = generate_rand_graph(100)
check_10neighbor_sampler(g, None)
check_10neighbor_sampler(g, seeds=np.unique(np.random.randint(0, g.number_of_nodes(),
size=int(g.number_of_nodes() / 10))))
def _test_layer_sampler(prefetch=False):
g = generate_rand_graph(100)
nid = g.nodes()
src, dst, eid = g.all_edges(form='all', order='eid')
n_batches = 5
batch_size = 50
seed_batches = [np.sort(np.random.choice(F.asnumpy(nid), batch_size, replace=False))
for i in range(n_batches)]
seed_nodes = np.hstack(seed_batches)
layer_sizes = [50] * 3
LayerSampler = getattr(dgl.contrib.sampling, 'LayerSampler')
sampler = LayerSampler(g, batch_size, layer_sizes, 'in',
seed_nodes=seed_nodes, num_workers=4, prefetch=prefetch)
for sub_g in sampler:
assert all(sub_g.layer_size(i) < size for i, size in enumerate(layer_sizes))
sub_nid = F.arange(0, sub_g.number_of_nodes())
assert all(np.all(np.isin(F.asnumpy(sub_g.layer_nid(i)), F.asnumpy(sub_nid)))
for i in range(sub_g.num_layers))
assert np.all(np.isin(F.asnumpy(sub_g.map_to_parent_nid(sub_nid)),
F.asnumpy(nid)))
sub_eid = F.arange(0, sub_g.number_of_edges())
assert np.all(np.isin(F.asnumpy(sub_g.map_to_parent_eid(sub_eid)),
F.asnumpy(eid)))
assert any(np.all(np.sort(F.asnumpy(sub_g.layer_parent_nid(-1))) == seed_batch)
for seed_batch in seed_batches)
sub_src, sub_dst = sub_g.all_edges(order='eid')
for i in range(sub_g.num_blocks):
block_eid = sub_g.block_eid(i)
block_src = sub_g.map_to_parent_nid(F.gather_row(sub_src, block_eid))
block_dst = sub_g.map_to_parent_nid(F.gather_row(sub_dst, block_eid))
block_parent_eid = sub_g.block_parent_eid(i)
block_parent_src = F.gather_row(src, block_parent_eid)
block_parent_dst = F.gather_row(dst, block_parent_eid)
assert np.all(F.asnumpy(block_src == block_parent_src))
n_layers = sub_g.num_layers
sub_n = sub_g.number_of_nodes()
assert sum(F.shape(sub_g.layer_nid(i))[0] for i in range(n_layers)) == sub_n
n_blocks = sub_g.num_blocks
sub_m = sub_g.number_of_edges()
assert sum(F.shape(sub_g.block_eid(i))[0] for i in range(n_blocks)) == sub_m
def test_layer_sampler():
_test_layer_sampler()
_test_layer_sampler(prefetch=True)
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="Error occured when multiprocessing")
def test_nonuniform_neighbor_sampler():
# Construct a graph with
# (1) A path (0, 1, ..., 99) with weight 1
# (2) A bunch of random edges with weight 0.
edges = []
for i in range(99):
edges.append((i, i + 1))
for i in range(1000):
edge = (np.random.randint(100), np.random.randint(100))
if edge not in edges:
edges.append(edge)
src, dst = zip(*edges)
g = dgl.DGLGraphStale()
g.add_nodes(100)
g.add_edges(src, dst)
g.readonly()
g.edata['w'] = F.cat([
F.ones((99,), F.float64, F.cpu()),
F.zeros((len(edges) - 99,), F.float64, F.cpu())], 0)
# Test 1-neighbor NodeFlow with 99 as target node.
# The generated NodeFlow should only contain node i on layer i.
sampler = dgl.contrib.sampling.NeighborSampler(
g, 1, 1, 99, 'in', transition_prob='w', seed_nodes=[99])
nf = next(iter(sampler))
assert nf.num_layers == 100
for i in range(nf.num_layers):
assert nf.layer_size(i) == 1
assert F.asnumpy(nf.layer_parent_nid(i)[0]) == i
# Test the reverse direction
sampler = dgl.contrib.sampling.NeighborSampler(
g, 1, 1, 99, 'out', transition_prob='w', seed_nodes=[0])
nf = next(iter(sampler))
assert nf.num_layers == 100
for i in range(nf.num_layers):
assert nf.layer_size(i) == 1
assert F.asnumpy(nf.layer_parent_nid(i)[0]) == 99 - i
def test_setseed():
g = generate_rand_graph(100)
nids = []
dgl.random.seed(42)
for subg in dgl.contrib.sampling.NeighborSampler(
g, 5, 3, num_hops=2, neighbor_type='in', num_workers=1):
nids.append(
tuple(tuple(F.asnumpy(subg.layer_parent_nid(i))) for i in range(3)))
# reinitialize
dgl.random.seed(42)
for i, subg in enumerate(dgl.contrib.sampling.NeighborSampler(
g, 5, 3, num_hops=2, neighbor_type='in', num_workers=1)):
item = tuple(tuple(F.asnumpy(subg.layer_parent_nid(i))) for i in range(3))
assert item == nids[i]
for i, subg in enumerate(dgl.contrib.sampling.NeighborSampler(
g, 5, 3, num_hops=2, neighbor_type='in', num_workers=4)):
pass
def check_head_tail(g):
lsrc, ldst, leid = g.all_edges(form='all', order='eid')
lsrc = np.unique(F.asnumpy(lsrc))
head_nid = np.unique(F.asnumpy(g.head_nid))
np.testing.assert_equal(lsrc, head_nid)
ldst = np.unique(F.asnumpy(ldst))
tail_nid = np.unique(F.asnumpy(g.tail_nid))
np.testing.assert_equal(tail_nid, ldst)
def check_negative_sampler(mode, exclude_positive, neg_size):
g = generate_rand_graph(100)
num_edges = g.number_of_edges()
etype = np.random.randint(0, 10, size=g.number_of_edges(), dtype=np.int64)
g.edata['etype'] = F.copy_to(F.tensor(etype), F.cpu())
pos_gsrc, pos_gdst, pos_geid = g.all_edges(form='all', order='eid')
pos_map = {}
for i in range(len(pos_geid)):
pos_d = int(F.asnumpy(pos_gdst[i]))
pos_e = int(F.asnumpy(pos_geid[i]))
pos_map[(pos_d, pos_e)] = int(F.asnumpy(pos_gsrc[i]))
EdgeSampler = getattr(dgl.contrib.sampling, 'EdgeSampler')
# Test the homogeneous graph.
batch_size = 50
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
negative_mode=mode,
reset=False,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
return_false_neg=True):
pos_lsrc, pos_ldst, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert_array_equal(F.asnumpy(F.gather_row(pos_edges.parent_eid, pos_leid)),
F.asnumpy(g.edge_ids(F.gather_row(pos_edges.parent_nid, pos_lsrc),
F.gather_row(pos_edges.parent_nid, pos_ldst))))
neg_lsrc, neg_ldst, neg_leid = neg_edges.all_edges(form='all', order='eid')
neg_src = F.gather_row(neg_edges.parent_nid, neg_lsrc)
neg_dst = F.gather_row(neg_edges.parent_nid, neg_ldst)
neg_eid = F.gather_row(neg_edges.parent_eid, neg_leid)
for i in range(len(neg_eid)):
neg_d = int(F.asnumpy(neg_dst)[i])
neg_e = int(F.asnumpy(neg_eid)[i])
assert (neg_d, neg_e) in pos_map
if exclude_positive:
assert int(F.asnumpy(neg_src[i])) != pos_map[(neg_d, neg_e)]
check_head_tail(neg_edges)
pos_tails = F.gather_row(pos_edges.parent_nid, pos_edges.tail_nid)
neg_tails = F.gather_row(neg_edges.parent_nid, neg_edges.tail_nid)
pos_tails = np.sort(F.asnumpy(pos_tails))
neg_tails = np.sort(F.asnumpy(neg_tails))
np.testing.assert_equal(pos_tails, neg_tails)
exist = neg_edges.edata['false_neg']
if exclude_positive:
assert np.sum(F.asnumpy(exist) == 0) == len(exist)
else:
assert F.array_equal(g.has_edges_between(neg_src, neg_dst), exist)
total_samples += batch_size
assert total_samples <= num_edges
# check replacement = True
# with reset = False (default setting)
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=True,
reset=False,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) == batch_size
total_samples += len(pos_leid)
assert total_samples == num_edges
# check replacement = False
# with reset = False (default setting)
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=False,
reset=False,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) == batch_size
total_samples += len(pos_leid)
assert total_samples == num_edges
# check replacement = True
# with reset = True
total_samples = 0
max_samples = 2 * num_edges
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=True,
reset=True,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) <= batch_size
total_samples += len(pos_leid)
if (total_samples >= max_samples):
break
assert total_samples >= max_samples
# check replacement = False
# with reset = True
total_samples = 0
max_samples = 2 * num_edges
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=False,
reset=True,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) <= batch_size
total_samples += len(pos_leid)
if (total_samples >= max_samples):
break
assert total_samples >= max_samples
# Test the knowledge graph.
total_samples = 0
for _, neg_edges in EdgeSampler(g, batch_size,
negative_mode=mode,
reset=False,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
relations=g.edata['etype'],
return_false_neg=True):
neg_lsrc, neg_ldst, neg_leid = neg_edges.all_edges(form='all', order='eid')
neg_src = F.gather_row(neg_edges.parent_nid, neg_lsrc)
neg_dst = F.gather_row(neg_edges.parent_nid, neg_ldst)
neg_eid = F.gather_row(neg_edges.parent_eid, neg_leid)
exists = neg_edges.edata['false_neg']
neg_edges.edata['etype'] = F.gather_row(g.edata['etype'], neg_eid)
for i in range(len(neg_eid)):
u, v = F.asnumpy(neg_src[i]), F.asnumpy(neg_dst[i])
if g.has_edge_between(u, v):
eid = g.edge_ids(u, v)
etype = g.edata['etype'][eid]
exist = neg_edges.edata['etype'][i] == etype
assert F.asnumpy(exists[i]) == F.asnumpy(exist)
total_samples += batch_size
assert total_samples <= num_edges
def check_weighted_negative_sampler(mode, exclude_positive, neg_size):
g = generate_rand_graph(100)
num_edges = g.number_of_edges()
num_nodes = g.number_of_nodes()
edge_weight = F.copy_to(F.tensor(np.full((num_edges,), 1, dtype=np.float32)), F.cpu())
node_weight = F.copy_to(F.tensor(np.full((num_nodes,), 1, dtype=np.float32)), F.cpu())
etype = np.random.randint(0, 10, size=num_edges, dtype=np.int64)
g.edata['etype'] = F.copy_to(F.tensor(etype), F.cpu())
pos_gsrc, pos_gdst, pos_geid = g.all_edges(form='all', order='eid')
pos_map = {}
for i in range(len(pos_geid)):
pos_d = int(F.asnumpy(pos_gdst[i]))
pos_e = int(F.asnumpy(pos_geid[i]))
pos_map[(pos_d, pos_e)] = int(F.asnumpy(pos_gsrc[i]))
EdgeSampler = getattr(dgl.contrib.sampling, 'EdgeSampler')
# Correctness check
# Test the homogeneous graph.
batch_size = 50
# Test the knowledge graph with edge weight provied.
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
reset=False,
edge_weight=edge_weight,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
return_false_neg=True):
pos_lsrc, pos_ldst, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert_array_equal(F.asnumpy(F.gather_row(pos_edges.parent_eid, pos_leid)),
F.asnumpy(g.edge_ids(F.gather_row(pos_edges.parent_nid, pos_lsrc),
F.gather_row(pos_edges.parent_nid, pos_ldst))))
neg_lsrc, neg_ldst, neg_leid = neg_edges.all_edges(form='all', order='eid')
neg_src = F.gather_row(neg_edges.parent_nid, neg_lsrc)
neg_dst = F.gather_row(neg_edges.parent_nid, neg_ldst)
neg_eid = F.gather_row(neg_edges.parent_eid, neg_leid)
for i in range(len(neg_eid)):
neg_d = int(F.asnumpy(neg_dst[i]))
neg_e = int(F.asnumpy(neg_eid[i]))
assert (neg_d, neg_e) in pos_map
if exclude_positive:
assert int(F.asnumpy(neg_src[i])) != pos_map[(neg_d, neg_e)]
check_head_tail(neg_edges)
pos_tails = F.gather_row(pos_edges.parent_nid, pos_edges.tail_nid)
neg_tails = F.gather_row(neg_edges.parent_nid, neg_edges.tail_nid)
pos_tails = np.sort(F.asnumpy(pos_tails))
neg_tails = np.sort(F.asnumpy(neg_tails))
np.testing.assert_equal(pos_tails, neg_tails)
exist = neg_edges.edata['false_neg']
if exclude_positive:
assert np.sum(F.asnumpy(exist) == 0) == len(exist)
else:
assert F.array_equal(g.has_edges_between(neg_src, neg_dst), exist)
total_samples += batch_size
assert total_samples <= num_edges
# Test the knowledge graph with edge weight provied.
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
reset=False,
edge_weight=edge_weight,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
relations=g.edata['etype'],
return_false_neg=True):
neg_lsrc, neg_ldst, neg_leid = neg_edges.all_edges(form='all', order='eid')
neg_src = F.gather_row(neg_edges.parent_nid, neg_lsrc)
neg_dst = F.gather_row(neg_edges.parent_nid, neg_ldst)
neg_eid = F.gather_row(neg_edges.parent_eid, neg_leid)
exists = neg_edges.edata['false_neg']
neg_edges.edata['etype'] = F.gather_row(g.edata['etype'], neg_eid)
for i in range(len(neg_eid)):
u, v = F.asnumpy(neg_src[i]), F.asnumpy(neg_dst[i])
if g.has_edge_between(u, v):
eid = g.edge_ids(u, v)
etype = g.edata['etype'][eid]
exist = neg_edges.edata['etype'][i] == etype
assert F.asnumpy(exists[i]) == F.asnumpy(exist)
total_samples += batch_size
assert total_samples <= num_edges
# Test the knowledge graph with edge/node weight provied.
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
reset=False,
edge_weight=edge_weight,
node_weight=node_weight,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
relations=g.edata['etype'],
return_false_neg=True):
neg_lsrc, neg_ldst, neg_leid = neg_edges.all_edges(form='all', order='eid')
neg_src = F.gather_row(neg_edges.parent_nid, neg_lsrc)
neg_dst = F.gather_row(neg_edges.parent_nid, neg_ldst)
neg_eid = F.gather_row(neg_edges.parent_eid, neg_leid)
exists = neg_edges.edata['false_neg']
neg_edges.edata['etype'] = F.gather_row(g.edata['etype'], neg_eid)
for i in range(len(neg_eid)):
u, v = F.asnumpy(neg_src[i]), F.asnumpy(neg_dst[i])
if g.has_edge_between(u, v):
eid = g.edge_ids(u, v)
etype = g.edata['etype'][eid]
exist = neg_edges.edata['etype'][i] == etype
assert F.asnumpy(exists[i]) == F.asnumpy(exist)
total_samples += batch_size
assert total_samples <= num_edges
# check replacement = True with pos edges no-uniform sample
# with reset = False
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=True,
reset=False,
edge_weight=edge_weight,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) == batch_size
total_samples += len(pos_leid)
assert total_samples == num_edges
# check replacement = True with pos edges no-uniform sample
# with reset = True
total_samples = 0
max_samples = 4 * num_edges
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=True,
reset=True,
edge_weight=edge_weight,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) == batch_size
total_samples += len(pos_leid)
if total_samples >= max_samples:
break
assert total_samples == max_samples
# check replacement = False with pos/neg edges no-uniform sample
# reset = False
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=False,
reset=False,
edge_weight=edge_weight,
node_weight=node_weight,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
relations=g.edata['etype'],
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) == batch_size
total_samples += len(pos_leid)
assert total_samples == num_edges
# check replacement = False with pos/neg edges no-uniform sample
# reset = True
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=False,
reset=True,
edge_weight=edge_weight,
node_weight=node_weight,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=exclude_positive,
relations=g.edata['etype'],
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
assert len(pos_leid) == batch_size
total_samples += len(pos_leid)
if total_samples >= max_samples:
break
assert total_samples == max_samples
# Check Rate
dgl.random.seed(0)
g = generate_rand_graph(1000)
num_edges = g.number_of_edges()
num_nodes = g.number_of_nodes()
edge_weight = F.copy_to(F.tensor(np.full((num_edges,), 1, dtype=np.float32)), F.cpu())
edge_weight[0] = F.sum(edge_weight, dim=0)
node_weight = F.copy_to(F.tensor(np.full((num_nodes,), 1, dtype=np.float32)), F.cpu())
node_weight[-1] = F.sum(node_weight, dim=0) / 200
etype = np.random.randint(0, 20, size=num_edges, dtype=np.int64)
g.edata['etype'] = F.copy_to(F.tensor(etype), F.cpu())
# Test w/o node weight.
max_samples = num_edges // 5
total_samples = 0
# Test the knowledge graph with edge weight provied.
edge_sampled = np.full((num_edges,), 0, dtype=np.int32)
node_sampled = np.full((num_nodes,), 0, dtype=np.int32)
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=True,
edge_weight=edge_weight,
shuffle=True,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=False,
relations=g.edata['etype'],
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
neg_lsrc, neg_ldst, _ = neg_edges.all_edges(form='all', order='eid')
if 'head' in mode:
neg_src = neg_edges.parent_nid[neg_lsrc]
np.add.at(node_sampled, F.asnumpy(neg_src), 1)
else:
neg_dst = neg_edges.parent_nid[neg_ldst]
np.add.at(node_sampled, F.asnumpy(neg_dst), 1)
np.add.at(edge_sampled, F.asnumpy(pos_edges.parent_eid[pos_leid]), 1)
total_samples += batch_size
if total_samples > max_samples:
break
# Check rate here
edge_rate_0 = edge_sampled[0] / edge_sampled.sum()
edge_tail_half_cnt = edge_sampled[edge_sampled.shape[0] // 2:-1].sum()
edge_rate_tail_half = edge_tail_half_cnt / edge_sampled.sum()
assert np.allclose(edge_rate_0, 0.5, atol=0.05)
assert np.allclose(edge_rate_tail_half, 0.25, atol=0.05)
node_rate_0 = node_sampled[0] / node_sampled.sum()
node_tail_half_cnt = node_sampled[node_sampled.shape[0] // 2:-1].sum()
node_rate_tail_half = node_tail_half_cnt / node_sampled.sum()
assert node_rate_0 < 0.02
assert np.allclose(node_rate_tail_half, 0.5, atol=0.02)
# Test the knowledge graph with edge/node weight provied.
edge_sampled = np.full((num_edges,), 0, dtype=np.int32)
node_sampled = np.full((num_nodes,), 0, dtype=np.int32)
total_samples = 0
for pos_edges, neg_edges in EdgeSampler(g, batch_size,
replacement=True,
edge_weight=edge_weight,
node_weight=node_weight,
shuffle=True,
negative_mode=mode,
neg_sample_size=neg_size,
exclude_positive=False,
relations=g.edata['etype'],
return_false_neg=True):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
neg_lsrc, neg_ldst, _ = neg_edges.all_edges(form='all', order='eid')
if 'head' in mode:
neg_src = F.gather_row(neg_edges.parent_nid, neg_lsrc)
np.add.at(node_sampled, F.asnumpy(neg_src), 1)
else:
neg_dst = F.gather_row(neg_edges.parent_nid, neg_ldst)
np.add.at(node_sampled, F.asnumpy(neg_dst), 1)
np.add.at(edge_sampled, F.asnumpy(pos_edges.parent_eid[pos_leid]), 1)
total_samples += batch_size
if total_samples > max_samples:
break
# Check rate here
edge_rate_0 = edge_sampled[0] / edge_sampled.sum()
edge_tail_half_cnt = edge_sampled[edge_sampled.shape[0] // 2:-1].sum()
edge_rate_tail_half = edge_tail_half_cnt / edge_sampled.sum()
assert np.allclose(edge_rate_0, 0.5, atol=0.05)
assert np.allclose(edge_rate_tail_half, 0.25, atol=0.05)
node_rate = node_sampled[-1] / node_sampled.sum()
node_rate_a = np.average(node_sampled[:50]) / node_sampled.sum()
node_rate_b = np.average(node_sampled[50:100]) / node_sampled.sum()
# As neg sampling does not contain duplicate nodes,
# this test takes some acceptable variation on the sample rate.
assert np.allclose(node_rate, node_rate_a * 5, atol=0.002)
assert np.allclose(node_rate_a, node_rate_b, atol=0.0002)
def check_positive_edge_sampler():
g = generate_rand_graph(1000)
num_edges = g.number_of_edges()
edge_weight = F.copy_to(F.tensor(np.full((num_edges,), 0.1, dtype=np.float32)), F.cpu())
edge_weight[num_edges-1] = num_edges ** 2
EdgeSampler = getattr(dgl.contrib.sampling, 'EdgeSampler')
# Correctness check
# Test the homogeneous graph.
batch_size = 128
edge_sampled = np.full((num_edges,), 0, dtype=np.int32)
for pos_edges in EdgeSampler(g, batch_size,
reset=False,
edge_weight=edge_weight):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
np.add.at(edge_sampled, F.asnumpy(pos_edges.parent_eid[pos_leid]), 1)
truth = np.full((num_edges,), 1, dtype=np.int32)
edge_sampled = edge_sampled[:num_edges]
assert np.array_equal(truth, edge_sampled)
edge_sampled = np.full((num_edges,), 0, dtype=np.int32)
for pos_edges in EdgeSampler(g, batch_size,
reset=False,
shuffle=True,
edge_weight=edge_weight):
_, _, pos_leid = pos_edges.all_edges(form='all', order='eid')
np.add.at(edge_sampled, F.asnumpy(pos_edges.parent_eid[pos_leid]), 1)
truth = np.full((num_edges,), 1, dtype=np.int32)
edge_sampled = edge_sampled[:num_edges]
assert np.array_equal(truth, edge_sampled)
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support item assignment")
def test_negative_sampler():
check_negative_sampler('chunk-head', False, 10)
check_negative_sampler('head', True, 10)
check_negative_sampler('head', False, 10)
check_weighted_negative_sampler('chunk-head', False, 10)
check_weighted_negative_sampler('head', True, 10)
check_weighted_negative_sampler('head', False, 10)
check_positive_edge_sampler()
#disable this check for now. It might take too long time.
#check_negative_sampler('head', False, 100)
if __name__ == '__main__':
test_create_full()
test_1neighbor_sampler_all()
test_10neighbor_sampler_all()
test_1neighbor_sampler()
test_10neighbor_sampler()
test_layer_sampler()
test_nonuniform_neighbor_sampler()
test_setseed()
test_negative_sampler()
import networkx as nx
import scipy.sparse as ssp
import dgl
import dgl.contrib as contrib
from dgl.graph_index import create_graph_index
from dgl.utils import toindex
import backend as F
......
......@@ -519,14 +519,6 @@ def create_large_graph(num_nodes, idtype=F.int64):
return dgl.from_scipy(spm, idtype=idtype)
def get_nodeflow(g, node_ids, num_layers):
batch_size = len(node_ids)
expand_factor = g.number_of_nodes()
sampler = dgl.contrib.sampling.NeighborSampler(g, batch_size,
expand_factor=expand_factor, num_hops=num_layers,
seed_nodes=node_ids)
return next(iter(sampler))
# Disabled since everything will be on heterogeneous graphs
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU not implemented")
def test_partition_with_halo():
......
""" NOTE(zihao) The unittest on shared memory store is temporally disabled because we
have not fixed the bug described in https://github.com/dmlc/dgl/issues/755 yet.
The bug causes CI failures occasionally but does not affect other parts of DGL.
As a result, we decide to disable this test until we fixed the bug.
"""
import dgl
import sys
import os
import random
import time
import numpy as np
from numpy.testing import assert_array_equal
from multiprocessing import Process, Manager, Condition, Value
from scipy import sparse as spsp
import backend as F
import unittest
import dgl.function as fn
import traceback
from numpy.testing import assert_almost_equal
num_nodes = 100
num_edges = int(num_nodes * num_nodes * 0.1)
rand_port = random.randint(5000, 8000)
print('run graph store with port ' + str(rand_port), file=sys.stderr)
def check_array_shared_memory(g, worker_id, arrays):
if worker_id == 0:
for i, arr in enumerate(arrays):
arr[0] = i + 10
g._sync_barrier(60)
else:
g._sync_barrier(60)
for i, arr in enumerate(arrays):
assert_almost_equal(F.asnumpy(arr[0]), i + 10)
def create_graph_store(graph_name):
for _ in range(10):
try:
g = dgl.contrib.graph_store.create_graph_from_store(graph_name, "shared_mem",
port=rand_port)
return g
except ConnectionError as e:
traceback.print_exc()
time.sleep(1)
return None
def check_init_func(worker_id, graph_name, return_dict):
np.random.seed(0)
csr = (spsp.random(num_nodes, num_nodes, density=0.1, format='csr') != 0).astype(np.int64)
tmp_g = dgl.DGLGraph(csr, readonly=True, multigraph=False)
# Verify the graph structure loaded from the shared memory.
try:
g = create_graph_store(graph_name)
if g is None:
return_dict[worker_id] = -1
return
src, dst = g.all_edges(order='srcdst')
src1, dst1 = tmp_g.all_edges(order='srcdst')
assert_array_equal(F.asnumpy(dst), F.asnumpy(dst1))
assert_array_equal(F.asnumpy(src), F.asnumpy(src1))
feat = F.asnumpy(g.nodes[0].data['feat'])
assert_array_equal(np.squeeze(feat), np.arange(10, dtype=feat.dtype))
feat = F.asnumpy(g.edges[0].data['feat'])
assert_array_equal(np.squeeze(feat), np.arange(10, dtype=feat.dtype))
g.init_ndata('test4', (g.number_of_nodes(), 10), 'float32')
g.init_edata('test4', (g.number_of_edges(), 10), 'float32')
g._sync_barrier(60)
check_array_shared_memory(g, worker_id, [g.nodes[:].data['test4'], g.edges[:].data['test4']])
g._sync_barrier(60)
data = g.nodes[:].data['test4']
g.set_n_repr({'test4': F.ones((1, 10)) * 10}, u=[0])
assert_almost_equal(F.asnumpy(data[0]), np.squeeze(F.asnumpy(g.nodes[0].data['test4'])))
data = g.edges[:].data['test4']
g.set_e_repr({'test4': F.ones((1, 10)) * 20}, edges=[0])
assert_almost_equal(F.asnumpy(data[0]), np.squeeze(F.asnumpy(g.edges[0].data['test4'])))
g.destroy()
return_dict[worker_id] = 0
except Exception as e:
return_dict[worker_id] = -1
g.destroy()
print(e, file=sys.stderr)
traceback.print_exc()
def server_func(num_workers, graph_name, server_init):
np.random.seed(0)
csr = (spsp.random(num_nodes, num_nodes, density=0.1, format='csr') != 0).astype(np.int64)
g = dgl.contrib.graph_store.create_graph_store_server(csr, graph_name, "shared_mem", num_workers,
False, port=rand_port)
assert num_nodes == g._graph.number_of_nodes()
assert num_edges == g._graph.number_of_edges()
nfeat = np.arange(0, num_nodes * 10).astype('float32').reshape((num_nodes, 10))
efeat = np.arange(0, num_edges * 10).astype('float32').reshape((num_edges, 10))
g.ndata['feat'] = F.tensor(nfeat)
g.edata['feat'] = F.tensor(efeat)
server_init.value = 1
g.run()
@unittest.skipIf(True, reason="skip this test")
def test_init():
manager = Manager()
return_dict = manager.dict()
# make server init before worker
server_init = Value('i', False)
serv_p = Process(target=server_func, args=(2, 'test_graph1', server_init))
serv_p.start()
while server_init.value == 0:
time.sleep(1)
work_p1 = Process(target=check_init_func, args=(0, 'test_graph1', return_dict))
work_p2 = Process(target=check_init_func, args=(1, 'test_graph1', return_dict))
work_p1.start()
work_p2.start()
serv_p.join()
work_p1.join()
work_p2.join()
for worker_id in return_dict.keys():
assert return_dict[worker_id] == 0, "worker %d fails" % worker_id
def check_compute_func(worker_id, graph_name, return_dict):
try:
g = create_graph_store(graph_name)
if g is None:
return_dict[worker_id] = -1
return
g._sync_barrier(60)
in_feats = g.nodes[0].data['feat'].shape[1]
# Test update all.
g.update_all(fn.copy_u(u='feat', out='m'), fn.sum(msg='m', out='preprocess'))
adj = g.adjacency_matrix(transpose=True)
tmp = F.spmm(adj, g.nodes[:].data['feat'])
assert_almost_equal(F.asnumpy(g.nodes[:].data['preprocess']), F.asnumpy(tmp))
g._sync_barrier(60)
check_array_shared_memory(g, worker_id, [g.nodes[:].data['preprocess']])
g._sync_barrier(60)
# Test apply nodes.
data = g.nodes[:].data['feat']
g.apply_nodes(func=lambda nodes: {'feat': F.ones((1, in_feats)) * 10}, v=0)
assert_almost_equal(F.asnumpy(data[0]), np.squeeze(F.asnumpy(g.nodes[0].data['feat'])))
# Test apply edges.
data = g.edges[:].data['feat']
g.apply_edges(func=lambda edges: {'feat': F.ones((1, in_feats)) * 10}, edges=0)
assert_almost_equal(F.asnumpy(data[0]), np.squeeze(F.asnumpy(g.edges[0].data['feat'])))
g.init_ndata('tmp', (g.number_of_nodes(), 10), 'float32')
data = g.nodes[:].data['tmp']
# Test pull
g.pull(1, fn.copy_u(u='feat', out='m'), fn.sum(msg='m', out='tmp'))
assert_almost_equal(F.asnumpy(data[1]), np.squeeze(F.asnumpy(g.nodes[1].data['preprocess'])))
# Test send_and_recv
in_edges = g.in_edges(v=2)
g.send_and_recv(in_edges, fn.copy_u(u='feat', out='m'), fn.sum(msg='m', out='tmp'))
assert_almost_equal(F.asnumpy(data[2]), np.squeeze(F.asnumpy(g.nodes[2].data['preprocess'])))
g.destroy()
return_dict[worker_id] = 0
except Exception as e:
return_dict[worker_id] = -1
g.destroy()
print(e, file=sys.stderr)
traceback.print_exc()
@unittest.skipIf(True, reason="skip this test")
def test_compute():
manager = Manager()
return_dict = manager.dict()
# make server init before worker
server_init = Value('i', 0)
serv_p = Process(target=server_func, args=(2, 'test_graph3', server_init))
serv_p.start()
while server_init.value == 0:
time.sleep(1)
work_p1 = Process(target=check_compute_func, args=(0, 'test_graph3', return_dict))
work_p2 = Process(target=check_compute_func, args=(1, 'test_graph3', return_dict))
work_p1.start()
work_p2.start()
serv_p.join()
work_p1.join()
work_p2.join()
for worker_id in return_dict.keys():
assert return_dict[worker_id] == 0, "worker %d fails" % worker_id
def check_sync_barrier(worker_id, graph_name, return_dict):
try:
g = create_graph_store(graph_name)
if g is None:
return_dict[worker_id] = -1
return
if worker_id == 1:
g.destroy()
return_dict[worker_id] = 0
return
start = time.time()
try:
g._sync_barrier(10)
except TimeoutError as e:
# this is very loose.
print("timeout: " + str(abs(time.time() - start)), file=sys.stderr)
assert 5 < abs(time.time() - start) < 15
g.destroy()
return_dict[worker_id] = 0
except Exception as e:
return_dict[worker_id] = -1
g.destroy()
print(e, file=sys.stderr)
traceback.print_exc()
@unittest.skipIf(True, reason="skip this test")
def test_sync_barrier():
manager = Manager()
return_dict = manager.dict()
# make server init before worker
server_init = Value('i', 0)
serv_p = Process(target=server_func, args=(2, 'test_graph4', server_init))
serv_p.start()
while server_init.value == 0:
time.sleep(1)
work_p1 = Process(target=check_sync_barrier, args=(0, 'test_graph4', return_dict))
work_p2 = Process(target=check_sync_barrier, args=(1, 'test_graph4', return_dict))
work_p1.start()
work_p2.start()
serv_p.join()
work_p1.join()
work_p2.join()
for worker_id in return_dict.keys():
assert return_dict[worker_id] == 0, "worker %d fails" % worker_id
def create_mem(gidx, cond_v, shared_v):
# serialize create_mem before check_mem
cond_v.acquire()
gidx1 = gidx.copyto_shared_mem("test_graph5")
shared_v.value = 1;
cond_v.notify()
cond_v.release()
# sync for exit
cond_v.acquire()
while shared_v.value == 1:
cond_v.wait()
cond_v.release()
def check_mem(gidx, cond_v, shared_v):
# check_mem should run after create_mem
cond_v.acquire()
while shared_v.value == 0:
cond_v.wait()
cond_v.release()
gidx1 = dgl.graph_index.from_shared_mem_graph_index("test_graph5")
in_csr = gidx.adjacency_matrix_scipy(True, "csr")
out_csr = gidx.adjacency_matrix_scipy(False, "csr")
in_csr1 = gidx1.adjacency_matrix_scipy(True, "csr")
assert_array_equal(in_csr.indptr, in_csr1.indptr)
assert_array_equal(in_csr.indices, in_csr1.indices)
out_csr1 = gidx1.adjacency_matrix_scipy(False, "csr")
assert_array_equal(out_csr.indptr, out_csr1.indptr)
assert_array_equal(out_csr.indices, out_csr1.indices)
gidx1 = gidx1.copyto_shared_mem("test_graph5")
#sync for exit
cond_v.acquire()
shared_v.value = 0;
cond_v.notify()
cond_v.release()
@unittest.skipIf(True, reason="skip this test")
def test_copy_shared_mem():
csr = (spsp.random(num_nodes, num_nodes, density=0.1, format='csr') != 0).astype(np.int64)
gidx = dgl.graph_index.create_graph_index(csr, True)
cond_v = Condition()
shared_v = Value('i', 0)
p1 = Process(target=create_mem, args=(gidx, cond_v, shared_v))
p2 = Process(target=check_mem, args=(gidx, cond_v, shared_v))
p1.start()
p2.start()
p1.join()
p2.join()
# Skip test this file
#if __name__ == '__main__':
# test_copy_shared_mem()
# test_init()
# test_sync_barrier()
# test_compute()
import dgl
import argparse
import mxnet as mx
import time
import backend as F
from multiprocessing import Process
ID = []
ID.append(mx.nd.array([0,1], dtype='int64'))
ID.append(mx.nd.array([2,3], dtype='int64'))
ID.append(mx.nd.array([4,5], dtype='int64'))
ID.append(mx.nd.array([6,7], dtype='int64'))
DATA = []
DATA.append(mx.nd.array([[1.,1.,1.,],[1.,1.,1.,]]))
DATA.append(mx.nd.array([[2.,2.,2.,],[2.,2.,2.,]]))
DATA.append(mx.nd.array([[3.,3.,3.,],[3.,3.,3.,]]))
DATA.append(mx.nd.array([[4.,4.,4.,],[4.,4.,4.,]]))
edata_partition_book = {'edata':mx.nd.array([0,0,1,1,2,2,3,3], dtype='int64')}
ndata_partition_book = {'ndata':mx.nd.array([0,0,1,1,2,2,3,3], dtype='int64')}
ndata_g2l = []
edata_g2l = []
ndata_g2l.append({'ndata':mx.nd.array([0,1,0,0,0,0,0,0], dtype='int64')})
ndata_g2l.append({'ndata':mx.nd.array([0,0,0,1,0,0,0,0], dtype='int64')})
ndata_g2l.append({'ndata':mx.nd.array([0,0,0,0,0,1,0,0], dtype='int64')})
ndata_g2l.append({'ndata':mx.nd.array([0,0,0,0,0,0,0,1], dtype='int64')})
edata_g2l.append({'edata':mx.nd.array([0,1,0,0,0,0,0,0], dtype='int64')})
edata_g2l.append({'edata':mx.nd.array([0,0,0,1,0,0,0,0], dtype='int64')})
edata_g2l.append({'edata':mx.nd.array([0,0,0,0,0,1,0,0], dtype='int64')})
edata_g2l.append({'edata':mx.nd.array([0,0,0,0,0,0,0,1], dtype='int64')})
def start_client(flag):
time.sleep(3)
client = dgl.contrib.start_client(ip_config='ip_config.txt',
ndata_partition_book=ndata_partition_book,
edata_partition_book=edata_partition_book,
close_shared_mem=flag)
client.push(name='edata', id_tensor=ID[client.get_id()], data_tensor=DATA[client.get_id()])
client.push(name='ndata', id_tensor=ID[client.get_id()], data_tensor=DATA[client.get_id()])
client.barrier()
tensor_edata = client.pull(name='edata', id_tensor=mx.nd.array([0,1,2,3,4,5,6,7], dtype='int64'))
tensor_ndata = client.pull(name='ndata', id_tensor=mx.nd.array([0,1,2,3,4,5,6,7], dtype='int64'))
target_tensor = mx.nd.array([[1., 1., 1.],
[1., 1., 1.],
[2., 2., 2.],
[2., 2., 2.],
[3., 3., 3.],
[3., 3., 3.],
[4., 4., 4.],
[4., 4., 4.]])
assert F.array_equal(tensor_edata, target_tensor)
assert F.array_equal(tensor_ndata, target_tensor)
client.barrier()
if client.get_id() == 0:
client.shut_down()
def start_server(server_id, num_client):
dgl.contrib.start_server(
server_id=server_id,
ip_config='ip_config.txt',
num_client=num_client,
ndata={'ndata':mx.nd.array([[0.,0.,0.],[0.,0.,0.]])},
edata={'edata':mx.nd.array([[0.,0.,0.],[0.,0.,0.]])},
ndata_g2l=ndata_g2l[server_id],
edata_g2l=edata_g2l[server_id])
if __name__ == '__main__':
# server process
p0 = Process(target=start_server, args=(0, 4))
p1 = Process(target=start_server, args=(1, 4))
p2 = Process(target=start_server, args=(2, 4))
p3 = Process(target=start_server, args=(3, 4))
# client process
p4 = Process(target=start_client, args=(True,))
p5 = Process(target=start_client, args=(True,))
p6 = Process(target=start_client, args=(False,))
p7 = Process(target=start_client, args=(False,))
# start server process
p0.start()
p1.start()
p2.start()
p3.start()
# start client process
p4.start()
p5.start()
p6.start()
p7.start()
p0.join()
p1.join()
p2.join()
p3.join()
p4.join()
p5.join()
p6.join()
p7.join()
\ No newline at end of file
import dgl
import argparse
import torch as th
import time
import backend as F
from multiprocessing import Process
ID = []
ID.append(th.tensor([0,1]))
ID.append(th.tensor([2,3]))
ID.append(th.tensor([4,5]))
ID.append(th.tensor([6,7]))
DATA = []
DATA.append(th.tensor([[1.,1.,1.,],[1.,1.,1.,]]))
DATA.append(th.tensor([[2.,2.,2.,],[2.,2.,2.,]]))
DATA.append(th.tensor([[3.,3.,3.,],[3.,3.,3.,]]))
DATA.append(th.tensor([[4.,4.,4.,],[4.,4.,4.,]]))
edata_partition_book = {'edata':th.tensor([0,0,1,1,2,2,3,3])}
ndata_partition_book = {'ndata':th.tensor([0,0,1,1,2,2,3,3])}
ndata_g2l = []
edata_g2l = []
ndata_g2l.append({'ndata':th.tensor([0,1,0,0,0,0,0,0])})
ndata_g2l.append({'ndata':th.tensor([0,0,0,1,0,0,0,0])})
ndata_g2l.append({'ndata':th.tensor([0,0,0,0,0,1,0,0])})
ndata_g2l.append({'ndata':th.tensor([0,0,0,0,0,0,0,1])})
edata_g2l.append({'edata':th.tensor([0,1,0,0,0,0,0,0])})
edata_g2l.append({'edata':th.tensor([0,0,0,1,0,0,0,0])})
edata_g2l.append({'edata':th.tensor([0,0,0,0,0,1,0,0])})
edata_g2l.append({'edata':th.tensor([0,0,0,0,0,0,0,1])})
def start_client(flag):
time.sleep(3)
client = dgl.contrib.start_client(ip_config='ip_config.txt',
ndata_partition_book=ndata_partition_book,
edata_partition_book=edata_partition_book,
close_shared_mem=flag)
client.push(name='edata', id_tensor=ID[client.get_id()], data_tensor=DATA[client.get_id()])
client.push(name='ndata', id_tensor=ID[client.get_id()], data_tensor=DATA[client.get_id()])
client.barrier()
tensor_edata = client.pull(name='edata', id_tensor=th.tensor([0,1,2,3,4,5,6,7]))
tensor_ndata = client.pull(name='ndata', id_tensor=th.tensor([0,1,2,3,4,5,6,7]))
target_tensor = th.tensor([[1., 1., 1.],
[1., 1., 1.],
[2., 2., 2.],
[2., 2., 2.],
[3., 3., 3.],
[3., 3., 3.],
[4., 4., 4.],
[4., 4., 4.]])
assert F.array_equal(tensor_edata, target_tensor)
assert F.array_equal(tensor_ndata, target_tensor)
client.barrier()
if client.get_id() == 0:
client.shut_down()
def start_server(server_id, num_client):
dgl.contrib.start_server(
server_id=server_id,
ip_config='ip_config.txt',
num_client=num_client,
ndata={'ndata':th.tensor([[0.,0.,0.],[0.,0.,0.]])},
edata={'edata':th.tensor([[0.,0.,0.],[0.,0.,0.]])},
ndata_g2l=ndata_g2l[server_id],
edata_g2l=edata_g2l[server_id])
if __name__ == '__main__':
# server process
p0 = Process(target=start_server, args=(0, 4))
p1 = Process(target=start_server, args=(1, 4))
p2 = Process(target=start_server, args=(2, 4))
p3 = Process(target=start_server, args=(3, 4))
# client process
p4 = Process(target=start_client, args=(True,))
p5 = Process(target=start_client, args=(True,))
p6 = Process(target=start_client, args=(False,))
p7 = Process(target=start_client, args=(False,))
# start server process
p0.start()
p1.start()
p2.start()
p3.start()
# start client process
p4.start()
p5.start()
p6.start()
p7.start()
p0.join()
p1.join()
p2.join()
p3.join()
p4.join()
p5.join()
p6.join()
p7.join()
\ No newline at end of file
import dgl.multiprocessing as mp
import unittest, os
import pytest
import torch as th
import dgl
import backend as F
def start_unified_tensor_worker(dev_id, input, seq_idx, rand_idx, output_seq, output_rand):
device = th.device('cuda:'+str(dev_id))
th.cuda.set_device(device)
input_unified = dgl.contrib.UnifiedTensor(input, device=device)
output_seq.copy_(input_unified[seq_idx.to(device)].cpu())
output_rand.copy_(input_unified[rand_idx.to(device)].cpu())
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(F.ctx().type == 'cpu', reason='gpu only test')
def test_unified_tensor():
test_row_size = 65536
test_col_size = 128
rand_test_size = 8192
device = th.device('cuda:0')
input = th.rand((test_row_size, test_col_size))
input_unified = dgl.contrib.UnifiedTensor(input, device=device)
seq_idx = th.arange(0, test_row_size)
# CPU indexing
assert th.all(th.eq(input[seq_idx], input_unified[seq_idx]))
# GPU indexing
assert th.all(th.eq(input[seq_idx].to(device), input_unified[seq_idx.to(device)]))
rand_idx = th.randint(0, test_row_size, (rand_test_size,))
# CPU indexing
assert th.all(th.eq(input[rand_idx], input_unified[rand_idx]))
# GPU indexing
assert th.all(th.eq(input[rand_idx].to(device), input_unified[rand_idx.to(device)]))
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(F.ctx().type == 'cpu', reason='gpu only test')
@pytest.mark.parametrize("num_workers", [1, 2])
def test_multi_gpu_unified_tensor(num_workers):
if F.ctx().type == 'cuda' and th.cuda.device_count() < num_workers:
pytest.skip("Not enough number of GPUs to do this test, skip multi-gpu test.")
test_row_size = 65536
test_col_size = 128
rand_test_size = 8192
input = th.rand((test_row_size, test_col_size)).share_memory_()
seq_idx = th.arange(0, test_row_size).share_memory_()
rand_idx = th.randint(0, test_row_size, (rand_test_size,)).share_memory_()
output_seq = []
output_rand = []
output_seq_cpu = input[seq_idx]
output_rand_cpu = input[rand_idx]
worker_list = []
ctx = mp.get_context('spawn')
for i in range(num_workers):
output_seq.append(th.zeros((test_row_size, test_col_size)).share_memory_())
output_rand.append(th.zeros((rand_test_size, test_col_size)).share_memory_())
p = ctx.Process(target=start_unified_tensor_worker,
args=(i, input, seq_idx, rand_idx, output_seq[i], output_rand[i],))
p.start()
worker_list.append(p)
for p in worker_list:
p.join()
for p in worker_list:
assert p.exitcode == 0
for i in range(num_workers):
assert th.all(th.eq(output_seq_cpu, output_seq[i]))
assert th.all(th.eq(output_rand_cpu, output_rand[i]))
if __name__ == '__main__':
test_unified_tensor()
test_multi_gpu_unified_tensor(1)
test_multi_gpu_unified_tensor(2)
......@@ -24,4 +24,4 @@ export LC_ALL=C.UTF-8
export LANG=C.UTF-8
python -m pip install psutil || fail "pip install"
python3 -m pytest -v --junitxml=pytest_go.xml --durations=100 tests/go || fail "go"
python3 -m pytest -v --junitxml=pytest_go.xml --durations=100 tests/go/test_model.py || fail "go"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment