data_proc_pipeline.py 2.97 KB
Newer Older
1
import argparse
2
import logging
3
import os
4
import platform
5
6
7
8
9

import numpy as np
import torch.multiprocessing as mp

from data_shuffle import multi_machine_run, single_machine_run
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

def log_params(params): 
    """ Print all the command line arguments for debugging purposes.

    Parameters:
    -----------
    params: argparse object
        Argument Parser structure listing all the pre-defined parameters
    """
    print('Input Dir: ', params.input_dir)
    print('Graph Name: ', params.graph_name)
    print('Schema File: ', params.schema)
    print('No. partitions: ', params.num_parts)
    print('Output Dir: ', params.output)
    print('WorldSize: ', params.world_size)
    print('Metis partitions: ', params.partitions_file)

if __name__ == "__main__":
    """ 
    Start of execution from this point. 
    Invoke the appropriate function to begin execution
    """
    #arguments which are already needed by the existing implementation of convert_partition.py
    parser = argparse.ArgumentParser(description='Construct graph partitions')
    parser.add_argument('--input-dir', required=True, type=str,
                     help='The directory path that contains the partition results.')
    parser.add_argument('--graph-name', required=True, type=str,
                     help='The graph name')
    parser.add_argument('--schema', required=True, type=str,
                     help='The schema of the graph')
    parser.add_argument('--num-parts', required=True, type=int,
                     help='The number of partitions')
    parser.add_argument('--output', required=True, type=str,
                    help='The output directory of the partitioned results')
44
45
    parser.add_argument('--partitions-dir', help='directory of the partition-ids for each node type',
                    default=None, type=str)
46
47
48
49
    parser.add_argument('--log-level', type=str, default="info", 
		    help='To enable log level for debugging purposes. Available options: \
			  (Critical, Error, Warning, Info, Debug, Notset), default value \
			  is: Info')
50
51
52
53

    #arguments needed for the distributed implementation
    parser.add_argument('--world-size', help='no. of processes to spawn',
                    default=1, type=int, required=True)
54
55
56
    parser.add_argument('--process-group-timeout', required=True, type=int,
                        help='timeout[seconds] for operations executed against the process group '
                             '(see torch.distributed.init_process_group)')
57
58
59
60
    parser.add_argument('--save-orig-nids', action='store_true',
                        help='Save original node IDs into files')
    parser.add_argument('--save-orig-eids', action='store_true',
                        help='Save original edge IDs into files')
61
62
    params = parser.parse_args()

63
    #invoke the pipeline function
64
65
    numeric_level = getattr(logging, params.log_level.upper(), None)
    logging.basicConfig(level=numeric_level, format=f"[{platform.node()} %(levelname)s %(asctime)s PID:%(process)d] %(message)s")
66
    multi_machine_run(params)