Unverified Commit ace76327 authored by kylasa's avatar kylasa Committed by GitHub
Browse files

Garbage Collection and memory snapshot code for debugging partitioning...


 Garbage Collection and memory snapshot code for debugging partitioning pipeline (target as master branch) (#4598)

* Squashed commit of the following:

commit e605a550b3783dd5f24eb39b6873a2e0e79be9c7
Author: kylasa <kylasa@gmail.com>
Date:   Thu Sep 15 14:45:39 2022 -0700

    Delete pyproject.toml

commit f2db9e700d817212b67b5227f6472d218f0c74f2
Author: kylasa <kylasa@gmail.com>
Date:   Thu Sep 15 14:44:40 2022 -0700

    Changes suggested by isort program to sort imports.

commit 5a6078beac6218a4f1fb378c169f04dda7396425
Author: kylasa <kylasa@gmail.com>
Date:   Thu Sep 15 14:39:50 2022 -0700

    addressing code review comments from the CI process.

commit c8e92decb7aebeb32c7467108e16f058491443ab
Author: kylasa <kylasa@gmail.com>
Date:   Wed Sep 14 18:23:59 2022 -0700

    Corrected a typo in the import statement

commit 14ddb0e9b553d5be3ed2c50d82dee671e84ad8c9
Author: kylasa <kylasa@gmail.com>
Date:   Tue Sep 13 18:47:34 2022 -0700

    Memory snapshot code for debugging memory footprint of the graph partitioning pipeline

Squashed commit done

* Addressing code review comments.

* Update utils.py

* dummy change to trigger CI tests
Co-authored-by: default avatarRhett Ying <85214957+Rhett-Ying@users.noreply.github.com>
parent 533afa85
......@@ -52,6 +52,7 @@ def submit_jobs(args) -> str:
argslist += "--num-parts {} ".format(num_parts)
argslist += "--output {} ".format(os.path.abspath(args.out_dir))
argslist += "--process-group-timeout {} ".format(args.process_group_timeout)
argslist += "--log-level {} ".format(args.log_level)
# (BarclayII) Is it safe to assume all the workers have the Python executable at the same path?
pipeline_cmd = os.path.join(INSTALL_DIR, PIPELINE_SCRIPT)
......@@ -71,6 +72,7 @@ def main():
parser.add_argument('--out-dir', type=str, help='Location of the output directory where the graph partitions will be created by this pipeline')
parser.add_argument('--ip-config', type=str, help='File location of IP configuration for server processes')
parser.add_argument('--master-port', type=int, default=12345, help='port used by gloo group to create randezvous point')
parser.add_argument('--log-level', type=str, default="info", help='To enable log level for debugging purposes. Available options: (Critical, Error, Warning, Info, Debug, Notset)')
parser.add_argument('--python-path', type=str, default=sys.executable, help='Path to the Python executable on all workers')
parser.add_argument('--ssh-port', type=int, default=22, help='SSH Port.')
parser.add_argument('--process-group-timeout', type=int, default=1800,
......@@ -81,6 +83,7 @@ def main():
assert os.path.isdir(args.in_dir)
assert os.path.isdir(args.partitions_dir)
assert os.path.isfile(args.ip_config)
assert isinstance(args.log_level, str)
assert isinstance(args.master_port, int)
tokens = sys.executable.split(os.sep)
......
import os
import argparse
import gc
import json
import logging
import os
import time
import argparse
import numpy as np
import dgl
import torch as th
import pyarrow
import numpy as np
import pandas as pd
import constants
import pyarrow
import torch as th
from pyarrow import csv
from utils import read_json, get_idranges
import constants
from utils import get_idranges, memory_snapshot, read_json
def create_dgl_object(graph_name, num_parts, \
schema, part_id, node_data, \
......@@ -95,6 +100,7 @@ def create_dgl_object(graph_name, num_parts, \
map between edge type(string) and edge_type_id(int)
"""
#create auxiliary data structures from the schema object
memory_snapshot("CreateDGLObj_Begin", part_id)
ntid_dict, global_nid_ranges = get_idranges(schema[constants.STR_NODE_TYPE],
schema[constants.STR_NUM_NODES_PER_CHUNK])
......@@ -117,8 +123,19 @@ def create_dgl_object(graph_name, num_parts, \
node_map_val = {ntype: [] for ntype in ntypes}
edge_map_val = {etype.split(":")[1]: [] for etype in etypes}
shuffle_global_nids, ntype_ids, global_type_nid = node_data[constants.SHUFFLE_GLOBAL_NID], \
node_data[constants.NTYPE_ID], node_data[constants.GLOBAL_TYPE_NID]
memory_snapshot("CreateDGLObj_AssignNodeData", part_id)
shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID]
node_data.pop(constants.SHUFFLE_GLOBAL_NID)
gc.collect()
ntype_ids = node_data[constants.NTYPE_ID]
node_data.pop(constants.NTYPE_ID)
gc.collect()
global_type_nid = node_data[constants.GLOBAL_TYPE_NID]
node_data.pop(constants.GLOBAL_TYPE_NID)
node_data = None
gc.collect()
global_homo_nid = ntype_offset_np[ntype_ids] + global_type_nid
assert np.all(shuffle_global_nids[1:] - shuffle_global_nids[:-1] == 1)
......@@ -133,11 +150,32 @@ def create_dgl_object(graph_name, num_parts, \
[int(type_nids[0]), int(type_nids[-1]) + 1])
#process edges
shuffle_global_src_id, shuffle_global_dst_id, global_src_id, global_dst_id, global_edge_id, etype_ids = \
edge_data[constants.SHUFFLE_GLOBAL_SRC_ID], edge_data[constants.SHUFFLE_GLOBAL_DST_ID], \
edge_data[constants.GLOBAL_SRC_ID], edge_data[constants.GLOBAL_DST_ID], \
edge_data[constants.GLOBAL_TYPE_EID], edge_data[constants.ETYPE_ID]
print('There are {} edges in partition {}'.format(len(shuffle_global_src_id), part_id))
memory_snapshot("CreateDGLObj_AssignEdgeData: ", part_id)
shuffle_global_src_id = edge_data[constants.SHUFFLE_GLOBAL_SRC_ID]
edge_data.pop(constants.SHUFFLE_GLOBAL_SRC_ID)
gc.collect()
shuffle_global_dst_id = edge_data[constants.SHUFFLE_GLOBAL_DST_ID]
edge_data.pop(constants.SHUFFLE_GLOBAL_DST_ID)
gc.collect()
global_src_id = edge_data[constants.GLOBAL_SRC_ID]
edge_data.pop(constants.GLOBAL_SRC_ID)
gc.collect()
global_dst_id = edge_data[constants.GLOBAL_DST_ID]
edge_data.pop(constants.GLOBAL_DST_ID)
gc.collect()
global_edge_id = edge_data[constants.GLOBAL_TYPE_EID]
edge_data.pop(constants.GLOBAL_TYPE_EID)
gc.collect()
etype_ids = edge_data[constants.ETYPE_ID]
edge_data.pop(constants.ETYPE_ID)
edge_data = None
gc.collect()
logging.info(f'There are {len(shuffle_global_src_id)} edges in partition {part_id}')
# It's not guaranteed that the edges are sorted based on edge type.
# Let's sort edges and all attributes on the edges.
......@@ -156,6 +194,7 @@ def create_dgl_object(graph_name, num_parts, \
edge_map_val[tokens[1]].append([edge_id_start,
edge_id_start + np.sum(etype_ids == etype_id)])
edge_id_start += np.sum(etype_ids == etype_id)
memory_snapshot("CreateDGLObj_UniqueNodeIds: ", part_id)
# get the edge list in some order and then reshuffle.
# Here the order of nodes is defined by the `np.unique` function
......
import argparse
import logging
import os
import platform
from data_shuffle import single_machine_run, multi_machine_run
import numpy as np
import torch.multiprocessing as mp
from data_shuffle import multi_machine_run, single_machine_run
def log_params(params):
""" Print all the command line arguments for debugging purposes.
......@@ -38,6 +43,10 @@ if __name__ == "__main__":
help='The output directory of the partitioned results')
parser.add_argument('--partitions-dir', help='directory of the partition-ids for each node type',
default=None, type=str)
parser.add_argument('--log-level', type=str, default="info",
help='To enable log level for debugging purposes. Available options: \
(Critical, Error, Warning, Info, Debug, Notset), default value \
is: Info')
#arguments needed for the distributed implementation
parser.add_argument('--world-size', help='no. of processes to spawn',
......@@ -48,5 +57,6 @@ if __name__ == "__main__":
params = parser.parse_args()
#invoke the pipeline function
logging.basicConfig(level='INFO', format=f"[{platform.node()} %(levelname)s %(asctime)s PID:%(process)d] %(message)s")
numeric_level = getattr(logging, params.log_level.upper(), None)
logging.basicConfig(level=numeric_level, format=f"[{platform.node()} %(levelname)s %(asctime)s PID:%(process)d] %(message)s")
multi_machine_run(params)
import gc
import logging
import math
import os
import sys
import constants
from datetime import timedelta
from timeit import default_timer as timer
import dgl
import numpy as np
import math
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import dgl
import logging
from timeit import default_timer as timer
from datetime import timedelta
from dataset_utils import get_dataset
from utils import read_ntype_partition_files, read_json, get_node_types, \
augment_edge_data, get_gnid_range_map, \
write_dgl_objects, write_metadata_json, get_ntype_featnames, \
get_idranges
from gloo_wrapper import allgather_sizes, gather_metadata_json,\
alltoallv_cpu
from globalids import assign_shuffle_global_nids_nodes, \
assign_shuffle_global_nids_edges, \
lookup_shuffle_global_nids_edges
import constants
from convert_partition import create_dgl_object, create_metadata_json
from dataset_utils import get_dataset
from dist_lookup import DistLookupService
from globalids import (assign_shuffle_global_nids_edges,
assign_shuffle_global_nids_nodes,
lookup_shuffle_global_nids_edges)
from gloo_wrapper import allgather_sizes, alltoallv_cpu, gather_metadata_json
from utils import (augment_edge_data, get_gnid_range_map, get_idranges,
get_node_types, get_ntype_featnames, memory_snapshot,
read_json, read_ntype_partition_files, write_dgl_objects,
write_metadata_json)
def gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map):
'''
......@@ -336,12 +338,16 @@ def exchange_graph_data(rank, world_size, node_features, node_feat_tids, edge_da
the input argument, edge_data dictionary, is updated with the edge data received from other processes
in the world. The edge data is received by each rank in the process of data shuffling.
"""
memory_snapshot("ShuffleNodeFeaturesBegin: ", rank)
rcvd_node_features, rcvd_global_nids = exchange_node_features(rank, world_size, node_feat_tids, \
ntypes_gnid_range_map, id_lookup, node_features)
memory_snapshot("ShuffleNodeFeaturesComplete: ", rank)
logging.info(f'[Rank: {rank}] Done with node features exchange.')
node_data = gen_node_data(rank, world_size, id_lookup, ntid_ntype_map, schema_map)
memory_snapshot("NodeDataGenerationComplete: ", rank)
edge_data = exchange_edge_data(rank, world_size, edge_data)
memory_snapshot("ShuffleEdgeDataComplete: ", rank)
return node_data, rcvd_node_features, rcvd_global_nids, edge_data
def read_dataset(rank, world_size, id_lookup, params, schema_map):
......@@ -522,7 +528,7 @@ def gen_dist_partitions(rank, world_size, params):
"""
global_start = timer()
logging.info(f'[Rank: {rank}] Starting distributed data processing pipeline...')
memory_snapshot("Pipeline Begin: ", rank)
#init processing
schema_map = read_json(os.path.join(params.input_dir, params.schema))
......@@ -543,6 +549,7 @@ def gen_dist_partitions(rank, world_size, params):
node_tids, node_features, node_feat_tids, edge_data, edge_features = \
read_dataset(rank, world_size, id_lookup, params, schema_map)
logging.info(f'[Rank: {rank}] Done augmenting file input data with auxilary columns')
memory_snapshot("DatasetReadComplete: ", rank)
#send out node and edge data --- and appropriate features.
#this function will also stitch the data recvd from other processes
......@@ -552,17 +559,23 @@ def gen_dist_partitions(rank, world_size, params):
exchange_graph_data(rank, world_size, node_features, node_feat_tids, \
edge_data, id_lookup, ntypes_ntypeid_map, ntypes_gnid_range_map, \
ntypeid_ntypes_map, schema_map)
gc.collect()
logging.info(f'[Rank: {rank}] Done with data shuffling...')
memory_snapshot("DataShuffleComplete: ", rank)
#sort node_data by ntype
idx = node_data[constants.NTYPE_ID].argsort()
for k, v in node_data.items():
node_data[k] = v[idx]
idx = None
gc.collect()
logging.info(f'[Rank: {rank}] Sorted node_data by node_type')
#resolve global_ids for nodes
assign_shuffle_global_nids_nodes(rank, world_size, node_data)
logging.info(f'[Rank: {rank}] Done assigning global-ids to nodes...')
memory_snapshot("ShuffleGlobalID_Nodes_Complete: ", rank)
#shuffle node feature according to the node order on each rank.
for ntype_name in ntypes:
......@@ -577,32 +590,42 @@ def gen_dist_partitions(rank, world_size, params):
shuffle_global_ids = node_data[constants.SHUFFLE_GLOBAL_NID][idx1]
feature_idx = shuffle_global_ids.argsort()
rcvd_node_features[ntype_name+'/'+featname] = rcvd_node_features[ntype_name+'/'+featname][feature_idx]
memory_snapshot("ReorderNodeFeaturesComplete: ", rank)
#sort edge_data by etype
sorted_idx = edge_data[constants.ETYPE_ID].argsort()
for k, v in edge_data.items():
edge_data[k] = v[sorted_idx]
sorted_idx = None
gc.collect()
shuffle_global_eid_start = assign_shuffle_global_nids_edges(rank, world_size, edge_data)
logging.info(f'[Rank: {rank}] Done assigning global_ids to edges ...')
memory_snapshot("ShuffleGlobalID_Edges_Complete: ", rank)
#determine global-ids for edge end-points
edge_data = lookup_shuffle_global_nids_edges(rank, world_size, edge_data, id_lookup, node_data)
logging.info(f'[Rank: {rank}] Done resolving orig_node_id for local node_ids...')
memory_snapshot("ShuffleGlobalID_Lookup_Complete: ", rank)
#create dgl objects here
start = timer()
num_nodes = 0
num_edges = shuffle_global_eid_start
node_count = len(node_data[constants.NTYPE_ID])
edge_count = len(edge_data[constants.ETYPE_ID])
graph_obj, ntypes_map_val, etypes_map_val, ntypes_ntypeid_map, etypes_map = create_dgl_object(\
params.graph_name, params.num_parts, \
schema_map, rank, node_data, edge_data, num_nodes, num_edges)
memory_snapshot("CreateDGLObjectsComplete: ", rank)
write_dgl_objects(graph_obj, rcvd_node_features, edge_features, params.output, rank)
memory_snapshot("DiskWriteDGLObjectsComplete: ", rank)
#get the meta-data
json_metadata = create_metadata_json(params.graph_name, len(node_data[constants.NTYPE_ID]), len(edge_data[constants.ETYPE_ID]), \
json_metadata = create_metadata_json(params.graph_name, node_count, edge_count, \
rank, world_size, ntypes_map_val, \
etypes_map_val, ntypes_ntypeid_map, etypes_map, params.output)
memory_snapshot("MetadataCreateComplete: ", rank)
if (rank == 0):
#get meta-data from all partitions and merge them on rank-0
......@@ -614,9 +637,11 @@ def gen_dist_partitions(rank, world_size, params):
gather_metadata_json(json_metadata, rank, world_size)
end = timer()
logging.info(f'[Rank: {rank}] Time to create dgl objects: {timedelta(seconds = end - start)}')
memory_snapshot("MetadataWriteComplete: ", rank)
global_end = timer()
logging.info(f'[Rank: {rank}] Total execution time of the program: {timedelta(seconds = global_end - global_start)}')
memory_snapshot("PipelineComplete: ", rank)
def single_machine_run(params):
""" Main function for distributed implementation on a single machine
......
import os
import numpy as np
import constants
import torch
import logging
import os
import numpy as np
import pyarrow
import torch
from pyarrow import csv
import constants
from utils import get_idranges
def get_dataset(input_dir, graph_name, rank, world_size, schema_map):
"""
Function to read the multiple file formatted dataset.
......
import numpy as np
import logging
import os
import numpy as np
import pyarrow
import torch
from pyarrow import csv
from gloo_wrapper import alltoallv_cpu
class DistLookupService:
'''
This is an implementation of a Distributed Lookup Service to provide the following
......@@ -55,7 +58,8 @@ class DistLookupService:
# Iterate over the node types and extract the partition id mappings.
for ntype in ntype_names:
print('[Rank: ', rank, '] Reading file: ', os.path.join(input_dir, '{}.txt'.format(ntype)))
fname = f'{ntype}.txt'
logging.info(f'[Rank: {rank}] Reading file: {os.path.join(input_dir, fname)}')
df = csv.read_csv(os.path.join(input_dir, '{}.txt'.format(ntype)), \
read_options=pyarrow.csv.ReadOptions(autogenerate_column_names=True), \
parse_options=pyarrow.csv.ParseOptions(delimiter=' '))
......
import itertools
import operator
import numpy as np
import torch
import operator
import itertools
import constants
from dist_lookup import DistLookupService
from gloo_wrapper import allgather_sizes, alltoallv_cpu
from utils import memory_snapshot
from dist_lookup import DistLookupService
def get_shuffle_global_nids(rank, world_size, global_nids_ranks, node_data):
"""
......@@ -83,13 +86,14 @@ def lookup_shuffle_global_nids_edges(rank, world_size, edge_data, id_lookup, nod
dictionary where keys are column names and values are numpy arrays representing all the
edges present in the current graph partition
'''
memory_snapshot("GlobalToShuffleIDMapBegin: ", rank)
node_list = np.concatenate([edge_data[constants.GLOBAL_SRC_ID], edge_data[constants.GLOBAL_DST_ID]])
shuffle_ids = id_lookup.get_shuffle_nids(node_list,
node_data[constants.GLOBAL_NID],
node_data[constants.SHUFFLE_GLOBAL_NID])
edge_data[constants.SHUFFLE_GLOBAL_SRC_ID], edge_data[constants.SHUFFLE_GLOBAL_DST_ID] = np.split(shuffle_ids, 2)
memory_snapshot("GlobalToShuffleIDMap_AfterLookupServiceCalls: ", rank)
return edge_data
def assign_shuffle_global_nids_nodes(rank, world_size, node_data):
......
import os
import torch
import numpy as np
import json
import dgl
import constants
import logging
import os
import dgl
import numpy as np
import psutil
import pyarrow
import torch
from pyarrow import csv
import constants
def read_ntype_partition_files(schema_map, input_dir):
"""
Utility method to read the partition id mapping for each node.
......@@ -420,3 +424,29 @@ def get_idranges(names, counts):
return tid_dict, gid_dict
def memory_snapshot(tag, rank):
"""
Utility function to take a snapshot of the usage of system resources
at a given point of time.
Parameters:
-----------
tag : string
string provided by the user for bookmarking purposes
rank : integer
process id of the participating process
"""
GB = 1024 * 1024 * 1024
MB = 1024 * 1024
KB = 1024
peak = dgl.partition.get_peak_mem()*KB
mem = psutil.virtual_memory()
avail = mem.available / MB
used = mem.used / MB
total = mem.total / MB
mem_string = f'{total:.0f} (MB) total, {peak:.0f} (MB) peak, {used:.0f} (MB) used, {avail:.0f} (MB) avail'
logging.debug(f'[Rank: {rank} MEMORY_SNAPSHOT] {mem_string} - {tag}')
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment