Unverified Commit 067cd744 authored by Quan (Andy) Gan's avatar Quan (Andy) Gan Committed by GitHub
Browse files

[Distributed] Graph chunking UX (#4365)

* first commit

* update

* huh

* fix

* update

* revert core

* fix

* update

* rewrite

* oops

* address comments

* add graph name

* address comments

* remove sample metadata file

* address comments

* fix

* remove

* add docs
parent 60bc0b76
# DGL Utility Scripts
This folder contains the utilities that do not belong to DGL core package as standalone executable
scripts.
## Graph Chunking
`chunk_graph.py` provides an example of chunking an existing DGLGraph object into the on-disk
[chunked graph format](http://13.231.216.217/guide/distributed-preprocessing.html#chunked-graph-format).
<!-- TODO: change the link of documentation once it's merged to master -->
An example of chunking the OGB MAG240M dataset:
```python
import ogb.lsc
dataset = ogb.lsc.MAG240MDataset('.')
etypes = [
('paper', 'cites', 'paper'),
('author', 'writes', 'paper'),
('author', 'affiliated_with', 'institution')]
g = dgl.heterograph({k: tuple(dataset.edge_index(*k)) for k in etypes})
chunk_graph(
g,
'mag240m',
{'paper': {
'feat': 'mag240m_kddcup2021/processed/paper/node_feat.npy',
'label': 'mag240m_kddcup2021/processed/paper/node_label.npy',
'year': 'mag240m_kddcup2021/processed/paper/node_year.npy'}},
{},
4,
'output')
```
The output chunked graph metadata will go as follows (assuming the current directory as
`/home/user`:
```json
{
"graph_name": "mag240m",
"node_type": [
"author",
"institution",
"paper"
],
"num_nodes_per_chunk": [
[
30595778,
30595778,
30595778,
30595778
],
[
6431,
6430,
6430,
6430
],
[
30437917,
30437917,
30437916,
30437916
]
],
"edge_type": [
"author:affiliated_with:institution",
"author:writes:paper",
"paper:cites:paper"
],
"num_edges_per_chunk": [
[
11148147,
11148147,
11148146,
11148146
],
[
96505680,
96505680,
96505680,
96505680
],
[
324437232,
324437232,
324437231,
324437231
]
],
"edges": {
"author:affiliated_with:institution": {
"format": {
"name": "csv",
"delimiter": " "
},
"data": [
"/home/user/output/edge_index/author:affiliated_with:institution0.txt",
"/home/user/output/edge_index/author:affiliated_with:institution1.txt",
"/home/user/output/edge_index/author:affiliated_with:institution2.txt",
"/home/user/output/edge_index/author:affiliated_with:institution3.txt"
]
},
"author:writes:paper": {
"format": {
"name": "csv",
"delimiter": " "
},
"data": [
"/home/user/output/edge_index/author:writes:paper0.txt",
"/home/user/output/edge_index/author:writes:paper1.txt",
"/home/user/output/edge_index/author:writes:paper2.txt",
"/home/user/output/edge_index/author:writes:paper3.txt"
]
},
"paper:cites:paper": {
"format": {
"name": "csv",
"delimiter": " "
},
"data": [
"/home/user/output/edge_index/paper:cites:paper0.txt",
"/home/user/output/edge_index/paper:cites:paper1.txt",
"/home/user/output/edge_index/paper:cites:paper2.txt",
"/home/user/output/edge_index/paper:cites:paper3.txt"
]
}
},
"node_data": {
"paper": {
"feat": {
"format": {
"name": "numpy"
},
"data": [
"/home/user/output/node_data/paper/feat-0.npy",
"/home/user/output/node_data/paper/feat-1.npy",
"/home/user/output/node_data/paper/feat-2.npy",
"/home/user/output/node_data/paper/feat-3.npy"
]
},
"label": {
"format": {
"name": "numpy"
},
"data": [
"/home/user/output/node_data/paper/label-0.npy",
"/home/user/output/node_data/paper/label-1.npy",
"/home/user/output/node_data/paper/label-2.npy",
"/home/user/output/node_data/paper/label-3.npy"
]
},
"year": {
"format": {
"name": "numpy"
},
"data": [
"/home/user/output/node_data/paper/year-0.npy",
"/home/user/output/node_data/paper/year-1.npy",
"/home/user/output/node_data/paper/year-2.npy",
"/home/user/output/node_data/paper/year-3.npy"
]
}
}
},
"edge_data": {}
}
```
# See the __main__ block for usage of chunk_graph().
import pathlib
import json
from contextlib import contextmanager
import logging
import os
import torch
import dgl
from utils import setdir
from utils import array_readwriter
def chunk_numpy_array(arr, fmt_meta, chunk_sizes, path_fmt):
paths = []
offset = 0
for j, n in enumerate(chunk_sizes):
path = os.path.abspath(path_fmt % j)
arr_chunk = arr[offset:offset + n]
logging.info('Chunking %d-%d' % (offset, offset + n))
array_readwriter.get_array_parser(**fmt_meta).write(path, arr_chunk)
offset += n
paths.append(path)
return paths
def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
# First deal with ndata and edata that are homogeneous (i.e. not a dict-of-dict)
if len(g.ntypes) == 1 and not isinstance(next(iter(ndata_paths.values())), dict):
ndata_paths = {g.ntypes[0]: ndata_paths}
if len(g.etypes) == 1 and not isinstance(next(iter(edata_paths.values())), dict):
edata_paths = {g.etypes[0]: ndata_paths}
# Then convert all edge types to canonical edge types
etypestrs = {etype: ':'.join(etype) for etype in g.canonical_etypes}
edata_paths = {':'.join(g.to_canonical_etype(k)): v for k, v in edata_paths.items()}
metadata = {}
metadata['graph_name'] = name
metadata['node_type'] = g.ntypes
# Compute the number of nodes per chunk per node type
metadata['num_nodes_per_chunk'] = num_nodes_per_chunk = []
for ntype in g.ntypes:
num_nodes = g.num_nodes(ntype)
num_nodes_list = []
for i in range(num_chunks):
n = num_nodes // num_chunks + (i < num_nodes % num_chunks)
num_nodes_list.append(n)
num_nodes_per_chunk.append(num_nodes_list)
num_nodes_per_chunk_dict = {k: v for k, v in zip(g.ntypes, num_nodes_per_chunk)}
metadata['edge_type'] = [etypestrs[etype] for etype in g.canonical_etypes]
# Compute the number of edges per chunk per edge type
metadata['num_edges_per_chunk'] = num_edges_per_chunk = []
for etype in g.canonical_etypes:
num_edges = g.num_edges(etype)
num_edges_list = []
for i in range(num_chunks):
n = num_edges // num_chunks + (i < num_edges % num_chunks)
num_edges_list.append(n)
num_edges_per_chunk.append(num_edges_list)
num_edges_per_chunk_dict = {k: v for k, v in zip(g.canonical_etypes, num_edges_per_chunk)}
# Split edge index
metadata['edges'] = {}
with setdir('edge_index'):
for etype in g.canonical_etypes:
etypestr = etypestrs[etype]
logging.info('Chunking edge index for %s' % etypestr)
edges_meta = {}
fmt_meta = {"name": "csv", "delimiter": " "}
edges_meta['format'] = fmt_meta
srcdst = torch.stack(g.edges(etype=etype), 1)
edges_meta['data'] = chunk_numpy_array(
srcdst.numpy(), fmt_meta, num_edges_per_chunk_dict[etype],
etypestr + '%d.txt')
metadata['edges'][etypestr] = edges_meta
# Chunk node data
metadata['node_data'] = {}
with setdir('node_data'):
for ntype, ndata_per_type in ndata_paths.items():
ndata_meta = {}
with setdir(ntype):
for key, path in ndata_per_type.items():
logging.info('Chunking node data for type %s key %s' % (ntype, key))
ndata_key_meta = {}
reader_fmt_meta = writer_fmt_meta = {"name": "numpy"}
arr = array_readwriter.get_array_parser(**reader_fmt_meta).read(path)
ndata_key_meta['format'] = writer_fmt_meta
ndata_key_meta['data'] = chunk_numpy_array(
arr, writer_fmt_meta, num_nodes_per_chunk_dict[ntype],
key + '-%d.npy')
ndata_meta[key] = ndata_key_meta
metadata['node_data'][ntype] = ndata_meta
# Chunk edge data
metadata['edge_data'] = {}
with setdir('edge_data'):
for etypestr, edata_per_type in edata_paths.items():
edata_meta = {}
with setdir(etypestr):
for key, path in edata_per_type.items():
logging.info('Chunking edge data for type %s key %s' % (etypestr, key))
edata_key_meta = {}
reader_fmt_meta = writer_fmt_meta = {"name": "numpy"}
arr = array_readwriter.get_array_parser(**reader_fmt_meta).read(path)
edata_key_meta['format'] = writer_fmt_meta
edata_key_meta['data'] = chunk_numpy_array(
arr, writer_fmt_meta, num_edges_per_chunk_dict[etype],
key + '-%d.npy')
edata_meta[key] = edata_key_meta
metadata['edge_data'][etypestr] = edata_meta
metadata_path = 'metadata.json'
with open(metadata_path, 'w') as f:
json.dump(metadata, f)
logging.info('Saved metadata in %s' % os.path.abspath(metadata_path))
def chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
"""
Split the graph into multiple chunks.
A directory will be created at :attr:`output_path` with the metadata and chunked
edge list as well as the node/edge data.
Parameters
----------
g : DGLGraph
The graph.
name : str
The name of the graph, to be used later in DistDGL training.
ndata_paths : dict[str, pathlike] or dict[ntype, dict[str, pathlike]]
The dictionary of paths pointing to the corresponding numpy array file for each
node data key.
edata_paths : dict[str, pathlike] or dict[etype, dict[str, pathlike]]
The dictionary of paths pointing to the corresponding numpy array file for each
edge data key.
num_chunks : int
The number of chunks
output_path : pathlike
The output directory saving the chunked graph.
"""
for ntype, ndata in ndata_paths.items():
for key in ndata.keys():
ndata[key] = os.path.abspath(ndata[key])
for etype, edata in edata_paths.items():
for key in edata.keys():
edata[key] = os.path.abspath(edata[key])
with setdir(output_path):
_chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path)
if __name__ == '__main__':
logging.basicConfig(level='INFO')
input_dir = '/data'
output_dir = '/chunked-data'
(g,), _ = dgl.load_graphs(os.path.join(input_dir, 'graph.dgl'))
chunk_graph(
g,
'mag240m',
{'paper': {
'feat': os.path.join(input_dir, 'paper/feat.npy'),
'label': os.path.join(input_dir, 'paper/label.npy'),
'year': os.path.join(input_dir, 'paper/year.npy')}},
{'cites': {'count': os.path.join(input_dir, 'cites/count.npy')},
'writes': {'year': os.path.join(input_dir, 'writes/year.npy')},
# you can put the same data file if they indeed share the features.
'rev_writes': {'year': os.path.join(input_dir, 'writes/year.npy')}},
4,
output_dir)
# The generated metadata goes as in tools/sample-config/mag240m-metadata.json.
# Requires setting PYTHONPATH=${GITROOT}/tools
import json
import logging
import sys
import os
import numpy as np
import argparse
from utils import setdir
from utils import array_readwriter
def _random_partition(metadata, num_parts):
num_nodes_per_type = [sum(_) for _ in metadata['num_nodes_per_chunk']]
ntypes = metadata['node_type']
for ntype, n in zip(ntypes, num_nodes_per_type):
logging.info('Generating partition for node type %s' % ntype)
parts = np.random.randint(0, num_parts, (n,))
array_readwriter.get_array_parser(name='csv').write(ntype + '.txt', parts)
def random_partition(metadata, num_parts, output_path):
"""
Randomly partition the graph described in metadata and generate partition ID mapping
in :attr:`output_path`.
A directory will be created at :attr:`output_path` containing the partition ID
mapping files named "<node-type>.txt" (e.g. "author.txt", "paper.txt" and
"institution.txt" for OGB-MAG240M). Each file contains one line per node representing
the partition ID the node belongs to.
"""
with setdir(output_path):
_random_partition(metadata, num_parts)
# Run with PYTHONPATH=${GIT_ROOT_DIR}/tools
# where ${GIT_ROOT_DIR} is the directory to the DGL git repository.
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'metadata', type=str, help='input metadata file of the chunked graph format')
parser.add_argument(
'output_path', type=str, help='output directory')
parser.add_argument(
'num_partitions', type=int, help='number of partitions')
logging.basicConfig(level='INFO')
args = parser.parse_args()
with open(args.metadata) as f:
metadata = json.load(f)
output_path = args.output_path
num_parts = args.num_partitions
random_partition(metadata, num_parts, output_path)
from .files import *
from . import array_readwriter
from .registry import register_array_parser, get_array_parser
from . import csv
from . import numpy_array
import logging
import pandas as pd
import pyarrow
import pyarrow.csv
from .registry import register_array_parser
@register_array_parser("csv")
class CSVArrayParser(object):
def __init__(self, delimiter=','):
self.delimiter = delimiter
def read(self, path):
logging.info('Reading from %s using CSV format with configuration %s' % (
path, self.__dict__))
# do not read the first line as header
read_options = pyarrow.csv.ReadOptions(autogenerate_column_names=True)
parse_options = pyarrow.csv.ParseOptions(delimiter=self.delimiter)
arr = pyarrow.csv.read_csv(path, read_options=read_options, parse_options=parse_options)
logging.info('Done reading from %s' % path)
return arr.to_pandas().to_numpy()
def write(self, path, arr):
logging.info('Writing to %s using CSV format with configuration %s' % (
path, self.__dict__))
write_options = pyarrow.csv.WriteOptions(include_header=False, delimiter=self.delimiter)
arr = pyarrow.Table.from_pandas(pd.DataFrame(arr))
pyarrow.csv.write_csv(arr, path, write_options=write_options)
logging.info('Done writing to %s' % path)
import logging
import numpy as np
from numpy.lib.format import open_memmap
from .registry import register_array_parser
@register_array_parser("numpy")
class NumpyArrayParser(object):
def __init__(self):
pass
def read(self, path):
logging.info('Reading from %s using numpy format' % path)
arr = np.load(path, mmap_mode='r')
logging.info('Done reading from %s' % path)
return arr
def write(self, path, arr):
logging.info('Writing to %s using numpy format' % path)
# np.save would load the entire memmap array up into CPU. So we manually open
# an empty npy file with memmap mode and manually flush it instead.
new_arr = open_memmap(path, mode='w+', dtype=arr.dtype, shape=arr.shape)
new_arr[:] = arr[:]
logging.info('Done writing to %s' % path)
REGISTRY = {}
def register_array_parser(name):
def _deco(cls):
REGISTRY[name] = cls
return cls
return _deco
def get_array_parser(**fmt_meta):
cls = REGISTRY[fmt_meta.pop('name')]
return cls(**fmt_meta)
import os
from contextlib import contextmanager
import logging
from numpy.lib.format import open_memmap
@contextmanager
def setdir(path):
try:
os.makedirs(path, exist_ok=True)
cwd = os.getcwd()
logging.info('Changing directory to %s' % path)
logging.info('Previously: %s' % cwd)
os.chdir(path)
yield
finally:
logging.info('Restoring directory to %s' % cwd)
os.chdir(cwd)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment