Unverified Commit aaec3d8a authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[Distributed] Support hierarchical partitioning (#3000)



* add.

* fix.

* fix.

* fix.

* fix.

* add tests.

* support node split and edge split.

* support 1 partition.

* add tests.

* fix.

* fix test.

* use hierarchical partition.

* add check.
Co-authored-by: default avatarZheng <dzzhen@3c22fba32af5.ant.amazon.com>
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-22-57.us-west-2.compute.internal>
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-71-112.ec2.internal>
parent 18eaad17
......@@ -20,6 +20,9 @@ if __name__ == '__main__':
help='turn the graph into an undirected graph.')
argparser.add_argument('--balance_edges', action='store_true',
help='balance the number of edges in each partition.')
argparser.add_argument('--num_trainers_per_machine', type=int, default=1,
help='the number of trainers per machine. The trainer ids are stored\
in the node feature \'trainer_id\'')
argparser.add_argument('--output', type=str, default='data',
help='Output path of partitioned graph.')
args = argparser.parse_args()
......@@ -50,4 +53,5 @@ if __name__ == '__main__':
dgl.distributed.partition_graph(g, args.dataset, args.num_parts, args.output,
part_method=args.part_method,
balance_ntypes=balance_ntypes,
balance_edges=args.balance_edges)
balance_edges=args.balance_edges,
num_trainers_per_machine=args.num_trainers_per_machine)
......@@ -260,6 +260,14 @@ def main(args):
print('rank:', g.rank())
pb = g.get_partition_book()
if 'trainer_id' in g.ndata:
train_nid = dgl.distributed.node_split(g.ndata['train_mask'], pb, force_even=True,
node_trainer_ids=g.ndata['trainer_id'])
val_nid = dgl.distributed.node_split(g.ndata['val_mask'], pb, force_even=True,
node_trainer_ids=g.ndata['trainer_id'])
test_nid = dgl.distributed.node_split(g.ndata['test_mask'], pb, force_even=True,
node_trainer_ids=g.ndata['trainer_id'])
else:
train_nid = dgl.distributed.node_split(g.ndata['train_mask'], pb, force_even=True)
val_nid = dgl.distributed.node_split(g.ndata['val_mask'], pb, force_even=True)
test_nid = dgl.distributed.node_split(g.ndata['test_mask'], pb, force_even=True)
......
......@@ -1063,20 +1063,18 @@ def _split_local(partition_book, rank, elements, local_eles):
else:
return local_eles[(size * client_id_in_part):]
def _split_even(partition_book, rank, elements):
''' Split the input element list evenly.
def _even_offset(n, k):
''' Split an array of length n into k segments and the difference of thier length is
at most 1. Return the offset of each segment.
'''
num_clients = role.get_num_trainers()
num_client_per_part = num_clients // partition_book.num_partitions()
# all ranks of the clients in the same machine are in a contiguous range.
if rank is None:
rank = role.get_trainer_rank()
assert rank < num_clients, \
'The input rank ({}) is incorrect. #Trainers: {}'.format(rank, num_clients)
# This conversion of rank is to make the new rank aligned with partitioning.
client_id_in_part = rank % num_client_per_part
rank = client_id_in_part + num_client_per_part * partition_book.partid
eles_per_part = n // k
offset = np.array([0] + [eles_per_part] * k, dtype=int)
offset[1 : n - eles_per_part * k + 1] += 1
return np.cumsum(offset)
def _split_even_to_part(partition_book, elements):
''' Split the input element list evenly.
'''
if isinstance(elements, DistTensor):
# Here we need to fetch all elements from the kvstore server.
# I hope it's OK.
......@@ -1091,25 +1089,78 @@ def _split_even(partition_book, rank, elements):
# compute the offset of each split and ensure that the difference of each partition size
# is 1.
part_size = len(eles) // num_clients
sizes = [part_size] * num_clients
remain = len(eles) - part_size * num_clients
if remain > 0:
for i in range(num_clients):
sizes[i] += 1
remain -= 1
if remain == 0:
break
offsets = np.cumsum(sizes)
offsets = _even_offset(len(eles), partition_book.num_partitions())
assert offsets[-1] == len(eles)
if rank == 0:
return eles[0:offsets[0]]
else:
return eles[offsets[rank-1]:offsets[rank]]
# Get the elements that belong to the partition.
partid = partition_book.partid
part_eles = eles[offsets[partid] : offsets[partid + 1]]
return part_eles
def _split_random_within_part(partition_book, rank, part_eles):
# If there are more than one client in a partition, we need to randomly select a subset of
# elements in the partition for a client. We have to make sure that the set of elements
# for different clients are disjoint.
def node_split(nodes, partition_book=None, ntype='_N', rank=None, force_even=True):
num_clients = role.get_num_trainers()
num_client_per_part = num_clients // partition_book.num_partitions()
if num_client_per_part == 1:
return part_eles
if rank is None:
rank = role.get_trainer_rank()
assert rank < num_clients, \
'The input rank ({}) is incorrect. #Trainers: {}'.format(rank, num_clients)
client_id_in_part = rank % num_client_per_part
offset = _even_offset(len(part_eles), num_client_per_part)
# We set the random seed for each partition, so that each process (client) in a partition
# permute the elements in a partition in the same way, so each process gets a disjoint subset
# of elements.
np.random.seed(partition_book.partid)
rand_idx = np.random.permutation(len(part_eles))
rand_idx = rand_idx[offset[client_id_in_part] : offset[client_id_in_part + 1]]
idx, _ = F.sort_1d(F.tensor(rand_idx))
return F.gather_row(part_eles, idx)
def _split_by_trainer_id(partition_book, part_eles, trainer_id,
num_client_per_part, client_id_in_part):
# TODO(zhengda): MXNet cannot deal with empty tensors, which makes the implementation
# much more difficult. Let's just use numpy for the computation for now. We just
# perform operations on vectors. It shouldn't be too difficult.
trainer_id = F.asnumpy(trainer_id)
part_eles = F.asnumpy(part_eles)
part_id = trainer_id // num_client_per_part
trainer_id = trainer_id % num_client_per_part
local_eles = part_eles[np.nonzero(part_id[part_eles] == partition_book.partid)[0]]
# these are the Ids of the local elements in the partition. The Ids are global Ids.
remote_eles = part_eles[np.nonzero(part_id[part_eles] != partition_book.partid)[0]]
# these are the Ids of the remote nodes in the partition. The Ids are global Ids.
local_eles_idx = np.concatenate(
[np.nonzero(trainer_id[local_eles] == i)[0] for i in range(num_client_per_part)],
# trainer_id[local_eles] is the trainer ids of local nodes in the partition and we
# pick out the indices where the node belongs to each trainer i respectively, and
# concatenate them.
axis=0
)
# `local_eles_idx` is used to sort `local_eles` according to `trainer_id`. It is a
# permutation of 0...(len(local_eles)-1)
local_eles = local_eles[local_eles_idx]
# evenly split local nodes to trainers
local_offsets = _even_offset(len(local_eles), num_client_per_part)
# evenly split remote nodes to trainers
remote_offsets = _even_offset(len(remote_eles), num_client_per_part)
client_local_eles = local_eles[
local_offsets[client_id_in_part]:local_offsets[client_id_in_part + 1]]
client_remote_eles = remote_eles[
remote_offsets[client_id_in_part]:remote_offsets[client_id_in_part + 1]]
client_eles = np.concatenate([client_local_eles, client_remote_eles], axis=0)
return F.tensor(client_eles)
def node_split(nodes, partition_book=None, ntype='_N', rank=None, force_even=True,
node_trainer_ids=None):
''' Split nodes and return a subset for the local rank.
This function splits the input nodes based on the partition book and
......@@ -1142,6 +1193,9 @@ def node_split(nodes, partition_book=None, ntype='_N', rank=None, force_even=Tru
The rank of a process. If not given, the rank of the current process is used.
force_even : bool, optional
Force the nodes are split evenly.
node_trainer_ids : 1D tensor or DistTensor, optional
If not None, split the nodes to the trainers on the same machine according to
trainer IDs assigned to each node. Otherwise, split randomly.
Returns
-------
......@@ -1155,14 +1209,39 @@ def node_split(nodes, partition_book=None, ntype='_N', rank=None, force_even=Tru
assert len(nodes) == partition_book._num_nodes(ntype), \
'The length of boolean mask vector should be the number of nodes in the graph.'
if rank is None:
rank = role.get_trainer_rank()
if force_even:
return _split_even(partition_book, rank, nodes)
num_clients = role.get_num_trainers()
num_client_per_part = num_clients // partition_book.num_partitions()
assert num_clients % partition_book.num_partitions() == 0, \
'The total number of clients should be multiple of the number of partitions.'
part_nid = _split_even_to_part(partition_book, nodes)
if num_client_per_part == 1:
return part_nid
elif node_trainer_ids is None:
return _split_random_within_part(partition_book, rank, part_nid)
else:
trainer_id = node_trainer_ids[0:len(node_trainer_ids)]
max_trainer_id = F.as_scalar(F.reduce_max(trainer_id)) + 1
if max_trainer_id > num_clients:
# We hope the partition scheme with trainer_id could be used when the number of
# trainers is less than the `num_trainers_per_machine` previously assigned during
# partitioning.
assert max_trainer_id % num_clients == 0
trainer_id //= (max_trainer_id // num_clients)
client_id_in_part = rank % num_client_per_part
return _split_by_trainer_id(partition_book, part_nid, trainer_id,
num_client_per_part, client_id_in_part)
else:
# Get all nodes that belong to the rank.
local_nids = partition_book.partid2nids(partition_book.partid)
return _split_local(partition_book, rank, nodes, local_nids)
def edge_split(edges, partition_book=None, etype='_E', rank=None, force_even=True):
def edge_split(edges, partition_book=None, etype='_E', rank=None, force_even=True,
edge_trainer_ids=None):
''' Split edges and return a subset for the local rank.
This function splits the input edges based on the partition book and
......@@ -1195,6 +1274,9 @@ def edge_split(edges, partition_book=None, etype='_E', rank=None, force_even=Tru
The rank of a process. If not given, the rank of the current process is used.
force_even : bool, optional
Force the edges are split evenly.
edge_trainer_ids : 1D tensor or DistTensor, optional
If not None, split the edges to the trainers on the same machine according to
trainer IDs assigned to each edge. Otherwise, split randomly.
Returns
-------
......@@ -1207,9 +1289,32 @@ def edge_split(edges, partition_book=None, etype='_E', rank=None, force_even=Tru
partition_book = edges.part_policy.partition_book
assert len(edges) == partition_book._num_edges(etype), \
'The length of boolean mask vector should be the number of edges in the graph.'
if rank is None:
rank = role.get_trainer_rank()
if force_even:
return _split_even(partition_book, rank, edges)
num_clients = role.get_num_trainers()
num_client_per_part = num_clients // partition_book.num_partitions()
assert num_clients % partition_book.num_partitions() == 0, \
'The total number of clients should be multiple of the number of partitions.'
part_eid = _split_even_to_part(partition_book, edges)
if num_client_per_part == 1:
return part_eid
elif edge_trainer_ids is None:
return _split_random_within_part(partition_book, rank, part_eid)
else:
trainer_id = edge_trainer_ids[0:len(edge_trainer_ids)]
max_trainer_id = F.as_scalar(F.reduce_max(trainer_id)) + 1
if max_trainer_id > num_clients:
# We hope the partition scheme with trainer_id could be used when the number of
# trainers is less than the `num_trainers_per_machine` previously assigned during
# partitioning.
assert max_trainer_id % num_clients == 0
trainer_id //= (max_trainer_id // num_clients)
client_id_in_part = rank % num_client_per_part
return _split_by_trainer_id(partition_book, part_eid, trainer_id,
num_client_per_part, client_id_in_part)
else:
# Get all edges that belong to the rank.
local_eids = partition_book.partid2eids(partition_book.partid)
......
......@@ -214,7 +214,8 @@ def load_partition_book(part_config, part_id, graph=None):
part_metadata['graph_name'], ntypes, etypes
def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method="metis",
reshuffle=True, balance_ntypes=None, balance_edges=False, return_mapping=False):
reshuffle=True, balance_ntypes=None, balance_edges=False, return_mapping=False,
num_trainers_per_machine=1):
''' Partition a graph for distributed training and store the partitions on files.
The partitioning occurs in three steps: 1) run a partition algorithm (e.g., Metis) to
......@@ -388,6 +389,12 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
return_mapping : bool
If `reshuffle=True`, this indicates to return the mapping between shuffled node/edge IDs
and the original node/edge IDs.
num_trainers_per_machine : int, optional
The number of trainers per machine. If is not 1, the whole graph will be first partitioned
to each trainer, that is num_parts*num_trainers_per_machine parts. And the trainer ids of
each node will be stored in the node feature 'trainer_id'. Then the partitions of trainers
on the same machine will be coalesced into one larger partition. The final number of
partitions is `num_part`.
Returns
-------
......@@ -453,6 +460,18 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
if num_parts == 1:
sim_g = to_homogeneous(g)
assert num_trainers_per_machine >= 1
if num_trainers_per_machine > 1:
# First partition the whole graph to each trainer and save the trainer ids in
# the node feature "trainer_id".
node_parts = metis_partition_assignment(
sim_g, num_parts * num_trainers_per_machine,
balance_ntypes=balance_ntypes,
balance_edges=balance_edges,
mode='k-way')
g.ndata['trainer_id'] = node_parts
g.edata['trainer_id'] = node_parts[g.edges()[1]]
node_parts = F.zeros((sim_g.number_of_nodes(),), F.int64, F.cpu())
parts = {}
if reshuffle:
......@@ -470,7 +489,23 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
elif part_method in ('metis', 'random'):
sim_g, balance_ntypes = get_homogeneous(g, balance_ntypes)
if part_method == 'metis':
node_parts = metis_partition_assignment(sim_g, num_parts, balance_ntypes=balance_ntypes,
assert num_trainers_per_machine >= 1
if num_trainers_per_machine > 1:
# First partition the whole graph to each trainer and save the trainer ids in
# the node feature "trainer_id".
node_parts = metis_partition_assignment(
sim_g, num_parts * num_trainers_per_machine,
balance_ntypes=balance_ntypes,
balance_edges=balance_edges,
mode='k-way')
g.ndata['trainer_id'] = node_parts
# And then coalesce the partitions of trainers on the same machine into one
# larger partition.
node_parts = node_parts // num_trainers_per_machine
else:
node_parts = metis_partition_assignment(sim_g, num_parts,
balance_ntypes=balance_ntypes,
balance_edges=balance_edges)
else:
node_parts = random_choice(num_parts, sim_g.number_of_nodes())
......
......@@ -230,7 +230,7 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
return subg_dict, None, None
def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False):
def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False, mode="k-way"):
''' This assigns nodes to different partitions with Metis partitioning algorithm.
When performing Metis partitioning, we can put some constraint on the partitioning.
......@@ -255,12 +255,15 @@ def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False):
Node type of each node
balance_edges : bool
Indicate whether to balance the edges.
mode : str, "k-way" or "recursive"
Whether use multilevel recursive bisection or multilevel k-way paritioning.
Returns
-------
a 1-D tensor
A vector with each element that indicates the partition ID of a vertex.
'''
assert mode in ("k-way", "recursive"), "'mode' can only be 'k-way' or 'recursive'"
# METIS works only on symmetric graphs.
# The METIS runs on the symmetric graph to generate the node assignment to partitions.
start = time.time()
......@@ -312,7 +315,7 @@ def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False):
vwgt = F.to_dgl_nd(vwgt)
start = time.time()
node_part = _CAPI_DGLMetisPartition_Hetero(sym_g._graph, k, vwgt)
node_part = _CAPI_DGLMetisPartition_Hetero(sym_g._graph, k, vwgt, mode)
print('Metis partitioning: {:.3f} seconds'.format(time.time() - start))
if len(node_part) == 0:
return None
......@@ -322,7 +325,7 @@ def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False):
def metis_partition(g, k, extra_cached_hops=0, reshuffle=False,
balance_ntypes=None, balance_edges=False):
balance_ntypes=None, balance_edges=False, mode="k-way"):
''' This is to partition a graph with Metis partitioning.
Metis assigns vertices to partitions. This API constructs subgraphs with the vertices assigned
......@@ -364,13 +367,16 @@ def metis_partition(g, k, extra_cached_hops=0, reshuffle=False,
Node type of each node
balance_edges : bool
Indicate whether to balance the edges.
mode : str, "k-way" or "recursive"
Whether use multilevel recursive bisection or multilevel k-way paritioning.
Returns
--------
a dict of DGLGraphs
The key is the partition ID and the value is the DGLGraph of the partition.
'''
node_part = metis_partition_assignment(g, k, balance_ntypes, balance_edges)
assert mode in ("k-way", "recursive"), "'mode' can only be 'k-way' or 'recursive'"
node_part = metis_partition_assignment(g, k, balance_ntypes, balance_edges, mode)
if node_part is None:
return None
......
......@@ -19,7 +19,10 @@ namespace transform {
#if !defined(_WIN32)
IdArray MetisPartition(UnitGraphPtr g, int k, NDArray vwgt_arr) {
IdArray MetisPartition(UnitGraphPtr g, int k, NDArray vwgt_arr, const std::string &mode) {
// Mode can only be "k-way" or "recursive"
CHECK(mode == "k-way" || mode == "recursive")
<< "mode can only be \"k-way\" or \"recursive\"";
// The index type of Metis needs to be compatible with DGL index type.
CHECK_EQ(sizeof(idx_t), sizeof(int64_t))
<< "Metis only supports int64 graph for now";
......@@ -47,6 +50,8 @@ IdArray MetisPartition(UnitGraphPtr g, int k, NDArray vwgt_arr) {
vwgt = static_cast<idx_t *>(vwgt_arr->data);
}
auto partition_func = (mode == "k-way") ? METIS_PartGraphKway : METIS_PartGraphRecursive;
idx_t options[METIS_NOPTIONS];
METIS_SetDefaultOptions(options);
options[METIS_OPTION_ONDISK] = 1;
......@@ -54,7 +59,7 @@ IdArray MetisPartition(UnitGraphPtr g, int k, NDArray vwgt_arr) {
options[METIS_OPTION_NIPARTS] = 1;
options[METIS_OPTION_DROPEDGES] = 1;
int ret = METIS_PartGraphKway(
int ret = partition_func(
&nvtxs, // The number of vertices
&ncon, // The number of balancing constraints.
xadj, // indptr
......@@ -99,8 +104,9 @@ DGL_REGISTER_GLOBAL("partition._CAPI_DGLMetisPartition_Hetero")
auto ugptr = hgptr->relation_graphs()[0];
int k = args[1];
NDArray vwgt = args[2];
std::string mode = args[3];
#if !defined(_WIN32)
*rv = MetisPartition(ugptr, k, vwgt);
*rv = MetisPartition(ugptr, k, vwgt, mode);
#else
LOG(FATAL) << "Metis partition does not support Windows.";
#endif // !defined(_WIN32)
......
......@@ -53,7 +53,7 @@ def create_random_graph(n):
return dgl.from_scipy(arr)
def run_server(graph_name, server_id, server_count, num_clients, shared_mem):
g = DistGraphServer(server_id, "kv_ip_config.txt", num_clients, server_count,
g = DistGraphServer(server_id, "kv_ip_config.txt", server_count, num_clients,
'/tmp/dist_graph/{}.json'.format(graph_name),
disable_shared_mem=not shared_mem)
print('start server', server_id)
......@@ -156,6 +156,20 @@ def run_emb_client(graph_name, part_id, server_count, num_clients, num_nodes, nu
g = DistGraph(graph_name, gpb=gpb)
check_dist_emb(g, num_clients, num_nodes, num_edges)
def run_client_hierarchy(graph_name, part_id, server_count, node_mask, edge_mask, return_dict):
time.sleep(5)
os.environ['DGL_NUM_SERVER'] = str(server_count)
dgl.distributed.initialize("kv_ip_config.txt")
gpb, graph_name, _, _ = load_partition_book('/tmp/dist_graph/{}.json'.format(graph_name),
part_id, None)
g = DistGraph(graph_name, gpb=gpb)
node_mask = F.tensor(node_mask)
edge_mask = F.tensor(edge_mask)
nodes = node_split(node_mask, g.get_partition_book(), node_trainer_ids=g.ndata['trainer_id'])
edges = edge_split(edge_mask, g.get_partition_book(), edge_trainer_ids=g.edata['trainer_id'])
rank = g.rank()
return_dict[rank] = (nodes, edges)
def check_dist_emb(g, num_clients, num_nodes, num_edges):
from dgl.distributed.optim import SparseAdagrad
from dgl.distributed.nn import NodeEmbedding
......@@ -360,6 +374,62 @@ def check_server_client(shared_mem, num_servers, num_clients):
print('clients have terminated')
def check_server_client_hierarchy(shared_mem, num_servers, num_clients):
prepare_dist()
g = create_random_graph(10000)
# Partition the graph
num_parts = 1
graph_name = 'dist_graph_test_2'
g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
partition_graph(g, graph_name, num_parts, '/tmp/dist_graph', num_trainers_per_machine=num_clients)
# let's just test on one partition for now.
# We cannot run multiple servers and clients on the same machine.
serv_ps = []
ctx = mp.get_context('spawn')
for serv_id in range(num_servers):
p = ctx.Process(target=run_server, args=(graph_name, serv_id, num_servers,
num_clients, shared_mem))
serv_ps.append(p)
p.start()
cli_ps = []
manager = mp.Manager()
return_dict = manager.dict()
node_mask = np.zeros((g.number_of_nodes(),), np.int32)
edge_mask = np.zeros((g.number_of_edges(),), np.int32)
nodes = np.random.choice(g.number_of_nodes(), g.number_of_nodes() // 10, replace=False)
edges = np.random.choice(g.number_of_edges(), g.number_of_edges() // 10, replace=False)
node_mask[nodes] = 1
edge_mask[edges] = 1
nodes = np.sort(nodes)
edges = np.sort(edges)
for cli_id in range(num_clients):
print('start client', cli_id)
p = ctx.Process(target=run_client_hierarchy, args=(graph_name, 0, num_servers,
node_mask, edge_mask, return_dict))
p.start()
cli_ps.append(p)
for p in cli_ps:
p.join()
for p in serv_ps:
p.join()
nodes1 = []
edges1 = []
for n, e in return_dict.values():
nodes1.append(n)
edges1.append(e)
nodes1, _ = F.sort_1d(F.cat(nodes1, 0))
edges1, _ = F.sort_1d(F.cat(edges1, 0))
assert np.all(F.asnumpy(nodes1) == nodes)
assert np.all(F.asnumpy(edges1) == edges)
print('clients have terminated')
def run_client_hetero(graph_name, part_id, server_count, num_clients, num_nodes, num_edges):
time.sleep(5)
......@@ -502,6 +572,7 @@ def check_server_client_hetero(shared_mem, num_servers, num_clients):
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_server_client():
os.environ['DGL_DIST_MODE'] = 'distributed'
check_server_client_hierarchy(False, 1, 4)
check_server_client_empty(True, 1, 1)
check_server_client_hetero(True, 1, 1)
check_server_client_hetero(False, 1, 1)
......@@ -693,9 +764,9 @@ def prepare_dist():
if __name__ == '__main__':
os.makedirs('/tmp/dist_graph', exist_ok=True)
test_server_client()
test_split()
test_split_even()
test_server_client()
test_standalone()
test_standalone_node_emb()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment