Unverified Commit 0b3a447b authored by Hongzhi (Steve), Chen's avatar Hongzhi (Steve), Chen Committed by GitHub
Browse files

auto format distributed (#5317)


Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-28-63.ap-northeast-1.compute.internal>
parent 74c9d27d
......@@ -127,7 +127,7 @@ class CustomPool:
# should be able to take infinite elements to avoid dead lock.
self.queue_size = 0
self.result_queue = ctx.Queue(self.queue_size)
self.results = {} # key is dataloader name, value is fetched batch.
self.results = {} # key is dataloader name, value is fetched batch.
self.task_queues = []
self.process_list = []
self.current_proc_id = 0
......
"""Define distributed graph."""
from collections.abc import MutableMapping
from collections import namedtuple
import gc
import os
import gc
from collections import namedtuple
from collections.abc import MutableMapping
import numpy as np
from ..heterograph import DGLGraph
from ..convert import heterograph as dgl_heterograph
from ..convert import graph as dgl_graph
from ..transforms import compact_graphs
from .. import heterograph_index
from .. import backend as F
from ..base import NID, EID, ETYPE, ALL, is_all, DGLError
from .kvstore import KVServer, get_kvstore
from .. import backend as F, heterograph_index
from .._ffi.ndarray import empty_shared_mem
from ..ndarray import exist_shared_mem_array
from ..base import ALL, DGLError, EID, ETYPE, is_all, NID
from ..convert import graph as dgl_graph, heterograph as dgl_heterograph
from ..frame import infer_scheme
from .partition import load_partition, load_partition_feats, load_partition_book
from .graph_partition_book import PartitionPolicy, get_shared_mem_partition_book
from .graph_partition_book import HeteroDataName, parse_hetero_data_name
from .graph_partition_book import NodePartitionPolicy, EdgePartitionPolicy
from .graph_partition_book import _etype_str_to_tuple
from .shared_mem_utils import _to_shared_mem, _get_ndata_path, _get_edata_path, DTYPE_DICT
from . import rpc
from . import role
from .server_state import ServerState
from .rpc_server import start_server
from . import graph_services
from .graph_services import find_edges as dist_find_edges
from .graph_services import out_degrees as dist_out_degrees
from .graph_services import in_degrees as dist_in_degrees
from ..heterograph import DGLGraph
from ..ndarray import exist_shared_mem_array
from ..transforms import compact_graphs
from . import graph_services, role, rpc
from .dist_tensor import DistTensor
from .partition import RESERVED_FIELD_DTYPE
from .graph_partition_book import (
_etype_str_to_tuple,
EdgePartitionPolicy,
get_shared_mem_partition_book,
HeteroDataName,
NodePartitionPolicy,
parse_hetero_data_name,
PartitionPolicy,
)
from .graph_services import (
find_edges as dist_find_edges,
in_degrees as dist_in_degrees,
out_degrees as dist_out_degrees,
)
from .kvstore import get_kvstore, KVServer
from .partition import (
load_partition,
load_partition_book,
load_partition_feats,
RESERVED_FIELD_DTYPE,
)
from .rpc_server import start_server
from .server_state import ServerState
from .shared_mem_utils import (
_get_edata_path,
_get_ndata_path,
_to_shared_mem,
DTYPE_DICT,
)
INIT_GRAPH = 800001
class InitGraphRequest(rpc.Request):
""" Init graph on the backup servers.
"""Init graph on the backup servers.
When the backup server starts, they don't load the graph structure.
This request tells the backup servers that they can map to the graph structure
with shared memory.
"""
def __init__(self, graph_name):
self._graph_name = graph_name
......@@ -58,9 +74,10 @@ class InitGraphRequest(rpc.Request):
server_state.graph = _get_graph_from_shared_mem(self._graph_name)
return InitGraphResponse(self._graph_name)
class InitGraphResponse(rpc.Response):
""" Ack the init graph request
"""
"""Ack the init graph request"""
def __init__(self, graph_name):
self._graph_name = graph_name
......@@ -70,71 +87,88 @@ class InitGraphResponse(rpc.Response):
def __setstate__(self, state):
self._graph_name = state
def _copy_graph_to_shared_mem(g, graph_name, graph_format):
new_g = g.shared_memory(graph_name, formats=graph_format)
# We should share the node/edge data to the client explicitly instead of putting them
# in the KVStore because some of the node/edge data may be duplicated.
new_g.ndata['inner_node'] = _to_shared_mem(g.ndata['inner_node'],
_get_ndata_path(graph_name, 'inner_node'))
new_g.ndata[NID] = _to_shared_mem(g.ndata[NID], _get_ndata_path(graph_name, NID))
new_g.ndata["inner_node"] = _to_shared_mem(
g.ndata["inner_node"], _get_ndata_path(graph_name, "inner_node")
)
new_g.ndata[NID] = _to_shared_mem(
g.ndata[NID], _get_ndata_path(graph_name, NID)
)
new_g.edata['inner_edge'] = _to_shared_mem(g.edata['inner_edge'],
_get_edata_path(graph_name, 'inner_edge'))
new_g.edata[EID] = _to_shared_mem(g.edata[EID], _get_edata_path(graph_name, EID))
new_g.edata["inner_edge"] = _to_shared_mem(
g.edata["inner_edge"], _get_edata_path(graph_name, "inner_edge")
)
new_g.edata[EID] = _to_shared_mem(
g.edata[EID], _get_edata_path(graph_name, EID)
)
# for heterogeneous graph, we need to put ETYPE into KVStore
# for homogeneous graph, ETYPE does not exist
if ETYPE in g.edata:
new_g.edata[ETYPE] = _to_shared_mem(
g.edata[ETYPE],
_get_edata_path(graph_name, ETYPE),
g.edata[ETYPE],
_get_edata_path(graph_name, ETYPE),
)
return new_g
def _get_shared_mem_ndata(g, graph_name, name):
''' Get shared-memory node data from DistGraph server.
"""Get shared-memory node data from DistGraph server.
This is called by the DistGraph client to access the node data in the DistGraph server
with shared memory.
'''
"""
shape = (g.number_of_nodes(),)
dtype = RESERVED_FIELD_DTYPE[name]
dtype = DTYPE_DICT[dtype]
data = empty_shared_mem(_get_ndata_path(graph_name, name), False, shape, dtype)
data = empty_shared_mem(
_get_ndata_path(graph_name, name), False, shape, dtype
)
dlpack = data.to_dlpack()
return F.zerocopy_from_dlpack(dlpack)
def _get_shared_mem_edata(g, graph_name, name):
''' Get shared-memory edge data from DistGraph server.
"""Get shared-memory edge data from DistGraph server.
This is called by the DistGraph client to access the edge data in the DistGraph server
with shared memory.
'''
"""
shape = (g.number_of_edges(),)
dtype = RESERVED_FIELD_DTYPE[name]
dtype = DTYPE_DICT[dtype]
data = empty_shared_mem(_get_edata_path(graph_name, name), False, shape, dtype)
data = empty_shared_mem(
_get_edata_path(graph_name, name), False, shape, dtype
)
dlpack = data.to_dlpack()
return F.zerocopy_from_dlpack(dlpack)
def _exist_shared_mem_array(graph_name, name):
return exist_shared_mem_array(_get_edata_path(graph_name, name))
def _get_graph_from_shared_mem(graph_name):
''' Get the graph from the DistGraph server.
"""Get the graph from the DistGraph server.
The DistGraph server puts the graph structure of the local partition in the shared memory.
The client can access the graph structure and some metadata on nodes and edges directly
through shared memory to reduce the overhead of data access.
'''
g, ntypes, etypes = heterograph_index.create_heterograph_from_shared_memory(graph_name)
"""
g, ntypes, etypes = heterograph_index.create_heterograph_from_shared_memory(
graph_name
)
if g is None:
return None
g = DGLGraph(g, ntypes, etypes)
g.ndata['inner_node'] = _get_shared_mem_ndata(g, graph_name, 'inner_node')
g.ndata["inner_node"] = _get_shared_mem_ndata(g, graph_name, "inner_node")
g.ndata[NID] = _get_shared_mem_ndata(g, graph_name, NID)
g.edata['inner_edge'] = _get_shared_mem_edata(g, graph_name, 'inner_edge')
g.edata["inner_edge"] = _get_shared_mem_edata(g, graph_name, "inner_edge")
g.edata[EID] = _get_shared_mem_edata(g, graph_name, EID)
# heterogeneous graph has ETYPE
......@@ -142,12 +176,15 @@ def _get_graph_from_shared_mem(graph_name):
g.edata[ETYPE] = _get_shared_mem_edata(g, graph_name, ETYPE)
return g
NodeSpace = namedtuple('NodeSpace', ['data'])
EdgeSpace = namedtuple('EdgeSpace', ['data'])
NodeSpace = namedtuple("NodeSpace", ["data"])
EdgeSpace = namedtuple("EdgeSpace", ["data"])
class HeteroNodeView(object):
"""A NodeView class to act as G.nodes for a DistGraph."""
__slots__ = ['_graph']
__slots__ = ["_graph"]
def __init__(self, graph):
self._graph = graph
......@@ -156,9 +193,11 @@ class HeteroNodeView(object):
assert isinstance(key, str)
return NodeSpace(data=NodeDataView(self._graph, key))
class HeteroEdgeView(object):
"""An EdgeView class to act as G.edges for a DistGraph."""
__slots__ = ['_graph']
__slots__ = ["_graph"]
def __init__(self, graph):
self._graph = graph
......@@ -169,10 +208,11 @@ class HeteroEdgeView(object):
), f"Expect edge type in string or triplet of string, but got {key}."
return EdgeSpace(data=EdgeDataView(self._graph, key))
class NodeDataView(MutableMapping):
"""The data view class when dist_graph.ndata[...].data is called.
"""
__slots__ = ['_graph', '_data']
"""The data view class when dist_graph.ndata[...].data is called."""
__slots__ = ["_graph", "_data"]
def __init__(self, g, ntype=None):
self._graph = g
......@@ -208,13 +248,16 @@ class NodeDataView(MutableMapping):
for name in self._data:
dtype = F.dtype(self._data[name])
shape = F.shape(self._data[name])
reprs[name] = 'DistTensor(shape={}, dtype={})'.format(str(shape), str(dtype))
reprs[name] = "DistTensor(shape={}, dtype={})".format(
str(shape), str(dtype)
)
return repr(reprs)
class EdgeDataView(MutableMapping):
"""The data view class when G.edges[...].data is called.
"""
__slots__ = ['_graph', '_data']
"""The data view class when G.edges[...].data is called."""
__slots__ = ["_graph", "_data"]
def __init__(self, g, etype=None):
self._graph = g
......@@ -249,12 +292,14 @@ class EdgeDataView(MutableMapping):
for name in self._data:
dtype = F.dtype(self._data[name])
shape = F.shape(self._data[name])
reprs[name] = 'DistTensor(shape={}, dtype={})'.format(str(shape), str(dtype))
reprs[name] = "DistTensor(shape={}, dtype={})".format(
str(shape), str(dtype)
)
return repr(reprs)
class DistGraphServer(KVServer):
''' The DistGraph server.
"""The DistGraph server.
This DistGraph server loads the graph data and sets up a service so that trainers and
samplers can read data of a graph partition (graph structure, node data and edge data)
......@@ -289,15 +334,26 @@ class DistGraphServer(KVServer):
Whether to keep server alive when clients exit
net_type : str
Backend rpc type: ``'socket'`` or ``'tensorpipe'``
'''
def __init__(self, server_id, ip_config, num_servers,
num_clients, part_config, disable_shared_mem=False,
graph_format=('csc', 'coo'), keep_alive=False,
net_type='socket'):
super(DistGraphServer, self).__init__(server_id=server_id,
ip_config=ip_config,
num_servers=num_servers,
num_clients=num_clients)
"""
def __init__(
self,
server_id,
ip_config,
num_servers,
num_clients,
part_config,
disable_shared_mem=False,
graph_format=("csc", "coo"),
keep_alive=False,
net_type="socket",
):
super(DistGraphServer, self).__init__(
server_id=server_id,
ip_config=ip_config,
num_servers=num_servers,
num_clients=num_clients,
)
self.ip_config = ip_config
self.num_servers = num_servers
self.keep_alive = keep_alive
......@@ -305,13 +361,22 @@ class DistGraphServer(KVServer):
# Load graph partition data.
if self.is_backup_server():
# The backup server doesn't load the graph partition. It'll initialized afterwards.
self.gpb, graph_name, ntypes, etypes = load_partition_book(part_config, self.part_id)
self.gpb, graph_name, ntypes, etypes = load_partition_book(
part_config, self.part_id
)
self.client_g = None
else:
# Loading of node/edge_feats are deferred to lower the peak memory consumption.
self.client_g, _, _, self.gpb, graph_name, \
ntypes, etypes = load_partition(part_config, self.part_id, load_feats=False)
print('load ' + graph_name)
(
self.client_g,
_,
_,
self.gpb,
graph_name,
ntypes,
etypes,
) = load_partition(part_config, self.part_id, load_feats=False)
print("load " + graph_name)
# formatting dtype
# TODO(Rui) Formatting forcely is not a perfect solution.
# We'd better store all dtypes when mapping to shared memory
......@@ -319,72 +384,97 @@ class DistGraphServer(KVServer):
for k, dtype in RESERVED_FIELD_DTYPE.items():
if k in self.client_g.ndata:
self.client_g.ndata[k] = F.astype(
self.client_g.ndata[k], dtype)
self.client_g.ndata[k], dtype
)
if k in self.client_g.edata:
self.client_g.edata[k] = F.astype(
self.client_g.edata[k], dtype)
self.client_g.edata[k], dtype
)
# Create the graph formats specified the users.
self.client_g = self.client_g.formats(graph_format)
self.client_g.create_formats_()
if not disable_shared_mem:
self.client_g = _copy_graph_to_shared_mem(self.client_g, graph_name, graph_format)
self.client_g = _copy_graph_to_shared_mem(
self.client_g, graph_name, graph_format
)
if not disable_shared_mem:
self.gpb.shared_memory(graph_name)
assert self.gpb.partid == self.part_id
for ntype in ntypes:
node_name = HeteroDataName(True, ntype, "")
self.add_part_policy(PartitionPolicy(node_name.policy_str, self.gpb))
self.add_part_policy(
PartitionPolicy(node_name.policy_str, self.gpb)
)
for etype in etypes:
edge_name = HeteroDataName(False, etype, "")
self.add_part_policy(PartitionPolicy(edge_name.policy_str, self.gpb))
self.add_part_policy(
PartitionPolicy(edge_name.policy_str, self.gpb)
)
if not self.is_backup_server():
node_feats, _ = load_partition_feats(part_config, self.part_id,
load_nodes=True, load_edges=False)
node_feats, _ = load_partition_feats(
part_config, self.part_id, load_nodes=True, load_edges=False
)
for name in node_feats:
# The feature name has the following format: node_type + "/" + feature_name to avoid
# feature name collision for different node types.
ntype, feat_name = name.split('/')
ntype, feat_name = name.split("/")
data_name = HeteroDataName(True, ntype, feat_name)
self.init_data(name=str(data_name), policy_str=data_name.policy_str,
data_tensor=node_feats[name])
self.init_data(
name=str(data_name),
policy_str=data_name.policy_str,
data_tensor=node_feats[name],
)
self.orig_data.add(str(data_name))
# Let's free once node features are copied to shared memory
del node_feats
gc.collect()
_, edge_feats = load_partition_feats(part_config, self.part_id,
load_nodes=False, load_edges=True)
_, edge_feats = load_partition_feats(
part_config, self.part_id, load_nodes=False, load_edges=True
)
for name in edge_feats:
# The feature name has the following format: edge_type + "/" + feature_name to avoid
# feature name collision for different edge types.
etype, feat_name = name.split('/')
etype, feat_name = name.split("/")
etype = _etype_str_to_tuple(etype)
data_name = HeteroDataName(False, etype, feat_name)
self.init_data(name=str(data_name), policy_str=data_name.policy_str,
data_tensor=edge_feats[name])
self.init_data(
name=str(data_name),
policy_str=data_name.policy_str,
data_tensor=edge_feats[name],
)
self.orig_data.add(str(data_name))
# Let's free once edge features are copied to shared memory
del edge_feats
gc.collect()
def start(self):
""" Start graph store server.
"""
"""Start graph store server."""
# start server
server_state = ServerState(kv_store=self, local_g=self.client_g,
partition_book=self.gpb, keep_alive=self.keep_alive)
print('start graph service on server {} for part {}'.format(
self.server_id, self.part_id))
start_server(server_id=self.server_id,
ip_config=self.ip_config,
num_servers=self.num_servers,
num_clients=self.num_clients,
server_state=server_state,
net_type=self.net_type)
server_state = ServerState(
kv_store=self,
local_g=self.client_g,
partition_book=self.gpb,
keep_alive=self.keep_alive,
)
print(
"start graph service on server {} for part {}".format(
self.server_id, self.part_id
)
)
start_server(
server_id=self.server_id,
ip_config=self.ip_config,
num_servers=self.num_servers,
num_clients=self.num_clients,
server_state=server_state,
net_type=self.net_type,
)
class DistGraph:
'''The class for accessing a distributed graph.
"""The class for accessing a distributed graph.
This class provides a subset of DGLGraph APIs for accessing partitioned graph data in
distributed GNN training and inference. Thus, its main use case is to work with
......@@ -455,35 +545,45 @@ class DistGraph:
DGL's distributed training by default runs server processes and trainer processes on the same
set of machines. If users need to run them on different sets of machines, it requires
manually setting up servers and trainers. The setup is not fully tested yet.
'''
"""
def __init__(self, graph_name, gpb=None, part_config=None):
self.graph_name = graph_name
if os.environ.get('DGL_DIST_MODE', 'standalone') == 'standalone':
assert part_config is not None, \
'When running in the standalone model, the partition config file is required'
if os.environ.get("DGL_DIST_MODE", "standalone") == "standalone":
assert (
part_config is not None
), "When running in the standalone model, the partition config file is required"
self._client = get_kvstore()
assert self._client is not None, \
'Distributed module is not initialized. Please call dgl.distributed.initialize.'
assert (
self._client is not None
), "Distributed module is not initialized. Please call dgl.distributed.initialize."
# Load graph partition data.
g, node_feats, edge_feats, self._gpb, _, _, _ = load_partition(part_config, 0)
assert self._gpb.num_partitions() == 1, \
'The standalone mode can only work with the graph data with one partition'
g, node_feats, edge_feats, self._gpb, _, _, _ = load_partition(
part_config, 0
)
assert (
self._gpb.num_partitions() == 1
), "The standalone mode can only work with the graph data with one partition"
if self._gpb is None:
self._gpb = gpb
self._g = g
for name in node_feats:
# The feature name has the following format: node_type + "/" + feature_name.
ntype, feat_name = name.split('/')
self._client.add_data(str(HeteroDataName(True, ntype, feat_name)),
node_feats[name],
NodePartitionPolicy(self._gpb, ntype=ntype))
ntype, feat_name = name.split("/")
self._client.add_data(
str(HeteroDataName(True, ntype, feat_name)),
node_feats[name],
NodePartitionPolicy(self._gpb, ntype=ntype),
)
for name in edge_feats:
# The feature name has the following format: edge_type + "/" + feature_name.
etype, feat_name = name.split('/')
etype, feat_name = name.split("/")
etype = _etype_str_to_tuple(etype)
self._client.add_data(str(HeteroDataName(False, etype, feat_name)),
edge_feats[name],
EdgePartitionPolicy(self._gpb, etype=etype))
self._client.add_data(
str(HeteroDataName(False, etype, feat_name)),
edge_feats[name],
EdgePartitionPolicy(self._gpb, etype=etype),
)
self._client.map_shared_data(self._gpb)
rpc.set_num_client(1)
else:
......@@ -501,17 +601,20 @@ class DistGraph:
self._num_nodes = 0
self._num_edges = 0
for part_md in self._gpb.metadata():
self._num_nodes += int(part_md['num_nodes'])
self._num_edges += int(part_md['num_edges'])
self._num_nodes += int(part_md["num_nodes"])
self._num_edges += int(part_md["num_edges"])
# When we store node/edge types in a list, they are stored in the order of type IDs.
self._ntype_map = {ntype:i for i, ntype in enumerate(self.ntypes)}
self._etype_map = {etype:i for i, etype in enumerate(self.canonical_etypes)}
self._ntype_map = {ntype: i for i, ntype in enumerate(self.ntypes)}
self._etype_map = {
etype: i for i, etype in enumerate(self.canonical_etypes)
}
def _init(self, gpb):
self._client = get_kvstore()
assert self._client is not None, \
'Distributed module is not initialized. Please call dgl.distributed.initialize.'
assert (
self._client is not None
), "Distributed module is not initialized. Please call dgl.distributed.initialize."
self._g = _get_graph_from_shared_mem(self.graph_name)
self._gpb = get_shared_mem_partition_book(self.graph_name)
if self._gpb is None:
......@@ -519,20 +622,24 @@ class DistGraph:
self._client.map_shared_data(self._gpb)
def _init_ndata_store(self):
'''Initialize node data store.'''
"""Initialize node data store."""
self._ndata_store = {}
for ntype in self.ntypes:
names = self._get_ndata_names(ntype)
data = {}
for name in names:
assert name.is_node()
policy = PartitionPolicy(name.policy_str,
self.get_partition_book()
policy = PartitionPolicy(
name.policy_str, self.get_partition_book()
)
dtype, shape, _ = self._client.get_data_meta(str(name))
# We create a wrapper on the existing tensor in the kvstore.
data[name.get_name()] = DistTensor(shape, dtype,
name.get_name(), part_policy=policy, attach=False
data[name.get_name()] = DistTensor(
shape,
dtype,
name.get_name(),
part_policy=policy,
attach=False,
)
if len(self.ntypes) == 1:
self._ndata_store = data
......@@ -540,20 +647,24 @@ class DistGraph:
self._ndata_store[ntype] = data
def _init_edata_store(self):
'''Initialize edge data store.'''
"""Initialize edge data store."""
self._edata_store = {}
for etype in self.canonical_etypes:
names = self._get_edata_names(etype)
data = {}
for name in names:
assert name.is_edge()
policy = PartitionPolicy(name.policy_str,
self.get_partition_book()
policy = PartitionPolicy(
name.policy_str, self.get_partition_book()
)
dtype, shape, _ = self._client.get_data_meta(str(name))
# We create a wrapper on the existing tensor in the kvstore.
data[name.get_name()] = DistTensor(shape, dtype,
name.get_name(), part_policy=policy, attach=False
data[name.get_name()] = DistTensor(
shape,
dtype,
name.get_name(),
part_policy=policy,
attach=False,
)
if len(self.canonical_etypes) == 1:
self._edata_store = data
......@@ -572,12 +683,12 @@ class DistGraph:
self._num_nodes = 0
self._num_edges = 0
for part_md in self._gpb.metadata():
self._num_nodes += int(part_md['num_nodes'])
self._num_edges += int(part_md['num_edges'])
self._num_nodes += int(part_md["num_nodes"])
self._num_edges += int(part_md["num_edges"])
@property
def local_partition(self):
''' Return the local partition on the client
"""Return the local partition on the client
DistGraph provides a global view of the distributed graph. Internally,
it may contains a partition of the graph if it is co-located with
......@@ -588,19 +699,17 @@ class DistGraph:
-------
DGLGraph
The local partition
'''
"""
return self._g
@property
def nodes(self):
'''Return a node view
'''
"""Return a node view"""
return HeteroNodeView(self)
@property
def edges(self):
'''Return an edge view
'''
"""Return an edge view"""
return HeteroEdgeView(self)
@property
......@@ -612,7 +721,9 @@ class DistGraph:
NodeDataView
The data view in the distributed graph storage.
"""
assert len(self.ntypes) == 1, "ndata only works for a graph with one node type."
assert (
len(self.ntypes) == 1
), "ndata only works for a graph with one node type."
return NodeDataView(self)
@property
......@@ -624,7 +735,9 @@ class DistGraph:
EdgeDataView
The data view in the distributed graph storage.
"""
assert len(self.etypes) == 1, "edata only works for a graph with one edge type."
assert (
len(self.etypes) == 1
), "edata only works for a graph with one edge type."
return EdgeDataView(self)
@property
......@@ -811,8 +924,10 @@ class DistGraph:
"""
if ntype is None:
if len(self._ntype_map) != 1:
raise DGLError('Node type name must be specified if there are more than one '
'node types.')
raise DGLError(
"Node type name must be specified if there are more than one "
"node types."
)
return 0
return self._ntype_map[ntype]
......@@ -833,8 +948,10 @@ class DistGraph:
"""
if etype is None:
if len(self._etype_map) != 1:
raise DGLError('Edge type name must be specified if there are more than one '
'edge types.')
raise DGLError(
"Edge type name must be specified if there are more than one "
"edge types."
)
return 0
etype = self.to_canonical_etype(etype)
return self._etype_map[etype]
......@@ -871,7 +988,9 @@ class DistGraph:
if len(self.ntypes) == 1:
return self._gpb._num_nodes(self.ntypes[0])
else:
return sum([self._gpb._num_nodes(ntype) for ntype in self.ntypes])
return sum(
[self._gpb._num_nodes(ntype) for ntype in self.ntypes]
)
return self._gpb._num_nodes(ntype)
def num_edges(self, etype=None):
......@@ -901,8 +1020,12 @@ class DistGraph:
123718280
"""
if etype is None:
return sum([self._gpb._num_edges(c_etype)
for c_etype in self.canonical_etypes])
return sum(
[
self._gpb._num_edges(c_etype)
for c_etype in self.canonical_etypes
]
)
return self._gpb._num_edges(etype)
def out_degrees(self, u=ALL):
......@@ -1058,7 +1181,7 @@ class DistGraph:
return schemes
def rank(self):
''' The rank of the current DistGraph.
"""The rank of the current DistGraph.
This returns a unique number to identify the DistGraph object among all of
the client processes.
......@@ -1067,11 +1190,11 @@ class DistGraph:
-------
int
The rank of the current DistGraph.
'''
"""
return role.get_global_rank()
def find_edges(self, edges, etype=None):
""" Given an edge ID array, return the source
"""Given an edge ID array, return the source
and destination node ID array ``s`` and ``d``. ``s[i]`` and ``d[i]``
are source and destination node ID for edge ``eid[i]``.
......@@ -1098,7 +1221,9 @@ class DistGraph:
The destination node ID array.
"""
if etype is None:
assert len(self.etypes) == 1, 'find_edges requires etype for heterogeneous graphs.'
assert (
len(self.etypes) == 1
), "find_edges requires etype for heterogeneous graphs."
gpb = self.get_partition_book()
if len(gpb.etypes) > 1:
......@@ -1151,7 +1276,9 @@ class DistGraph:
for etype, edge in edges.items():
etype = self.to_canonical_etype(etype)
subg[etype] = self.find_edges(edge, etype)
num_nodes = {ntype: self.number_of_nodes(ntype) for ntype in self.ntypes}
num_nodes = {
ntype: self.number_of_nodes(ntype) for ntype in self.ntypes
}
subg = dgl_heterograph(subg, num_nodes_dict=num_nodes)
for etype in edges:
subg.edges[etype].data[EID] = edges[etype]
......@@ -1163,7 +1290,7 @@ class DistGraph:
if relabel_nodes:
subg = compact_graphs(subg)
assert store_ids, 'edge_subgraph always stores original node/edge IDs.'
assert store_ids, "edge_subgraph always stores original node/edge IDs."
return subg
def get_partition_book(self):
......@@ -1220,55 +1347,72 @@ class DistGraph:
return EdgePartitionPolicy(self.get_partition_book(), etype)
def barrier(self):
'''Barrier for all client nodes.
"""Barrier for all client nodes.
This API blocks the current process untill all the clients invoke this API.
Please use this API with caution.
'''
"""
self._client.barrier()
def sample_neighbors(self, seed_nodes, fanout, edge_dir='in', prob=None,
exclude_edges=None, replace=False, etype_sorted=True,
output_device=None):
def sample_neighbors(
self,
seed_nodes,
fanout,
edge_dir="in",
prob=None,
exclude_edges=None,
replace=False,
etype_sorted=True,
output_device=None,
):
# pylint: disable=unused-argument
"""Sample neighbors from a distributed graph."""
if len(self.etypes) > 1:
frontier = graph_services.sample_etype_neighbors(
self, seed_nodes, fanout, replace=replace,
etype_sorted=etype_sorted, prob=prob)
self,
seed_nodes,
fanout,
replace=replace,
etype_sorted=etype_sorted,
prob=prob,
)
else:
frontier = graph_services.sample_neighbors(
self, seed_nodes, fanout, replace=replace, prob=prob)
self, seed_nodes, fanout, replace=replace, prob=prob
)
return frontier
def _get_ndata_names(self, ntype=None):
''' Get the names of all node data.
'''
"""Get the names of all node data."""
names = self._client.gdata_name_list()
ndata_names = []
for name in names:
name = parse_hetero_data_name(name)
right_type = (name.get_type() == ntype) if ntype is not None else True
right_type = (
(name.get_type() == ntype) if ntype is not None else True
)
if name.is_node() and right_type:
ndata_names.append(name)
return ndata_names
def _get_edata_names(self, etype=None):
''' Get the names of all edge data.
'''
"""Get the names of all edge data."""
if etype is not None:
etype = self.to_canonical_etype(etype)
names = self._client.gdata_name_list()
edata_names = []
for name in names:
name = parse_hetero_data_name(name)
right_type = (name.get_type() == etype) if etype is not None else True
right_type = (
(name.get_type() == etype) if etype is not None else True
)
if name.is_edge() and right_type:
edata_names.append(name)
return edata_names
def _get_overlap(mask_arr, ids):
""" Select the IDs given a boolean mask array.
"""Select the IDs given a boolean mask array.
The boolean mask array indicates all of the IDs to be selected. We want to
find the overlap between the IDs selected by the boolean mask array and
......@@ -1293,39 +1437,45 @@ def _get_overlap(mask_arr, ids):
masks = F.gather_row(F.tensor(mask_arr), ids)
return F.boolean_mask(ids, masks)
def _split_local(partition_book, rank, elements, local_eles):
''' Split the input element list with respect to data locality.
'''
"""Split the input element list with respect to data locality."""
num_clients = role.get_num_trainers()
num_client_per_part = num_clients // partition_book.num_partitions()
if rank is None:
rank = role.get_trainer_rank()
assert rank < num_clients, \
'The input rank ({}) is incorrect. #Trainers: {}'.format(rank, num_clients)
assert (
rank < num_clients
), "The input rank ({}) is incorrect. #Trainers: {}".format(
rank, num_clients
)
# all ranks of the clients in the same machine are in a contiguous range.
client_id_in_part = rank % num_client_per_part
client_id_in_part = rank % num_client_per_part
local_eles = _get_overlap(elements, local_eles)
# get a subset for the local client.
size = len(local_eles) // num_client_per_part
# if this isn't the last client in the partition.
if client_id_in_part + 1 < num_client_per_part:
return local_eles[(size * client_id_in_part):(size * (client_id_in_part + 1))]
return local_eles[
(size * client_id_in_part) : (size * (client_id_in_part + 1))
]
else:
return local_eles[(size * client_id_in_part):]
return local_eles[(size * client_id_in_part) :]
def _even_offset(n, k):
''' Split an array of length n into k segments and the difference of thier length is
at most 1. Return the offset of each segment.
'''
"""Split an array of length n into k segments and the difference of thier length is
at most 1. Return the offset of each segment.
"""
eles_per_part = n // k
offset = np.array([0] + [eles_per_part] * k, dtype=int)
offset[1 : n - eles_per_part * k + 1] += 1
return np.cumsum(offset)
def _split_even_to_part(partition_book, elements):
''' Split the input element list evenly.
'''
"""Split the input element list evenly."""
# here we divide the element list as evenly as possible. If we use range partitioning,
# the split results also respect the data locality. Range partitioning is the default
# strategy.
......@@ -1350,7 +1500,9 @@ def _split_even_to_part(partition_book, elements):
part_eles = None
# compute the nonzero tensor of each partition instead of whole tensor to save memory
for idx in range(0, num_elements, block_size):
nonzero_block = F.nonzero_1d(elements[idx:min(idx+block_size, num_elements)])
nonzero_block = F.nonzero_1d(
elements[idx : min(idx + block_size, num_elements)]
)
x = y
y += len(nonzero_block)
if y > left and x < right:
......@@ -1366,6 +1518,7 @@ def _split_even_to_part(partition_book, elements):
return part_eles
def _split_random_within_part(partition_book, rank, part_eles):
# If there are more than one client in a partition, we need to randomly select a subset of
# elements in the partition for a client. We have to make sure that the set of elements
......@@ -1377,9 +1530,12 @@ def _split_random_within_part(partition_book, rank, part_eles):
return part_eles
if rank is None:
rank = role.get_trainer_rank()
assert rank < num_clients, \
'The input rank ({}) is incorrect. #Trainers: {}'.format(rank, num_clients)
client_id_in_part = rank % num_client_per_part
assert (
rank < num_clients
), "The input rank ({}) is incorrect. #Trainers: {}".format(
rank, num_clients
)
client_id_in_part = rank % num_client_per_part
offset = _even_offset(len(part_eles), num_client_per_part)
# We set the random seed for each partition, so that each process (client) in a partition
......@@ -1387,12 +1543,20 @@ def _split_random_within_part(partition_book, rank, part_eles):
# of elements.
np.random.seed(partition_book.partid)
rand_idx = np.random.permutation(len(part_eles))
rand_idx = rand_idx[offset[client_id_in_part] : offset[client_id_in_part + 1]]
rand_idx = rand_idx[
offset[client_id_in_part] : offset[client_id_in_part + 1]
]
idx, _ = F.sort_1d(F.tensor(rand_idx))
return F.gather_row(part_eles, idx)
def _split_by_trainer_id(partition_book, part_eles, trainer_id,
num_client_per_part, client_id_in_part):
def _split_by_trainer_id(
partition_book,
part_eles,
trainer_id,
num_client_per_part,
client_id_in_part,
):
# TODO(zhengda): MXNet cannot deal with empty tensors, which makes the implementation
# much more difficult. Let's just use numpy for the computation for now. We just
# perform operations on vectors. It shouldn't be too difficult.
......@@ -1400,16 +1564,23 @@ def _split_by_trainer_id(partition_book, part_eles, trainer_id,
part_eles = F.asnumpy(part_eles)
part_id = trainer_id // num_client_per_part
trainer_id = trainer_id % num_client_per_part
local_eles = part_eles[np.nonzero(part_id[part_eles] == partition_book.partid)[0]]
local_eles = part_eles[
np.nonzero(part_id[part_eles] == partition_book.partid)[0]
]
# these are the Ids of the local elements in the partition. The Ids are global Ids.
remote_eles = part_eles[np.nonzero(part_id[part_eles] != partition_book.partid)[0]]
remote_eles = part_eles[
np.nonzero(part_id[part_eles] != partition_book.partid)[0]
]
# these are the Ids of the remote nodes in the partition. The Ids are global Ids.
local_eles_idx = np.concatenate(
[np.nonzero(trainer_id[local_eles] == i)[0] for i in range(num_client_per_part)],
[
np.nonzero(trainer_id[local_eles] == i)[0]
for i in range(num_client_per_part)
],
# trainer_id[local_eles] is the trainer ids of local nodes in the partition and we
# pick out the indices where the node belongs to each trainer i respectively, and
# concatenate them.
axis=0
axis=0,
)
# `local_eles_idx` is used to sort `local_eles` according to `trainer_id`. It is a
# permutation of 0...(len(local_eles)-1)
......@@ -1421,15 +1592,28 @@ def _split_by_trainer_id(partition_book, part_eles, trainer_id,
remote_offsets = _even_offset(len(remote_eles), num_client_per_part)
client_local_eles = local_eles[
local_offsets[client_id_in_part]:local_offsets[client_id_in_part + 1]]
local_offsets[client_id_in_part] : local_offsets[client_id_in_part + 1]
]
client_remote_eles = remote_eles[
remote_offsets[client_id_in_part]:remote_offsets[client_id_in_part + 1]]
client_eles = np.concatenate([client_local_eles, client_remote_eles], axis=0)
remote_offsets[client_id_in_part] : remote_offsets[
client_id_in_part + 1
]
]
client_eles = np.concatenate(
[client_local_eles, client_remote_eles], axis=0
)
return F.tensor(client_eles)
def node_split(nodes, partition_book=None, ntype='_N', rank=None, force_even=True,
node_trainer_ids=None):
''' Split nodes and return a subset for the local rank.
def node_split(
nodes,
partition_book=None,
ntype="_N",
rank=None,
force_even=True,
node_trainer_ids=None,
):
"""Split nodes and return a subset for the local rank.
This function splits the input nodes based on the partition book and
returns a subset of nodes for the local rank. This method is used for
......@@ -1469,28 +1653,32 @@ def node_split(nodes, partition_book=None, ntype='_N', rank=None, force_even=Tru
-------
1D-tensor
The vector of node IDs that belong to the rank.
'''
"""
if not isinstance(nodes, DistTensor):
assert partition_book is not None, 'Regular tensor requires a partition book.'
assert (
partition_book is not None
), "Regular tensor requires a partition book."
elif partition_book is None:
partition_book = nodes.part_policy.partition_book
assert len(nodes) == partition_book._num_nodes(ntype), \
'The length of boolean mask vector should be the number of nodes in the graph.'
assert len(nodes) == partition_book._num_nodes(
ntype
), "The length of boolean mask vector should be the number of nodes in the graph."
if rank is None:
rank = role.get_trainer_rank()
if force_even:
num_clients = role.get_num_trainers()
num_client_per_part = num_clients // partition_book.num_partitions()
assert num_clients % partition_book.num_partitions() == 0, \
'The total number of clients should be multiple of the number of partitions.'
assert (
num_clients % partition_book.num_partitions() == 0
), "The total number of clients should be multiple of the number of partitions."
part_nid = _split_even_to_part(partition_book, nodes)
if num_client_per_part == 1:
return part_nid
elif node_trainer_ids is None:
return _split_random_within_part(partition_book, rank, part_nid)
else:
trainer_id = node_trainer_ids[0:len(node_trainer_ids)]
trainer_id = node_trainer_ids[0 : len(node_trainer_ids)]
max_trainer_id = F.as_scalar(F.reduce_max(trainer_id)) + 1
if max_trainer_id > num_clients:
......@@ -1498,19 +1686,33 @@ def node_split(nodes, partition_book=None, ntype='_N', rank=None, force_even=Tru
# trainers is less than the `num_trainers_per_machine` previously assigned during
# partitioning.
assert max_trainer_id % num_clients == 0
trainer_id //= (max_trainer_id // num_clients)
trainer_id //= max_trainer_id // num_clients
client_id_in_part = rank % num_client_per_part
return _split_by_trainer_id(partition_book, part_nid, trainer_id,
num_client_per_part, client_id_in_part)
return _split_by_trainer_id(
partition_book,
part_nid,
trainer_id,
num_client_per_part,
client_id_in_part,
)
else:
# Get all nodes that belong to the rank.
local_nids = partition_book.partid2nids(partition_book.partid, ntype=ntype)
local_nids = partition_book.partid2nids(
partition_book.partid, ntype=ntype
)
return _split_local(partition_book, rank, nodes, local_nids)
def edge_split(edges, partition_book=None, etype='_E', rank=None, force_even=True,
edge_trainer_ids=None):
''' Split edges and return a subset for the local rank.
def edge_split(
edges,
partition_book=None,
etype="_E",
rank=None,
force_even=True,
edge_trainer_ids=None,
):
"""Split edges and return a subset for the local rank.
This function splits the input edges based on the partition book and
returns a subset of edges for the local rank. This method is used for
......@@ -1550,27 +1752,31 @@ def edge_split(edges, partition_book=None, etype='_E', rank=None, force_even=Tru
-------
1D-tensor
The vector of edge IDs that belong to the rank.
'''
"""
if not isinstance(edges, DistTensor):
assert partition_book is not None, 'Regular tensor requires a partition book.'
assert (
partition_book is not None
), "Regular tensor requires a partition book."
elif partition_book is None:
partition_book = edges.part_policy.partition_book
assert len(edges) == partition_book._num_edges(etype), \
'The length of boolean mask vector should be the number of edges in the graph.'
assert len(edges) == partition_book._num_edges(
etype
), "The length of boolean mask vector should be the number of edges in the graph."
if rank is None:
rank = role.get_trainer_rank()
if force_even:
num_clients = role.get_num_trainers()
num_client_per_part = num_clients // partition_book.num_partitions()
assert num_clients % partition_book.num_partitions() == 0, \
'The total number of clients should be multiple of the number of partitions.'
assert (
num_clients % partition_book.num_partitions() == 0
), "The total number of clients should be multiple of the number of partitions."
part_eid = _split_even_to_part(partition_book, edges)
if num_client_per_part == 1:
return part_eid
elif edge_trainer_ids is None:
return _split_random_within_part(partition_book, rank, part_eid)
else:
trainer_id = edge_trainer_ids[0:len(edge_trainer_ids)]
trainer_id = edge_trainer_ids[0 : len(edge_trainer_ids)]
max_trainer_id = F.as_scalar(F.reduce_max(trainer_id)) + 1
if max_trainer_id > num_clients:
......@@ -1578,14 +1784,22 @@ def edge_split(edges, partition_book=None, etype='_E', rank=None, force_even=Tru
# trainers is less than the `num_trainers_per_machine` previously assigned during
# partitioning.
assert max_trainer_id % num_clients == 0
trainer_id //= (max_trainer_id // num_clients)
trainer_id //= max_trainer_id // num_clients
client_id_in_part = rank % num_client_per_part
return _split_by_trainer_id(partition_book, part_eid, trainer_id,
num_client_per_part, client_id_in_part)
return _split_by_trainer_id(
partition_book,
part_eid,
trainer_id,
num_client_per_part,
client_id_in_part,
)
else:
# Get all edges that belong to the rank.
local_eids = partition_book.partid2eids(partition_book.partid, etype=etype)
local_eids = partition_book.partid2eids(
partition_book.partid, etype=etype
)
return _split_local(partition_book, rank, edges, local_eids)
rpc.register_service(INIT_GRAPH, InitGraphRequest, InitGraphResponse)
......@@ -2,21 +2,24 @@
import os
from .. import backend as F, utils
from .dist_context import is_initialized
from .kvstore import get_kvstore
from .role import get_role
from .. import utils
from .. import backend as F
from .rpc import get_group_id
def _default_init_data(shape, dtype):
return F.zeros(shape, dtype, F.cpu())
# These IDs can identify the anonymous distributed tensors.
DIST_TENSOR_ID = 0
class DistTensor:
''' Distributed tensor.
"""Distributed tensor.
``DistTensor`` references to a distributed tensor sharded and stored in a cluster of machines.
It has the same interface as Pytorch Tensor to access its metadata (e.g., shape and data type).
......@@ -103,12 +106,23 @@ class DistTensor:
The creation of ``DistTensor`` is a synchronized operation. When a trainer process tries to
create a ``DistTensor`` object, the creation succeeds only when all trainer processes
do the same.
'''
def __init__(self, shape, dtype, name=None, init_func=None, part_policy=None,
persistent=False, is_gdata=True, attach=True):
"""
def __init__(
self,
shape,
dtype,
name=None,
init_func=None,
part_policy=None,
persistent=False,
is_gdata=True,
attach=True,
):
self.kvstore = get_kvstore()
assert self.kvstore is not None, \
'Distributed module is not initialized. Please call dgl.distributed.initialize.'
assert (
self.kvstore is not None
), "Distributed module is not initialized. Please call dgl.distributed.initialize."
self._shape = shape
self._dtype = dtype
self._attach = attach
......@@ -124,18 +138,21 @@ class DistTensor:
# If multiple partition policies match the input shape, we cannot
# decide which is the right one automatically. We should ask users
# to provide one.
assert part_policy is None, \
'Multiple partition policies match the input shape. ' \
+ 'Please provide a partition policy explicitly.'
assert part_policy is None, (
"Multiple partition policies match the input shape. "
+ "Please provide a partition policy explicitly."
)
part_policy = policy
assert part_policy is not None, \
'Cannot find a right partition policy. It is either because ' \
+ 'its first dimension does not match the number of nodes or edges ' \
+ 'of a distributed graph or there does not exist a distributed graph.'
assert part_policy is not None, (
"Cannot find a right partition policy. It is either because "
+ "its first dimension does not match the number of nodes or edges "
+ "of a distributed graph or there does not exist a distributed graph."
)
self._part_policy = part_policy
assert part_policy.get_size() == shape[0], \
'The partition policy does not match the input shape.'
assert (
part_policy.get_size() == shape[0]
), "The partition policy does not match the input shape."
if init_func is None:
init_func = _default_init_data
......@@ -143,13 +160,17 @@ class DistTensor:
# If a user doesn't provide a name, we generate a name ourselves.
# We need to generate the name in a deterministic way.
if name is None:
assert not persistent, 'We cannot generate anonymous persistent distributed tensors'
assert (
not persistent
), "We cannot generate anonymous persistent distributed tensors"
global DIST_TENSOR_ID
# All processes of the same role should create DistTensor synchronously.
# Thus, all of them should have the same IDs.
name = 'anonymous-' + get_role() + '-' + str(DIST_TENSOR_ID)
name = "anonymous-" + get_role() + "-" + str(DIST_TENSOR_ID)
DIST_TENSOR_ID += 1
assert isinstance(name, str), 'name {} is type {}'.format(name, type(name))
assert isinstance(name, str), "name {} is type {}".format(
name, type(name)
)
name = self._attach_group_id(name)
self._tensor_name = name
data_name = part_policy.get_data_name(name)
......@@ -157,16 +178,24 @@ class DistTensor:
self._persistent = persistent
if self._name not in exist_names:
self._owner = True
self.kvstore.init_data(self._name, shape, dtype, part_policy, init_func, is_gdata)
self.kvstore.init_data(
self._name, shape, dtype, part_policy, init_func, is_gdata
)
else:
self._owner = False
dtype1, shape1, _ = self.kvstore.get_data_meta(self._name)
assert dtype == dtype1, 'The dtype does not match with the existing tensor'
assert shape == shape1, 'The shape does not match with the existing tensor'
assert (
dtype == dtype1
), "The dtype does not match with the existing tensor"
assert (
shape == shape1
), "The shape does not match with the existing tensor"
def __del__(self):
initialized = os.environ.get('DGL_DIST_MODE', 'standalone') == 'standalone' \
or is_initialized()
initialized = (
os.environ.get("DGL_DIST_MODE", "standalone") == "standalone"
or is_initialized()
)
if not self._persistent and self._owner and initialized:
self.kvstore.delete_data(self._name)
......@@ -193,12 +222,12 @@ class DistTensor:
def __or__(self, other):
new_dist_tensor = DistTensor(
self._shape,
self._dtype,
part_policy=self._part_policy,
persistent=self._persistent,
is_gdata=self._is_gdata,
attach=self._attach
self._shape,
self._dtype,
part_policy=self._part_policy,
persistent=self._persistent,
is_gdata=self._is_gdata,
attach=self._attach,
)
kvstore = self.kvstore
kvstore.union(self._name, other._name, new_dist_tensor._name)
......@@ -209,67 +238,67 @@ class DistTensor:
@property
def part_policy(self):
'''Return the partition policy
"""Return the partition policy
Returns
-------
PartitionPolicy
The partition policy of the distributed tensor.
'''
"""
return self._part_policy
@property
def shape(self):
'''Return the shape of the distributed tensor.
"""Return the shape of the distributed tensor.
Returns
-------
tuple
The shape of the distributed tensor.
'''
"""
return self._shape
@property
def dtype(self):
'''Return the data type of the distributed tensor.
"""Return the data type of the distributed tensor.
Returns
------
dtype
The data type of the tensor.
'''
"""
return self._dtype
@property
def name(self):
'''Return the name of the distributed tensor
"""Return the name of the distributed tensor
Returns
-------
str
The name of the tensor.
'''
"""
return self._detach_group_id(self._name)
@property
def tensor_name(self):
'''Return the tensor name
"""Return the tensor name
Returns
-------
str
The name of the tensor.
'''
"""
return self._detach_group_id(self._tensor_name)
def count_nonzero(self):
'''Count and return the number of nonzero value
"""Count and return the number of nonzero value
Returns
-------
int
the number of nonzero value
'''
"""
return self.kvstore.count_nonzero(name=self._name)
def _attach_group_id(self, name):
......@@ -295,4 +324,4 @@ class DistTensor:
if not self._attach:
return name
suffix = "_{}".format(get_group_id())
return name[:-len(suffix)]
return name[: -len(suffix)]
......@@ -5,8 +5,7 @@ from abc import ABC
import numpy as np
from .. import backend as F
from .. import utils
from .. import backend as F, utils
from .._ffi.ndarray import empty_shared_mem
from ..base import DGLError
from ..ndarray import exist_shared_mem_array
......@@ -14,16 +13,17 @@ from ..partition import NDArrayPartition
from .constants import DEFAULT_ETYPE, DEFAULT_NTYPE
from .id_map import IdMap
from .shared_mem_utils import (
DTYPE_DICT,
_get_edata_path,
_get_ndata_path,
_to_shared_mem,
DTYPE_DICT,
)
CANONICAL_ETYPE_DELIMITER = ":"
def _etype_tuple_to_str(c_etype):
'''Convert canonical etype from tuple to string.
"""Convert canonical etype from tuple to string.
Examples
--------
......@@ -32,14 +32,16 @@ def _etype_tuple_to_str(c_etype):
>>> print(c_etype_str)
'user:like:item'
'''
assert isinstance(c_etype, tuple) and len(c_etype) == 3, \
"Passed-in canonical etype should be in format of (str, str, str). " \
"""
assert isinstance(c_etype, tuple) and len(c_etype) == 3, (
"Passed-in canonical etype should be in format of (str, str, str). "
f"But got {c_etype}."
)
return CANONICAL_ETYPE_DELIMITER.join(c_etype)
def _etype_str_to_tuple(c_etype):
'''Convert canonical etype from tuple to string.
"""Convert canonical etype from tuple to string.
Examples
--------
......@@ -48,13 +50,15 @@ def _etype_str_to_tuple(c_etype):
>>> print(c_etype)
('user', 'like', 'item')
'''
"""
ret = tuple(c_etype.split(CANONICAL_ETYPE_DELIMITER))
assert len(ret) == 3, \
"Passed-in canonical etype should be in format of 'str:str:str'. " \
assert len(ret) == 3, (
"Passed-in canonical etype should be in format of 'str:str:str'. "
f"But got {c_etype}."
)
return ret
def _move_metadata_to_shared_mem(
graph_name,
num_nodes,
......@@ -533,6 +537,7 @@ class GraphPartitionBook(ABC):
Homogeneous edge IDs.
"""
class RangePartitionBook(GraphPartitionBook):
"""This partition book supports more efficient storage of partition information.
......@@ -582,9 +587,10 @@ class RangePartitionBook(GraphPartitionBook):
ntype is not None for ntype in self._ntypes
), "The node types have invalid IDs."
for c_etype, etype_id in etypes.items():
assert isinstance(c_etype, tuple) and len(c_etype) == 3, \
"Expect canonical edge type in a triplet of string, but got " \
assert isinstance(c_etype, tuple) and len(c_etype) == 3, (
"Expect canonical edge type in a triplet of string, but got "
f"{c_etype}."
)
etype = c_etype[1]
self._etypes[etype_id] = etype
self._canonical_etypes[etype_id] = c_etype
......@@ -660,13 +666,19 @@ class RangePartitionBook(GraphPartitionBook):
# to local heterogenized node/edge IDs. One can do the mapping by binary search
# on these arrays.
self._local_ntype_offset = np.cumsum(
[0] + [
v[self._partid, 1] - v[self._partid, 0]
for v in self._typed_nid_range.values()]).tolist()
[0]
+ [
v[self._partid, 1] - v[self._partid, 0]
for v in self._typed_nid_range.values()
]
).tolist()
self._local_etype_offset = np.cumsum(
[0] + [
v[self._partid, 1] - v[self._partid, 0]
for v in self._typed_eid_range.values()]).tolist()
[0]
+ [
v[self._partid, 1] - v[self._partid, 0]
for v in self._typed_eid_range.values()
]
).tolist()
# Get meta data of the partition book
self._partition_meta_data = []
......@@ -945,7 +957,7 @@ class RangePartitionBook(GraphPartitionBook):
NODE_PART_POLICY = "node"
EDGE_PART_POLICY = "edge"
POLICY_DELIMITER = '~'
POLICY_DELIMITER = "~"
class PartitionPolicy(object):
......@@ -967,11 +979,12 @@ class PartitionPolicy(object):
"""
def __init__(self, policy_str, partition_book):
assert (policy_str.startswith(NODE_PART_POLICY) or
policy_str.startswith(EDGE_PART_POLICY)), (
f"policy_str must start with {NODE_PART_POLICY} or "
f"{EDGE_PART_POLICY}, but got {policy_str}."
)
assert policy_str.startswith(NODE_PART_POLICY) or policy_str.startswith(
EDGE_PART_POLICY
), (
f"policy_str must start with {NODE_PART_POLICY} or "
f"{EDGE_PART_POLICY}, but got {policy_str}."
)
if NODE_PART_POLICY == policy_str:
policy_str = NODE_PART_POLICY + POLICY_DELIMITER + DEFAULT_NTYPE
if EDGE_PART_POLICY == policy_str:
......@@ -1127,11 +1140,12 @@ class EdgePartitionPolicy(PartitionPolicy):
"""Partition policy for edges."""
def __init__(self, partition_book, etype=DEFAULT_ETYPE):
assert isinstance(etype, tuple) and len(etype) == 3, \
f"Expect canonical edge type in a triplet of string, but got {etype}."
assert (
isinstance(etype, tuple) and len(etype) == 3
), f"Expect canonical edge type in a triplet of string, but got {etype}."
super(EdgePartitionPolicy, self).__init__(
EDGE_PART_POLICY + POLICY_DELIMITER + _etype_tuple_to_str(etype),
partition_book
partition_book,
)
......@@ -1156,9 +1170,10 @@ class HeteroDataName(object):
def __init__(self, is_node, entity_type, data_name):
self._policy = NODE_PART_POLICY if is_node else EDGE_PART_POLICY
if not is_node:
assert isinstance(entity_type, tuple) and len(entity_type) == 3, \
"Expect canonical edge type in a triplet of string, but got " \
assert isinstance(entity_type, tuple) and len(entity_type) == 3, (
"Expect canonical edge type in a triplet of string, but got "
f"{entity_type}."
)
self._entity_type = entity_type
self.data_name = data_name
......@@ -1226,6 +1241,4 @@ def parse_hetero_data_name(name):
entity_type = names[1]
if not is_node:
entity_type = _etype_str_to_tuple(entity_type)
return HeteroDataName(
is_node, entity_type, names[2]
)
return HeteroDataName(is_node, entity_type, names[2])
......@@ -6,16 +6,17 @@ import numpy as np
from .. import backend as F
from ..base import EID, NID
from ..convert import graph, heterograph
from ..sampling import sample_etype_neighbors as local_sample_etype_neighbors
from ..sampling import sample_neighbors as local_sample_neighbors
from ..sampling import (
sample_etype_neighbors as local_sample_etype_neighbors,
sample_neighbors as local_sample_neighbors,
)
from ..subgraph import in_subgraph as local_in_subgraph
from ..utils import toindex
from .. import backend as F
from .rpc import (
Request,
Response,
recv_responses,
register_service,
Request,
Response,
send_requests_to_machine,
)
......@@ -207,6 +208,7 @@ def _in_subgraph(local_g, partition_book, seed_nodes):
# This is a limitation of the current DistDGL design. We should improve it
# later.
class SamplingRequest(Request):
"""Sampling Request"""
......@@ -798,9 +800,7 @@ def sample_neighbors(g, nodes, fanout, edge_dir="in", prob=None, replace=False):
def local_access(local_g, partition_book, local_nids):
# See NOTE 1
_prob = (
[g.edata[prob].local_partition] if prob is not None else None
)
_prob = [g.edata[prob].local_partition] if prob is not None else None
return _sample_neighbors(
local_g,
partition_book,
......
"""Module for mapping between node/edge IDs and node/edge types."""
import numpy as np
from .. import backend as F, utils
from .._ffi.function import _init_api
from .. import backend as F
from .. import utils
class IdMap:
'''A map for converting node/edge IDs to their type IDs and type-wise IDs.
"""A map for converting node/edge IDs to their type IDs and type-wise IDs.
For a heterogeneous graph, DGL assigns an integer ID to each node/edge type;
node and edge of different types have independent IDs starting from zero.
......@@ -96,7 +97,8 @@ class IdMap:
for a particular node type in a partition. For example, all nodes of type ``"T"`` in
partition ``i`` has ID range ``id_ranges["T"][i][0]`` to ``id_ranges["T"][i][1]``.
It is the same as the `node_map` argument in `RangePartitionBook`.
'''
"""
def __init__(self, id_ranges):
self.num_parts = list(id_ranges.values())[0].shape[0]
self.num_types = len(id_ranges)
......@@ -105,7 +107,7 @@ class IdMap:
id_ranges = list(id_ranges.values())
id_ranges.sort(key=lambda a: a[0, 0])
for i, id_range in enumerate(id_ranges):
ranges[i::self.num_types] = id_range
ranges[i :: self.num_types] = id_range
map1 = np.cumsum(id_range[:, 1] - id_range[:, 0])
typed_map.append(map1)
......@@ -116,7 +118,7 @@ class IdMap:
self.typed_map = utils.toindex(np.concatenate(typed_map))
def __call__(self, ids):
'''Convert the homogeneous IDs to (type_id, type_wise_id).
"""Convert the homogeneous IDs to (type_id, type_wise_id).
Parameters
----------
......@@ -129,19 +131,23 @@ class IdMap:
Type IDs
per_type_ids : Tensor
Type-wise IDs
'''
"""
if self.num_types == 0:
return F.zeros((len(ids),), F.dtype(ids), F.cpu()), ids
if len(ids) == 0:
return ids, ids
ids = utils.toindex(ids)
ret = _CAPI_DGLHeteroMapIds(ids.todgltensor(),
self.range_start.todgltensor(),
self.range_end.todgltensor(),
self.typed_map.todgltensor(),
self.num_parts, self.num_types)
ret = _CAPI_DGLHeteroMapIds(
ids.todgltensor(),
self.range_start.todgltensor(),
self.range_end.todgltensor(),
self.typed_map.todgltensor(),
self.num_parts,
self.num_types,
)
ret = utils.toindex(ret).tousertensor()
return ret[:len(ids)], ret[len(ids):]
return ret[: len(ids)], ret[len(ids) :]
_init_api("dgl.distributed.id_map")
"""Define distributed kvstore"""
import os
import numpy as np
from .. import backend as F, utils
from .._ffi.ndarray import empty_shared_mem
from . import rpc
from .graph_partition_book import NodePartitionPolicy, EdgePartitionPolicy
from .graph_partition_book import EdgePartitionPolicy, NodePartitionPolicy
from .standalone_kvstore import KVClient as SA_KVClient
from .. import backend as F
from .. import utils
from .._ffi.ndarray import empty_shared_mem
############################ Register KVStore Requsts and Responses ###############################
KVSTORE_PULL = 901231
class PullResponse(rpc.Response):
"""Send the sliced data tensor back to the client.
......@@ -25,6 +26,7 @@ class PullResponse(rpc.Response):
data_tensor : tensor
sliced data tensor
"""
def __init__(self, server_id, data_tensor):
self.server_id = server_id
self.data_tensor = data_tensor
......@@ -35,6 +37,7 @@ class PullResponse(rpc.Response):
def __setstate__(self, state):
self.server_id, self.data_tensor = state
class PullRequest(rpc.Request):
"""Send ID tensor to server and get target data tensor as response.
......@@ -45,6 +48,7 @@ class PullRequest(rpc.Request):
id_tensor : tensor
a vector storing the data ID
"""
def __init__(self, name, id_tensor):
self.name = name
self.id_tensor = id_tensor
......@@ -58,16 +62,25 @@ class PullRequest(rpc.Request):
def process_request(self, server_state):
kv_store = server_state.kv_store
if self.name not in kv_store.part_policy:
raise RuntimeError("KVServer cannot find partition policy with name: %s" % self.name)
raise RuntimeError(
"KVServer cannot find partition policy with name: %s"
% self.name
)
if self.name not in kv_store.data_store:
raise RuntimeError("KVServer Cannot find data tensor with name: %s" % self.name)
raise RuntimeError(
"KVServer Cannot find data tensor with name: %s" % self.name
)
local_id = kv_store.part_policy[self.name].to_local(self.id_tensor)
data = kv_store.pull_handlers[self.name](kv_store.data_store, self.name, local_id)
data = kv_store.pull_handlers[self.name](
kv_store.data_store, self.name, local_id
)
res = PullResponse(kv_store.server_id, data)
return res
KVSTORE_PUSH = 901232
class PushRequest(rpc.Request):
"""Send ID tensor and data tensor to server and update kvstore's data.
......@@ -82,6 +95,7 @@ class PushRequest(rpc.Request):
data_tensor : tensor
a tensor with the same row size of data ID
"""
def __init__(self, name, id_tensor, data_tensor):
self.name = name
self.id_tensor = id_tensor
......@@ -96,15 +110,23 @@ class PushRequest(rpc.Request):
def process_request(self, server_state):
kv_store = server_state.kv_store
if self.name not in kv_store.part_policy:
raise RuntimeError("KVServer cannot find partition policy with name: %s" % self.name)
raise RuntimeError(
"KVServer cannot find partition policy with name: %s"
% self.name
)
if self.name not in kv_store.data_store:
raise RuntimeError("KVServer Cannot find data tensor with name: %s" % self.name)
raise RuntimeError(
"KVServer Cannot find data tensor with name: %s" % self.name
)
local_id = kv_store.part_policy[self.name].to_local(self.id_tensor)
kv_store.push_handlers[self.name](kv_store.data_store, self.name,
local_id, self.data_tensor)
kv_store.push_handlers[self.name](
kv_store.data_store, self.name, local_id, self.data_tensor
)
INIT_DATA = 901233
INIT_MSG = 'Init'
INIT_MSG = "Init"
class InitDataResponse(rpc.Response):
"""Send a confirmation response (just a short string message) of
......@@ -115,6 +137,7 @@ class InitDataResponse(rpc.Response):
msg : string
string message
"""
def __init__(self, msg):
self.msg = msg
......@@ -124,6 +147,7 @@ class InitDataResponse(rpc.Response):
def __setstate__(self, state):
self.msg = state
class InitDataRequest(rpc.Request):
"""Send meta data to server and init data tensor
on server using UDF init function.
......@@ -141,6 +165,7 @@ class InitDataRequest(rpc.Request):
init_func : function
UDF init function.
"""
def __init__(self, name, shape, dtype, policy_str, init_func):
self.name = name
self.shape = shape
......@@ -149,10 +174,22 @@ class InitDataRequest(rpc.Request):
self.init_func = init_func
def __getstate__(self):
return self.name, self.shape, self.dtype, self.policy_str, self.init_func
return (
self.name,
self.shape,
self.dtype,
self.policy_str,
self.init_func,
)
def __setstate__(self, state):
self.name, self.shape, self.dtype, self.policy_str, self.init_func = state
(
self.name,
self.shape,
self.dtype,
self.policy_str,
self.init_func,
) = state
def process_request(self, server_state):
kv_store = server_state.kv_store
......@@ -161,22 +198,33 @@ class InitDataRequest(rpc.Request):
# We should see requests from multiple clients. We need to ignore the duplicated
# reqeusts.
if self.name in kv_store.data_store:
assert tuple(F.shape(kv_store.data_store[self.name])) == tuple(self.shape)
assert F.reverse_data_type_dict[F.dtype(kv_store.data_store[self.name])] == self.dtype
assert tuple(F.shape(kv_store.data_store[self.name])) == tuple(
self.shape
)
assert (
F.reverse_data_type_dict[
F.dtype(kv_store.data_store[self.name])
]
== self.dtype
)
assert kv_store.part_policy[self.name].policy_str == self.policy_str
else:
if not kv_store.is_backup_server():
data_tensor = self.init_func(self.shape, dtype)
kv_store.init_data(name=self.name,
policy_str=self.policy_str,
data_tensor=data_tensor)
kv_store.init_data(
name=self.name,
policy_str=self.policy_str,
data_tensor=data_tensor,
)
else:
kv_store.init_data(name=self.name, policy_str=self.policy_str)
res = InitDataResponse(INIT_MSG)
return res
BARRIER = 901234
BARRIER_MSG = 'Barrier'
BARRIER_MSG = "Barrier"
class BarrierResponse(rpc.Response):
"""Send an confimation signal (just a short string message) of
......@@ -187,6 +235,7 @@ class BarrierResponse(rpc.Response):
msg : string
string msg
"""
def __init__(self, msg):
self.msg = msg
......@@ -196,6 +245,7 @@ class BarrierResponse(rpc.Response):
def __setstate__(self, state):
self.msg = state
class BarrierRequest(rpc.Request):
"""Send a barrier signal (just a short string message) to server.
......@@ -204,6 +254,7 @@ class BarrierRequest(rpc.Request):
role : string
client role
"""
def __init__(self, role):
self.role = role
self.group_id = rpc.get_group_id()
......@@ -229,8 +280,10 @@ class BarrierRequest(rpc.Request):
return res_list
return None
REGISTER_PULL = 901235
REGISTER_PULL_MSG = 'Register_Pull'
REGISTER_PULL_MSG = "Register_Pull"
class RegisterPullHandlerResponse(rpc.Response):
"""Send a confirmation signal (just a short string message) of
......@@ -241,6 +294,7 @@ class RegisterPullHandlerResponse(rpc.Response):
msg : string
string message
"""
def __init__(self, msg):
self.msg = msg
......@@ -250,6 +304,7 @@ class RegisterPullHandlerResponse(rpc.Response):
def __setstate__(self, state):
self.msg = state
class RegisterPullHandlerRequest(rpc.Request):
"""Send an UDF and register Pull handler on server.
......@@ -258,6 +313,7 @@ class RegisterPullHandlerRequest(rpc.Request):
pull_func : func
UDF pull handler
"""
def __init__(self, name, pull_func):
self.name = name
self.pull_func = pull_func
......@@ -274,8 +330,10 @@ class RegisterPullHandlerRequest(rpc.Request):
res = RegisterPullHandlerResponse(REGISTER_PULL_MSG)
return res
REGISTER_PUSH = 901236
REGISTER_PUSH_MSG = 'Register_Push'
REGISTER_PUSH_MSG = "Register_Push"
class RegisterPushHandlerResponse(rpc.Response):
"""Send a confirmation signal (just a short string message) of
......@@ -286,6 +344,7 @@ class RegisterPushHandlerResponse(rpc.Response):
msg : string
string message
"""
def __init__(self, msg):
self.msg = msg
......@@ -295,6 +354,7 @@ class RegisterPushHandlerResponse(rpc.Response):
def __setstate__(self, state):
self.msg = state
class RegisterPushHandlerRequest(rpc.Request):
"""Send an UDF to register Push handler on server.
......@@ -303,6 +363,7 @@ class RegisterPushHandlerRequest(rpc.Request):
push_func : func
UDF push handler
"""
def __init__(self, name, push_func):
self.name = name
self.push_func = push_func
......@@ -319,8 +380,10 @@ class RegisterPushHandlerRequest(rpc.Request):
res = RegisterPushHandlerResponse(REGISTER_PUSH_MSG)
return res
GET_SHARED = 901237
GET_SHARED_MSG = 'Get_Shared'
GET_SHARED_MSG = "Get_Shared"
class GetSharedDataResponse(rpc.Response):
"""Send meta data of shared-memory tensor to client.
......@@ -333,6 +396,7 @@ class GetSharedDataResponse(rpc.Response):
{'data_0' : (shape, dtype, policy_str),
'data_1' : (shape, dtype, policy_str)}
"""
def __init__(self, meta):
self.meta = meta
......@@ -342,6 +406,7 @@ class GetSharedDataResponse(rpc.Response):
def __setstate__(self, state):
self.meta = state
class GetSharedDataRequest(rpc.Request):
"""Send a signal (just a short string message) to get the
meta data of shared-tensor from server.
......@@ -351,6 +416,7 @@ class GetSharedDataRequest(rpc.Request):
msg : string
string message
"""
def __init__(self, msg):
self.msg = msg
......@@ -368,14 +434,18 @@ class GetSharedDataRequest(rpc.Request):
if server_state.keep_alive:
if name not in kv_store.orig_data:
continue
meta[name] = (F.shape(data),
F.reverse_data_type_dict[F.dtype(data)],
kv_store.part_policy[name].policy_str)
meta[name] = (
F.shape(data),
F.reverse_data_type_dict[F.dtype(data)],
kv_store.part_policy[name].policy_str,
)
res = GetSharedDataResponse(meta)
return res
GET_PART_SHAPE = 901238
class GetPartShapeResponse(rpc.Response):
"""Send the partitioned data shape back to client.
......@@ -384,6 +454,7 @@ class GetPartShapeResponse(rpc.Response):
shape : tuple
shape of tensor
"""
def __init__(self, shape):
self.shape = shape
......@@ -397,6 +468,7 @@ class GetPartShapeResponse(rpc.Response):
else:
self.shape = state
class GetPartShapeRequest(rpc.Request):
"""Send data name to get the partitioned data shape from server.
......@@ -405,6 +477,7 @@ class GetPartShapeRequest(rpc.Request):
name : str
data name
"""
def __init__(self, name):
self.name = name
......@@ -417,18 +490,23 @@ class GetPartShapeRequest(rpc.Request):
def process_request(self, server_state):
kv_store = server_state.kv_store
if self.name not in kv_store.data_store:
raise RuntimeError("KVServer Cannot find data tensor with name: %s" % self.name)
raise RuntimeError(
"KVServer Cannot find data tensor with name: %s" % self.name
)
data_shape = F.shape(kv_store.data_store[self.name])
res = GetPartShapeResponse(data_shape)
return res
SEND_META_TO_BACKUP = 901239
SEND_META_TO_BACKUP_MSG = "Send_Meta_TO_Backup"
class SendMetaToBackupResponse(rpc.Response):
"""Send a confirmation signal (just a short string message)
of SendMetaToBackupRequest to client.
"""
def __init__(self, msg):
self.msg = msg
......@@ -438,6 +516,7 @@ class SendMetaToBackupResponse(rpc.Response):
def __setstate__(self, state):
self.msg = state
class SendMetaToBackupRequest(rpc.Request):
"""Send meta data to backup server and backup server
will use this meta data to read shared-memory tensor.
......@@ -457,7 +536,10 @@ class SendMetaToBackupRequest(rpc.Request):
push_handler : callable
The callback function when data is pushed to kvstore.
"""
def __init__(self, name, dtype, shape, policy_str, pull_handler, push_handler):
def __init__(
self, name, dtype, shape, policy_str, pull_handler, push_handler
):
self.name = name
self.dtype = dtype
self.shape = shape
......@@ -466,39 +548,65 @@ class SendMetaToBackupRequest(rpc.Request):
self.push_handler = push_handler
def __getstate__(self):
return self.name, self.dtype, self.shape, self.policy_str, self.pull_handler, \
self.push_handler
return (
self.name,
self.dtype,
self.shape,
self.policy_str,
self.pull_handler,
self.push_handler,
)
def __setstate__(self, state):
self.name, self.dtype, self.shape, self.policy_str, self.pull_handler, \
self.push_handler = state
(
self.name,
self.dtype,
self.shape,
self.policy_str,
self.pull_handler,
self.push_handler,
) = state
def process_request(self, server_state):
kv_store = server_state.kv_store
assert kv_store.is_backup_server()
if self.name not in kv_store.data_store:
shared_data = empty_shared_mem(self.name+'-kvdata-', False, self.shape, self.dtype)
shared_data = empty_shared_mem(
self.name + "-kvdata-", False, self.shape, self.dtype
)
dlpack = shared_data.to_dlpack()
kv_store.data_store[self.name] = F.zerocopy_from_dlpack(dlpack)
kv_store.part_policy[self.name] = kv_store.find_policy(self.policy_str)
kv_store.part_policy[self.name] = kv_store.find_policy(
self.policy_str
)
kv_store.pull_handlers[self.name] = self.pull_handler
kv_store.push_handlers[self.name] = self.push_handler
else:
assert tuple(F.shape(kv_store.data_store[self.name])) == tuple(self.shape)
assert F.reverse_data_type_dict[F.dtype(kv_store.data_store[self.name])] == self.dtype
assert tuple(F.shape(kv_store.data_store[self.name])) == tuple(
self.shape
)
assert (
F.reverse_data_type_dict[
F.dtype(kv_store.data_store[self.name])
]
== self.dtype
)
assert kv_store.part_policy[self.name].policy_str == self.policy_str
assert kv_store.pull_handlers[self.name] == self.pull_handler
assert kv_store.push_handlers[self.name] == self.push_handler
res = SendMetaToBackupResponse(SEND_META_TO_BACKUP_MSG)
return res
DELETE_DATA = 901240
DELETE_MSG = "Delete_Data"
class DeleteDataResponse(rpc.Response):
"""Send a confirmation signal (just a short string message)
of DeleteDataRequest to client.
"""
def __init__(self, msg):
self.msg = msg
......@@ -508,6 +616,7 @@ class DeleteDataResponse(rpc.Response):
def __setstate__(self, state):
self.msg = state
class DeleteDataRequest(rpc.Request):
"""Send message to server to delete data tensor
......@@ -516,6 +625,7 @@ class DeleteDataRequest(rpc.Request):
name : str
data name
"""
def __init__(self, name):
self.name = name
......@@ -535,11 +645,13 @@ class DeleteDataRequest(rpc.Request):
res = DeleteDataResponse(DELETE_MSG)
return res
COUNT_LOCAL_NONZERO = 901241
class CountLocalNonzeroResponse(rpc.Response):
"""Send the number of nonzero value in local data
"""
"""Send the number of nonzero value in local data"""
def __init__(self, num_local_nonzero):
self.num_local_nonzero = num_local_nonzero
......@@ -549,6 +661,7 @@ class CountLocalNonzeroResponse(rpc.Response):
def __setstate__(self, state):
self.num_local_nonzero = state
class CountLocalNonzeroRequest(rpc.Request):
"""Send data name to server to count local nonzero value
Parameters
......@@ -556,6 +669,7 @@ class CountLocalNonzeroRequest(rpc.Request):
name : str
data name
"""
def __init__(self, name):
self.name = name
......@@ -571,8 +685,10 @@ class CountLocalNonzeroRequest(rpc.Request):
res = CountLocalNonzeroResponse(num_local_nonzero)
return res
############################ KVServer ###############################
def default_push_handler(target, name, id_tensor, data_tensor):
"""Default handler for PUSH message.
......@@ -592,6 +708,7 @@ def default_push_handler(target, name, id_tensor, data_tensor):
# TODO(chao): support Tensorflow backend
target[name][id_tensor] = data_tensor
def default_pull_handler(target, name, id_tensor):
"""Default handler for PULL operation.
......@@ -614,6 +731,7 @@ def default_pull_handler(target, name, id_tensor):
# TODO(chao): support Tensorflow backend
return target[name][id_tensor]
class KVServer(object):
"""KVServer is a lightweight key-value store service for DGL distributed training.
......@@ -636,45 +754,50 @@ class KVServer(object):
num_clients : int
Total number of KVClients that will be connected to the KVServer.
"""
def __init__(self, server_id, ip_config, num_servers, num_clients):
assert server_id >= 0, 'server_id (%d) cannot be a negative number.' % server_id
assert num_servers > 0, 'num_servers (%d) must be a positive number.' % num_servers
assert os.path.exists(ip_config), 'Cannot open file: %s' % ip_config
assert num_clients >= 0, 'num_clients (%d) cannot be a negative number.' % num_clients
assert server_id >= 0, (
"server_id (%d) cannot be a negative number." % server_id
)
assert num_servers > 0, (
"num_servers (%d) must be a positive number." % num_servers
)
assert os.path.exists(ip_config), "Cannot open file: %s" % ip_config
assert num_clients >= 0, (
"num_clients (%d) cannot be a negative number." % num_clients
)
# Register services on server
rpc.register_service(KVSTORE_PULL,
PullRequest,
PullResponse)
rpc.register_service(KVSTORE_PUSH,
PushRequest,
None)
rpc.register_service(INIT_DATA,
InitDataRequest,
InitDataResponse)
rpc.register_service(BARRIER,
BarrierRequest,
BarrierResponse)
rpc.register_service(REGISTER_PUSH,
RegisterPushHandlerRequest,
RegisterPushHandlerResponse)
rpc.register_service(REGISTER_PULL,
RegisterPullHandlerRequest,
RegisterPullHandlerResponse)
rpc.register_service(GET_SHARED,
GetSharedDataRequest,
GetSharedDataResponse)
rpc.register_service(GET_PART_SHAPE,
GetPartShapeRequest,
GetPartShapeResponse)
rpc.register_service(SEND_META_TO_BACKUP,
SendMetaToBackupRequest,
SendMetaToBackupResponse)
rpc.register_service(DELETE_DATA,
DeleteDataRequest,
DeleteDataResponse)
rpc.register_service(COUNT_LOCAL_NONZERO,
CountLocalNonzeroRequest,
CountLocalNonzeroResponse)
rpc.register_service(KVSTORE_PULL, PullRequest, PullResponse)
rpc.register_service(KVSTORE_PUSH, PushRequest, None)
rpc.register_service(INIT_DATA, InitDataRequest, InitDataResponse)
rpc.register_service(BARRIER, BarrierRequest, BarrierResponse)
rpc.register_service(
REGISTER_PUSH,
RegisterPushHandlerRequest,
RegisterPushHandlerResponse,
)
rpc.register_service(
REGISTER_PULL,
RegisterPullHandlerRequest,
RegisterPullHandlerResponse,
)
rpc.register_service(
GET_SHARED, GetSharedDataRequest, GetSharedDataResponse
)
rpc.register_service(
GET_PART_SHAPE, GetPartShapeRequest, GetPartShapeResponse
)
rpc.register_service(
SEND_META_TO_BACKUP,
SendMetaToBackupRequest,
SendMetaToBackupResponse,
)
rpc.register_service(DELETE_DATA, DeleteDataRequest, DeleteDataResponse)
rpc.register_service(
COUNT_LOCAL_NONZERO,
CountLocalNonzeroRequest,
CountLocalNonzeroResponse,
)
# Store the tensor data with specified data name
self._data_store = {}
# Store original tensor data names when instantiating DistGraphServer
......@@ -685,9 +808,11 @@ class KVServer(object):
# Basic information
self._server_id = server_id
self._server_namebook = rpc.read_ip_config(ip_config, num_servers)
assert server_id in self._server_namebook, \
'Trying to start server {}, but there are {} servers in the config file'.format(
server_id, len(self._server_namebook))
assert (
server_id in self._server_namebook
), "Trying to start server {}, but there are {} servers in the config file".format(
server_id, len(self._server_namebook)
)
self._machine_id = self._server_namebook[server_id][0]
self._group_count = self._server_namebook[server_id][3]
# We assume partition_id is equal to machine_id
......@@ -749,8 +874,7 @@ class KVServer(object):
return self._pull_handlers
def is_backup_server(self):
"""Return True if current server is a backup server.
"""
"""Return True if current server is a backup server."""
if self._server_id % self._group_count == 0:
return False
return True
......@@ -778,22 +902,26 @@ class KVServer(object):
If the data_tensor is None, KVServer will
read shared-memory when client invoking get_shared_data().
"""
assert len(name) > 0, 'name cannot be empty.'
assert len(name) > 0, "name cannot be empty."
if name in self._data_store:
raise RuntimeError("Data %s has already exists!" % name)
self._part_policy[name] = self.find_policy(policy_str)
if data_tensor is not None: # Create shared-tensor
if data_tensor is not None: # Create shared-tensor
data_type = F.reverse_data_type_dict[F.dtype(data_tensor)]
shared_data = empty_shared_mem(name+'-kvdata-', True, data_tensor.shape, data_type)
shared_data = empty_shared_mem(
name + "-kvdata-", True, data_tensor.shape, data_type
)
dlpack = shared_data.to_dlpack()
self._data_store[name] = F.zerocopy_from_dlpack(dlpack)
rpc.copy_data_to_shared_memory(self._data_store[name], data_tensor)
assert self._part_policy[name].get_part_size() == data_tensor.shape[0], \
'kvserver expect partition {} for {} has {} rows, but gets {} rows'.format(
self._part_policy[name].part_id,
policy_str,
self._part_policy[name].get_part_size(),
data_tensor.shape[0])
assert (
self._part_policy[name].get_part_size() == data_tensor.shape[0]
), "kvserver expect partition {} for {} has {} rows, but gets {} rows".format(
self._part_policy[name].part_id,
policy_str,
self._part_policy[name].get_part_size(),
data_tensor.shape[0],
)
self._pull_handlers[name] = default_pull_handler
self._push_handlers[name] = default_push_handler
......@@ -808,7 +936,9 @@ class KVServer(object):
for policy in self._policy_set:
if policy_str == policy.policy_str:
return policy
raise RuntimeError("Cannot find policy_str: %s from kvserver." % policy_str)
raise RuntimeError(
"Cannot find policy_str: %s from kvserver." % policy_str
)
def count_local_nonzero(self, name):
"""Count nonzero in local data
......@@ -823,13 +953,15 @@ class KVServer(object):
int
the number of nonzero in local data.
"""
assert len(name) > 0, 'name cannot be empty.'
assert len(name) > 0, "name cannot be empty."
if name not in self._data_store:
raise RuntimeError("Data %s has not be created!" % name)
return F.count_nonzero(self._data_store[name])
############################ KVClient ###############################
class KVClient(object):
"""KVClient is used to push/pull data to/from KVServer. If the
target kvclient and kvserver are in the same machine, they can
......@@ -849,45 +981,47 @@ class KVClient(object):
role : str
We can set different role for kvstore.
"""
def __init__(self, ip_config, num_servers, role='default'):
assert rpc.get_rank() != -1, \
'Please invoke rpc.connect_to_server() before creating KVClient.'
assert os.path.exists(ip_config), 'Cannot open file: %s' % ip_config
assert num_servers > 0, 'num_servers (%d) must be a positive number.' % num_servers
def __init__(self, ip_config, num_servers, role="default"):
assert (
rpc.get_rank() != -1
), "Please invoke rpc.connect_to_server() before creating KVClient."
assert os.path.exists(ip_config), "Cannot open file: %s" % ip_config
assert num_servers > 0, (
"num_servers (%d) must be a positive number." % num_servers
)
# Register services on client
rpc.register_service(KVSTORE_PULL,
PullRequest,
PullResponse)
rpc.register_service(KVSTORE_PUSH,
PushRequest,
None)
rpc.register_service(INIT_DATA,
InitDataRequest,
InitDataResponse)
rpc.register_service(BARRIER,
BarrierRequest,
BarrierResponse)
rpc.register_service(REGISTER_PUSH,
RegisterPushHandlerRequest,
RegisterPushHandlerResponse)
rpc.register_service(REGISTER_PULL,
RegisterPullHandlerRequest,
RegisterPullHandlerResponse)
rpc.register_service(GET_SHARED,
GetSharedDataRequest,
GetSharedDataResponse)
rpc.register_service(GET_PART_SHAPE,
GetPartShapeRequest,
GetPartShapeResponse)
rpc.register_service(SEND_META_TO_BACKUP,
SendMetaToBackupRequest,
SendMetaToBackupResponse)
rpc.register_service(DELETE_DATA,
DeleteDataRequest,
DeleteDataResponse)
rpc.register_service(COUNT_LOCAL_NONZERO,
CountLocalNonzeroRequest,
CountLocalNonzeroResponse)
rpc.register_service(KVSTORE_PULL, PullRequest, PullResponse)
rpc.register_service(KVSTORE_PUSH, PushRequest, None)
rpc.register_service(INIT_DATA, InitDataRequest, InitDataResponse)
rpc.register_service(BARRIER, BarrierRequest, BarrierResponse)
rpc.register_service(
REGISTER_PUSH,
RegisterPushHandlerRequest,
RegisterPushHandlerResponse,
)
rpc.register_service(
REGISTER_PULL,
RegisterPullHandlerRequest,
RegisterPullHandlerResponse,
)
rpc.register_service(
GET_SHARED, GetSharedDataRequest, GetSharedDataResponse
)
rpc.register_service(
GET_PART_SHAPE, GetPartShapeRequest, GetPartShapeResponse
)
rpc.register_service(
SEND_META_TO_BACKUP,
SendMetaToBackupRequest,
SendMetaToBackupResponse,
)
rpc.register_service(DELETE_DATA, DeleteDataRequest, DeleteDataResponse)
rpc.register_service(
COUNT_LOCAL_NONZERO,
CountLocalNonzeroRequest,
CountLocalNonzeroResponse,
)
# Store the tensor data with specified data name
self._data_store = {}
# Store the partition information with specified data name
......@@ -1015,7 +1149,9 @@ class KVClient(object):
self._pull_handlers[name] = func
self.barrier()
def init_data(self, name, shape, dtype, part_policy, init_func, is_gdata=True):
def init_data(
self, name, shape, dtype, part_policy, init_func, is_gdata=True
):
"""Send message to kvserver to initialize new data tensor and mapping this
data from server side to client side.
......@@ -1034,9 +1170,11 @@ class KVClient(object):
is_gdata : bool
Whether the created tensor is a ndata/edata or not.
"""
assert len(name) > 0, 'name cannot be empty.'
assert len(shape) > 0, 'shape cannot be empty'
assert name not in self._data_name_list, 'data name: %s already exists.' % name
assert len(name) > 0, "name cannot be empty."
assert len(shape) > 0, "shape cannot be empty"
assert name not in self._data_name_list, (
"data name: %s already exists." % name
)
self.barrier()
shape = list(shape)
......@@ -1044,11 +1182,13 @@ class KVClient(object):
# The servers may handle the duplicated initializations.
part_shape = shape.copy()
part_shape[0] = part_policy.get_part_size()
request = InitDataRequest(name,
tuple(part_shape),
F.reverse_data_type_dict[dtype],
part_policy.policy_str,
init_func)
request = InitDataRequest(
name,
tuple(part_shape),
F.reverse_data_type_dict[dtype],
part_policy.policy_str,
init_func,
)
# The request is sent to the servers in one group, which are on the same machine.
for n in range(self._group_count):
server_id = part_policy.part_id * self._group_count + n
......@@ -1069,8 +1209,12 @@ class KVClient(object):
raise RuntimeError("Data shape %s has already exists!" % name)
self._part_policy[name] = part_policy
self._all_possible_part_policy[part_policy.policy_str] = part_policy
shared_data = empty_shared_mem(name+'-kvdata-', False, \
local_shape, F.reverse_data_type_dict[dtype])
shared_data = empty_shared_mem(
name + "-kvdata-",
False,
local_shape,
F.reverse_data_type_dict[dtype],
)
dlpack = shared_data.to_dlpack()
self._data_store[name] = F.zerocopy_from_dlpack(dlpack)
self._data_name_list.add(name)
......@@ -1081,16 +1225,20 @@ class KVClient(object):
self._push_handlers[name] = default_push_handler
# Now we need to tell the backup server the new tensor.
request = SendMetaToBackupRequest(name, F.reverse_data_type_dict[dtype],
part_shape, part_policy.policy_str,
self._pull_handlers[name],
self._push_handlers[name])
request = SendMetaToBackupRequest(
name,
F.reverse_data_type_dict[dtype],
part_shape,
part_policy.policy_str,
self._pull_handlers[name],
self._push_handlers[name],
)
# send request to all the backup server nodes
for i in range(self._group_count-1):
for i in range(self._group_count - 1):
server_id = self._machine_id * self._group_count + i + 1
rpc.send_request(server_id, request)
# recv response from all the backup server nodes
for _ in range(self._group_count-1):
for _ in range(self._group_count - 1):
response = rpc.recv_response()
assert response.msg == SEND_META_TO_BACKUP_MSG
self.barrier()
......@@ -1103,8 +1251,8 @@ class KVClient(object):
name : str
data name
"""
assert len(name) > 0, 'name cannot be empty.'
assert name in self._data_name_list, 'data name: %s not exists.' % name
assert len(name) > 0, "name cannot be empty."
assert name in self._data_name_list, "data name: %s not exists." % name
self.barrier()
part_policy = self._part_policy[name]
......@@ -1154,10 +1302,14 @@ class KVClient(object):
if name not in self._data_name_list:
shape, dtype, policy_str = meta
assert policy_str in self._all_possible_part_policy
shared_data = empty_shared_mem(name+'-kvdata-', False, shape, dtype)
shared_data = empty_shared_mem(
name + "-kvdata-", False, shape, dtype
)
dlpack = shared_data.to_dlpack()
self._data_store[name] = F.zerocopy_from_dlpack(dlpack)
self._part_policy[name] = self._all_possible_part_policy[policy_str]
self._part_policy[name] = self._all_possible_part_policy[
policy_str
]
self._pull_handlers[name] = default_pull_handler
self._push_handlers[name] = default_push_handler
# Get full data shape across servers
......@@ -1179,15 +1331,20 @@ class KVClient(object):
# Send meta data to backup servers
for name, meta in response.meta.items():
shape, dtype, policy_str = meta
request = SendMetaToBackupRequest(name, dtype, shape, policy_str,
self._pull_handlers[name],
self._push_handlers[name])
request = SendMetaToBackupRequest(
name,
dtype,
shape,
policy_str,
self._pull_handlers[name],
self._push_handlers[name],
)
# send request to all the backup server nodes
for i in range(self._group_count-1):
for i in range(self._group_count - 1):
server_id = self._machine_id * self._group_count + i + 1
rpc.send_request(server_id, request)
# recv response from all the backup server nodes
for _ in range(self._group_count-1):
for _ in range(self._group_count - 1):
response = rpc.recv_response()
assert response.msg == SEND_META_TO_BACKUP_MSG
self._data_name_list.add(name)
......@@ -1205,9 +1362,8 @@ class KVClient(object):
return list(self._data_name_list)
def get_data_meta(self, name):
"""Get meta data (data_type, data_shape, partition_policy)
"""
assert len(name) > 0, 'name cannot be empty.'
"""Get meta data (data_type, data_shape, partition_policy)"""
assert len(name) > 0, "name cannot be empty."
data_type = F.dtype(self._data_store[name])
data_shape = self._full_data_shape[name]
part_policy = self._part_policy[name]
......@@ -1222,16 +1378,15 @@ class KVClient(object):
id_tensor : tensor
a vector storing the global data ID
"""
assert len(name) > 0, 'name cannot be empty.'
assert len(name) > 0, "name cannot be empty."
id_tensor = utils.toindex(id_tensor)
id_tensor = id_tensor.tousertensor()
assert F.ndim(id_tensor) == 1, 'ID must be a vector.'
assert F.ndim(id_tensor) == 1, "ID must be a vector."
# partition data
machine_id = self._part_policy[name].to_partid(id_tensor)
return machine_id
def push(self, name, id_tensor, data_tensor):
"""Push data to KVServer.
......@@ -1246,12 +1401,13 @@ class KVClient(object):
data_tensor : tensor
a tensor with the same row size of data ID
"""
assert len(name) > 0, 'name cannot be empty.'
assert len(name) > 0, "name cannot be empty."
id_tensor = utils.toindex(id_tensor)
id_tensor = id_tensor.tousertensor()
assert F.ndim(id_tensor) == 1, 'ID must be a vector.'
assert F.shape(id_tensor)[0] == F.shape(data_tensor)[0], \
'The data must has the same row size with ID.'
assert F.ndim(id_tensor) == 1, "ID must be a vector."
assert (
F.shape(id_tensor)[0] == F.shape(data_tensor)[0]
), "The data must has the same row size with ID."
# partition data
machine_id = self._part_policy[name].to_partid(id_tensor)
# sort index by machine id
......@@ -1265,21 +1421,23 @@ class KVClient(object):
local_data = None
for idx, machine_idx in enumerate(machine):
end = start + count[idx]
if start == end: # No data for target machine
if start == end: # No data for target machine
continue
partial_id = id_tensor[start:end]
partial_data = data_tensor[start:end]
if machine_idx == self._machine_id: # local push
if machine_idx == self._machine_id: # local push
# Note that DO NOT push local data right now because we can overlap
# communication-local_push here
local_id = self._part_policy[name].to_local(partial_id)
local_data = partial_data
else: # push data to remote server
else: # push data to remote server
request = PushRequest(name, partial_id, partial_data)
rpc.send_request_to_machine(machine_idx, request)
start += count[idx]
if local_id is not None: # local push
self._push_handlers[name](self._data_store, name, local_id, local_data)
if local_id is not None: # local push
self._push_handlers[name](
self._data_store, name, local_id, local_data
)
def pull(self, name, id_tensor):
"""Pull message from KVServer.
......@@ -1296,19 +1454,24 @@ class KVClient(object):
tensor
a data tensor with the same row size of id_tensor.
"""
assert len(name) > 0, 'name cannot be empty.'
assert len(name) > 0, "name cannot be empty."
id_tensor = utils.toindex(id_tensor)
id_tensor = id_tensor.tousertensor()
assert F.ndim(id_tensor) == 1, 'ID must be a vector.'
if self._pull_handlers[name] is default_pull_handler: # Use fast-pull
assert F.ndim(id_tensor) == 1, "ID must be a vector."
if self._pull_handlers[name] is default_pull_handler: # Use fast-pull
part_id = self._part_policy[name].to_partid(id_tensor)
return rpc.fast_pull(name, id_tensor, part_id, KVSTORE_PULL,
self._machine_count,
self._group_count,
self._machine_id,
self._client_id,
self._data_store[name],
self._part_policy[name])
return rpc.fast_pull(
name,
id_tensor,
part_id,
KVSTORE_PULL,
self._machine_count,
self._group_count,
self._machine_id,
self._client_id,
self._data_store[name],
self._part_policy[name],
)
else:
# partition data
machine_id = self._part_policy[name].to_partid(id_tensor)
......@@ -1316,29 +1479,33 @@ class KVClient(object):
sorted_id = F.tensor(np.argsort(F.asnumpy(machine_id)))
back_sorted_id = F.tensor(np.argsort(F.asnumpy(sorted_id)))
id_tensor = id_tensor[sorted_id]
machine, count = np.unique(F.asnumpy(machine_id), return_counts=True)
machine, count = np.unique(
F.asnumpy(machine_id), return_counts=True
)
# pull data from server by order
start = 0
pull_count = 0
local_id = None
for idx, machine_idx in enumerate(machine):
end = start + count[idx]
if start == end: # No data for target machine
if start == end: # No data for target machine
continue
partial_id = id_tensor[start:end]
if machine_idx == self._machine_id: # local pull
if machine_idx == self._machine_id: # local pull
# Note that DO NOT pull local data right now because we can overlap
# communication-local_pull here
local_id = self._part_policy[name].to_local(partial_id)
else: # pull data from remote server
else: # pull data from remote server
request = PullRequest(name, partial_id)
rpc.send_request_to_machine(machine_idx, request)
pull_count += 1
start += count[idx]
# recv response
response_list = []
if local_id is not None: # local pull
local_data = self._pull_handlers[name](self._data_store, name, local_id)
if local_id is not None: # local pull
local_data = self._pull_handlers[name](
self._data_store, name, local_id
)
server_id = self._main_server_id
local_response = PullResponse(server_id, local_data)
response_list.append(local_response)
......@@ -1348,21 +1515,22 @@ class KVClient(object):
response_list.append(remote_response)
# sort response by server_id and concat tensor
response_list.sort(key=self._take_id)
data_tensor = F.cat(seq=[response.data_tensor for response in response_list], dim=0)
return data_tensor[back_sorted_id] # return data with original index order
data_tensor = F.cat(
seq=[response.data_tensor for response in response_list], dim=0
)
return data_tensor[
back_sorted_id
] # return data with original index order
def union(self, operand1_name, operand2_name, output_name):
"""Compute the union of two mask arrays in the KVStore.
"""
"""Compute the union of two mask arrays in the KVStore."""
# Each trainer computes its own result from its local storage.
self._data_store[output_name][:] = (
self._data_store[operand1_name] |
self._data_store[operand2_name]
self._data_store[operand1_name] | self._data_store[operand2_name]
)
def _take_id(self, elem):
"""Used by sort response list
"""
"""Used by sort response list"""
return elem.server_id
def count_nonzero(self, name):
......@@ -1382,8 +1550,11 @@ class KVClient(object):
pull_count = 0
for machine_id in range(self._machine_count):
if machine_id == self._machine_id:
local_id = F.tensor(np.arange(self._part_policy[name].get_part_size(),
dtype=np.int64))
local_id = F.tensor(
np.arange(
self._part_policy[name].get_part_size(), dtype=np.int64
)
)
total += F.count_nonzero(self._data_store[name][local_id])
else:
request = CountLocalNonzeroRequest(name)
......@@ -1405,22 +1576,26 @@ class KVClient(object):
"""
return self._data_store
KVCLIENT = None
def init_kvstore(ip_config, num_servers, role):
"""initialize KVStore"""
global KVCLIENT
if KVCLIENT is None:
if os.environ.get('DGL_DIST_MODE', 'standalone') == 'standalone':
if os.environ.get("DGL_DIST_MODE", "standalone") == "standalone":
KVCLIENT = SA_KVClient()
else:
KVCLIENT = KVClient(ip_config, num_servers, role)
def close_kvstore():
"""Close the current KVClient"""
global KVCLIENT
KVCLIENT = None
def get_kvstore():
"""get the KVClient"""
return KVCLIENT
"""Define sparse embedding and optimizer."""
import torch as th
from .... import backend as F
from .... import utils
from .... import backend as F, utils
from ...dist_tensor import DistTensor
class DistEmbedding:
'''Distributed node embeddings.
"""Distributed node embeddings.
DGL provides a distributed embedding to support models that require learnable embeddings.
DGL's distributed embeddings are mainly used for learning node embeddings of graph models.
......@@ -63,11 +64,23 @@ class DistEmbedding:
the forward computation, users have to invoke
py:meth:`~dgl.distributed.optim.SparseAdagrad.step` afterwards. Otherwise, there will be
some memory leak.
'''
def __init__(self, num_embeddings, embedding_dim, name=None,
init_func=None, part_policy=None):
self._tensor = DistTensor((num_embeddings, embedding_dim), F.float32, name,
init_func=init_func, part_policy=part_policy)
"""
def __init__(
self,
num_embeddings,
embedding_dim,
name=None,
init_func=None,
part_policy=None,
):
self._tensor = DistTensor(
(num_embeddings, embedding_dim),
F.float32,
name,
init_func=init_func,
part_policy=part_policy,
)
self._trace = []
self._name = name
self._num_embeddings = num_embeddings
......@@ -81,10 +94,10 @@ class DistEmbedding:
# actually fails unit test. ???
# else:
# assert 'th.distributed should be initialized'
self._optm_state = None # track optimizer state
self._optm_state = None # track optimizer state
self._part_policy = part_policy
def __call__(self, idx, device=th.device('cpu')):
def __call__(self, idx, device=th.device("cpu")):
"""
node_ids : th.tensor
Index of the embeddings to collect.
......@@ -104,8 +117,7 @@ class DistEmbedding:
return emb
def reset_trace(self):
'''Reset the traced data.
'''
"""Reset the traced data."""
self._trace = []
@property
......
......@@ -10,9 +10,9 @@ import dgl
from .... import backend as F
from ...dist_tensor import DistTensor
from ...graph_partition_book import EDGE_PART_POLICY, NODE_PART_POLICY
from ...nn.pytorch import DistEmbedding
from .utils import alltoall_cpu, alltoallv_cpu
from ...graph_partition_book import EDGE_PART_POLICY, NODE_PART_POLICY
EMB_STATES = "emb_states"
WORLD_SIZE = "world_size"
......
......@@ -3,40 +3,41 @@
import json
import os
import time
import numpy as np
from .. import backend as F
from ..base import NID, EID, NTYPE, ETYPE, DGLError
from ..base import DGLError, EID, ETYPE, NID, NTYPE
from ..convert import to_homogeneous
from ..random import choice as random_choice
from ..transforms import sort_csr_by_tag, sort_csc_by_tag
from ..data.utils import load_graphs, save_graphs, load_tensors, save_tensors
from ..data.utils import load_graphs, load_tensors, save_graphs, save_tensors
from ..partition import (
get_peak_mem,
metis_partition_assignment,
partition_graph_with_halo,
get_peak_mem,
)
from ..random import choice as random_choice
from ..transforms import sort_csc_by_tag, sort_csr_by_tag
from .constants import DEFAULT_ETYPE, DEFAULT_NTYPE
from .graph_partition_book import (
RangePartitionBook,
_etype_tuple_to_str,
_etype_str_to_tuple,
_etype_tuple_to_str,
RangePartitionBook,
)
RESERVED_FIELD_DTYPE = {
'inner_node': F.uint8, # A flag indicates whether the node is inside a partition.
'inner_edge': F.uint8, # A flag indicates whether the edge is inside a partition.
"inner_node": F.uint8, # A flag indicates whether the node is inside a partition.
"inner_edge": F.uint8, # A flag indicates whether the edge is inside a partition.
NID: F.int64,
EID: F.int64,
NTYPE: F.int16,
# `sort_csr_by_tag` and `sort_csc_by_tag` works on int32/64 only.
ETYPE: F.int32
}
ETYPE: F.int32,
}
def _format_part_metadata(part_metadata, formatter):
'''Format etypes with specified formatter.
'''
for key in ['edge_map', 'etypes']:
"""Format etypes with specified formatter."""
for key in ["edge_map", "etypes"]:
if key not in part_metadata:
continue
orig_data = part_metadata[key]
......@@ -49,32 +50,36 @@ def _format_part_metadata(part_metadata, formatter):
part_metadata[key] = new_data
return part_metadata
def _load_part_config(part_config):
'''Load part config and format.
'''
"""Load part config and format."""
try:
with open(part_config) as f:
part_metadata = _format_part_metadata(json.load(f),
_etype_str_to_tuple)
part_metadata = _format_part_metadata(
json.load(f), _etype_str_to_tuple
)
except AssertionError as e:
raise DGLError(f"Failed to load partition config due to {e}. "
raise DGLError(
f"Failed to load partition config due to {e}. "
"Probably caused by outdated config. If so, please refer to "
"https://github.com/dmlc/dgl/tree/master/tools#change-edge-"
"type-to-canonical-edge-type-for-partition-configuration-json")
"type-to-canonical-edge-type-for-partition-configuration-json"
)
return part_metadata
def _dump_part_config(part_config, part_metadata):
'''Format and dump part config.
'''
"""Format and dump part config."""
part_metadata = _format_part_metadata(part_metadata, _etype_tuple_to_str)
with open(part_config, 'w') as outfile:
with open(part_config, "w") as outfile:
json.dump(part_metadata, outfile, sort_keys=True, indent=4)
def _save_graphs(filename, g_list, formats=None, sort_etypes=False):
'''Preprocess partitions before saving:
"""Preprocess partitions before saving:
1. format data types.
2. sort csc/csr by tag.
'''
"""
for g in g_list:
for k, dtype in RESERVED_FIELD_DTYPE.items():
if k in g.ndata:
......@@ -84,25 +89,36 @@ def _save_graphs(filename, g_list, formats=None, sort_etypes=False):
for g in g_list:
if (not sort_etypes) or (formats is None):
continue
if 'csr' in formats:
g = sort_csr_by_tag(g, tag=g.edata[ETYPE], tag_type='edge')
if 'csc' in formats:
g = sort_csc_by_tag(g, tag=g.edata[ETYPE], tag_type='edge')
save_graphs(filename , g_list, formats=formats)
if "csr" in formats:
g = sort_csr_by_tag(g, tag=g.edata[ETYPE], tag_type="edge")
if "csc" in formats:
g = sort_csc_by_tag(g, tag=g.edata[ETYPE], tag_type="edge")
save_graphs(filename, g_list, formats=formats)
def _get_inner_node_mask(graph, ntype_id):
if NTYPE in graph.ndata:
dtype = F.dtype(graph.ndata['inner_node'])
return graph.ndata['inner_node'] * F.astype(graph.ndata[NTYPE] == ntype_id, dtype) == 1
dtype = F.dtype(graph.ndata["inner_node"])
return (
graph.ndata["inner_node"]
* F.astype(graph.ndata[NTYPE] == ntype_id, dtype)
== 1
)
else:
return graph.ndata['inner_node'] == 1
return graph.ndata["inner_node"] == 1
def _get_inner_edge_mask(graph, etype_id):
if ETYPE in graph.edata:
dtype = F.dtype(graph.edata['inner_edge'])
return graph.edata['inner_edge'] * F.astype(graph.edata[ETYPE] == etype_id, dtype) == 1
dtype = F.dtype(graph.edata["inner_edge"])
return (
graph.edata["inner_edge"]
* F.astype(graph.edata[ETYPE] == etype_id, dtype)
== 1
)
else:
return graph.edata['inner_edge'] == 1
return graph.edata["inner_edge"] == 1
def _get_part_ranges(id_ranges):
res = {}
......@@ -116,11 +132,14 @@ def _get_part_ranges(id_ranges):
for i, end in enumerate(id_ranges[key]):
id_ranges[key][i] = [start, end]
start = end
res[key] = np.concatenate([np.array(l) for l in id_ranges[key]]).reshape(-1, 2)
res[key] = np.concatenate(
[np.array(l) for l in id_ranges[key]]
).reshape(-1, 2)
return res
def load_partition(part_config, part_id, load_feats=True):
''' Load data of a partition from the data path.
"""Load data of a partition from the data path.
A partition data includes a graph structure of the partition, a dict of node tensors,
a dict of edge tensors and some metadata. The partition may contain the HALO nodes,
......@@ -158,60 +177,87 @@ def load_partition(part_config, part_id, load_feats=True):
The node types
List[(str, str, str)]
The edge types
'''
"""
config_path = os.path.dirname(part_config)
relative_to_config = lambda path: os.path.join(config_path, path)
with open(part_config) as conf_f:
part_metadata = json.load(conf_f)
assert 'part-{}'.format(part_id) in part_metadata, "part-{} does not exist".format(part_id)
part_files = part_metadata['part-{}'.format(part_id)]
assert 'part_graph' in part_files, "the partition does not contain graph structure."
graph = load_graphs(relative_to_config(part_files['part_graph']))[0][0]
assert NID in graph.ndata, "the partition graph should contain node mapping to global node ID"
assert EID in graph.edata, "the partition graph should contain edge mapping to global edge ID"
assert (
"part-{}".format(part_id) in part_metadata
), "part-{} does not exist".format(part_id)
part_files = part_metadata["part-{}".format(part_id)]
assert (
"part_graph" in part_files
), "the partition does not contain graph structure."
graph = load_graphs(relative_to_config(part_files["part_graph"]))[0][0]
assert (
NID in graph.ndata
), "the partition graph should contain node mapping to global node ID"
assert (
EID in graph.edata
), "the partition graph should contain edge mapping to global edge ID"
gpb, graph_name, ntypes, etypes = load_partition_book(part_config, part_id)
ntypes_list = list(ntypes.keys())
etypes_list = list(etypes.keys())
if 'DGL_DIST_DEBUG' in os.environ:
if "DGL_DIST_DEBUG" in os.environ:
for ntype in ntypes:
ntype_id = ntypes[ntype]
# graph.ndata[NID] are global homogeneous node IDs.
nids = F.boolean_mask(graph.ndata[NID], _get_inner_node_mask(graph, ntype_id))
nids = F.boolean_mask(
graph.ndata[NID], _get_inner_node_mask(graph, ntype_id)
)
partids1 = gpb.nid2partid(nids)
_, per_type_nids = gpb.map_to_per_ntype(nids)
partids2 = gpb.nid2partid(per_type_nids, ntype)
assert np.all(F.asnumpy(partids1 == part_id)), \
'Unexpected partition IDs are found in the loaded partition ' \
'while querying via global homogeneous node IDs.'
assert np.all(F.asnumpy(partids2 == part_id)), \
'Unexpected partition IDs are found in the loaded partition ' \
'while querying via type-wise node IDs.'
assert np.all(F.asnumpy(partids1 == part_id)), (
"Unexpected partition IDs are found in the loaded partition "
"while querying via global homogeneous node IDs."
)
assert np.all(F.asnumpy(partids2 == part_id)), (
"Unexpected partition IDs are found in the loaded partition "
"while querying via type-wise node IDs."
)
for etype in etypes:
etype_id = etypes[etype]
# graph.edata[EID] are global homogeneous edge IDs.
eids = F.boolean_mask(graph.edata[EID], _get_inner_edge_mask(graph, etype_id))
eids = F.boolean_mask(
graph.edata[EID], _get_inner_edge_mask(graph, etype_id)
)
partids1 = gpb.eid2partid(eids)
_, per_type_eids = gpb.map_to_per_etype(eids)
partids2 = gpb.eid2partid(per_type_eids, etype)
assert np.all(F.asnumpy(partids1 == part_id)), \
'Unexpected partition IDs are found in the loaded partition ' \
'while querying via global homogeneous edge IDs.'
assert np.all(F.asnumpy(partids2 == part_id)), \
'Unexpected partition IDs are found in the loaded partition ' \
'while querying via type-wise edge IDs.'
assert np.all(F.asnumpy(partids1 == part_id)), (
"Unexpected partition IDs are found in the loaded partition "
"while querying via global homogeneous edge IDs."
)
assert np.all(F.asnumpy(partids2 == part_id)), (
"Unexpected partition IDs are found in the loaded partition "
"while querying via type-wise edge IDs."
)
node_feats = {}
edge_feats = {}
if load_feats:
node_feats, edge_feats = load_partition_feats(part_config, part_id)
return graph, node_feats, edge_feats, gpb, graph_name, ntypes_list, etypes_list
return (
graph,
node_feats,
edge_feats,
gpb,
graph_name,
ntypes_list,
etypes_list,
)
def load_partition_feats(part_config, part_id, load_nodes=True, load_edges=True):
'''Load node/edge feature data from a partition.
def load_partition_feats(
part_config, part_id, load_nodes=True, load_edges=True
):
"""Load node/edge feature data from a partition.
Parameters
----------
......@@ -230,45 +276,52 @@ def load_partition_feats(part_config, part_id, load_nodes=True, load_edges=True)
Node features.
Dict[str, Tensor] or None
Edge features.
'''
"""
config_path = os.path.dirname(part_config)
relative_to_config = lambda path: os.path.join(config_path, path)
with open(part_config) as conf_f:
part_metadata = json.load(conf_f)
assert 'part-{}'.format(part_id) in part_metadata, "part-{} does not exist".format(part_id)
part_files = part_metadata['part-{}'.format(part_id)]
assert 'node_feats' in part_files, "the partition does not contain node features."
assert 'edge_feats' in part_files, "the partition does not contain edge feature."
assert (
"part-{}".format(part_id) in part_metadata
), "part-{} does not exist".format(part_id)
part_files = part_metadata["part-{}".format(part_id)]
assert (
"node_feats" in part_files
), "the partition does not contain node features."
assert (
"edge_feats" in part_files
), "the partition does not contain edge feature."
node_feats = None
if load_nodes:
node_feats = load_tensors(relative_to_config(part_files['node_feats']))
node_feats = load_tensors(relative_to_config(part_files["node_feats"]))
edge_feats = None
if load_edges:
edge_feats = load_tensors(relative_to_config(part_files['edge_feats']))
edge_feats = load_tensors(relative_to_config(part_files["edge_feats"]))
# In the old format, the feature name doesn't contain node/edge type.
# For compatibility, let's add node/edge types to the feature names.
if node_feats is not None:
new_feats = {}
for name in node_feats:
feat = node_feats[name]
if name.find('/') == -1:
name = DEFAULT_NTYPE + '/' + name
if name.find("/") == -1:
name = DEFAULT_NTYPE + "/" + name
new_feats[name] = feat
node_feats = new_feats
if edge_feats is not None:
new_feats = {}
for name in edge_feats:
feat = edge_feats[name]
if name.find('/') == -1:
name = _etype_tuple_to_str(DEFAULT_ETYPE) + '/' + name
if name.find("/") == -1:
name = _etype_tuple_to_str(DEFAULT_ETYPE) + "/" + name
new_feats[name] = feat
edge_feats = new_feats
return node_feats, edge_feats
def load_partition_book(part_config, part_id):
'''Load a graph partition book from the partition config file.
"""Load a graph partition book from the partition config file.
Parameters
----------
......@@ -287,23 +340,30 @@ def load_partition_book(part_config, part_id):
The node types
dict
The edge types
'''
"""
part_metadata = _load_part_config(part_config)
assert 'num_parts' in part_metadata, 'num_parts does not exist.'
assert part_metadata['num_parts'] > part_id, \
'part {} is out of range (#parts: {})'.format(part_id, part_metadata['num_parts'])
num_parts = part_metadata['num_parts']
assert 'num_nodes' in part_metadata, "cannot get the number of nodes of the global graph."
assert 'num_edges' in part_metadata, "cannot get the number of edges of the global graph."
assert 'node_map' in part_metadata, "cannot get the node map."
assert 'edge_map' in part_metadata, "cannot get the edge map."
assert 'graph_name' in part_metadata, "cannot get the graph name"
assert "num_parts" in part_metadata, "num_parts does not exist."
assert (
part_metadata["num_parts"] > part_id
), "part {} is out of range (#parts: {})".format(
part_id, part_metadata["num_parts"]
)
num_parts = part_metadata["num_parts"]
assert (
"num_nodes" in part_metadata
), "cannot get the number of nodes of the global graph."
assert (
"num_edges" in part_metadata
), "cannot get the number of edges of the global graph."
assert "node_map" in part_metadata, "cannot get the node map."
assert "edge_map" in part_metadata, "cannot get the edge map."
assert "graph_name" in part_metadata, "cannot get the graph name"
# If this is a range partitioning, node_map actually stores a list, whose elements
# indicate the boundary of range partitioning. Otherwise, node_map stores a filename
# that contains node map in a NumPy array.
node_map = part_metadata['node_map']
edge_map = part_metadata['edge_map']
node_map = part_metadata["node_map"]
edge_map = part_metadata["edge_map"]
if isinstance(node_map, dict):
for key in node_map:
is_range_part = isinstance(node_map[key], list)
......@@ -318,28 +378,35 @@ def load_partition_book(part_config, part_id):
ntypes = {DEFAULT_NTYPE: 0}
etypes = {DEFAULT_ETYPE: 0}
if 'ntypes' in part_metadata:
ntypes = part_metadata['ntypes']
if 'etypes' in part_metadata:
etypes = part_metadata['etypes']
if "ntypes" in part_metadata:
ntypes = part_metadata["ntypes"]
if "etypes" in part_metadata:
etypes = part_metadata["etypes"]
if isinstance(node_map, dict):
for key in node_map:
assert key in ntypes, 'The node type {} is invalid'.format(key)
assert key in ntypes, "The node type {} is invalid".format(key)
if isinstance(edge_map, dict):
for key in edge_map:
assert key in etypes, 'The edge type {} is invalid'.format(key)
assert key in etypes, "The edge type {} is invalid".format(key)
if not is_range_part:
raise TypeError("Only RangePartitionBook is supported currently.")
node_map = _get_part_ranges(node_map)
edge_map = _get_part_ranges(edge_map)
return RangePartitionBook(part_id, num_parts, node_map, edge_map, ntypes, etypes), \
part_metadata['graph_name'], ntypes, etypes
return (
RangePartitionBook(
part_id, num_parts, node_map, edge_map, ntypes, etypes
),
part_metadata["graph_name"],
ntypes,
etypes,
)
def _get_orig_ids(g, sim_g, orig_nids, orig_eids):
'''Convert/construct the original node IDs and edge IDs.
"""Convert/construct the original node IDs and edge IDs.
It handles multiple cases:
* If the graph has been reshuffled and it's a homogeneous graph, we just return
......@@ -363,23 +430,32 @@ def _get_orig_ids(g, sim_g, orig_nids, orig_eids):
Returns
-------
tensor or dict of tensors, tensor or dict of tensors
'''
"""
is_hetero = not g.is_homogeneous
if is_hetero:
# Get the type IDs
# Get the type IDs
orig_ntype = F.gather_row(sim_g.ndata[NTYPE], orig_nids)
orig_etype = F.gather_row(sim_g.edata[ETYPE], orig_eids)
# Mapping between shuffled global IDs to original per-type IDs
orig_nids = F.gather_row(sim_g.ndata[NID], orig_nids)
orig_eids = F.gather_row(sim_g.edata[EID], orig_eids)
orig_nids = {ntype: F.boolean_mask(orig_nids, orig_ntype == g.get_ntype_id(ntype)) \
for ntype in g.ntypes}
orig_eids = {etype: F.boolean_mask(orig_eids, orig_etype == g.get_etype_id(etype)) \
for etype in g.canonical_etypes}
orig_nids = {
ntype: F.boolean_mask(
orig_nids, orig_ntype == g.get_ntype_id(ntype)
)
for ntype in g.ntypes
}
orig_eids = {
etype: F.boolean_mask(
orig_eids, orig_etype == g.get_etype_id(etype)
)
for etype in g.canonical_etypes
}
return orig_nids, orig_eids
def _set_trainer_ids(g, sim_g, node_parts):
'''Set the trainer IDs for each node and edge on the input graph.
"""Set the trainer IDs for each node and edge on the input graph.
The trainer IDs will be stored as node data and edge data in the input graph.
......@@ -391,29 +467,44 @@ def _set_trainer_ids(g, sim_g, node_parts):
The homogeneous version of the input graph.
node_parts : tensor
The node partition ID for each node in `sim_g`.
'''
"""
if g.is_homogeneous:
g.ndata['trainer_id'] = node_parts
g.ndata["trainer_id"] = node_parts
# An edge is assigned to a partition based on its destination node.
g.edata['trainer_id'] = F.gather_row(node_parts, g.edges()[1])
g.edata["trainer_id"] = F.gather_row(node_parts, g.edges()[1])
else:
for ntype_id, ntype in enumerate(g.ntypes):
type_idx = sim_g.ndata[NTYPE] == ntype_id
orig_nid = F.boolean_mask(sim_g.ndata[NID], type_idx)
trainer_id = F.zeros((len(orig_nid),), F.dtype(node_parts), F.cpu())
F.scatter_row_inplace(trainer_id, orig_nid, F.boolean_mask(node_parts, type_idx))
g.nodes[ntype].data['trainer_id'] = trainer_id
F.scatter_row_inplace(
trainer_id, orig_nid, F.boolean_mask(node_parts, type_idx)
)
g.nodes[ntype].data["trainer_id"] = trainer_id
for c_etype in g.canonical_etypes:
# An edge is assigned to a partition based on its destination node.
_, _, dst_type = c_etype
trainer_id = F.gather_row(g.nodes[dst_type].data['trainer_id'],
g.edges(etype=c_etype)[1])
g.edges[c_etype].data['trainer_id'] = trainer_id
def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method="metis",
balance_ntypes=None, balance_edges=False, return_mapping=False,
num_trainers_per_machine=1, objtype='cut', graph_formats=None):
''' Partition a graph for distributed training and store the partitions on files.
trainer_id = F.gather_row(
g.nodes[dst_type].data["trainer_id"], g.edges(etype=c_etype)[1]
)
g.edges[c_etype].data["trainer_id"] = trainer_id
def partition_graph(
g,
graph_name,
num_parts,
out_path,
num_hops=1,
part_method="metis",
balance_ntypes=None,
balance_edges=False,
return_mapping=False,
num_trainers_per_machine=1,
objtype="cut",
graph_formats=None,
):
"""Partition a graph for distributed training and store the partitions on files.
The partitioning occurs in three steps: 1) run a partition algorithm (e.g., Metis) to
assign nodes to partitions; 2) construct partition graph structure based on
......@@ -608,10 +699,12 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
>>> (
... g, node_feats, edge_feats, gpb, graph_name, ntypes_list, etypes_list,
... ) = dgl.distributed.load_partition('output/test.json', 0)
'''
"""
# 'coo' is required for partition
assert 'coo' in np.concatenate(list(g.formats().values())), \
"'coo' format should be allowed for partitioning graph."
assert "coo" in np.concatenate(
list(g.formats().values())
), "'coo' format should be allowed for partitioning graph."
def get_homogeneous(g, balance_ntypes):
if g.is_homogeneous:
sim_g = to_homogeneous(g)
......@@ -626,49 +719,65 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
num_ntypes = 0
for key in g.ntypes:
if key in balance_ntypes:
g.nodes[key].data['bal_ntype'] = F.astype(balance_ntypes[key],
F.int32) + num_ntypes
g.nodes[key].data["bal_ntype"] = (
F.astype(balance_ntypes[key], F.int32) + num_ntypes
)
uniq_ntypes = F.unique(balance_ntypes[key])
assert np.all(F.asnumpy(uniq_ntypes) == np.arange(len(uniq_ntypes)))
assert np.all(
F.asnumpy(uniq_ntypes) == np.arange(len(uniq_ntypes))
)
num_ntypes += len(uniq_ntypes)
else:
g.nodes[key].data['bal_ntype'] = F.ones((g.number_of_nodes(key),), F.int32,
F.cpu()) * num_ntypes
g.nodes[key].data["bal_ntype"] = (
F.ones((g.number_of_nodes(key),), F.int32, F.cpu())
* num_ntypes
)
num_ntypes += 1
sim_g = to_homogeneous(g, ndata=['bal_ntype'])
bal_ntypes = sim_g.ndata['bal_ntype']
print('The graph has {} node types and balance among {} types'.format(
len(g.ntypes), len(F.unique(bal_ntypes))))
sim_g = to_homogeneous(g, ndata=["bal_ntype"])
bal_ntypes = sim_g.ndata["bal_ntype"]
print(
"The graph has {} node types and balance among {} types".format(
len(g.ntypes), len(F.unique(bal_ntypes))
)
)
# We now no longer need them.
for key in g.ntypes:
del g.nodes[key].data['bal_ntype']
del sim_g.ndata['bal_ntype']
del g.nodes[key].data["bal_ntype"]
del sim_g.ndata["bal_ntype"]
else:
sim_g = to_homogeneous(g)
bal_ntypes = sim_g.ndata[NTYPE]
return sim_g, bal_ntypes
if objtype not in ['cut', 'vol']:
if objtype not in ["cut", "vol"]:
raise ValueError
if num_parts == 1:
start = time.time()
sim_g, balance_ntypes = get_homogeneous(g, balance_ntypes)
print('Converting to homogeneous graph takes {:.3f}s, peak mem: {:.3f} GB'.format(
time.time() - start, get_peak_mem()))
print(
"Converting to homogeneous graph takes {:.3f}s, peak mem: {:.3f} GB".format(
time.time() - start, get_peak_mem()
)
)
assert num_trainers_per_machine >= 1
if num_trainers_per_machine > 1:
# First partition the whole graph to each trainer and save the trainer ids in
# the node feature "trainer_id".
start = time.time()
node_parts = metis_partition_assignment(
sim_g, num_parts * num_trainers_per_machine,
sim_g,
num_parts * num_trainers_per_machine,
balance_ntypes=balance_ntypes,
balance_edges=balance_edges,
mode='k-way')
mode="k-way",
)
_set_trainer_ids(g, sim_g, node_parts)
print('Assigning nodes to METIS partitions takes {:.3f}s, peak mem: {:.3f} GB'.format(
time.time() - start, get_peak_mem()))
print(
"Assigning nodes to METIS partitions takes {:.3f}s, peak mem: {:.3f} GB".format(
time.time() - start, get_peak_mem()
)
)
node_parts = F.zeros((sim_g.number_of_nodes(),), F.int64, F.cpu())
parts = {0: sim_g.clone()}
......@@ -676,60 +785,86 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
orig_eids = parts[0].edata[EID] = F.arange(0, sim_g.number_of_edges())
# For one partition, we don't really shuffle nodes and edges. We just need to simulate
# it and set node data and edge data of orig_id.
parts[0].ndata['orig_id'] = orig_nids
parts[0].edata['orig_id'] = orig_eids
parts[0].ndata["orig_id"] = orig_nids
parts[0].edata["orig_id"] = orig_eids
if return_mapping:
if g.is_homogeneous:
orig_nids = F.arange(0, sim_g.number_of_nodes())
orig_eids = F.arange(0, sim_g.number_of_edges())
else:
orig_nids = {ntype: F.arange(0, g.number_of_nodes(ntype))
for ntype in g.ntypes}
orig_eids = {etype: F.arange(0, g.number_of_edges(etype))
for etype in g.canonical_etypes}
parts[0].ndata['inner_node'] = F.ones((sim_g.number_of_nodes(),),
RESERVED_FIELD_DTYPE['inner_node'], F.cpu())
parts[0].edata['inner_edge'] = F.ones((sim_g.number_of_edges(),),
RESERVED_FIELD_DTYPE['inner_edge'], F.cpu())
elif part_method in ('metis', 'random'):
orig_nids = {
ntype: F.arange(0, g.number_of_nodes(ntype))
for ntype in g.ntypes
}
orig_eids = {
etype: F.arange(0, g.number_of_edges(etype))
for etype in g.canonical_etypes
}
parts[0].ndata["inner_node"] = F.ones(
(sim_g.number_of_nodes(),),
RESERVED_FIELD_DTYPE["inner_node"],
F.cpu(),
)
parts[0].edata["inner_edge"] = F.ones(
(sim_g.number_of_edges(),),
RESERVED_FIELD_DTYPE["inner_edge"],
F.cpu(),
)
elif part_method in ("metis", "random"):
start = time.time()
sim_g, balance_ntypes = get_homogeneous(g, balance_ntypes)
print('Converting to homogeneous graph takes {:.3f}s, peak mem: {:.3f} GB'.format(
time.time() - start, get_peak_mem()))
if part_method == 'metis':
print(
"Converting to homogeneous graph takes {:.3f}s, peak mem: {:.3f} GB".format(
time.time() - start, get_peak_mem()
)
)
if part_method == "metis":
assert num_trainers_per_machine >= 1
start = time.time()
if num_trainers_per_machine > 1:
# First partition the whole graph to each trainer and save the trainer ids in
# the node feature "trainer_id".
node_parts = metis_partition_assignment(
sim_g, num_parts * num_trainers_per_machine,
sim_g,
num_parts * num_trainers_per_machine,
balance_ntypes=balance_ntypes,
balance_edges=balance_edges,
mode='k-way', objtype=objtype)
mode="k-way",
objtype=objtype,
)
_set_trainer_ids(g, sim_g, node_parts)
# And then coalesce the partitions of trainers on the same machine into one
# larger partition.
node_parts = F.floor_div(node_parts, num_trainers_per_machine)
else:
node_parts = metis_partition_assignment(sim_g, num_parts,
balance_ntypes=balance_ntypes,
balance_edges=balance_edges,
objtype=objtype)
print('Assigning nodes to METIS partitions takes {:.3f}s, peak mem: {:.3f} GB'.format(
time.time() - start, get_peak_mem()))
node_parts = metis_partition_assignment(
sim_g,
num_parts,
balance_ntypes=balance_ntypes,
balance_edges=balance_edges,
objtype=objtype,
)
print(
"Assigning nodes to METIS partitions takes {:.3f}s, peak mem: {:.3f} GB".format(
time.time() - start, get_peak_mem()
)
)
else:
node_parts = random_choice(num_parts, sim_g.number_of_nodes())
start = time.time()
parts, orig_nids, orig_eids = partition_graph_with_halo(sim_g, node_parts, num_hops,
reshuffle=True)
print('Splitting the graph into partitions takes {:.3f}s, peak mem: {:.3f} GB'.format(
time.time() - start, get_peak_mem()))
parts, orig_nids, orig_eids = partition_graph_with_halo(
sim_g, node_parts, num_hops, reshuffle=True
)
print(
"Splitting the graph into partitions takes {:.3f}s, peak mem: {:.3f} GB".format(
time.time() - start, get_peak_mem()
)
)
if return_mapping:
orig_nids, orig_eids = _get_orig_ids(g, sim_g, orig_nids, orig_eids)
else:
raise Exception('Unknown partitioning method: ' + part_method)
raise Exception("Unknown partitioning method: " + part_method)
# If the input is a heterogeneous graph, get the original node types and original node IDs.
# `part' has three types of node data at this point.
......@@ -738,38 +873,56 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
# NID: the global node IDs in the reshuffled homogeneous version of the input graph.
if not g.is_homogeneous:
for name in parts:
orig_ids = parts[name].ndata['orig_id']
orig_ids = parts[name].ndata["orig_id"]
ntype = F.gather_row(sim_g.ndata[NTYPE], orig_ids)
parts[name].ndata[NTYPE] = F.astype(ntype, RESERVED_FIELD_DTYPE[NTYPE])
assert np.all(F.asnumpy(ntype) == F.asnumpy(parts[name].ndata[NTYPE]))
parts[name].ndata[NTYPE] = F.astype(
ntype, RESERVED_FIELD_DTYPE[NTYPE]
)
assert np.all(
F.asnumpy(ntype) == F.asnumpy(parts[name].ndata[NTYPE])
)
# Get the original edge types and original edge IDs.
orig_ids = parts[name].edata['orig_id']
orig_ids = parts[name].edata["orig_id"]
etype = F.gather_row(sim_g.edata[ETYPE], orig_ids)
parts[name].edata[ETYPE] = F.astype(etype, RESERVED_FIELD_DTYPE[ETYPE])
assert np.all(F.asnumpy(etype) == F.asnumpy(parts[name].edata[ETYPE]))
parts[name].edata[ETYPE] = F.astype(
etype, RESERVED_FIELD_DTYPE[ETYPE]
)
assert np.all(
F.asnumpy(etype) == F.asnumpy(parts[name].edata[ETYPE])
)
# Calculate the global node IDs to per-node IDs mapping.
inner_ntype = F.boolean_mask(parts[name].ndata[NTYPE],
parts[name].ndata['inner_node'] == 1)
inner_nids = F.boolean_mask(parts[name].ndata[NID],
parts[name].ndata['inner_node'] == 1)
inner_ntype = F.boolean_mask(
parts[name].ndata[NTYPE], parts[name].ndata["inner_node"] == 1
)
inner_nids = F.boolean_mask(
parts[name].ndata[NID], parts[name].ndata["inner_node"] == 1
)
for ntype in g.ntypes:
inner_ntype_mask = inner_ntype == g.get_ntype_id(ntype)
typed_nids = F.boolean_mask(inner_nids, inner_ntype_mask)
# inner node IDs are in a contiguous ID range.
expected_range = np.arange(int(F.as_scalar(typed_nids[0])),
int(F.as_scalar(typed_nids[-1])) + 1)
expected_range = np.arange(
int(F.as_scalar(typed_nids[0])),
int(F.as_scalar(typed_nids[-1])) + 1,
)
assert np.all(F.asnumpy(typed_nids) == expected_range)
# Calculate the global edge IDs to per-edge IDs mapping.
inner_etype = F.boolean_mask(parts[name].edata[ETYPE],
parts[name].edata['inner_edge'] == 1)
inner_eids = F.boolean_mask(parts[name].edata[EID],
parts[name].edata['inner_edge'] == 1)
inner_etype = F.boolean_mask(
parts[name].edata[ETYPE], parts[name].edata["inner_edge"] == 1
)
inner_eids = F.boolean_mask(
parts[name].edata[EID], parts[name].edata["inner_edge"] == 1
)
for etype in g.canonical_etypes:
inner_etype_mask = inner_etype == g.get_etype_id(etype)
typed_eids = np.sort(F.asnumpy(F.boolean_mask(inner_eids, inner_etype_mask)))
assert np.all(typed_eids == np.arange(int(typed_eids[0]),
int(typed_eids[-1]) + 1))
typed_eids = np.sort(
F.asnumpy(F.boolean_mask(inner_eids, inner_etype_mask))
)
assert np.all(
typed_eids
== np.arange(int(typed_eids[0]), int(typed_eids[-1]) + 1)
)
os.makedirs(out_path, mode=0o775, exist_ok=True)
tot_num_inner_edges = 0
......@@ -786,10 +939,18 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
node_map_val[ntype] = []
for i in parts:
inner_node_mask = _get_inner_node_mask(parts[i], ntype_id)
val.append(F.as_scalar(F.sum(F.astype(inner_node_mask, F.int64), 0)))
inner_nids = F.boolean_mask(parts[i].ndata[NID], inner_node_mask)
node_map_val[ntype].append([int(F.as_scalar(inner_nids[0])),
int(F.as_scalar(inner_nids[-1])) + 1])
val.append(
F.as_scalar(F.sum(F.astype(inner_node_mask, F.int64), 0))
)
inner_nids = F.boolean_mask(
parts[i].ndata[NID], inner_node_mask
)
node_map_val[ntype].append(
[
int(F.as_scalar(inner_nids[0])),
int(F.as_scalar(inner_nids[-1])) + 1,
]
)
val = np.cumsum(val).tolist()
assert val[-1] == g.number_of_nodes(ntype)
for etype in g.canonical_etypes:
......@@ -798,10 +959,17 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
edge_map_val[etype] = []
for i in parts:
inner_edge_mask = _get_inner_edge_mask(parts[i], etype_id)
val.append(F.as_scalar(F.sum(F.astype(inner_edge_mask, F.int64), 0)))
inner_eids = np.sort(F.asnumpy(F.boolean_mask(parts[i].edata[EID],
inner_edge_mask)))
edge_map_val[etype].append([int(inner_eids[0]), int(inner_eids[-1]) + 1])
val.append(
F.as_scalar(F.sum(F.astype(inner_edge_mask, F.int64), 0))
)
inner_eids = np.sort(
F.asnumpy(
F.boolean_mask(parts[i].edata[EID], inner_edge_mask)
)
)
edge_map_val[etype].append(
[int(inner_eids[0]), int(inner_eids[-1]) + 1]
)
val = np.cumsum(val).tolist()
assert val[-1] == g.number_of_edges(etype)
else:
......@@ -811,14 +979,22 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
ntype_id = g.get_ntype_id(ntype)
inner_node_mask = _get_inner_node_mask(parts[0], ntype_id)
inner_nids = F.boolean_mask(parts[0].ndata[NID], inner_node_mask)
node_map_val[ntype] = [[int(F.as_scalar(inner_nids[0])),
int(F.as_scalar(inner_nids[-1])) + 1]]
node_map_val[ntype] = [
[
int(F.as_scalar(inner_nids[0])),
int(F.as_scalar(inner_nids[-1])) + 1,
]
]
for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(parts[0], etype_id)
inner_eids = F.boolean_mask(parts[0].edata[EID], inner_edge_mask)
edge_map_val[etype] = [[int(F.as_scalar(inner_eids[0])),
int(F.as_scalar(inner_eids[-1])) + 1]]
edge_map_val[etype] = [
[
int(F.as_scalar(inner_eids[0])),
int(F.as_scalar(inner_eids[-1])) + 1,
]
]
# Double check that the node IDs in the global ID space are sorted.
for ntype in node_map_val:
......@@ -829,18 +1005,20 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
assert np.all(val[:-1] <= val[1:])
start = time.time()
ntypes = {ntype:g.get_ntype_id(ntype) for ntype in g.ntypes}
etypes = {etype:g.get_etype_id(etype) for etype in g.canonical_etypes}
part_metadata = {'graph_name': graph_name,
'num_nodes': g.number_of_nodes(),
'num_edges': g.number_of_edges(),
'part_method': part_method,
'num_parts': num_parts,
'halo_hops': num_hops,
'node_map': node_map_val,
'edge_map': edge_map_val,
'ntypes': ntypes,
'etypes': etypes}
ntypes = {ntype: g.get_ntype_id(ntype) for ntype in g.ntypes}
etypes = {etype: g.get_etype_id(etype) for etype in g.canonical_etypes}
part_metadata = {
"graph_name": graph_name,
"num_nodes": g.number_of_nodes(),
"num_edges": g.number_of_edges(),
"part_method": part_method,
"num_parts": num_parts,
"halo_hops": num_hops,
"node_map": node_map_val,
"edge_map": edge_map_val,
"ntypes": ntypes,
"etypes": etypes,
}
for part_id in range(num_parts):
part = parts[part_id]
......@@ -852,107 +1030,150 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
ntype_id = g.get_ntype_id(ntype)
# To get the edges in the input graph, we should use original node IDs.
# Both orig_id and NID stores the per-node-type IDs.
ndata_name = 'orig_id'
ndata_name = "orig_id"
inner_node_mask = _get_inner_node_mask(part, ntype_id)
# This is global node IDs.
local_nodes = F.boolean_mask(part.ndata[ndata_name], inner_node_mask)
local_nodes = F.boolean_mask(
part.ndata[ndata_name], inner_node_mask
)
if len(g.ntypes) > 1:
# If the input is a heterogeneous graph.
local_nodes = F.gather_row(sim_g.ndata[NID], local_nodes)
print('part {} has {} nodes of type {} and {} are inside the partition'.format(
part_id, F.as_scalar(F.sum(part.ndata[NTYPE] == ntype_id, 0)),
ntype, len(local_nodes)))
print(
"part {} has {} nodes of type {} and {} are inside the partition".format(
part_id,
F.as_scalar(
F.sum(part.ndata[NTYPE] == ntype_id, 0)
),
ntype,
len(local_nodes),
)
)
else:
print('part {} has {} nodes and {} are inside the partition'.format(
part_id, part.number_of_nodes(), len(local_nodes)))
print(
"part {} has {} nodes and {} are inside the partition".format(
part_id, part.number_of_nodes(), len(local_nodes)
)
)
for name in g.nodes[ntype].data:
if name in [NID, 'inner_node']:
if name in [NID, "inner_node"]:
continue
node_feats[ntype + '/' + name] = F.gather_row(g.nodes[ntype].data[name],
local_nodes)
node_feats[ntype + "/" + name] = F.gather_row(
g.nodes[ntype].data[name], local_nodes
)
for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype)
edata_name = 'orig_id'
edata_name = "orig_id"
inner_edge_mask = _get_inner_edge_mask(part, etype_id)
# This is global edge IDs.
local_edges = F.boolean_mask(part.edata[edata_name], inner_edge_mask)
local_edges = F.boolean_mask(
part.edata[edata_name], inner_edge_mask
)
if not g.is_homogeneous:
local_edges = F.gather_row(sim_g.edata[EID], local_edges)
print('part {} has {} edges of type {} and {} are inside the partition'.format(
part_id, F.as_scalar(F.sum(part.edata[ETYPE] == etype_id, 0)),
etype, len(local_edges)))
print(
"part {} has {} edges of type {} and {} are inside the partition".format(
part_id,
F.as_scalar(
F.sum(part.edata[ETYPE] == etype_id, 0)
),
etype,
len(local_edges),
)
)
else:
print('part {} has {} edges and {} are inside the partition'.format(
part_id, part.number_of_edges(), len(local_edges)))
print(
"part {} has {} edges and {} are inside the partition".format(
part_id, part.number_of_edges(), len(local_edges)
)
)
tot_num_inner_edges += len(local_edges)
for name in g.edges[etype].data:
if name in [EID, 'inner_edge']:
if name in [EID, "inner_edge"]:
continue
edge_feats[_etype_tuple_to_str(etype) + '/' + name] = F.gather_row(
g.edges[etype].data[name], local_edges)
edge_feats[
_etype_tuple_to_str(etype) + "/" + name
] = F.gather_row(g.edges[etype].data[name], local_edges)
else:
for ntype in g.ntypes:
if len(g.ntypes) > 1:
ndata_name = 'orig_id'
ndata_name = "orig_id"
ntype_id = g.get_ntype_id(ntype)
inner_node_mask = _get_inner_node_mask(part, ntype_id)
# This is global node IDs.
local_nodes = F.boolean_mask(part.ndata[ndata_name], inner_node_mask)
local_nodes = F.boolean_mask(
part.ndata[ndata_name], inner_node_mask
)
local_nodes = F.gather_row(sim_g.ndata[NID], local_nodes)
else:
local_nodes = sim_g.ndata[NID]
for name in g.nodes[ntype].data:
if name in [NID, 'inner_node']:
if name in [NID, "inner_node"]:
continue
node_feats[ntype + '/' + name] = F.gather_row(g.nodes[ntype].data[name],
local_nodes)
node_feats[ntype + "/" + name] = F.gather_row(
g.nodes[ntype].data[name], local_nodes
)
for etype in g.canonical_etypes:
if not g.is_homogeneous:
edata_name = 'orig_id'
edata_name = "orig_id"
etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(part, etype_id)
# This is global edge IDs.
local_edges = F.boolean_mask(part.edata[edata_name], inner_edge_mask)
local_edges = F.boolean_mask(
part.edata[edata_name], inner_edge_mask
)
local_edges = F.gather_row(sim_g.edata[EID], local_edges)
else:
local_edges = sim_g.edata[EID]
for name in g.edges[etype].data:
if name in [EID, 'inner_edge']:
if name in [EID, "inner_edge"]:
continue
edge_feats[_etype_tuple_to_str(etype) + '/' + name] = F.gather_row(
g.edges[etype].data[name], local_edges)
edge_feats[
_etype_tuple_to_str(etype) + "/" + name
] = F.gather_row(g.edges[etype].data[name], local_edges)
# delete `orig_id` from ndata/edata
del part.ndata['orig_id']
del part.edata['orig_id']
del part.ndata["orig_id"]
del part.edata["orig_id"]
part_dir = os.path.join(out_path, "part" + str(part_id))
node_feat_file = os.path.join(part_dir, "node_feat.dgl")
edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
part_graph_file = os.path.join(part_dir, "graph.dgl")
part_metadata['part-{}'.format(part_id)] = {
'node_feats': os.path.relpath(node_feat_file, out_path),
'edge_feats': os.path.relpath(edge_feat_file, out_path),
'part_graph': os.path.relpath(part_graph_file, out_path)}
part_metadata["part-{}".format(part_id)] = {
"node_feats": os.path.relpath(node_feat_file, out_path),
"edge_feats": os.path.relpath(edge_feat_file, out_path),
"part_graph": os.path.relpath(part_graph_file, out_path),
}
os.makedirs(part_dir, mode=0o775, exist_ok=True)
save_tensors(node_feat_file, node_feats)
save_tensors(edge_feat_file, edge_feats)
sort_etypes = len(g.etypes) > 1
_save_graphs(part_graph_file, [part], formats=graph_formats,
sort_etypes=sort_etypes)
print('Save partitions: {:.3f} seconds, peak memory: {:.3f} GB'.format(
time.time() - start, get_peak_mem()))
_dump_part_config(f'{out_path}/{graph_name}.json', part_metadata)
_save_graphs(
part_graph_file,
[part],
formats=graph_formats,
sort_etypes=sort_etypes,
)
print(
"Save partitions: {:.3f} seconds, peak memory: {:.3f} GB".format(
time.time() - start, get_peak_mem()
)
)
_dump_part_config(f"{out_path}/{graph_name}.json", part_metadata)
num_cuts = sim_g.number_of_edges() - tot_num_inner_edges
if num_parts == 1:
num_cuts = 0
print('There are {} edges in the graph and {} edge cuts for {} partitions.'.format(
g.number_of_edges(), num_cuts, num_parts))
print(
"There are {} edges in the graph and {} edge cuts for {} partitions.".format(
g.number_of_edges(), num_cuts, num_parts
)
)
if return_mapping:
return orig_nids, orig_eids
"""Define utility functions for shared memory."""
from .. import backend as F
from .. import ndarray as nd
from .. import backend as F, ndarray as nd
from .._ffi.ndarray import empty_shared_mem
DTYPE_DICT = F.data_type_dict
......
......@@ -5,12 +5,14 @@ This kvstore is used when running in the standalone mode
from .. import backend as F
class KVClient(object):
''' The fake KVStore client.
"""The fake KVStore client.
This is to mimic the distributed KVStore client. It's used for DistGraph
in standalone mode.
'''
"""
def __init__(self):
self._data = {}
self._all_possible_part_policy = {}
......@@ -30,25 +32,27 @@ class KVClient(object):
return 1
def barrier(self):
'''barrier'''
"""barrier"""
def register_push_handler(self, name, func):
'''register push handler'''
"""register push handler"""
self._push_handlers[name] = func
def register_pull_handler(self, name, func):
'''register pull handler'''
"""register pull handler"""
self._pull_handlers[name] = func
def add_data(self, name, tensor, part_policy):
'''add data to the client'''
"""add data to the client"""
self._data[name] = tensor
self._gdata_name_list.add(name)
if part_policy.policy_str not in self._all_possible_part_policy:
self._all_possible_part_policy[part_policy.policy_str] = part_policy
def init_data(self, name, shape, dtype, part_policy, init_func, is_gdata=True):
'''add new data to the client'''
def init_data(
self, name, shape, dtype, part_policy, init_func, is_gdata=True
):
"""add new data to the client"""
self._data[name] = init_func(shape, dtype)
if part_policy.policy_str not in self._all_possible_part_policy:
self._all_possible_part_policy[part_policy.policy_str] = part_policy
......@@ -56,38 +60,38 @@ class KVClient(object):
self._gdata_name_list.add(name)
def delete_data(self, name):
'''delete the data'''
"""delete the data"""
del self._data[name]
self._gdata_name_list.remove(name)
def data_name_list(self):
'''get the names of all data'''
"""get the names of all data"""
return list(self._data.keys())
def gdata_name_list(self):
'''get the names of graph data'''
"""get the names of graph data"""
return list(self._gdata_name_list)
def get_data_meta(self, name):
'''get the metadata of data'''
"""get the metadata of data"""
return F.dtype(self._data[name]), F.shape(self._data[name]), None
def push(self, name, id_tensor, data_tensor):
'''push data to kvstore'''
"""push data to kvstore"""
if name in self._push_handlers:
self._push_handlers[name](self._data, name, id_tensor, data_tensor)
else:
F.scatter_row_inplace(self._data[name], id_tensor, data_tensor)
def pull(self, name, id_tensor):
'''pull data from kvstore'''
"""pull data from kvstore"""
if name in self._pull_handlers:
return self._pull_handlers[name](self._data, name, id_tensor)
else:
return F.gather_row(self._data[name], id_tensor)
def map_shared_data(self, partition_book):
'''Mapping shared-memory tensor from server to client.'''
"""Mapping shared-memory tensor from server to client."""
def count_nonzero(self, name):
"""Count nonzero value by pull request from KVServers.
......@@ -116,8 +120,7 @@ class KVClient(object):
return self._data
def union(self, operand1_name, operand2_name, output_name):
"""Compute the union of two mask arrays in the KVStore.
"""
"""Compute the union of two mask arrays in the KVStore."""
self._data[output_name][:] = (
self._data[operand1_name] | self._data[operand2_name]
self._data[operand1_name] | self._data[operand2_name]
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment