data_proc_pipeline.py 2.18 KB
Newer Older
1
2
3
import argparse
import numpy as np
import torch.multiprocessing as mp
4
5
6
import logging
import platform
import os
7
from data_shuffle import single_machine_run, multi_machine_run 
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

def log_params(params): 
    """ Print all the command line arguments for debugging purposes.

    Parameters:
    -----------
    params: argparse object
        Argument Parser structure listing all the pre-defined parameters
    """
    print('Input Dir: ', params.input_dir)
    print('Graph Name: ', params.graph_name)
    print('Schema File: ', params.schema)
    print('No. partitions: ', params.num_parts)
    print('Output Dir: ', params.output)
    print('WorldSize: ', params.world_size)
    print('Metis partitions: ', params.partitions_file)

if __name__ == "__main__":
    """ 
    Start of execution from this point. 
    Invoke the appropriate function to begin execution
    """
    #arguments which are already needed by the existing implementation of convert_partition.py
    parser = argparse.ArgumentParser(description='Construct graph partitions')
    parser.add_argument('--input-dir', required=True, type=str,
                     help='The directory path that contains the partition results.')
    parser.add_argument('--graph-name', required=True, type=str,
                     help='The graph name')
    parser.add_argument('--schema', required=True, type=str,
                     help='The schema of the graph')
    parser.add_argument('--num-parts', required=True, type=int,
                     help='The number of partitions')
    parser.add_argument('--output', required=True, type=str,
                    help='The output directory of the partitioned results')
42
43
    parser.add_argument('--partitions-dir', help='directory of the partition-ids for each node type',
                    default=None, type=str)
44
45
46
47
48
49

    #arguments needed for the distributed implementation
    parser.add_argument('--world-size', help='no. of processes to spawn',
                    default=1, type=int, required=True)
    params = parser.parse_args()

50
    #invoke the pipeline function
51
    logging.basicConfig(level='INFO', format=f"[{platform.node()} %(levelname)s %(asctime)s PID:%(process)d] %(message)s")
52
    multi_machine_run(params)