import argparse from collections import defaultdict import networkx as nx import numpy as np from gensim.models.keyedvectors import Vocab from six import iteritems from sklearn.metrics import auc, f1_score, precision_recall_curve, roc_auc_score import torch import time import multiprocessing from functools import partial, reduce, wraps def parse_args(): parser = argparse.ArgumentParser() parser.add_argument( "--input", type=str, default="data/amazon", help="Input dataset path" ) parser.add_argument( "--features", type=str, default=None, help="Input node features" ) parser.add_argument( "--epoch", type=int, default=100, help="Number of epoch. Default is 100." ) parser.add_argument( "--batch-size", type=int, default=64, help="Number of batch_size. Default is 64.", ) parser.add_argument( "--eval-type", type=str, default="all", help="The edge type(s) for evaluation." ) parser.add_argument( "--schema", type=str, default=None, help="The metapath schema (e.g., U-I-U,I-U-I).", ) parser.add_argument( "--dimensions", type=int, default=200, help="Number of dimensions. Default is 200.", ) parser.add_argument( "--edge-dim", type=int, default=10, help="Number of edge embedding dimensions. Default is 10.", ) parser.add_argument( "--att-dim", type=int, default=20, help="Number of attention dimensions. Default is 20.", ) parser.add_argument( "--walk-length", type=int, default=10, help="Length of walk per source. Default is 10.", ) parser.add_argument( "--num-walks", type=int, default=20, help="Number of walks per source. Default is 20.", ) parser.add_argument( "--window-size", type=int, default=5, help="Context size for optimization. Default is 5.", ) parser.add_argument( "--negative-samples", type=int, default=5, help="Negative samples for optimization. Default is 5.", ) parser.add_argument( "--neighbor-samples", type=int, default=10, help="Neighbor samples for aggregation. Default is 10.", ) parser.add_argument( "--patience", type=int, default=5, help="Early stopping patience. Default is 5." ) parser.add_argument( "--gpu", type=str, default=None, help="Comma separated list of GPU device IDs." ) parser.add_argument( "--workers", type=int, default=4, help="Number of workers.", ) return parser.parse_args() # for each line, the data is [edge_type, node, node] def load_training_data(f_name): print("We are loading data from:", f_name) edge_data_by_type = dict() all_nodes = list() with open(f_name, "r") as f: for line in f: words = line[:-1].split(" ") # line[-1] == '\n' if words[0] not in edge_data_by_type: edge_data_by_type[words[0]] = list() x, y = words[1], words[2] edge_data_by_type[words[0]].append((x, y)) all_nodes.append(x) all_nodes.append(y) all_nodes = list(set(all_nodes)) print("Total training nodes: " + str(len(all_nodes))) return edge_data_by_type # for each line, the data is [edge_type, node, node, true_or_false] def load_testing_data(f_name): print("We are loading data from:", f_name) true_edge_data_by_type = dict() false_edge_data_by_type = dict() all_edges = list() all_nodes = list() with open(f_name, "r") as f: for line in f: words = line[:-1].split(" ") x, y = words[1], words[2] if int(words[3]) == 1: if words[0] not in true_edge_data_by_type: true_edge_data_by_type[words[0]] = list() true_edge_data_by_type[words[0]].append((x, y)) else: if words[0] not in false_edge_data_by_type: false_edge_data_by_type[words[0]] = list() false_edge_data_by_type[words[0]].append((x, y)) all_nodes.append(x) all_nodes.append(y) all_nodes = list(set(all_nodes)) return true_edge_data_by_type, false_edge_data_by_type def load_node_type(f_name): print("We are loading node type from:", f_name) node_type = {} with open(f_name, "r") as f: for line in f: items = line.strip().split() node_type[items[0]] = items[1] return node_type def generate_pairs_parallel(walks, skip_window=None, layer_id=None): pairs = [] for walk in walks: walk = walk.tolist() for i in range(len(walk)): for j in range(1, skip_window + 1): if i - j >= 0: pairs.append((walk[i], walk[i - j], layer_id)) if i + j < len(walk): pairs.append((walk[i], walk[i + j], layer_id)) return pairs def generate_pairs(all_walks, window_size, num_workers): # for each node, choose the first neighbor and second neighbor of it to form pairs # Get all worker processes start_time = time.time() print("We are generating pairs with {} cores.".format(num_workers)) # Start all worker processes pool = multiprocessing.Pool(processes=num_workers) pairs = [] skip_window = window_size // 2 for layer_id, walks in enumerate(all_walks): block_num = len(walks) // num_workers if block_num > 0: walks_list = [ walks[i * block_num : min((i + 1) * block_num, len(walks))] for i in range(num_workers) ] else: walks_list = [walks] tmp_result = pool.map( partial( generate_pairs_parallel, skip_window=skip_window, layer_id=layer_id ), walks_list, ) pairs += reduce(lambda x, y: x + y, tmp_result) pool.close() end_time = time.time() print("Generate pairs end, use {}s.".format(end_time - start_time)) return np.array([list(pair) for pair in set(pairs)]) def generate_vocab(network_data): nodes, index2word = [], [] for edge_type in network_data: node1, node2 = zip(*network_data[edge_type]) index2word = index2word + list(node1) + list(node2) index2word = list(set(index2word)) vocab = {} i = 0 for word in index2word: vocab[word] = i i = i + 1 for edge_type in network_data: node1, node2 = zip(*network_data[edge_type]) tmp_nodes = list(set(list(node1) + list(node2))) tmp_nodes = [vocab[word] for word in tmp_nodes] nodes.append(tmp_nodes) return index2word, vocab, nodes def get_score(local_model, edge): node1, node2 = str(edge[0]), str(edge[1]) try: vector1 = local_model[node1] vector2 = local_model[node2] return np.dot(vector1, vector2) / ( np.linalg.norm(vector1) * np.linalg.norm(vector2) ) except Exception as e: pass def evaluate(model, true_edges, false_edges, num_workers): true_list = list() prediction_list = list() true_num = 0 # Start all worker processes pool = multiprocessing.Pool(processes=num_workers) tmp_true_score_list = pool.map(partial(get_score, model), true_edges) tmp_false_score_list = pool.map(partial(get_score, model), false_edges) pool.close() prediction_list += [ tmp_score for tmp_score in tmp_true_score_list if tmp_score is not None ] true_num = len(prediction_list) true_list += [1] * true_num prediction_list += [ tmp_score for tmp_score in tmp_false_score_list if tmp_score is not None ] true_list += [0] * (len(prediction_list) - true_num) sorted_pred = prediction_list[:] sorted_pred.sort() threshold = sorted_pred[-true_num] y_pred = np.zeros(len(prediction_list), dtype=np.int32) for i in range(len(prediction_list)): if prediction_list[i] >= threshold: y_pred[i] = 1 y_true = np.array(true_list) y_scores = np.array(prediction_list) ps, rs, _ = precision_recall_curve(y_true, y_scores) return roc_auc_score(y_true, y_scores), f1_score(y_true, y_pred), auc(rs, ps)