Unverified Commit aa42aaeb authored by kylasa's avatar kylasa Committed by GitHub
Browse files

[DistDGL][Lintrunner]Lintrunner for tools directory (#5261)

* lintrunner patch for gloo_wrapper.py

* lintrunner changes to the tools directory.
parent 8b47bad5
......@@ -4,9 +4,9 @@ import logging
import os
import time
import torch
import dgl
import torch
from dgl._ffi.base import DGLError
from dgl.data.utils import load_graphs
from dgl.utils import toindex
......
......@@ -5,11 +5,12 @@ import os
import pathlib
from contextlib import contextmanager
import dgl
import torch
from distpartitioning import array_readwriter
from files import setdir
import dgl
def chunk_numpy_array(arr, fmt_meta, chunk_sizes, path_fmt):
paths = []
......@@ -26,7 +27,9 @@ def chunk_numpy_array(arr, fmt_meta, chunk_sizes, path_fmt):
return paths
def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path, data_fmt):
def _chunk_graph(
g, name, ndata_paths, edata_paths, num_chunks, output_path, data_fmt
):
# First deal with ndata and edata that are homogeneous (i.e. not a dict-of-dict)
if len(g.ntypes) == 1 and not isinstance(
next(iter(ndata_paths.values())), dict
......@@ -96,7 +99,7 @@ def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path, dat
# Chunk node data
reader_fmt_meta, writer_fmt_meta = {"name": "numpy"}, {"name": data_fmt}
file_suffix = 'npy' if data_fmt == 'numpy' else 'parquet'
file_suffix = "npy" if data_fmt == "numpy" else "parquet"
metadata["node_data"] = {}
with setdir("node_data"):
for ntype, ndata_per_type in ndata_paths.items():
......@@ -154,7 +157,9 @@ def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path, dat
logging.info("Saved metadata in %s" % os.path.abspath(metadata_path))
def chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path, data_fmt='numpy'):
def chunk_graph(
g, name, ndata_paths, edata_paths, num_chunks, output_path, data_fmt="numpy"
):
"""
Split the graph into multiple chunks.
......@@ -185,7 +190,9 @@ def chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path, data
for key in edata.keys():
edata[key] = os.path.abspath(edata[key])
with setdir(output_path):
_chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path, data_fmt)
_chunk_graph(
g, name, ndata_paths, edata_paths, num_chunks, output_path, data_fmt
)
if __name__ == "__main__":
......
......@@ -64,7 +64,9 @@ def submit_jobs(args) -> str:
argslist = ""
argslist += "--world-size {} ".format(num_ips)
argslist += "--partitions-dir {} ".format(os.path.abspath(args.partitions_dir))
argslist += "--partitions-dir {} ".format(
os.path.abspath(args.partitions_dir)
)
argslist += "--input-dir {} ".format(os.path.abspath(args.in_dir))
argslist += "--graph-name {} ".format(graph_name)
argslist += "--schema {} ".format(schema_path)
......@@ -74,7 +76,9 @@ def submit_jobs(args) -> str:
argslist += "--log-level {} ".format(args.log_level)
argslist += "--save-orig-nids " if args.save_orig_nids else ""
argslist += "--save-orig-eids " if args.save_orig_eids else ""
argslist += f"--graph-formats {args.graph_formats} " if args.graph_formats else ""
argslist += (
f"--graph-formats {args.graph_formats} " if args.graph_formats else ""
)
# (BarclayII) Is it safe to assume all the workers have the Python executable at the same path?
pipeline_cmd = os.path.join(INSTALL_DIR, PIPELINE_SCRIPT)
......@@ -153,9 +157,9 @@ def main():
type=str,
default=None,
help="Save partitions in specified formats. It could be any combination(joined with ``,``) "
"of ``coo``, ``csc`` and ``csr``. If not specified, save one format only according to "
"what format is available. If multiple formats are available, selection priority "
"from high to low is ``coo``, ``csc``, ``csr``.",
"of ``coo``, ``csc`` and ``csr``. If not specified, save one format only according to "
"what format is available. If multiple formats are available, selection priority "
"from high to low is ``coo``, ``csc``, ``csr``.",
)
args, udf_command = parser.parse_known_args()
......
"""Launching tool for DGL distributed training"""
import os
import stat
import sys
import subprocess
import argparse
import signal
import logging
import time
import json
import logging
import multiprocessing
import os
import re
import signal
import stat
import subprocess
import sys
import time
from functools import partial
from threading import Thread
from typing import Optional
DEFAULT_PORT = 30050
def cleanup_proc(get_all_remote_pids, conn):
'''This process tries to clean up the remote training tasks.
'''
print('cleanupu process runs')
"""This process tries to clean up the remote training tasks."""
print("cleanupu process runs")
# This process should not handle SIGINT.
signal.signal(signal.SIGINT, signal.SIG_IGN)
data = conn.recv()
# If the launch process exits normally, this process doesn't need to do anything.
if data == 'exit':
if data == "exit":
sys.exit(0)
else:
remote_pids = get_all_remote_pids()
# Otherwise, we need to ssh to each machine and kill the training jobs.
for (ip, port), pids in remote_pids.items():
kill_process(ip, port, pids)
print('cleanup process exits')
print("cleanup process exits")
def kill_process(ip, port, pids):
'''ssh to a remote machine and kill the specified processes.
'''
"""ssh to a remote machine and kill the specified processes."""
curr_pid = os.getpid()
killed_pids = []
# If we kill child processes first, the parent process may create more again. This happens
......@@ -44,8 +44,14 @@ def kill_process(ip, port, pids):
pids.sort()
for pid in pids:
assert curr_pid != pid
print('kill process {} on {}:{}'.format(pid, ip, port), flush=True)
kill_cmd = 'ssh -o StrictHostKeyChecking=no -p ' + str(port) + ' ' + ip + ' \'kill {}\''.format(pid)
print("kill process {} on {}:{}".format(pid, ip, port), flush=True)
kill_cmd = (
"ssh -o StrictHostKeyChecking=no -p "
+ str(port)
+ " "
+ ip
+ " 'kill {}'".format(pid)
)
subprocess.run(kill_cmd, shell=True)
killed_pids.append(pid)
# It's possible that some of the processes are not killed. Let's try again.
......@@ -56,29 +62,41 @@ def kill_process(ip, port, pids):
else:
killed_pids.sort()
for pid in killed_pids:
print('kill process {} on {}:{}'.format(pid, ip, port), flush=True)
kill_cmd = 'ssh -o StrictHostKeyChecking=no -p ' + str(port) + ' ' + ip + ' \'kill -9 {}\''.format(pid)
print(
"kill process {} on {}:{}".format(pid, ip, port), flush=True
)
kill_cmd = (
"ssh -o StrictHostKeyChecking=no -p "
+ str(port)
+ " "
+ ip
+ " 'kill -9 {}'".format(pid)
)
subprocess.run(kill_cmd, shell=True)
def get_killed_pids(ip, port, killed_pids):
'''Get the process IDs that we want to kill but are still alive.
'''
"""Get the process IDs that we want to kill but are still alive."""
killed_pids = [str(pid) for pid in killed_pids]
killed_pids = ','.join(killed_pids)
ps_cmd = 'ssh -o StrictHostKeyChecking=no -p ' + str(port) + ' ' + ip + ' \'ps -p {} -h\''.format(killed_pids)
killed_pids = ",".join(killed_pids)
ps_cmd = (
"ssh -o StrictHostKeyChecking=no -p "
+ str(port)
+ " "
+ ip
+ " 'ps -p {} -h'".format(killed_pids)
)
res = subprocess.run(ps_cmd, shell=True, stdout=subprocess.PIPE)
pids = []
for p in res.stdout.decode('utf-8').split('\n'):
for p in res.stdout.decode("utf-8").split("\n"):
l = p.split()
if len(l) > 0:
pids.append(int(l[0]))
return pids
def execute_remote(
cmd: str,
ip: str,
port: int,
username: Optional[str] = ""
cmd: str, ip: str, port: int, username: Optional[str] = ""
) -> Thread:
"""Execute command line on remote machine via ssh.
......@@ -115,15 +133,21 @@ def execute_remote(
thread.start()
return thread
def get_remote_pids(ip, port, cmd_regex):
"""Get the process IDs that run the command in the remote machine.
"""
"""Get the process IDs that run the command in the remote machine."""
pids = []
curr_pid = os.getpid()
# Here we want to get the python processes. We may get some ssh processes, so we should filter them out.
ps_cmd = 'ssh -o StrictHostKeyChecking=no -p ' + str(port) + ' ' + ip + ' \'ps -aux | grep python | grep -v StrictHostKeyChecking\''
ps_cmd = (
"ssh -o StrictHostKeyChecking=no -p "
+ str(port)
+ " "
+ ip
+ " 'ps -aux | grep python | grep -v StrictHostKeyChecking'"
)
res = subprocess.run(ps_cmd, shell=True, stdout=subprocess.PIPE)
for p in res.stdout.decode('utf-8').split('\n'):
for p in res.stdout.decode("utf-8").split("\n"):
l = p.split()
if len(l) < 2:
continue
......@@ -132,28 +156,34 @@ def get_remote_pids(ip, port, cmd_regex):
if res is not None and int(l[1]) != curr_pid:
pids.append(l[1])
pid_str = ','.join([str(pid) for pid in pids])
ps_cmd = 'ssh -o StrictHostKeyChecking=no -p ' + str(port) + ' ' + ip + ' \'pgrep -P {}\''.format(pid_str)
pid_str = ",".join([str(pid) for pid in pids])
ps_cmd = (
"ssh -o StrictHostKeyChecking=no -p "
+ str(port)
+ " "
+ ip
+ " 'pgrep -P {}'".format(pid_str)
)
res = subprocess.run(ps_cmd, shell=True, stdout=subprocess.PIPE)
pids1 = res.stdout.decode('utf-8').split('\n')
pids1 = res.stdout.decode("utf-8").split("\n")
all_pids = []
for pid in set(pids + pids1):
if pid == '' or int(pid) == curr_pid:
if pid == "" or int(pid) == curr_pid:
continue
all_pids.append(int(pid))
all_pids.sort()
return all_pids
def get_all_remote_pids(hosts, ssh_port, udf_command):
'''Get all remote processes.
'''
"""Get all remote processes."""
remote_pids = {}
for node_id, host in enumerate(hosts):
ip, _ = host
# When creating training processes in remote machines, we may insert some arguments
# in the commands. We need to use regular expressions to match the modified command.
cmds = udf_command.split()
new_udf_command = ' .*'.join(cmds)
new_udf_command = " .*".join(cmds)
pids = get_remote_pids(ip, ssh_port, new_udf_command)
remote_pids[(ip, ssh_port)] = pids
return remote_pids
......@@ -164,7 +194,7 @@ def construct_torch_dist_launcher_cmd(
num_nodes: int,
node_rank: int,
master_addr: str,
master_port: int
master_port: int,
) -> str:
"""Constructs the torch distributed launcher command.
Helper function.
......@@ -179,18 +209,20 @@ def construct_torch_dist_launcher_cmd(
Returns:
cmd_str.
"""
torch_cmd_template = "-m torch.distributed.launch " \
"--nproc_per_node={nproc_per_node} " \
"--nnodes={nnodes} " \
"--node_rank={node_rank} " \
"--master_addr={master_addr} " \
"--master_port={master_port}"
torch_cmd_template = (
"-m torch.distributed.launch "
"--nproc_per_node={nproc_per_node} "
"--nnodes={nnodes} "
"--node_rank={node_rank} "
"--master_addr={master_addr} "
"--master_port={master_port}"
)
return torch_cmd_template.format(
nproc_per_node=num_trainers,
nnodes=num_nodes,
node_rank=node_rank,
master_addr=master_addr,
master_port=master_port
master_port=master_port,
)
......@@ -233,7 +265,7 @@ def wrap_udf_in_torch_dist_launcher(
num_nodes=num_nodes,
node_rank=node_rank,
master_addr=master_addr,
master_port=master_port
master_port=master_port,
)
# Auto-detect the python binary that kicks off the distributed trainer code.
# Note: This allowlist order matters, this will match with the FIRST matching entry. Thus, please add names to this
......@@ -241,9 +273,14 @@ def wrap_udf_in_torch_dist_launcher(
# (python3.7, python3.8) -> (python3)
# The allowed python versions are from this: https://www.dgl.ai/pages/start.html
python_bin_allowlist = (
"python3.6", "python3.7", "python3.8", "python3.9", "python3",
"python3.6",
"python3.7",
"python3.8",
"python3.9",
"python3",
# for backwards compatibility, accept python2 but technically DGL is a py3 library, so this is not recommended
"python2.7", "python2",
"python2.7",
"python2",
)
# If none of the candidate python bins match, then we go with the default `python`
python_bin = "python"
......@@ -258,7 +295,9 @@ def wrap_udf_in_torch_dist_launcher(
# python -m torch.distributed.launch [DIST TORCH ARGS] path/to/dist_trainer.py arg0 arg1
# Note: if there are multiple python commands in `udf_command`, this may do the Wrong Thing, eg launch each
# python command within the torch distributed launcher.
new_udf_command = udf_command.replace(python_bin, f"{python_bin} {torch_dist_cmd}")
new_udf_command = udf_command.replace(
python_bin, f"{python_bin} {torch_dist_cmd}"
)
return new_udf_command
......@@ -322,6 +361,7 @@ def wrap_cmd_with_local_envvars(cmd: str, env_vars: str) -> str:
# https://stackoverflow.com/a/45993803
return f"(export {env_vars}; {cmd})"
def wrap_cmd_with_extra_envvars(cmd: str, env_vars: list) -> str:
"""Wraps a CLI command with extra env vars
......@@ -341,6 +381,7 @@ def wrap_cmd_with_extra_envvars(cmd: str, env_vars: list) -> str:
env_vars = " ".join(env_vars)
return wrap_cmd_with_local_envvars(cmd, env_vars)
def submit_jobs(args, udf_command):
"""Submit distributed jobs (server and client processes) via ssh"""
hosts = []
......@@ -348,7 +389,7 @@ def submit_jobs(args, udf_command):
server_count_per_machine = 0
# Get the IP addresses of the cluster.
#ip_config = os.path.join(args.workspace, args.ip_config)
# ip_config = os.path.join(args.workspace, args.ip_config)
ip_config = args.ip_config
with open(ip_config) as f:
for line in f:
......@@ -376,58 +417,76 @@ def submit_jobs(args, udf_command):
server_env_vars_cur = f"{server_env_vars} RANK={i} MASTER_ADDR={hosts[0][0]} MASTER_PORT={args.master_port}"
cmd = wrap_cmd_with_local_envvars(udf_command, server_env_vars_cur)
print(cmd)
thread_list.append(execute_remote(cmd, ip, args.ssh_port, username=args.ssh_username))
thread_list.append(
execute_remote(cmd, ip, args.ssh_port, username=args.ssh_username)
)
# Start a cleanup process dedicated for cleaning up remote training jobs.
conn1,conn2 = multiprocessing.Pipe()
conn1, conn2 = multiprocessing.Pipe()
func = partial(get_all_remote_pids, hosts, args.ssh_port, udf_command)
process = multiprocessing.Process(target=cleanup_proc, args=(func, conn1))
process.start()
def signal_handler(signal, frame):
logging.info('Stop launcher')
logging.info("Stop launcher")
# We need to tell the cleanup process to kill remote training jobs.
conn2.send('cleanup')
conn2.send("cleanup")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
for thread in thread_list:
thread.join()
# The training processes complete. We should tell the cleanup process to exit.
conn2.send('exit')
conn2.send("exit")
process.join()
def main():
parser = argparse.ArgumentParser(description='Launch a distributed job')
parser.add_argument('--ssh_port', type=int, default=22, help='SSH Port.')
parser = argparse.ArgumentParser(description="Launch a distributed job")
parser.add_argument("--ssh_port", type=int, default=22, help="SSH Port.")
parser.add_argument(
"--ssh_username", default="",
"--ssh_username",
default="",
help="Optional. When issuing commands (via ssh) to cluster, use the provided username in the ssh cmd. "
"Example: If you provide --ssh_username=bob, then the ssh command will be like: 'ssh bob@1.2.3.4 CMD' "
"instead of 'ssh 1.2.3.4 CMD'"
"Example: If you provide --ssh_username=bob, then the ssh command will be like: 'ssh bob@1.2.3.4 CMD' "
"instead of 'ssh 1.2.3.4 CMD'",
)
parser.add_argument(
"--num_proc_per_machine",
type=int,
help="The number of server processes per machine",
)
parser.add_argument(
"--master_port",
type=int,
help="This port is used to form gloo group (randevouz server)",
)
parser.add_argument(
"--ip_config",
type=str,
help="The file (in workspace) of IP configuration for server processes",
)
parser.add_argument('--num_proc_per_machine', type=int,
help='The number of server processes per machine')
parser.add_argument('--master_port', type=int,
help='This port is used to form gloo group (randevouz server)')
parser.add_argument('--ip_config', type=str,
help='The file (in workspace) of IP configuration for server processes')
args, udf_command = parser.parse_known_args()
assert len(udf_command) == 1, 'Please provide user command line.'
assert args.num_proc_per_machine is not None and args.num_proc_per_machine > 0, \
'--num_proc_per_machine must be a positive number.'
assert args.ip_config is not None, \
'A user has to specify an IP configuration file with --ip_config.'
assert len(udf_command) == 1, "Please provide user command line."
assert (
args.num_proc_per_machine is not None and args.num_proc_per_machine > 0
), "--num_proc_per_machine must be a positive number."
assert (
args.ip_config is not None
), "A user has to specify an IP configuration file with --ip_config."
udf_command = str(udf_command[0])
if 'python' not in udf_command:
raise RuntimeError("DGL launching script can only support Python executable file.")
if "python" not in udf_command:
raise RuntimeError(
"DGL launching script can only support Python executable file."
)
submit_jobs(args, udf_command)
if __name__ == '__main__':
fmt = '%(asctime)s %(levelname)s %(message)s'
if __name__ == "__main__":
fmt = "%(asctime)s %(levelname)s %(message)s"
logging.basicConfig(format=fmt, level=logging.INFO)
main()
......@@ -30,7 +30,9 @@ class ParquetArrayParser(object):
# Spark ML feature processing produces single-column parquet files where each row is a vector object
if len(data_types) == 1 and isinstance(data_types[0], pyarrow.ListType):
arr = np.array(table.to_pandas().iloc[:, 0].to_list())
logging.debug(f"Parquet data under {path} converted from single vector per row to ndarray")
logging.debug(
f"Parquet data under {path} converted from single vector per row to ndarray"
)
else:
arr = table.to_pandas().to_numpy()
if not shape:
......@@ -49,8 +51,8 @@ class ParquetArrayParser(object):
array = array.reshape(shape[0], -1)
if vector_rows:
table = pyarrow.table(
[pyarrow.array(array.tolist())],
names=["vector"])
[pyarrow.array(array.tolist())], names=["vector"]
)
logging.info("Writing to %s using single-vector rows..." % path)
else:
table = pyarrow.Table.from_pandas(pd.DataFrame(array))
......
......@@ -37,4 +37,4 @@ STR_NAME = "name"
STR_GRAPH_NAME = "graph_name"
STR_NODE_FEATURES = "node_features"
STR_EDGE_FEATURES = "edge_features"
\ No newline at end of file
STR_EDGE_FEATURES = "edge_features"
......@@ -5,43 +5,50 @@ import logging
import os
import time
import constants
import dgl
import numpy as np
import pandas as pd
import pyarrow
import torch as th
from pyarrow import csv
import constants
from utils import get_idranges, memory_snapshot, read_json
from dgl.distributed.partition import (
RESERVED_FIELD_DTYPE,
_etype_str_to_tuple,
_etype_tuple_to_str,
RESERVED_FIELD_DTYPE,
)
from pyarrow import csv
from utils import get_idranges, memory_snapshot, read_json
def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
return_orig_nids=False, return_orig_eids=False):
def create_dgl_object(
schema,
part_id,
node_data,
edge_data,
edgeid_offset,
return_orig_nids=False,
return_orig_eids=False,
):
"""
This function creates dgl objects for a given graph partition, as in function
arguments.
arguments.
The "schema" argument is a dictionary, which contains the metadata related to node ids
and edge ids. It contains two keys: "nid" and "eid", whose value is also a dictionary
with the following structure.
with the following structure.
1. The key-value pairs in the "nid" dictionary has the following format.
"ntype-name" is the user assigned name to this node type. "format" describes the
"ntype-name" is the user assigned name to this node type. "format" describes the
format of the contents of the files. and "data" is a list of lists, each list has
3 elements: file-name, start_id and end_id. File-name can be either absolute or
relative path to this file and starting and ending ids are type ids of the nodes
relative path to this file and starting and ending ids are type ids of the nodes
which are contained in this file. These type ids are later used to compute global ids
of these nodes which are used throughout the processing of this pipeline.
of these nodes which are used throughout the processing of this pipeline.
"ntype-name" : {
"format" : "csv",
"format" : "csv",
"data" : [
[ <path-to-file>/ntype0-name-0.csv, start_id0, end_id0],
[ <path-to-file>/ntype0-name-0.csv, start_id0, end_id0],
[ <path-to-file>/ntype0-name-1.csv, start_id1, end_id1],
...
[ <path-to-file>/ntype0-name-<p-1>.csv, start_id<p-1>, end_id<p-1>],
......@@ -50,11 +57,11 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
2. The key-value pairs in the "eid" dictionary has the following format.
As described for the "nid" dictionary the "eid" dictionary is similarly structured
except that these entries are for edges.
except that these entries are for edges.
"etype-name" : {
"format" : "csv",
"format" : "csv",
"data" : [
[ <path-to-file>/etype0-name-0, start_id0, end_id0],
[ <path-to-file>/etype0-name-0, start_id0, end_id0],
[ <path-to-file>/etype0-name-1 start_id1, end_id1],
...
[ <path-to-file>/etype0-name-1 start_id<p-1>, end_id<p-1>]
......@@ -62,8 +69,8 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
}
In "nid" dictionary, the type_nids are specified that
should be assigned to nodes which are read from the corresponding nodes file.
Along the same lines dictionary for the key "eid" is used for edges in the
should be assigned to nodes which are read from the corresponding nodes file.
Along the same lines dictionary for the key "eid" is used for edges in the
input graph.
These type ids, for nodes and edges, are used to compute global ids for nodes
......@@ -79,14 +86,14 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
node_data, where each row is of the following format:
<global_nid> <ntype_id> <global_type_nid>
edge_data : numpy ndarray
edge_data, where each row is of the following format:
edge_data, where each row is of the following format:
<global_src_id> <global_dst_id> <etype_id> <global_type_eid>
edgeid_offset : int
offset to be used when assigning edge global ids in the current partition
return_orig_ids : bool, optional
Indicates whether to return original node/edge IDs.
Returns:
Returns:
--------
dgl object
dgl object created for the current graph partition
......@@ -107,12 +114,16 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
and value is a 1D tensor mapping between shuffled edge IDs and the original edge
IDs for each edge type. Otherwise, ``None`` is returned.
"""
#create auxiliary data structures from the schema object
# create auxiliary data structures from the schema object
memory_snapshot("CreateDGLObj_Begin", part_id)
_, global_nid_ranges = get_idranges(schema[constants.STR_NODE_TYPE],
schema[constants.STR_NUM_NODES_PER_CHUNK])
_, global_eid_ranges = get_idranges(schema[constants.STR_EDGE_TYPE],
schema[constants.STR_NUM_EDGES_PER_CHUNK])
_, global_nid_ranges = get_idranges(
schema[constants.STR_NODE_TYPE],
schema[constants.STR_NUM_NODES_PER_CHUNK],
)
_, global_eid_ranges = get_idranges(
schema[constants.STR_EDGE_TYPE],
schema[constants.STR_NUM_EDGES_PER_CHUNK],
)
id_map = dgl.distributed.id_map.IdMap(global_nid_ranges)
......@@ -147,15 +158,15 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
assert np.all(shuffle_global_nids[1:] - shuffle_global_nids[:-1] == 1)
shuffle_global_nid_range = (shuffle_global_nids[0], shuffle_global_nids[-1])
# Determine the node ID ranges of different node types.
for ntype_name in global_nid_ranges:
ntype_id = ntypes_map[ntype_name]
type_nids = shuffle_global_nids[ntype_ids == ntype_id]
node_map_val[ntype_name].append(
[int(type_nids[0]), int(type_nids[-1]) + 1])
[int(type_nids[0]), int(type_nids[-1]) + 1]
)
#process edges
# process edges
memory_snapshot("CreateDGLObj_AssignEdgeData: ", part_id)
shuffle_global_src_id = edge_data[constants.SHUFFLE_GLOBAL_SRC_ID]
edge_data.pop(constants.SHUFFLE_GLOBAL_SRC_ID)
......@@ -180,28 +191,43 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
etype_ids = edge_data[constants.ETYPE_ID]
edge_data.pop(constants.ETYPE_ID)
edge_data = None
gc.collect()
logging.info(f'There are {len(shuffle_global_src_id)} edges in partition {part_id}')
gc.collect()
logging.info(
f"There are {len(shuffle_global_src_id)} edges in partition {part_id}"
)
# It's not guaranteed that the edges are sorted based on edge type.
# Let's sort edges and all attributes on the edges.
if not np.all(np.diff(etype_ids) >= 0):
sort_idx = np.argsort(etype_ids)
shuffle_global_src_id, shuffle_global_dst_id, global_src_id, global_dst_id, global_edge_id, etype_ids = \
shuffle_global_src_id[sort_idx], shuffle_global_dst_id[sort_idx], global_src_id[sort_idx], \
global_dst_id[sort_idx], global_edge_id[sort_idx], etype_ids[sort_idx]
(
shuffle_global_src_id,
shuffle_global_dst_id,
global_src_id,
global_dst_id,
global_edge_id,
etype_ids,
) = (
shuffle_global_src_id[sort_idx],
shuffle_global_dst_id[sort_idx],
global_src_id[sort_idx],
global_dst_id[sort_idx],
global_edge_id[sort_idx],
etype_ids[sort_idx],
)
assert np.all(np.diff(etype_ids) >= 0)
else:
print(f'[Rank: {part_id} Edge data is already sorted !!!')
print(f"[Rank: {part_id} Edge data is already sorted !!!")
# Determine the edge ID range of different edge types.
edge_id_start = edgeid_offset
edge_id_start = edgeid_offset
for etype_name in global_eid_ranges:
etype = _etype_str_to_tuple(etype_name)
assert len(etype) == 3
etype_id = etypes_map[etype]
edge_map_val[etype].append([edge_id_start,
edge_id_start + np.sum(etype_ids == etype_id)])
edge_map_val[etype].append(
[edge_id_start, edge_id_start + np.sum(etype_ids == etype_id)]
)
edge_id_start += np.sum(etype_ids == etype_id)
memory_snapshot("CreateDGLObj_UniqueNodeIds: ", part_id)
......@@ -209,25 +235,38 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
# Here the order of nodes is defined by the `np.unique` function
# node order is as listed in the uniq_ids array
ids = np.concatenate(
[shuffle_global_src_id, shuffle_global_dst_id,
np.arange(shuffle_global_nid_range[0], shuffle_global_nid_range[1] + 1)])
[
shuffle_global_src_id,
shuffle_global_dst_id,
np.arange(
shuffle_global_nid_range[0], shuffle_global_nid_range[1] + 1
),
]
)
uniq_ids, idx, inverse_idx = np.unique(
ids, return_index=True, return_inverse=True)
ids, return_index=True, return_inverse=True
)
assert len(uniq_ids) == len(idx)
# We get the edge list with their node IDs mapped to a contiguous ID range.
part_local_src_id, part_local_dst_id = np.split(inverse_idx[:len(shuffle_global_src_id) * 2], 2)
inner_nodes = th.as_tensor(np.logical_and(
part_local_src_id, part_local_dst_id = np.split(
inverse_idx[: len(shuffle_global_src_id) * 2], 2
)
inner_nodes = th.as_tensor(
np.logical_and(
uniq_ids >= shuffle_global_nid_range[0],
uniq_ids <= shuffle_global_nid_range[1]))
uniq_ids <= shuffle_global_nid_range[1],
)
)
#get the list of indices, from inner_nodes, which will sort inner_nodes as [True, True, ...., False, False, ...]
#essentially local nodes will be placed before non-local nodes.
# get the list of indices, from inner_nodes, which will sort inner_nodes as [True, True, ...., False, False, ...]
# essentially local nodes will be placed before non-local nodes.
reshuffle_nodes = th.arange(len(uniq_ids))
reshuffle_nodes = th.cat([reshuffle_nodes[inner_nodes.bool()],
reshuffle_nodes[inner_nodes == 0]])
reshuffle_nodes = th.cat(
[reshuffle_nodes[inner_nodes.bool()], reshuffle_nodes[inner_nodes == 0]]
)
'''
"""
Following procedure is used to map the part_local_src_id, part_local_dst_id to account for
reshuffling of nodes (to order localy owned nodes prior to non-local nodes in a partition)
1. Form a node_map, in this case a numpy array, which will be used to map old node-ids (pre-reshuffling)
......@@ -261,59 +300,100 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
Since the edge are re-ordered in any way, there is no reordering required for edge related data
during the DGL object creation.
'''
#create the mappings to generate mapped part_local_src_id and part_local_dst_id
#This map will map from unshuffled node-ids to reshuffled-node-ids (which are ordered to prioritize
#locally owned nodes).
nid_map = np.zeros((len(reshuffle_nodes,)))
"""
# create the mappings to generate mapped part_local_src_id and part_local_dst_id
# This map will map from unshuffled node-ids to reshuffled-node-ids (which are ordered to prioritize
# locally owned nodes).
nid_map = np.zeros(
(
len(
reshuffle_nodes,
)
)
)
nid_map[reshuffle_nodes] = np.arange(len(reshuffle_nodes))
#Now map the edge end points to reshuffled_values.
part_local_src_id, part_local_dst_id = nid_map[part_local_src_id], nid_map[part_local_dst_id]
# Now map the edge end points to reshuffled_values.
part_local_src_id, part_local_dst_id = (
nid_map[part_local_src_id],
nid_map[part_local_dst_id],
)
#create the graph here now.
part_graph = dgl.graph(data=(part_local_src_id, part_local_dst_id), num_nodes=len(uniq_ids))
# create the graph here now.
part_graph = dgl.graph(
data=(part_local_src_id, part_local_dst_id), num_nodes=len(uniq_ids)
)
part_graph.edata[dgl.EID] = th.arange(
edgeid_offset, edgeid_offset + part_graph.number_of_edges(), dtype=th.int64)
part_graph.edata[dgl.ETYPE] = th.as_tensor(etype_ids, dtype=RESERVED_FIELD_DTYPE[dgl.ETYPE])
part_graph.edata['inner_edge'] = th.ones(part_graph.number_of_edges(),
dtype=RESERVED_FIELD_DTYPE['inner_edge'])
#compute per_type_ids and ntype for all the nodes in the graph.
global_ids = np.concatenate(
[global_src_id, global_dst_id, global_homo_nid])
edgeid_offset,
edgeid_offset + part_graph.number_of_edges(),
dtype=th.int64,
)
part_graph.edata[dgl.ETYPE] = th.as_tensor(
etype_ids, dtype=RESERVED_FIELD_DTYPE[dgl.ETYPE]
)
part_graph.edata["inner_edge"] = th.ones(
part_graph.number_of_edges(), dtype=RESERVED_FIELD_DTYPE["inner_edge"]
)
# compute per_type_ids and ntype for all the nodes in the graph.
global_ids = np.concatenate([global_src_id, global_dst_id, global_homo_nid])
part_global_ids = global_ids[idx]
part_global_ids = part_global_ids[reshuffle_nodes]
ntype, per_type_ids = id_map(part_global_ids)
#continue with the graph creation
part_graph.ndata[dgl.NTYPE] = th.as_tensor(ntype, dtype=RESERVED_FIELD_DTYPE[dgl.NTYPE])
# continue with the graph creation
part_graph.ndata[dgl.NTYPE] = th.as_tensor(
ntype, dtype=RESERVED_FIELD_DTYPE[dgl.NTYPE]
)
part_graph.ndata[dgl.NID] = th.as_tensor(uniq_ids[reshuffle_nodes])
part_graph.ndata['inner_node'] = th.as_tensor(inner_nodes[reshuffle_nodes],
dtype=RESERVED_FIELD_DTYPE['inner_node'])
part_graph.ndata["inner_node"] = th.as_tensor(
inner_nodes[reshuffle_nodes], dtype=RESERVED_FIELD_DTYPE["inner_node"]
)
orig_nids = None
orig_eids = None
if return_orig_nids:
orig_nids = {}
for ntype, ntype_id in ntypes_map.items():
mask = th.logical_and(part_graph.ndata[dgl.NTYPE] == ntype_id,
part_graph.ndata['inner_node'])
mask = th.logical_and(
part_graph.ndata[dgl.NTYPE] == ntype_id,
part_graph.ndata["inner_node"],
)
orig_nids[ntype] = th.as_tensor(per_type_ids[mask])
if return_orig_eids:
orig_eids = {}
for etype, etype_id in etypes_map.items():
mask = th.logical_and(part_graph.edata[dgl.ETYPE] == etype_id,
part_graph.edata['inner_edge'])
orig_eids[_etype_tuple_to_str(etype)] = th.as_tensor(global_edge_id[mask])
return part_graph, node_map_val, edge_map_val, ntypes_map, etypes_map, \
orig_nids, orig_eids
def create_metadata_json(graph_name, num_nodes, num_edges, part_id, num_parts, node_map_val, \
edge_map_val, ntypes_map, etypes_map, output_dir ):
mask = th.logical_and(
part_graph.edata[dgl.ETYPE] == etype_id,
part_graph.edata["inner_edge"],
)
orig_eids[_etype_tuple_to_str(etype)] = th.as_tensor(
global_edge_id[mask]
)
return (
part_graph,
node_map_val,
edge_map_val,
ntypes_map,
etypes_map,
orig_nids,
orig_eids,
)
def create_metadata_json(
graph_name,
num_nodes,
num_edges,
part_id,
num_parts,
node_map_val,
edge_map_val,
ntypes_map,
etypes_map,
output_dir,
):
"""
Auxiliary function to create json file for the graph partition metadata
......@@ -338,7 +418,7 @@ def create_metadata_json(graph_name, num_nodes, num_edges, part_id, num_parts, n
etypes_map : dictionary
map between edge type(string) and edge_type_id(int)
output_dir : string
directory where the output files are to be stored
directory where the output files are to be stored
Returns:
--------
......@@ -346,22 +426,26 @@ def create_metadata_json(graph_name, num_nodes, num_edges, part_id, num_parts, n
map describing the graph information
"""
part_metadata = {'graph_name': graph_name,
'num_nodes': num_nodes,
'num_edges': num_edges,
'part_method': 'metis',
'num_parts': num_parts,
'halo_hops': 1,
'node_map': node_map_val,
'edge_map': edge_map_val,
'ntypes': ntypes_map,
'etypes': etypes_map}
part_dir = 'part' + str(part_id)
part_metadata = {
"graph_name": graph_name,
"num_nodes": num_nodes,
"num_edges": num_edges,
"part_method": "metis",
"num_parts": num_parts,
"halo_hops": 1,
"node_map": node_map_val,
"edge_map": edge_map_val,
"ntypes": ntypes_map,
"etypes": etypes_map,
}
part_dir = "part" + str(part_id)
node_feat_file = os.path.join(part_dir, "node_feat.dgl")
edge_feat_file = os.path.join(part_dir, "edge_feat.dgl")
part_graph_file = os.path.join(part_dir, "graph.dgl")
part_metadata['part-{}'.format(part_id)] = {'node_feats': node_feat_file,
'edge_feats': edge_feat_file,
'part_graph': part_graph_file}
part_metadata["part-{}".format(part_id)] = {
"node_feats": node_feat_file,
"edge_feats": edge_feat_file,
"part_graph": part_graph_file,
}
return part_metadata
......@@ -8,61 +8,104 @@ import torch.multiprocessing as mp
from data_shuffle import multi_machine_run, single_machine_run
def log_params(params):
""" Print all the command line arguments for debugging purposes.
def log_params(params):
"""Print all the command line arguments for debugging purposes.
Parameters:
-----------
params: argparse object
Argument Parser structure listing all the pre-defined parameters
"""
print('Input Dir: ', params.input_dir)
print('Graph Name: ', params.graph_name)
print('Schema File: ', params.schema)
print('No. partitions: ', params.num_parts)
print('Output Dir: ', params.output)
print('WorldSize: ', params.world_size)
print('Metis partitions: ', params.partitions_file)
print("Input Dir: ", params.input_dir)
print("Graph Name: ", params.graph_name)
print("Schema File: ", params.schema)
print("No. partitions: ", params.num_parts)
print("Output Dir: ", params.output)
print("WorldSize: ", params.world_size)
print("Metis partitions: ", params.partitions_file)
if __name__ == "__main__":
"""
Start of execution from this point.
"""
Start of execution from this point.
Invoke the appropriate function to begin execution
"""
#arguments which are already needed by the existing implementation of convert_partition.py
parser = argparse.ArgumentParser(description='Construct graph partitions')
parser.add_argument('--input-dir', required=True, type=str,
help='The directory path that contains the partition results.')
parser.add_argument('--graph-name', required=True, type=str,
help='The graph name')
parser.add_argument('--schema', required=True, type=str,
help='The schema of the graph')
parser.add_argument('--num-parts', required=True, type=int,
help='The number of partitions')
parser.add_argument('--output', required=True, type=str,
help='The output directory of the partitioned results')
parser.add_argument('--partitions-dir', help='directory of the partition-ids for each node type',
default=None, type=str)
parser.add_argument('--log-level', type=str, default="info",
help='To enable log level for debugging purposes. Available options: \
# arguments which are already needed by the existing implementation of convert_partition.py
parser = argparse.ArgumentParser(description="Construct graph partitions")
parser.add_argument(
"--input-dir",
required=True,
type=str,
help="The directory path that contains the partition results.",
)
parser.add_argument(
"--graph-name", required=True, type=str, help="The graph name"
)
parser.add_argument(
"--schema", required=True, type=str, help="The schema of the graph"
)
parser.add_argument(
"--num-parts", required=True, type=int, help="The number of partitions"
)
parser.add_argument(
"--output",
required=True,
type=str,
help="The output directory of the partitioned results",
)
parser.add_argument(
"--partitions-dir",
help="directory of the partition-ids for each node type",
default=None,
type=str,
)
parser.add_argument(
"--log-level",
type=str,
default="info",
help="To enable log level for debugging purposes. Available options: \
(Critical, Error, Warning, Info, Debug, Notset), default value \
is: Info')
is: Info",
)
#arguments needed for the distributed implementation
parser.add_argument('--world-size', help='no. of processes to spawn',
default=1, type=int, required=True)
parser.add_argument('--process-group-timeout', required=True, type=int,
help='timeout[seconds] for operations executed against the process group '
'(see torch.distributed.init_process_group)')
parser.add_argument('--save-orig-nids', action='store_true',
help='Save original node IDs into files')
parser.add_argument('--save-orig-eids', action='store_true',
help='Save original edge IDs into files')
parser.add_argument('--graph-formats', default=None, type=str,
help='Save partitions in specified formats.')
# arguments needed for the distributed implementation
parser.add_argument(
"--world-size",
help="no. of processes to spawn",
default=1,
type=int,
required=True,
)
parser.add_argument(
"--process-group-timeout",
required=True,
type=int,
help="timeout[seconds] for operations executed against the process group "
"(see torch.distributed.init_process_group)",
)
parser.add_argument(
"--save-orig-nids",
action="store_true",
help="Save original node IDs into files",
)
parser.add_argument(
"--save-orig-eids",
action="store_true",
help="Save original edge IDs into files",
)
parser.add_argument(
"--graph-formats",
default=None,
type=str,
help="Save partitions in specified formats.",
)
params = parser.parse_args()
#invoke the pipeline function
# invoke the pipeline function
numeric_level = getattr(logging, params.log_level.upper(), None)
logging.basicConfig(level=numeric_level, format=f"[{platform.node()} %(levelname)s %(asctime)s PID:%(process)d] %(message)s")
logging.basicConfig(
level=numeric_level,
format=f"[{platform.node()} %(levelname)s %(asctime)s PID:%(process)d] %(message)s",
)
multi_machine_run(params)
This diff is collapsed.
This diff is collapsed.
import copy
import logging
import os
import numpy as np
import pyarrow
import torch
import copy
from gloo_wrapper import alltoallv_cpu
from pyarrow import csv
from gloo_wrapper import alltoallv_cpu
from utils import map_partid_rank
class DistLookupService:
'''
"""
This is an implementation of a Distributed Lookup Service to provide the following
services to its users. Map 1) global node-ids to partition-ids, and 2) global node-ids
to shuffle global node-ids (contiguous, within each node for a give node_type and across
to shuffle global node-ids (contiguous, within each node for a give node_type and across
all the partitions)
This services initializes itself with the node-id to partition-id mappings, which are inputs
......@@ -44,7 +45,7 @@ class DistLookupService:
integer indicating the rank of a given process
world_size : integer
integer indicating the total no. of processes
'''
"""
def __init__(self, input_dir, ntype_names, id_map, rank, world_size):
assert os.path.isdir(input_dir)
......@@ -60,19 +61,28 @@ class DistLookupService:
# Iterate over the node types and extract the partition id mappings.
for ntype in ntype_names:
filename = f'{ntype}.txt'
logging.info(f'[Rank: {rank}] Reading file: {os.path.join(input_dir, filename)}')
read_options=pyarrow.csv.ReadOptions(use_threads=True, block_size=4096, autogenerate_column_names=True)
parse_options=pyarrow.csv.ParseOptions(delimiter=' ')
filename = f"{ntype}.txt"
logging.info(
f"[Rank: {rank}] Reading file: {os.path.join(input_dir, filename)}"
)
read_options = pyarrow.csv.ReadOptions(
use_threads=True,
block_size=4096,
autogenerate_column_names=True,
)
parse_options = pyarrow.csv.ParseOptions(delimiter=" ")
ntype_partids = []
with pyarrow.csv.open_csv(os.path.join(input_dir, '{}.txt'.format(ntype)),
read_options=read_options, parse_options=parse_options) as reader:
with pyarrow.csv.open_csv(
os.path.join(input_dir, "{}.txt".format(ntype)),
read_options=read_options,
parse_options=parse_options,
) as reader:
for next_chunk in reader:
if next_chunk is None:
break
next_table = pyarrow.Table.from_batches([next_chunk])
ntype_partids.append(next_table['f0'].to_numpy())
ntype_partids.append(next_table["f0"].to_numpy())
ntype_partids = np.concatenate(ntype_partids)
count = len(ntype_partids)
......@@ -80,9 +90,12 @@ class DistLookupService:
# Each rank assumes a contiguous set of partition-ids which are equally split
# across all the processes.
split_size = np.ceil(count/np.int64(world_size)).astype(np.int64)
start, end = np.int64(rank)*split_size, np.int64(rank+1)*split_size
if rank == (world_size-1):
split_size = np.ceil(count / np.int64(world_size)).astype(np.int64)
start, end = (
np.int64(rank) * split_size,
np.int64(rank + 1) * split_size,
)
if rank == (world_size - 1):
end = count
type_nid_begin.append(start)
type_nid_end.append(end)
......@@ -102,14 +115,13 @@ class DistLookupService:
self.rank = rank
self.world_size = world_size
def get_partition_ids(self, global_nids):
'''
"""
This function is used to get the partition-ids for a given set of global node ids
global_nids <-> partition-ids mappings are deterministically distributed across
global_nids <-> partition-ids mappings are deterministically distributed across
all the participating processes, within the service. A contiguous global-nids
(ntype-ids, per-type-nids) are stored within each process and this is determined
(ntype-ids, per-type-nids) are stored within each process and this is determined
by the total no. of nodes of a given ntype-id and the rank of the process.
Process, where the global_nid <-> partition-id mapping is stored can be easily computed
......@@ -118,7 +130,7 @@ class DistLookupService:
partition-ids using locally stored lookup tables. It builds responses to all the other
processes and performs alltoallv.
Once the response, partition-ids, is received, they are re-ordered corresponding to the
Once the response, partition-ids, is received, they are re-ordered corresponding to the
incoming global-nids order and returns to the caller.
Parameters:
......@@ -126,33 +138,35 @@ class DistLookupService:
self : instance of this class
instance of this class, which is passed by the runtime implicitly
global_nids : numpy array
an array of global node-ids for which partition-ids are to be retrieved by
an array of global node-ids for which partition-ids are to be retrieved by
the distributed lookup service.
Returns:
--------
list of integers :
list of integers :
list of integers, which are the partition-ids of the global-node-ids (which is the
function argument)
'''
"""
# Find the process where global_nid --> partition-id(owner) is stored.
# Find the process where global_nid --> partition-id(owner) is stored.
ntype_ids, type_nids = self.id_map(global_nids)
ntype_ids, type_nids = ntype_ids.numpy(), type_nids.numpy()
assert len(ntype_ids) == len(global_nids)
# For each node-type, the per-type-node-id <-> partition-id mappings are
# stored as contiguous chunks by this lookup service.
# stored as contiguous chunks by this lookup service.
# The no. of these mappings stored by each process, in the lookup service, are
# equally split among all the processes in the lookup service, deterministically.
typeid_counts = self.ntype_count[ntype_ids]
chunk_sizes = np.ceil(typeid_counts/self.world_size).astype(np.int64)
service_owners = np.floor_divide(type_nids, chunk_sizes).astype(np.int64)
chunk_sizes = np.ceil(typeid_counts / self.world_size).astype(np.int64)
service_owners = np.floor_divide(type_nids, chunk_sizes).astype(
np.int64
)
# Now `service_owners` is a list of ranks (process-ids) which own the corresponding
# global-nid <-> partition-id mapping.
# Split the input global_nids into a list of lists where each list will be
# Split the input global_nids into a list of lists where each list will be
# sent to the respective rank/process
# We also need to store the indices, in the indices_list, so that we can re-order
# the final result (partition-ids) in the same order as the global-nids (function argument)
......@@ -164,12 +178,14 @@ class DistLookupService:
send_list.append(torch.from_numpy(ll))
indices_list.append(idxes[0])
assert len(np.concatenate(indices_list)) == len(global_nids)
assert np.all(np.sort(np.concatenate(indices_list)) == np.arange(len(global_nids)))
assert np.all(
np.sort(np.concatenate(indices_list)) == np.arange(len(global_nids))
)
# Send the request to everyone else.
# As a result of this operation, the current process also receives a list of lists
# from all the other processes.
# These lists are global-node-ids whose global-node-ids <-> partition-id mappings
# from all the other processes.
# These lists are global-node-ids whose global-node-ids <-> partition-id mappings
# are owned/stored by the current process
owner_req_list = alltoallv_cpu(self.rank, self.world_size, send_list)
......@@ -201,12 +217,15 @@ class DistLookupService:
local_type_nids = global_type_nids - self.type_nid_begin[tid]
assert np.all(local_type_nids >= 0)
assert np.all(local_type_nids <= (self.type_nid_end[tid] + 1 - self.type_nid_begin[tid]))
assert np.all(
local_type_nids
<= (self.type_nid_end[tid] + 1 - self.type_nid_begin[tid])
)
cur_owners = self.partid_list[tid][local_type_nids]
type_id_lookups.append(cur_owners)
# Reorder the partition-ids, so that it agrees with the input order --
# Reorder the partition-ids, so that it agrees with the input order --
# which is the order in which the incoming message is received.
if len(type_id_lookups) <= 0:
out_list.append(torch.empty((0,), dtype=torch.int64))
......@@ -219,14 +238,16 @@ class DistLookupService:
# Send the partition-ids to their respective requesting processes.
owner_resp_list = alltoallv_cpu(self.rank, self.world_size, out_list)
# Owner_resp_list, is a list of lists of numpy arrays where each list
# Owner_resp_list, is a list of lists of numpy arrays where each list
# is a list of partition-ids which the current process requested
# Now we need to re-order so that the parition-ids correspond to the
# Now we need to re-order so that the parition-ids correspond to the
# global_nids which are passed into this function.
# Order according to the requesting order.
# Order according to the requesting order.
# Owner_resp_list is the list of owner-ids for global_nids (function argument).
owner_ids = torch.cat([x for x in owner_resp_list if x is not None]).numpy()
owner_ids = torch.cat(
[x for x in owner_resp_list if x is not None]
).numpy()
assert len(owner_ids) == len(global_nids)
global_nids_order = np.concatenate(indices_list)
......@@ -238,16 +259,18 @@ class DistLookupService:
# Now the owner_ids (partition-ids) which corresponding to the global_nids.
return owner_ids
def get_shuffle_nids(self, global_nids, my_global_nids, my_shuffle_global_nids, world_size):
'''
def get_shuffle_nids(
self, global_nids, my_global_nids, my_shuffle_global_nids, world_size
):
"""
This function is used to retrieve shuffle_global_nids for a given set of incoming
global_nids. Note that global_nids are of random order and will contain duplicates
This function first retrieves the partition-ids of the incoming global_nids.
These partition-ids which are also the ranks of processes which own the respective
global-nids as well as shuffle-global-nids. alltoallv is performed to send the
global-nids to respective ranks/partition-ids where the mapping
global-nids <-> shuffle-global-nid is located.
global-nids as well as shuffle-global-nids. alltoallv is performed to send the
global-nids to respective ranks/partition-ids where the mapping
global-nids <-> shuffle-global-nid is located.
On the receiving side, once the global-nids are received associated shuffle-global-nids
are retrieved and an alltoallv is performed to send the responses to all the other
......@@ -261,7 +284,7 @@ class DistLookupService:
self : instance of this class
instance of this class, which is passed by the runtime implicitly
global_nids : numpy array
an array of global node-ids for which partition-ids are to be retrieved by
an array of global node-ids for which partition-ids are to be retrieved by
the distributed lookup service.
my_global_nids: numpy ndarray
array of global_nids which are owned by the current partition/rank/process
......@@ -276,17 +299,17 @@ class DistLookupService:
list of integers:
list of shuffle_global_nids which correspond to the incoming node-ids in the
global_nids.
'''
"""
# Get the owner_ids (partition-ids or rank).
owner_ids = self.get_partition_ids(global_nids)
# These owner_ids, which are also partition ids of the nodes in the
# These owner_ids, which are also partition ids of the nodes in the
# input graph, are in the range 0 - (num_partitions - 1).
# These ids are generated using some kind of graph partitioning method.
# Distribuged lookup service, as used by the graph partitioning
# pipeline, is used to store ntype-ids (also type_nids) and their
# mapping to the associated partition-id.
# Distribuged lookup service, as used by the graph partitioning
# pipeline, is used to store ntype-ids (also type_nids) and their
# mapping to the associated partition-id.
# These ids are split into `num_process` chunks and processes in the
# dist. lookup service are assigned the owernship of these chunks.
# The pipeline also enforeces the following constraint among the
......@@ -318,8 +341,15 @@ class DistLookupService:
shuffle_nids_list.append(torch.empty((0,), dtype=torch.int64))
continue
uniq_ids, inverse_idx = np.unique(cur_global_nids[idx], return_inverse=True)
common, idx1, idx2 = np.intersect1d(uniq_ids, my_global_nids, assume_unique=True, return_indices=True)
uniq_ids, inverse_idx = np.unique(
cur_global_nids[idx], return_inverse=True
)
common, idx1, idx2 = np.intersect1d(
uniq_ids,
my_global_nids,
assume_unique=True,
return_indices=True,
)
assert len(common) == len(uniq_ids)
req_shuffle_global_nids = my_shuffle_global_nids[idx2][inverse_idx]
......@@ -327,7 +357,9 @@ class DistLookupService:
shuffle_nids_list.append(torch.from_numpy(req_shuffle_global_nids))
# Send the shuffle-global-nids to their respective ranks.
mapped_global_nids = alltoallv_cpu(self.rank, self.world_size, shuffle_nids_list)
mapped_global_nids = alltoallv_cpu(
self.rank, self.world_size, shuffle_nids_list
)
for idx in range(len(mapped_global_nids)):
if mapped_global_nids[idx] == None:
mapped_global_nids[idx] = torch.empty((0,), dtype=torch.int64)
......@@ -338,7 +370,7 @@ class DistLookupService:
assert len(shuffle_global_nids) == len(global_nids)
sorted_idx = np.argsort(global_nids_order)
shuffle_global_nids = shuffle_global_nids[ sorted_idx ]
shuffle_global_nids = shuffle_global_nids[sorted_idx]
global_nids_ordered = global_nids_order[sorted_idx]
assert np.all(global_nids_ordered == np.arange(len(global_nids)))
......
import itertools
import operator
import constants
import numpy as np
import torch
import constants
from dist_lookup import DistLookupService
from gloo_wrapper import allgather_sizes, alltoallv_cpu
from utils import memory_snapshot
def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data):
"""
"""
For nodes which are not owned by the current rank, whose global_nid <-> shuffle_global-nid mapping
is not present at the current rank, this function retrieves their shuffle_global_ids from the owner rank
Parameters:
Parameters:
-----------
rank : integer
rank of the process
......@@ -23,7 +23,7 @@ def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data):
total no. of ranks configured
global_nids_ranks : list
list of numpy arrays (of global_nids), index of the list is the rank of the process
where global_nid <-> shuffle_global_nid mapping is located.
where global_nid <-> shuffle_global_nid mapping is located.
node_data : dictionary
node_data is a dictionary with keys as column names and values as numpy arrays
......@@ -31,36 +31,51 @@ def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data):
--------
numpy ndarray
where the column-0 are global_nids and column-1 are shuffle_global_nids which are retrieved
from other processes.
from other processes.
"""
#build a list of sizes (lengths of lists)
# build a list of sizes (lengths of lists)
global_nids_ranks = [torch.from_numpy(x) for x in global_nids_ranks]
recv_nodes = alltoallv_cpu(rank, world_size, global_nids_ranks)
# Use node_data to lookup global id to send over.
send_nodes = []
for proc_i_nodes in recv_nodes:
#list of node-ids to lookup
if proc_i_nodes is not None:
# list of node-ids to lookup
if proc_i_nodes is not None:
global_nids = proc_i_nodes.numpy()
if(len(global_nids) != 0):
common, ind1, ind2 = np.intersect1d(node_data[constants.GLOBAL_NID], global_nids, return_indices=True)
shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID][ind1]
send_nodes.append(torch.from_numpy(shuffle_global_nids).type(dtype=torch.int64))
if len(global_nids) != 0:
common, ind1, ind2 = np.intersect1d(
node_data[constants.GLOBAL_NID],
global_nids,
return_indices=True,
)
shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID][
ind1
]
send_nodes.append(
torch.from_numpy(shuffle_global_nids).type(
dtype=torch.int64
)
)
else:
send_nodes.append(torch.empty((0), dtype=torch.int64))
else:
send_nodes.append(torch.empty((0), dtype=torch.int64))
#send receive global-ids
# send receive global-ids
recv_shuffle_global_nids = alltoallv_cpu(rank, world_size, send_nodes)
shuffle_global_nids = np.concatenate([x.numpy() if x is not None else [] for x in recv_shuffle_global_nids])
shuffle_global_nids = np.concatenate(
[x.numpy() if x is not None else [] for x in recv_shuffle_global_nids]
)
global_nids = np.concatenate([x for x in global_nids_ranks])
ret_val = np.column_stack([global_nids, shuffle_global_nids])
return ret_val
def lookup_shuffle_global_nids_edges(rank, world_size, num_parts, edge_data, id_lookup, node_data):
'''
def lookup_shuffle_global_nids_edges(
rank, world_size, num_parts, edge_data, id_lookup, node_data
):
"""
This function is a helper function used to lookup shuffle-global-nids for a given set of
global-nids using a distributed lookup service.
......@@ -87,56 +102,87 @@ def lookup_shuffle_global_nids_edges(rank, world_size, num_parts, edge_data, id_
dictionary :
dictionary where keys are column names and values are numpy arrays representing all the
edges present in the current graph partition
'''
# Make sure that the outgoing message size does not exceed 2GB in size.
"""
# Make sure that the outgoing message size does not exceed 2GB in size.
# Even though gloo can handle upto 10GB size of data in the outgoing messages,
# it needs additional memory to store temporary information into the buffers which will increase
# the memory needs of the process.
# the memory needs of the process.
MILLION = 1000 * 1000
BATCH_SIZE = 250 * MILLION
memory_snapshot("GlobalToShuffleIDMapBegin: ", rank)
local_nids = []
local_shuffle_nids = []
for local_part_id in range(num_parts//world_size):
local_nids.append(node_data[constants.GLOBAL_NID+"/"+str(local_part_id)])
local_shuffle_nids.append(node_data[constants.SHUFFLE_GLOBAL_NID+"/"+str(local_part_id)])
for local_part_id in range(num_parts // world_size):
local_nids.append(
node_data[constants.GLOBAL_NID + "/" + str(local_part_id)]
)
local_shuffle_nids.append(
node_data[constants.SHUFFLE_GLOBAL_NID + "/" + str(local_part_id)]
)
local_nids = np.concatenate(local_nids)
local_shuffle_nids = np.concatenate(local_shuffle_nids)
for local_part_id in range(num_parts//world_size):
node_list = edge_data[constants.GLOBAL_SRC_ID+"/"+str(local_part_id)]
for local_part_id in range(num_parts // world_size):
node_list = edge_data[
constants.GLOBAL_SRC_ID + "/" + str(local_part_id)
]
# Determine the no. of times each process has to send alltoall messages.
all_sizes = allgather_sizes([node_list.shape[0]], world_size, num_parts, return_sizes=True)
all_sizes = allgather_sizes(
[node_list.shape[0]], world_size, num_parts, return_sizes=True
)
max_count = np.amax(all_sizes)
num_splits = max_count // BATCH_SIZE + 1
num_splits = max_count // BATCH_SIZE + 1
# Split the message into batches and send.
splits = np.array_split(node_list, num_splits)
shuffle_mappings = []
for item in splits:
shuffle_ids = id_lookup.get_shuffle_nids(item, local_nids, local_shuffle_nids, world_size)
shuffle_ids = id_lookup.get_shuffle_nids(
item, local_nids, local_shuffle_nids, world_size
)
shuffle_mappings.append(shuffle_ids)
shuffle_ids = np.concatenate(shuffle_mappings)
assert shuffle_ids.shape[0] == node_list.shape[0]
edge_data[constants.SHUFFLE_GLOBAL_SRC_ID+"/"+str(local_part_id)] = shuffle_ids
edge_data[
constants.SHUFFLE_GLOBAL_SRC_ID + "/" + str(local_part_id)
] = shuffle_ids
# Destination end points of edges are owned by the current node and therefore
# should have corresponding SHUFFLE_GLOBAL_NODE_IDs.
# should have corresponding SHUFFLE_GLOBAL_NODE_IDs.
# Here retrieve SHUFFLE_GLOBAL_NODE_IDs for the destination end points of local edges.
uniq_ids, inverse_idx = np.unique(edge_data[constants.GLOBAL_DST_ID+"/"+str(local_part_id)], return_inverse=True)
common, idx1, idx2 = np.intersect1d(uniq_ids, node_data[constants.GLOBAL_NID+"/"+str(local_part_id)], assume_unique=True, return_indices=True)
uniq_ids, inverse_idx = np.unique(
edge_data[constants.GLOBAL_DST_ID + "/" + str(local_part_id)],
return_inverse=True,
)
common, idx1, idx2 = np.intersect1d(
uniq_ids,
node_data[constants.GLOBAL_NID + "/" + str(local_part_id)],
assume_unique=True,
return_indices=True,
)
assert len(common) == len(uniq_ids)
edge_data[constants.SHUFFLE_GLOBAL_DST_ID+"/"+str(local_part_id)] = node_data[constants.SHUFFLE_GLOBAL_NID+"/"+str(local_part_id)][idx2][inverse_idx]
assert len(edge_data[constants.SHUFFLE_GLOBAL_DST_ID+"/"+str(local_part_id)]) == len(edge_data[constants.GLOBAL_DST_ID+"/"+str(local_part_id)])
edge_data[
constants.SHUFFLE_GLOBAL_DST_ID + "/" + str(local_part_id)
] = node_data[constants.SHUFFLE_GLOBAL_NID + "/" + str(local_part_id)][
idx2
][
inverse_idx
]
assert len(
edge_data[
constants.SHUFFLE_GLOBAL_DST_ID + "/" + str(local_part_id)
]
) == len(edge_data[constants.GLOBAL_DST_ID + "/" + str(local_part_id)])
memory_snapshot("GlobalToShuffleIDMap_AfterLookupServiceCalls: ", rank)
return edge_data
def assign_shuffle_global_nids_nodes(rank, world_size, num_parts, node_data):
"""
Utility function to assign shuffle global ids to nodes at a given rank
......@@ -145,10 +191,10 @@ def assign_shuffle_global_nids_nodes(rank, world_size, num_parts, node_data):
where shuffle_global_nid : global id of the node after data shuffle
ntype : node-type as read from xxx_nodes.txt
global_type_nid : node-type-id as read from xxx_nodes.txt
global_nid : node-id as read from xxx_nodes.txt, implicitly
global_nid : node-id as read from xxx_nodes.txt, implicitly
this is the line no. in the file
part_local_type_nid : type_nid assigned by the current rank within its scope
Parameters:
-----------
rank : integer
......@@ -162,17 +208,27 @@ def assign_shuffle_global_nids_nodes(rank, world_size, num_parts, node_data):
"""
# Compute prefix sum to determine node-id offsets
local_row_counts = []
for local_part_id in range(num_parts//world_size):
local_row_counts.append(node_data[constants.GLOBAL_NID+"/"+str(local_part_id)].shape[0])
for local_part_id in range(num_parts // world_size):
local_row_counts.append(
node_data[constants.GLOBAL_NID + "/" + str(local_part_id)].shape[0]
)
# Perform allgather to compute the local offsets.
prefix_sum_nodes = allgather_sizes(local_row_counts, world_size, num_parts)
for local_part_id in range(num_parts//world_size):
shuffle_global_nid_start = prefix_sum_nodes[rank + (local_part_id*world_size)]
shuffle_global_nid_end = prefix_sum_nodes[rank + 1 + (local_part_id*world_size)]
shuffle_global_nids = np.arange(shuffle_global_nid_start, shuffle_global_nid_end, dtype=np.int64)
node_data[constants.SHUFFLE_GLOBAL_NID+"/"+str(local_part_id)] = shuffle_global_nids
for local_part_id in range(num_parts // world_size):
shuffle_global_nid_start = prefix_sum_nodes[
rank + (local_part_id * world_size)
]
shuffle_global_nid_end = prefix_sum_nodes[
rank + 1 + (local_part_id * world_size)
]
shuffle_global_nids = np.arange(
shuffle_global_nid_start, shuffle_global_nid_end, dtype=np.int64
)
node_data[
constants.SHUFFLE_GLOBAL_NID + "/" + str(local_part_id)
] = shuffle_global_nids
def assign_shuffle_global_nids_edges(rank, world_size, num_parts, edge_data):
......@@ -198,19 +254,31 @@ def assign_shuffle_global_nids_edges(rank, world_size, num_parts, edge_data):
shuffle_global_eid_start, which indicates the starting value from which shuffle_global-ids are assigned to edges
on this rank
"""
#get prefix sum of edge counts per rank to locate the starting point
#from which global-ids to edges are assigned in the current rank
# get prefix sum of edge counts per rank to locate the starting point
# from which global-ids to edges are assigned in the current rank
local_row_counts = []
for local_part_id in range(num_parts//world_size):
local_row_counts.append(edge_data[constants.GLOBAL_SRC_ID+"/"+str(local_part_id)].shape[0])
for local_part_id in range(num_parts // world_size):
local_row_counts.append(
edge_data[constants.GLOBAL_SRC_ID + "/" + str(local_part_id)].shape[
0
]
)
shuffle_global_eid_offset = []
prefix_sum_edges = allgather_sizes(local_row_counts, world_size, num_parts)
for local_part_id in range(num_parts//world_size):
shuffle_global_eid_start = prefix_sum_edges[rank + (local_part_id*world_size)]
shuffle_global_eid_end = prefix_sum_edges[rank + 1 + (local_part_id*world_size)]
shuffle_global_eids = np.arange(shuffle_global_eid_start, shuffle_global_eid_end, dtype=np.int64)
edge_data[constants.SHUFFLE_GLOBAL_EID+"/"+str(local_part_id)] = shuffle_global_eids
for local_part_id in range(num_parts // world_size):
shuffle_global_eid_start = prefix_sum_edges[
rank + (local_part_id * world_size)
]
shuffle_global_eid_end = prefix_sum_edges[
rank + 1 + (local_part_id * world_size)
]
shuffle_global_eids = np.arange(
shuffle_global_eid_start, shuffle_global_eid_end, dtype=np.int64
)
edge_data[
constants.SHUFFLE_GLOBAL_EID + "/" + str(local_part_id)
] = shuffle_global_eids
shuffle_global_eid_offset.append(shuffle_global_eid_start)
return shuffle_global_eid_offset
......@@ -2,15 +2,16 @@ import numpy as np
import torch
import torch.distributed as dist
def allgather_sizes(send_data, world_size, num_parts, return_sizes=False):
"""
"""
Perform all gather on list lengths, used to compute prefix sums
to determine the offsets on each ranks. This is used to allocate
global ids for edges/nodes on each ranks.
Parameters
----------
send_data : numpy array
send_data : numpy array
Data on which allgather is performed.
world_size : integer
No. of processes configured for execution
......@@ -20,7 +21,7 @@ def allgather_sizes(send_data, world_size, num_parts, return_sizes=False):
Boolean flag to indicate whether to return raw sizes from each process
or perform prefix sum on the raw sizes.
Returns :
Returns :
---------
numpy array
array with the prefix sum
......@@ -29,33 +30,35 @@ def allgather_sizes(send_data, world_size, num_parts, return_sizes=False):
# Assert on the world_size, num_parts
assert (num_parts % world_size) == 0
#compute the length of the local data
# compute the length of the local data
send_length = len(send_data)
out_tensor = torch.as_tensor(send_data, dtype=torch.int64)
in_tensor = [torch.zeros(send_length, dtype=torch.int64)
for _ in range(world_size)]
in_tensor = [
torch.zeros(send_length, dtype=torch.int64) for _ in range(world_size)
]
#all_gather message
# all_gather message
dist.all_gather(in_tensor, out_tensor)
# Return on the raw sizes from each process
if return_sizes:
return torch.cat(in_tensor).numpy()
#gather sizes in on array to return to the invoking function
# gather sizes in on array to return to the invoking function
rank_sizes = np.zeros(num_parts + 1, dtype=np.int64)
part_counts = torch.cat(in_tensor).numpy()
count = rank_sizes[0]
idx = 1
for local_part_id in range(num_parts//world_size):
for local_part_id in range(num_parts // world_size):
for r in range(world_size):
count += part_counts[r*(num_parts//world_size) + local_part_id]
count += part_counts[r * (num_parts // world_size) + local_part_id]
rank_sizes[idx] = count
idx += 1
return rank_sizes
def __alltoall_cpu(rank, world_size, output_tensor_list, input_tensor_list):
"""
Each process scatters list of input tensors to all processes in a cluster
......@@ -72,36 +75,41 @@ def __alltoall_cpu(rank, world_size, output_tensor_list, input_tensor_list):
input_tensor_list : List of tensor
The tensors to exchange
"""
input_tensor_list = [tensor.to(torch.device('cpu')) for tensor in input_tensor_list]
input_tensor_list = [
tensor.to(torch.device("cpu")) for tensor in input_tensor_list
]
# TODO(#5002): As Boolean data is not supported in
# ``torch.distributed.scatter()``, we convert boolean into uint8 before
# scatter and convert it back afterwards.
dtypes = [ t.dtype for t in input_tensor_list]
dtypes = [t.dtype for t in input_tensor_list]
for i, dtype in enumerate(dtypes):
if dtype == torch.bool:
input_tensor_list[i] = input_tensor_list[i].to(torch.int8)
output_tensor_list[i] = output_tensor_list[i].to(torch.int8)
for i in range(world_size):
dist.scatter(output_tensor_list[i], input_tensor_list if i == rank else [], src=i)
dist.scatter(
output_tensor_list[i], input_tensor_list if i == rank else [], src=i
)
# Convert back to original dtype
for i, dtype in enumerate(dtypes):
if dtype == torch.bool:
input_tensor_list[i] = input_tensor_list[i].to(dtype)
output_tensor_list[i] = output_tensor_list[i].to(dtype)
def alltoallv_cpu(rank, world_size, input_tensor_list, retain_nones=True):
"""
Wrapper function to providing the alltoallv functionality by using underlying alltoall
messaging primitive. This function, in its current implementation, supports exchanging
messaging primitive. This function, in its current implementation, supports exchanging
messages of arbitrary dimensions and is not tied to the user of this function.
This function pads all input tensors, except one, so that all the messages are of the same
size. Once the messages are padded, It first sends a vector whose first two elements are
1) actual message size along first dimension, and 2) Message size along first dimension
size. Once the messages are padded, It first sends a vector whose first two elements are
1) actual message size along first dimension, and 2) Message size along first dimension
which is used for communication. The rest of the dimensions are assumed to be same across
all the input tensors. After receiving the message sizes, the receiving end will create buffers
of appropriate sizes. And then slices the received messages to remove the added padding, if any,
and returns to the caller.
of appropriate sizes. And then slices the received messages to remove the added padding, if any,
and returns to the caller.
Parameters:
-----------
......@@ -116,81 +124,99 @@ def alltoallv_cpu(rank, world_size, input_tensor_list, retain_nones=True):
Returns:
--------
list :
list :
list of tensors received from other processes during alltoall message
"""
#ensure len of input_tensor_list is same as the world_size.
# ensure len of input_tensor_list is same as the world_size.
assert input_tensor_list != None
assert len(input_tensor_list) == world_size
#ensure that all the tensors in the input_tensor_list are of same size.
# ensure that all the tensors in the input_tensor_list are of same size.
sizes = [list(x.size()) for x in input_tensor_list]
for idx in range(1,len(sizes)):
assert len(sizes[idx-1]) == len(sizes[idx]) #no. of dimensions should be same
assert input_tensor_list[idx-1].dtype == input_tensor_list[idx].dtype # dtype should be same
assert sizes[idx-1][1:] == sizes[idx][1:] #except first dimension remaining dimensions should all be the same
#decide how much to pad.
#always use the first-dimension for padding.
ll = [ x[0] for x in sizes ]
#dims of the padding needed, if any
#these dims are used for padding purposes.
diff_dims = [ [np.amax(ll) - l[0]] + l[1:] for l in sizes ]
#pad the actual message
input_tensor_list = [torch.cat((x, torch.zeros(diff_dims[idx]).type(x.dtype))) for idx, x in enumerate(input_tensor_list)]
#send useful message sizes to all
for idx in range(1, len(sizes)):
assert len(sizes[idx - 1]) == len(
sizes[idx]
) # no. of dimensions should be same
assert (
input_tensor_list[idx - 1].dtype == input_tensor_list[idx].dtype
) # dtype should be same
assert (
sizes[idx - 1][1:] == sizes[idx][1:]
) # except first dimension remaining dimensions should all be the same
# decide how much to pad.
# always use the first-dimension for padding.
ll = [x[0] for x in sizes]
# dims of the padding needed, if any
# these dims are used for padding purposes.
diff_dims = [[np.amax(ll) - l[0]] + l[1:] for l in sizes]
# pad the actual message
input_tensor_list = [
torch.cat((x, torch.zeros(diff_dims[idx]).type(x.dtype)))
for idx, x in enumerate(input_tensor_list)
]
# send useful message sizes to all
send_counts = []
recv_counts = []
for idx in range(world_size):
#send a vector, of atleast 3 elements, [a, b, ....] where
#a = useful message dim, b = actual message outgoing message size along the first dimension
#and remaining elements are the remaining dimensions of the tensor
send_counts.append(torch.from_numpy(np.array([sizes[idx][0]] + [np.amax(ll)] + sizes[idx][1:] )).type(torch.int64))
recv_counts.append(torch.zeros((1 + len(sizes[idx])), dtype=torch.int64))
# send a vector, of atleast 3 elements, [a, b, ....] where
# a = useful message dim, b = actual message outgoing message size along the first dimension
# and remaining elements are the remaining dimensions of the tensor
send_counts.append(
torch.from_numpy(
np.array([sizes[idx][0]] + [np.amax(ll)] + sizes[idx][1:])
).type(torch.int64)
)
recv_counts.append(
torch.zeros((1 + len(sizes[idx])), dtype=torch.int64)
)
__alltoall_cpu(rank, world_size, recv_counts, send_counts)
#allocate buffers for receiving message
# allocate buffers for receiving message
output_tensor_list = []
recv_counts = [ tsize.numpy() for tsize in recv_counts]
recv_counts = [tsize.numpy() for tsize in recv_counts]
for idx, tsize in enumerate(recv_counts):
output_tensor_list.append(torch.zeros(tuple(tsize[1:])).type(input_tensor_list[idx].dtype))
output_tensor_list.append(
torch.zeros(tuple(tsize[1:])).type(input_tensor_list[idx].dtype)
)
#send actual message itself.
# send actual message itself.
__alltoall_cpu(rank, world_size, output_tensor_list, input_tensor_list)
#extract un-padded message from the output_tensor_list and return it
# extract un-padded message from the output_tensor_list and return it
return_vals = []
for s, t in zip(recv_counts, output_tensor_list):
if s[0] == 0:
if retain_nones:
return_vals.append(None)
else:
return_vals.append(t[0:s[0]])
return_vals.append(t[0 : s[0]])
return return_vals
def gather_metadata_json(metadata, rank, world_size):
"""
def gather_metadata_json(metadata, rank, world_size):
"""
Gather an object (json schema on `rank`)
Parameters:
-----------
metadata : json dictionary object
json schema formed on each rank with graph level data.
json schema formed on each rank with graph level data.
This will be used as input to the distributed training in the later steps.
Returns:
--------
list : list of json dictionary objects
The result of the gather operation, which is the list of json dicitonary
The result of the gather operation, which is the list of json dicitonary
objects from each rank in the world
"""
#Populate input obj and output obj list on rank-0 and non-rank-0 machines
# Populate input obj and output obj list on rank-0 and non-rank-0 machines
input_obj = None if rank == 0 else metadata
output_objs = [None for _ in range(world_size)] if rank == 0 else None
#invoke the gloo method to perform gather on rank-0
# invoke the gloo method to perform gather on rank-0
dist.gather_object(input_obj, output_objs, dst=0)
return output_objs
......@@ -5,13 +5,13 @@ import platform
import sys
from pathlib import Path
import constants
import numpy as np
import pyarrow
import pyarrow.csv as csv
import constants
from partition_algo.base import dump_partition_meta, PartitionMeta
from utils import get_idranges, get_node_types, read_json
from partition_algo.base import PartitionMeta, dump_partition_meta
def post_process(params):
......
......@@ -4,14 +4,14 @@ import os
import sys
from pathlib import Path
import constants
import numpy as np
import pyarrow
import pyarrow.csv as csv
import pyarrow.parquet as pq
import torch
import torch.distributed as dist
import constants
from utils import get_idranges, get_node_types, read_json
import array_readwriter
......@@ -33,12 +33,13 @@ def get_proc_info():
# mpich
if "PMI_RANK" in env_variables:
return int(env_variables["PMI_RANK"])
#openmpi
# openmpi
elif "OMPI_COMM_WORLD_RANK" in env_variables:
return int(env_variables["OMPI_COMM_WORLD_RANK"])
else:
return 0
def gen_edge_files(schema_map, output):
"""Function to create edges files to be consumed by ParMETIS
for partitioning purposes.
......@@ -106,12 +107,16 @@ def gen_edge_files(schema_map, output):
options = csv.WriteOptions(include_header=False, delimiter=" ")
options.delimiter = " "
csv.write_csv(
pyarrow.Table.from_arrays(cols, names=col_names), out_file, options
pyarrow.Table.from_arrays(cols, names=col_names),
out_file,
options,
)
return out_file
if edges_format == constants.STR_CSV:
delimiter = etype_info[constants.STR_FORMAT][constants.STR_FORMAT_DELIMITER]
delimiter = etype_info[constants.STR_FORMAT][
constants.STR_FORMAT_DELIMITER
]
data_df = csv.read_csv(
edge_data_files[rank],
read_options=pyarrow.csv.ReadOptions(
......@@ -309,16 +314,22 @@ def gen_parmetis_input_args(params, schema_map):
)
# Check if <graph-name>_stats.txt exists, if not create one using metadata.
# Here stats file will be created in the current directory.
# Here stats file will be created in the current directory.
# No. of constraints, third column in the stats file is computed as follows:
# num_constraints = no. of node types + train_mask + test_mask + val_mask
# Here, (train/test/val) masks will be set to 1 if these masks exist for
# all the node types in the graph, otherwise these flags will be set to 0
assert constants.STR_GRAPH_NAME in schema_map, "Graph name is not present in the json file"
assert (
constants.STR_GRAPH_NAME in schema_map
), "Graph name is not present in the json file"
graph_name = schema_map[constants.STR_GRAPH_NAME]
if not os.path.isfile(f'{graph_name}_stats.txt'):
num_nodes = np.sum(np.concatenate(schema_map[constants.STR_NUM_NODES_PER_CHUNK]))
num_edges = np.sum(np.concatenate(schema_map[constants.STR_NUM_EDGES_PER_CHUNK]))
if not os.path.isfile(f"{graph_name}_stats.txt"):
num_nodes = np.sum(
np.concatenate(schema_map[constants.STR_NUM_NODES_PER_CHUNK])
)
num_edges = np.sum(
np.concatenate(schema_map[constants.STR_NUM_EDGES_PER_CHUNK])
)
num_ntypes = len(schema_map[constants.STR_NODE_TYPE])
train_mask = test_mask = val_mask = 0
......@@ -335,8 +346,8 @@ def gen_parmetis_input_args(params, schema_map):
val_mask = val_mask // num_ntypes
num_constraints = num_ntypes + train_mask + test_mask + val_mask
with open(f'{graph_name}_stats.txt', 'w') as sf:
sf.write(f'{num_nodes} {num_edges} {num_constraints}')
with open(f"{graph_name}_stats.txt", "w") as sf:
sf.write(f"{num_nodes} {num_edges} {num_constraints}")
node_files = []
outdir = Path(params.output_dir)
......
This diff is collapsed.
import json
from typing import Optional
import pydantic as dt
import json
from dgl import DGLError
class PartitionMeta(dt.BaseModel):
""" Metadata that describes the partition assignment results.
"""Metadata that describes the partition assignment results.
Regardless of the choice of partitioning algorithm, a metadata JSON file
will be created in the output directory which includes the meta information
......@@ -22,15 +24,17 @@ class PartitionMeta(dt.BaseModel):
... part_meta = PartitionMeta(**(json.load(f)))
"""
# version of metadata JSON.
version: Optional[str] = '1.0.0'
version: Optional[str] = "1.0.0"
# number of partitions.
num_parts: int
# name of partition algorithm.
algo_name: str
def dump_partition_meta(part_meta, meta_file):
""" Dump partition metadata into json file.
"""Dump partition metadata into json file.
Parameters
----------
......@@ -39,11 +43,12 @@ def dump_partition_meta(part_meta, meta_file):
meta_file : str
The target file to save data.
"""
with open(meta_file, 'w') as f:
with open(meta_file, "w") as f:
json.dump(part_meta.dict(), f, sort_keys=True, indent=4)
def load_partition_meta(meta_file):
""" Load partition metadata and do sanity check.
"""Load partition metadata and do sanity check.
Parameters
----------
......@@ -60,14 +65,18 @@ def load_partition_meta(meta_file):
part_meta = PartitionMeta(**(json.load(f)))
except dt.ValidationError as e:
raise DGLError(
f"Invalid partition metadata JSON. Error details: {e.json()}")
if part_meta.version != '1.0.0':
f"Invalid partition metadata JSON. Error details: {e.json()}"
)
if part_meta.version != "1.0.0":
raise DGLError(
f"Invalid version[{part_meta.version}]. Supported versions: '1.0.0'")
f"Invalid version[{part_meta.version}]. Supported versions: '1.0.0'"
)
if part_meta.num_parts <= 0:
raise DGLError(
f"num_parts[{part_meta.num_parts}] should be greater than 0.")
if part_meta.algo_name not in ['random', 'metis']:
f"num_parts[{part_meta.num_parts}] should be greater than 0."
)
if part_meta.algo_name not in ["random", "metis"]:
raise DGLError(
f"algo_name[{part_meta.num_parts}] is not supported.")
f"algo_name[{part_meta.num_parts}] is not supported."
)
return part_meta
......@@ -6,10 +6,11 @@ import os
import sys
import numpy as np
from base import PartitionMeta, dump_partition_meta
from base import dump_partition_meta, PartitionMeta
from distpartitioning import array_readwriter
from files import setdir
def _random_partition(metadata, num_parts):
num_nodes_per_type = [sum(_) for _ in metadata["num_nodes_per_chunk"]]
ntypes = metadata["node_type"]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment