Unverified Commit d628f5a2 authored by esang's avatar esang Committed by GitHub
Browse files

[Example]Add sparse embedding in GATNE-T example to support big graph (#2234)



* add sparse and mutil gpu support

* update multi gpu

* update readme

* remove multi gpu

* add multiple gpus support
Co-authored-by: default avatarQuan (Andy) Gan <coin2028@hotmail.com>
Co-authored-by: default avatarZihao Ye <expye@outlook.com>
parent 7d19b33c
...@@ -37,16 +37,29 @@ python src/main.py --input data/example ...@@ -37,16 +37,29 @@ python src/main.py --input data/example
To run on "twitter" dataset, use To run on "twitter" dataset, use
```bash ```bash
python src/main.py --input data/twitter --eval-type 1 python src/main.py --input data/twitter --eval-type 1 --gpu 0
``` ```
For a big dataset, use sparse to avoid cuda out of memory in backward
```bash
python src/main_sparse.py --input data/example --gpu 0
```
If you have multiple GPUs, you can also accelerate training with [`DistributedDataParallel`](https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html)
```bash
python src/main_sparse_multi_gpus.py --input data/example --gpu 0,1
```
**It is worth noting that DistributedDataParallel will cause more cuda memory consumption and a certain loss of preformance.**
Results Results
------- -------
All the results match the [official code](https://github.com/THUDM/GATNE/blob/master/src/main_pytorch.py) with the same hyper parameter values, including twiiter dataset (auc, pr, f1 is 76.29, 76.17, 69.34, respectively). All the results match the [official code](https://github.com/THUDM/GATNE/blob/master/src/main_pytorch.py) with the same hyper parameter values, including twiiter dataset (auc, pr, f1 is 76.29, 76.17, 69.34, respectively).
| | auc | pr | f1 | | | auc | pr | f1 |
| ------ | ---- | --- | ----- | | ------- | ----- | ----- | ----- |
| amazon | 96.88 | 96.31 | 92.12 | | amazon | 96.88 | 96.31 | 92.12 |
| youtube | 82.29 | 80.35 | 74.63 | | youtube | 82.29 | 80.35 | 74.63 |
| twitter | 72.40 | 74.40 | 65.89 | | twitter | 72.40 | 74.40 | 65.89 |
| example | 94.65 | 94.57 | 89.99 | | example | 94.65 | 94.57 | 89.99 |
python src/main.py --input data/example python src/main.py --input data/example --gpu 0
\ No newline at end of file
python src/main_sparse.py --input data/example --gpu 0
python src/main_sparse_multi_gpus.py --input data/example
...@@ -8,7 +8,7 @@ import numpy as np ...@@ -8,7 +8,7 @@ import numpy as np
import torch import torch
import torch.nn as nn import torch.nn as nn
import torch.nn.functional as F import torch.nn.functional as F
import tqdm from tqdm.auto import tqdm
from numpy import random from numpy import random
from torch.nn.parameter import Parameter from torch.nn.parameter import Parameter
import dgl import dgl
...@@ -18,8 +18,8 @@ from utils import * ...@@ -18,8 +18,8 @@ from utils import *
def get_graph(network_data, vocab): def get_graph(network_data, vocab):
''' Build graph, treat all nodes as the same type """ Build graph, treat all nodes as the same type
Parameters Parameters
---------- ----------
network_data: a dict network_data: a dict
...@@ -30,10 +30,10 @@ def get_graph(network_data, vocab): ...@@ -30,10 +30,10 @@ def get_graph(network_data, vocab):
------ ------
DGLHeteroGraph DGLHeteroGraph
a heterogenous graph, with one node type and different edge types a heterogenous graph, with one node type and different edge types
''' """
graphs = [] graphs = []
node_type = '_N' # '_N' can be replaced by an arbitrary name node_type = "_N" # '_N' can be replaced by an arbitrary name
data_dict = dict() data_dict = dict()
num_nodes_dict = {node_type: len(vocab)} num_nodes_dict = {node_type: len(vocab)}
...@@ -46,7 +46,7 @@ def get_graph(network_data, vocab): ...@@ -46,7 +46,7 @@ def get_graph(network_data, vocab):
dst.extend([vocab[edge[1]], vocab[edge[0]]]) dst.extend([vocab[edge[1]], vocab[edge[0]]])
data_dict[(node_type, edge_type, node_type)] = (src, dst) data_dict[(node_type, edge_type, node_type)] = (src, dst)
graph = dgl.heterograph(data_dict, num_nodes_dict) graph = dgl.heterograph(data_dict, num_nodes_dict)
return graph return graph
...@@ -54,7 +54,7 @@ class NeighborSampler(object): ...@@ -54,7 +54,7 @@ class NeighborSampler(object):
def __init__(self, g, num_fanouts): def __init__(self, g, num_fanouts):
self.g = g self.g = g
self.num_fanouts = num_fanouts self.num_fanouts = num_fanouts
def sample(self, pairs): def sample(self, pairs):
heads, tails, types = zip(*pairs) heads, tails, types = zip(*pairs)
seeds, head_invmap = torch.unique(torch.LongTensor(heads), return_inverse=True) seeds, head_invmap = torch.unique(torch.LongTensor(heads), return_inverse=True)
...@@ -64,11 +64,24 @@ class NeighborSampler(object): ...@@ -64,11 +64,24 @@ class NeighborSampler(object):
sampled_block = dgl.to_block(sampled_graph, seeds) sampled_block = dgl.to_block(sampled_graph, seeds)
seeds = sampled_block.srcdata[dgl.NID] seeds = sampled_block.srcdata[dgl.NID]
blocks.insert(0, sampled_block) blocks.insert(0, sampled_block)
return blocks, torch.LongTensor(head_invmap), torch.LongTensor(tails), torch.LongTensor(types) return (
blocks,
torch.LongTensor(head_invmap),
torch.LongTensor(tails),
torch.LongTensor(types),
)
class DGLGATNE(nn.Module): class DGLGATNE(nn.Module):
def __init__(self, num_nodes, embedding_size, embedding_u_size, edge_types, edge_type_count, dim_a): def __init__(
self,
num_nodes,
embedding_size,
embedding_u_size,
edge_types,
edge_type_count,
dim_a,
):
super(DGLGATNE, self).__init__() super(DGLGATNE, self).__init__()
self.num_nodes = num_nodes self.num_nodes = num_nodes
self.embedding_size = embedding_size self.embedding_size = embedding_size
...@@ -78,9 +91,15 @@ class DGLGATNE(nn.Module): ...@@ -78,9 +91,15 @@ class DGLGATNE(nn.Module):
self.dim_a = dim_a self.dim_a = dim_a
self.node_embeddings = Parameter(torch.FloatTensor(num_nodes, embedding_size)) self.node_embeddings = Parameter(torch.FloatTensor(num_nodes, embedding_size))
self.node_type_embeddings = Parameter(torch.FloatTensor(num_nodes, edge_type_count, embedding_u_size)) self.node_type_embeddings = Parameter(
self.trans_weights = Parameter(torch.FloatTensor(edge_type_count, embedding_u_size, embedding_size)) torch.FloatTensor(num_nodes, edge_type_count, embedding_u_size)
self.trans_weights_s1 = Parameter(torch.FloatTensor(edge_type_count, embedding_u_size, dim_a)) )
self.trans_weights = Parameter(
torch.FloatTensor(edge_type_count, embedding_u_size, embedding_size)
)
self.trans_weights_s1 = Parameter(
torch.FloatTensor(edge_type_count, embedding_u_size, dim_a)
)
self.trans_weights_s2 = Parameter(torch.FloatTensor(edge_type_count, dim_a, 1)) self.trans_weights_s2 = Parameter(torch.FloatTensor(edge_type_count, dim_a, 1))
self.reset_parameters() self.reset_parameters()
...@@ -105,41 +124,65 @@ class DGLGATNE(nn.Module): ...@@ -105,41 +124,65 @@ class DGLGATNE(nn.Module):
edge_type = self.edge_types[i] edge_type = self.edge_types[i]
block.srcdata[edge_type] = self.node_type_embeddings[input_nodes, i] block.srcdata[edge_type] = self.node_type_embeddings[input_nodes, i]
block.dstdata[edge_type] = self.node_type_embeddings[output_nodes, i] block.dstdata[edge_type] = self.node_type_embeddings[output_nodes, i]
block.update_all(fn.copy_u(edge_type, 'm'), fn.sum('m', edge_type), etype=edge_type) block.update_all(
fn.copy_u(edge_type, "m"), fn.sum("m", edge_type), etype=edge_type
)
node_type_embed.append(block.dstdata[edge_type]) node_type_embed.append(block.dstdata[edge_type])
node_type_embed = torch.stack(node_type_embed, 1) node_type_embed = torch.stack(node_type_embed, 1)
tmp_node_type_embed = node_type_embed.unsqueeze(2).view(-1, 1, self.embedding_u_size) tmp_node_type_embed = node_type_embed.unsqueeze(2).view(
trans_w = self.trans_weights.unsqueeze(0).repeat(batch_size, 1, 1, 1).view( -1, 1, self.embedding_u_size
-1, self.embedding_u_size, self.embedding_size )
trans_w = (
self.trans_weights.unsqueeze(0)
.repeat(batch_size, 1, 1, 1)
.view(-1, self.embedding_u_size, self.embedding_size)
)
trans_w_s1 = (
self.trans_weights_s1.unsqueeze(0)
.repeat(batch_size, 1, 1, 1)
.view(-1, self.embedding_u_size, self.dim_a)
)
trans_w_s2 = (
self.trans_weights_s2.unsqueeze(0)
.repeat(batch_size, 1, 1, 1)
.view(-1, self.dim_a, 1)
)
attention = (
F.softmax(
torch.matmul(
torch.tanh(torch.matmul(tmp_node_type_embed, trans_w_s1)),
trans_w_s2,
)
.squeeze(2)
.view(-1, self.edge_type_count),
dim=1,
)
.unsqueeze(1)
.repeat(1, self.edge_type_count, 1)
)
node_type_embed = torch.matmul(attention, node_type_embed).view(
-1, 1, self.embedding_u_size
) )
trans_w_s1 = self.trans_weights_s1.unsqueeze(0).repeat(batch_size, 1, 1, 1).view( node_embed = node_embed[output_nodes].unsqueeze(1).repeat(
-1, self.embedding_u_size, self.dim_a 1, self.edge_type_count, 1
) + torch.matmul(node_type_embed, trans_w).view(
-1, self.edge_type_count, self.embedding_size
) )
trans_w_s2 = self.trans_weights_s2.unsqueeze(0).repeat(batch_size, 1, 1, 1).view(-1, self.dim_a, 1)
attention = F.softmax(
torch.matmul(
torch.tanh(torch.matmul(tmp_node_type_embed, trans_w_s1)), trans_w_s2
).squeeze(2).view(-1, self.edge_type_count),
dim=1,
).unsqueeze(1).repeat(1, self.edge_type_count, 1)
node_type_embed = torch.matmul(attention, node_type_embed).view(-1, 1, self.embedding_u_size)
node_embed = node_embed[output_nodes].unsqueeze(1).repeat(1, self.edge_type_count, 1) + \
torch.matmul(node_type_embed, trans_w).view(-1, self.edge_type_count, self.embedding_size)
last_node_embed = F.normalize(node_embed, dim=2) last_node_embed = F.normalize(node_embed, dim=2)
return last_node_embed # [batch_size, edge_type_count, embedding_size] return last_node_embed # [batch_size, edge_type_count, embedding_size]
class NSLoss(nn.Module): class NSLoss(nn.Module):
def __init__(self, num_nodes, num_sampled, embedding_size): def __init__(self, num_nodes, num_sampled, embedding_size):
super(NSLoss, self).__init__() super(NSLoss, self).__init__()
self.num_nodes = num_nodes self.num_nodes = num_nodes
self.num_sampled = num_sampled self.num_sampled = num_sampled
self.embedding_size = embedding_size self.embedding_size = embedding_size
self.weights = Parameter(torch.FloatTensor(num_nodes, embedding_size)) self.weights = Parameter(torch.FloatTensor(num_nodes, embedding_size))
# [ (log(i+2) - log(i+1)) / log(num_nodes + 1)] # [ (log(i+2) - log(i+1)) / log(num_nodes + 1)]
self.sample_weights = F.normalize( self.sample_weights = F.normalize(
torch.Tensor( torch.Tensor(
...@@ -176,39 +219,53 @@ class NSLoss(nn.Module): ...@@ -176,39 +219,53 @@ class NSLoss(nn.Module):
def train_model(network_data): def train_model(network_data):
index2word, vocab, type_nodes = generate_vocab(network_data) index2word, vocab, type_nodes = generate_vocab(network_data)
edge_types = list(network_data.keys()) edge_types = list(network_data.keys())
num_nodes = len(index2word) num_nodes = len(index2word)
edge_type_count = len(edge_types) edge_type_count = len(edge_types)
epochs = args.epoch epochs = args.epoch
batch_size = args.batch_size batch_size = args.batch_size
embedding_size = args.dimensions embedding_size = args.dimensions
embedding_u_size = args.edge_dim embedding_u_size = args.edge_dim
u_num = edge_type_count u_num = edge_type_count
num_sampled = args.negative_samples num_sampled = args.negative_samples
dim_a = args.att_dim dim_a = args.att_dim
att_head = 1 att_head = 1
neighbor_samples = args.neighbor_samples neighbor_samples = args.neighbor_samples
num_workers = args.workers
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") device = torch.device(
"cuda" if args.gpu is not None and torch.cuda.is_available() else "cpu"
)
g = get_graph(network_data, vocab) g = get_graph(network_data, vocab)
all_walks = [] all_walks = []
for i in range(edge_type_count): for i in range(edge_type_count):
nodes = torch.LongTensor(type_nodes[i] * args.num_walks) nodes = torch.LongTensor(type_nodes[i] * args.num_walks)
traces, types = dgl.sampling.random_walk(g, nodes, metapath=[edge_types[i]]*(neighbor_samples-1)) traces, types = dgl.sampling.random_walk(
g, nodes, metapath=[edge_types[i]] * (neighbor_samples - 1)
)
all_walks.append(traces) all_walks.append(traces)
train_pairs = generate_pairs(all_walks, args.window_size) train_pairs = generate_pairs(all_walks, args.window_size, num_workers)
neighbor_sampler = NeighborSampler(g, [neighbor_samples]) neighbor_sampler = NeighborSampler(g, [neighbor_samples])
train_dataloader = torch.utils.data.DataLoader( train_dataloader = torch.utils.data.DataLoader(
train_pairs, batch_size=batch_size, collate_fn=neighbor_sampler.sample, shuffle=True train_pairs,
batch_size=batch_size,
collate_fn=neighbor_sampler.sample,
shuffle=True,
num_workers=num_workers,
pin_memory=True,
)
model = DGLGATNE(
num_nodes, embedding_size, embedding_u_size, edge_types, edge_type_count, dim_a
) )
model = DGLGATNE(num_nodes, embedding_size, embedding_u_size, edge_types, edge_type_count, dim_a)
nsloss = NSLoss(num_nodes, num_sampled, embedding_size) nsloss = NSLoss(num_nodes, num_sampled, embedding_size)
model.to(device) model.to(device)
nsloss.to(device) nsloss.to(device)
optimizer = torch.optim.Adam([{"params": model.parameters()}, {"params": nsloss.parameters()}], lr=1e-4) optimizer = torch.optim.Adam(
[{"params": model.parameters()}, {"params": nsloss.parameters()}], lr=1e-3
)
best_score = 0 best_score = 0
patience = 0 patience = 0
...@@ -216,11 +273,10 @@ def train_model(network_data): ...@@ -216,11 +273,10 @@ def train_model(network_data):
model.train() model.train()
random.shuffle(train_pairs) random.shuffle(train_pairs)
data_iter = tqdm.tqdm( data_iter = tqdm(
train_dataloader, train_dataloader,
desc="epoch %d" % (epoch), desc="epoch %d" % (epoch),
total=(len(train_pairs) + (batch_size - 1)) // batch_size, total=(len(train_pairs) + (batch_size - 1)) // batch_size,
bar_format="{l_bar}{r_bar}",
) )
avg_loss = 0.0 avg_loss = 0.0
...@@ -229,33 +285,54 @@ def train_model(network_data): ...@@ -229,33 +285,54 @@ def train_model(network_data):
# embs: [batch_size, edge_type_count, embedding_size] # embs: [batch_size, edge_type_count, embedding_size]
block_types = block_types.to(device) block_types = block_types.to(device)
embs = model(block[0].to(device))[head_invmap] embs = model(block[0].to(device))[head_invmap]
embs = embs.gather(1, block_types.view(-1, 1, 1).expand(embs.shape[0], 1, embs.shape[2]))[:, 0] embs = embs.gather(
loss = nsloss(block[0].dstdata[dgl.NID][head_invmap].to(device), embs, tails.to(device)) 1, block_types.view(-1, 1, 1).expand(embs.shape[0], 1, embs.shape[2])
)[:, 0]
loss = nsloss(
block[0].dstdata[dgl.NID][head_invmap].to(device),
embs,
tails.to(device),
)
loss.backward() loss.backward()
optimizer.step() optimizer.step()
avg_loss += loss.item() avg_loss += loss.item()
if i % 5000 == 0: post_fix = {
post_fix = { "epoch": epoch,
"epoch": epoch, "iter": i,
"iter": i, "avg_loss": avg_loss / (i + 1),
"avg_loss": avg_loss / (i + 1), "loss": loss.item(),
"loss": loss.item(), }
} data_iter.set_postfix(post_fix)
data_iter.write(str(post_fix))
model.eval() model.eval()
# {'1': {}, '2': {}} # {'1': {}, '2': {}}
final_model = dict(zip(edge_types, [dict() for _ in range(edge_type_count)])) final_model = dict(zip(edge_types, [dict() for _ in range(edge_type_count)]))
for i in range(num_nodes): for i in range(num_nodes):
train_inputs = torch.tensor([i for _ in range(edge_type_count)]).unsqueeze(1).to(device) # [i, i] train_inputs = (
train_types = torch.tensor(list(range(edge_type_count))).unsqueeze(1).to(device) # [0, 1] torch.tensor([i for _ in range(edge_type_count)])
pairs = torch.cat((train_inputs, train_inputs, train_types), dim=1) # (2, 3) .unsqueeze(1)
train_blocks, train_invmap, fake_tails, train_types = neighbor_sampler.sample(pairs) .to(device)
) # [i, i]
train_types = (
torch.tensor(list(range(edge_type_count))).unsqueeze(1).to(device)
) # [0, 1]
pairs = torch.cat(
(train_inputs, train_inputs, train_types), dim=1
) # (2, 3)
(
train_blocks,
train_invmap,
fake_tails,
train_types,
) = neighbor_sampler.sample(pairs)
node_emb = model(train_blocks[0].to(device))[train_invmap] node_emb = model(train_blocks[0].to(device))[train_invmap]
node_emb = node_emb.gather(1, train_types.to(device).view(-1, 1, 1 node_emb = node_emb.gather(
).expand(node_emb.shape[0], 1, node_emb.shape[2]) 1,
train_types.to(device)
.view(-1, 1, 1)
.expand(node_emb.shape[0], 1, node_emb.shape[2]),
)[:, 0] )[:, 0]
for j in range(edge_type_count): for j in range(edge_type_count):
...@@ -271,6 +348,7 @@ def train_model(network_data): ...@@ -271,6 +348,7 @@ def train_model(network_data):
final_model[edge_types[i]], final_model[edge_types[i]],
valid_true_data_by_edge[edge_types[i]], valid_true_data_by_edge[edge_types[i]],
valid_false_data_by_edge[edge_types[i]], valid_false_data_by_edge[edge_types[i]],
num_workers,
) )
valid_aucs.append(tmp_auc) valid_aucs.append(tmp_auc)
valid_f1s.append(tmp_f1) valid_f1s.append(tmp_f1)
...@@ -280,6 +358,7 @@ def train_model(network_data): ...@@ -280,6 +358,7 @@ def train_model(network_data):
final_model[edge_types[i]], final_model[edge_types[i]],
testing_true_data_by_edge[edge_types[i]], testing_true_data_by_edge[edge_types[i]],
testing_false_data_by_edge[edge_types[i]], testing_false_data_by_edge[edge_types[i]],
num_workers,
) )
test_aucs.append(tmp_auc) test_aucs.append(tmp_auc)
test_f1s.append(tmp_f1) test_f1s.append(tmp_f1)
...@@ -323,4 +402,4 @@ if __name__ == "__main__": ...@@ -323,4 +402,4 @@ if __name__ == "__main__":
print("Overall ROC-AUC:", average_auc) print("Overall ROC-AUC:", average_auc)
print("Overall PR-AUC", average_pr) print("Overall PR-AUC", average_pr)
print("Overall F1:", average_f1) print("Overall F1:", average_f1)
print("Training Time", end-start) print("Training Time", end - start)
from collections import defaultdict
import math
import os
import sys
import time
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import tqdm
from numpy import random
from torch.nn.parameter import Parameter
import dgl
import dgl.function as fn
from utils import *
def get_graph(network_data, vocab):
""" Build graph, treat all nodes as the same type
Parameters
----------
network_data: a dict
keys describing the edge types, values representing edges
vocab: a dict
mapping node IDs to node indices
Output
------
DGLHeteroGraph
a heterogenous graph, with one node type and different edge types
"""
graphs = []
node_type = "_N" # '_N' can be replaced by an arbitrary name
data_dict = dict()
num_nodes_dict = {node_type: len(vocab)}
for edge_type in network_data:
tmp_data = network_data[edge_type]
src = []
dst = []
for edge in tmp_data:
src.extend([vocab[edge[0]], vocab[edge[1]]])
dst.extend([vocab[edge[1]], vocab[edge[0]]])
data_dict[(node_type, edge_type, node_type)] = (src, dst)
graph = dgl.heterograph(data_dict, num_nodes_dict)
return graph
class NeighborSampler(object):
def __init__(self, g, num_fanouts):
self.g = g
self.num_fanouts = num_fanouts
def sample(self, pairs):
pairs = np.stack(pairs)
heads, tails, types = pairs[:, 0], pairs[:, 1], pairs[:, 2]
seeds, head_invmap = torch.unique(torch.LongTensor(heads), return_inverse=True)
blocks = []
for fanout in reversed(self.num_fanouts):
sampled_graph = dgl.sampling.sample_neighbors(self.g, seeds, fanout)
sampled_block = dgl.to_block(sampled_graph, seeds)
seeds = sampled_block.srcdata[dgl.NID]
blocks.insert(0, sampled_block)
return (
blocks,
torch.LongTensor(head_invmap),
torch.LongTensor(tails),
torch.LongTensor(types),
)
class DGLGATNE(nn.Module):
def __init__(
self,
num_nodes,
embedding_size,
embedding_u_size,
edge_types,
edge_type_count,
dim_a,
):
super(DGLGATNE, self).__init__()
self.num_nodes = num_nodes
self.embedding_size = embedding_size
self.embedding_u_size = embedding_u_size
self.edge_types = edge_types
self.edge_type_count = edge_type_count
self.dim_a = dim_a
self.node_embeddings = nn.Embedding(num_nodes, embedding_size, sparse=True)
self.node_type_embeddings = nn.Embedding(
num_nodes * edge_type_count, embedding_u_size, sparse=True
)
self.trans_weights = Parameter(
torch.FloatTensor(edge_type_count, embedding_u_size, embedding_size)
)
self.trans_weights_s1 = Parameter(
torch.FloatTensor(edge_type_count, embedding_u_size, dim_a)
)
self.trans_weights_s2 = Parameter(torch.FloatTensor(edge_type_count, dim_a, 1))
self.reset_parameters()
def reset_parameters(self):
self.node_embeddings.weight.data.uniform_(-1.0, 1.0)
self.node_type_embeddings.weight.data.uniform_(-1.0, 1.0)
self.trans_weights.data.normal_(std=1.0 / math.sqrt(self.embedding_size))
self.trans_weights_s1.data.normal_(std=1.0 / math.sqrt(self.embedding_size))
self.trans_weights_s2.data.normal_(std=1.0 / math.sqrt(self.embedding_size))
# embs: [batch_size, embedding_size]
def forward(self, block):
input_nodes = block.srcdata[dgl.NID]
output_nodes = block.dstdata[dgl.NID]
batch_size = block.number_of_dst_nodes()
node_type_embed = []
with block.local_scope():
for i in range(self.edge_type_count):
edge_type = self.edge_types[i]
block.srcdata[edge_type] = self.node_type_embeddings(
input_nodes * self.edge_type_count + i
)
block.dstdata[edge_type] = self.node_type_embeddings(
output_nodes * self.edge_type_count + i
)
block.update_all(
fn.copy_u(edge_type, "m"), fn.sum("m", edge_type), etype=edge_type
)
node_type_embed.append(block.dstdata[edge_type])
node_type_embed = torch.stack(node_type_embed, 1)
tmp_node_type_embed = node_type_embed.unsqueeze(2).view(
-1, 1, self.embedding_u_size
)
trans_w = (
self.trans_weights.unsqueeze(0)
.repeat(batch_size, 1, 1, 1)
.view(-1, self.embedding_u_size, self.embedding_size)
)
trans_w_s1 = (
self.trans_weights_s1.unsqueeze(0)
.repeat(batch_size, 1, 1, 1)
.view(-1, self.embedding_u_size, self.dim_a)
)
trans_w_s2 = (
self.trans_weights_s2.unsqueeze(0)
.repeat(batch_size, 1, 1, 1)
.view(-1, self.dim_a, 1)
)
attention = (
F.softmax(
torch.matmul(
torch.tanh(torch.matmul(tmp_node_type_embed, trans_w_s1)),
trans_w_s2,
)
.squeeze(2)
.view(-1, self.edge_type_count),
dim=1,
)
.unsqueeze(1)
.repeat(1, self.edge_type_count, 1)
)
node_type_embed = torch.matmul(attention, node_type_embed).view(
-1, 1, self.embedding_u_size
)
node_embed = self.node_embeddings(output_nodes).unsqueeze(1).repeat(
1, self.edge_type_count, 1
) + torch.matmul(node_type_embed, trans_w).view(
-1, self.edge_type_count, self.embedding_size
)
last_node_embed = F.normalize(node_embed, dim=2)
return last_node_embed # [batch_size, edge_type_count, embedding_size]
class NSLoss(nn.Module):
def __init__(self, num_nodes, num_sampled, embedding_size):
super(NSLoss, self).__init__()
self.num_nodes = num_nodes
self.num_sampled = num_sampled
self.embedding_size = embedding_size
# [ (log(i+2) - log(i+1)) / log(num_nodes + 1)]
self.sample_weights = F.normalize(
torch.Tensor(
[
(math.log(k + 2) - math.log(k + 1)) / math.log(num_nodes + 1)
for k in range(num_nodes)
]
),
dim=0,
)
self.weights = nn.Embedding(num_nodes, embedding_size, sparse=True)
self.reset_parameters()
def reset_parameters(self):
self.weights.weight.data.normal_(std=1.0 / math.sqrt(self.embedding_size))
def forward(self, input, embs, label):
n = input.shape[0]
log_target = torch.log(
torch.sigmoid(torch.sum(torch.mul(embs, self.weights(label)), 1))
)
negs = (
torch.multinomial(
self.sample_weights, self.num_sampled * n, replacement=True
)
.view(n, self.num_sampled)
.to(input.device)
)
noise = torch.neg(self.weights(negs))
sum_log_sampled = torch.sum(
torch.log(torch.sigmoid(torch.bmm(noise, embs.unsqueeze(2)))), 1
).squeeze()
loss = log_target + sum_log_sampled
return -loss.sum() / n
def train_model(network_data):
index2word, vocab, type_nodes = generate_vocab(network_data)
edge_types = list(network_data.keys())
num_nodes = len(index2word)
edge_type_count = len(edge_types)
epochs = args.epoch
batch_size = args.batch_size
embedding_size = args.dimensions
embedding_u_size = args.edge_dim
u_num = edge_type_count
num_sampled = args.negative_samples
dim_a = args.att_dim
att_head = 1
neighbor_samples = args.neighbor_samples
num_workers = args.workers
device = torch.device(
"cuda" if args.gpu is not None and torch.cuda.is_available() else "cpu"
)
g = get_graph(network_data, vocab)
all_walks = []
for i in range(edge_type_count):
nodes = torch.LongTensor(type_nodes[i] * args.num_walks)
traces, types = dgl.sampling.random_walk(
g, nodes, metapath=[edge_types[i]] * (neighbor_samples - 1)
)
all_walks.append(traces)
train_pairs = generate_pairs(all_walks, args.window_size, num_workers)
neighbor_sampler = NeighborSampler(g, [neighbor_samples])
train_dataloader = torch.utils.data.DataLoader(
train_pairs,
batch_size=batch_size,
collate_fn=neighbor_sampler.sample,
shuffle=True,
num_workers=num_workers,
pin_memory=True,
)
model = DGLGATNE(
num_nodes, embedding_size, embedding_u_size, edge_types, edge_type_count, dim_a,
)
nsloss = NSLoss(num_nodes, num_sampled, embedding_size)
model.to(device)
nsloss.to(device)
embeddings_params = list(map(id, model.node_embeddings.parameters())) + list(
map(id, model.node_type_embeddings.parameters())
)
weights_params = list(map(id, nsloss.weights.parameters()))
optimizer = torch.optim.Adam(
[
{
"params": filter(
lambda p: id(p) not in embeddings_params, model.parameters(),
)
},
{
"params": filter(
lambda p: id(p) not in weights_params, nsloss.parameters(),
)
},
],
lr=1e-3,
)
sparse_optimizer = torch.optim.SparseAdam(
[
{"params": model.node_embeddings.parameters()},
{"params": model.node_type_embeddings.parameters()},
{"params": nsloss.weights.parameters()},
],
lr=1e-3,
)
best_score = 0
patience = 0
for epoch in range(epochs):
model.train()
random.shuffle(train_pairs)
data_iter = tqdm.tqdm(
train_dataloader,
desc="epoch %d" % (epoch),
total=(len(train_pairs) + (batch_size - 1)) // batch_size,
)
avg_loss = 0.0
for i, (block, head_invmap, tails, block_types) in enumerate(data_iter):
optimizer.zero_grad()
sparse_optimizer.zero_grad()
# embs: [batch_size, edge_type_count, embedding_size]
block_types = block_types.to(device)
embs = model(block[0].to(device))[head_invmap]
embs = embs.gather(
1, block_types.view(-1, 1, 1).expand(embs.shape[0], 1, embs.shape[2]),
)[:, 0]
loss = nsloss(
block[0].dstdata[dgl.NID][head_invmap].to(device),
embs,
tails.to(device),
)
loss.backward()
optimizer.step()
sparse_optimizer.step()
avg_loss += loss.item()
post_fix = {
"epoch": epoch,
"iter": i,
"avg_loss": avg_loss / (i + 1),
"loss": loss.item(),
}
data_iter.set_postfix(post_fix)
model.eval()
# {'1': {}, '2': {}}
final_model = dict(zip(edge_types, [dict() for _ in range(edge_type_count)]))
for i in range(num_nodes):
train_inputs = (
torch.tensor([i for _ in range(edge_type_count)])
.unsqueeze(1)
.to(device)
) # [i, i]
train_types = (
torch.tensor(list(range(edge_type_count))).unsqueeze(1).to(device)
) # [0, 1]
pairs = torch.cat(
(train_inputs, train_inputs, train_types), dim=1
) # (2, 3)
(
train_blocks,
train_invmap,
fake_tails,
train_types,
) = neighbor_sampler.sample(pairs.cpu())
node_emb = model(train_blocks[0].to(device))[train_invmap]
node_emb = node_emb.gather(
1,
train_types.to(device)
.view(-1, 1, 1)
.expand(node_emb.shape[0], 1, node_emb.shape[2]),
)[:, 0]
for j in range(edge_type_count):
final_model[edge_types[j]][index2word[i]] = (
node_emb[j].cpu().detach().numpy()
)
valid_aucs, valid_f1s, valid_prs = [], [], []
test_aucs, test_f1s, test_prs = [], [], []
for i in range(edge_type_count):
if args.eval_type == "all" or edge_types[i] in args.eval_type.split(","):
tmp_auc, tmp_f1, tmp_pr = evaluate(
final_model[edge_types[i]],
valid_true_data_by_edge[edge_types[i]],
valid_false_data_by_edge[edge_types[i]],
num_workers,
)
valid_aucs.append(tmp_auc)
valid_f1s.append(tmp_f1)
valid_prs.append(tmp_pr)
tmp_auc, tmp_f1, tmp_pr = evaluate(
final_model[edge_types[i]],
testing_true_data_by_edge[edge_types[i]],
testing_false_data_by_edge[edge_types[i]],
num_workers,
)
test_aucs.append(tmp_auc)
test_f1s.append(tmp_f1)
test_prs.append(tmp_pr)
print("valid auc:", np.mean(valid_aucs))
print("valid pr:", np.mean(valid_prs))
print("valid f1:", np.mean(valid_f1s))
average_auc = np.mean(test_aucs)
average_f1 = np.mean(test_f1s)
average_pr = np.mean(test_prs)
cur_score = np.mean(valid_aucs)
if cur_score > best_score:
best_score = cur_score
patience = 0
else:
patience += 1
if patience > args.patience:
print("Early Stopping")
break
return average_auc, average_f1, average_pr
if __name__ == "__main__":
args = parse_args()
file_name = args.input
print(args)
training_data_by_type = load_training_data(file_name + "/train.txt")
valid_true_data_by_edge, valid_false_data_by_edge = load_testing_data(
file_name + "/valid.txt"
)
testing_true_data_by_edge, testing_false_data_by_edge = load_testing_data(
file_name + "/test.txt"
)
start = time.time()
average_auc, average_f1, average_pr = train_model(training_data_by_type)
end = time.time()
print("Overall ROC-AUC:", average_auc)
print("Overall PR-AUC", average_pr)
print("Overall F1:", average_f1)
print("Training Time", end - start)
from collections import defaultdict
import math
import os
import sys
import time
import datetime
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel
from tqdm.auto import tqdm
from numpy import random
from torch.nn.parameter import Parameter
import dgl
import dgl.function as fn
from utils import *
def setup_seed(seed):
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.deterministic = True
def get_graph(network_data, vocab):
""" Build graph, treat all nodes as the same type
Parameters
----------
network_data: a dict
keys describing the edge types, values representing edges
vocab: a dict
mapping node IDs to node indices
Output
------
DGLHeteroGraph
a heterogenous graph, with one node type and different edge types
"""
graphs = []
node_type = "_N" # '_N' can be replaced by an arbitrary name
data_dict = dict()
num_nodes_dict = {node_type: len(vocab)}
for edge_type in network_data:
tmp_data = network_data[edge_type]
src = []
dst = []
for edge in tmp_data:
src.extend([vocab[edge[0]], vocab[edge[1]]])
dst.extend([vocab[edge[1]], vocab[edge[0]]])
data_dict[(node_type, edge_type, node_type)] = (src, dst)
graph = dgl.heterograph(data_dict, num_nodes_dict)
return graph
class NeighborSampler(object):
def __init__(self, g, num_fanouts):
self.g = g
self.num_fanouts = num_fanouts
def sample(self, pairs):
pairs = np.stack(pairs)
heads, tails, types = pairs[:, 0], pairs[:, 1], pairs[:, 2]
seeds, head_invmap = torch.unique(torch.LongTensor(heads), return_inverse=True)
blocks = []
for fanout in reversed(self.num_fanouts):
sampled_graph = dgl.sampling.sample_neighbors(self.g, seeds, fanout)
sampled_block = dgl.to_block(sampled_graph, seeds)
seeds = sampled_block.srcdata[dgl.NID]
blocks.insert(0, sampled_block)
return (
blocks,
torch.LongTensor(head_invmap),
torch.LongTensor(tails),
torch.LongTensor(types),
)
class DGLGATNE(nn.Module):
def __init__(
self,
num_nodes,
embedding_size,
embedding_u_size,
edge_types,
edge_type_count,
dim_a,
):
super(DGLGATNE, self).__init__()
self.num_nodes = num_nodes
self.embedding_size = embedding_size
self.embedding_u_size = embedding_u_size
self.edge_types = edge_types
self.edge_type_count = edge_type_count
self.dim_a = dim_a
self.node_embeddings = nn.Embedding(num_nodes, embedding_size, sparse=True)
self.node_type_embeddings = nn.Embedding(
num_nodes * edge_type_count, embedding_u_size, sparse=True
)
self.trans_weights = Parameter(
torch.FloatTensor(edge_type_count, embedding_u_size, embedding_size)
)
self.trans_weights_s1 = Parameter(
torch.FloatTensor(edge_type_count, embedding_u_size, dim_a)
)
self.trans_weights_s2 = Parameter(torch.FloatTensor(edge_type_count, dim_a, 1))
self.reset_parameters()
def reset_parameters(self):
self.node_embeddings.weight.data.uniform_(-1.0, 1.0)
self.node_type_embeddings.weight.data.uniform_(-1.0, 1.0)
self.trans_weights.data.normal_(std=1.0 / math.sqrt(self.embedding_size))
self.trans_weights_s1.data.normal_(std=1.0 / math.sqrt(self.embedding_size))
self.trans_weights_s2.data.normal_(std=1.0 / math.sqrt(self.embedding_size))
# embs: [batch_size, embedding_size]
def forward(self, block):
input_nodes = block.srcdata[dgl.NID]
output_nodes = block.dstdata[dgl.NID]
batch_size = block.number_of_dst_nodes()
node_type_embed = []
with block.local_scope():
for i in range(self.edge_type_count):
edge_type = self.edge_types[i]
block.srcdata[edge_type] = self.node_type_embeddings(
input_nodes * self.edge_type_count + i
)
block.dstdata[edge_type] = self.node_type_embeddings(
output_nodes * self.edge_type_count + i
)
block.update_all(
fn.copy_u(edge_type, "m"), fn.sum("m", edge_type), etype=edge_type
)
node_type_embed.append(block.dstdata[edge_type])
node_type_embed = torch.stack(node_type_embed, 1)
tmp_node_type_embed = node_type_embed.unsqueeze(2).view(
-1, 1, self.embedding_u_size
)
trans_w = (
self.trans_weights.unsqueeze(0)
.repeat(batch_size, 1, 1, 1)
.view(-1, self.embedding_u_size, self.embedding_size)
)
trans_w_s1 = (
self.trans_weights_s1.unsqueeze(0)
.repeat(batch_size, 1, 1, 1)
.view(-1, self.embedding_u_size, self.dim_a)
)
trans_w_s2 = (
self.trans_weights_s2.unsqueeze(0)
.repeat(batch_size, 1, 1, 1)
.view(-1, self.dim_a, 1)
)
attention = (
F.softmax(
torch.matmul(
torch.tanh(torch.matmul(tmp_node_type_embed, trans_w_s1)),
trans_w_s2,
)
.squeeze(2)
.view(-1, self.edge_type_count),
dim=1,
)
.unsqueeze(1)
.repeat(1, self.edge_type_count, 1)
)
node_type_embed = torch.matmul(attention, node_type_embed).view(
-1, 1, self.embedding_u_size
)
node_embed = self.node_embeddings(output_nodes).unsqueeze(1).repeat(
1, self.edge_type_count, 1
) + torch.matmul(node_type_embed, trans_w).view(
-1, self.edge_type_count, self.embedding_size
)
last_node_embed = F.normalize(node_embed, dim=2)
return last_node_embed # [batch_size, edge_type_count, embedding_size]
class NSLoss(nn.Module):
def __init__(self, num_nodes, num_sampled, embedding_size):
super(NSLoss, self).__init__()
self.num_nodes = num_nodes
self.num_sampled = num_sampled
self.embedding_size = embedding_size
# [ (log(i+2) - log(i+1)) / log(num_nodes + 1)]
self.sample_weights = F.normalize(
torch.Tensor(
[
(math.log(k + 2) - math.log(k + 1)) / math.log(num_nodes + 1)
for k in range(num_nodes)
]
),
dim=0,
)
self.weights = nn.Embedding(num_nodes, embedding_size, sparse=True)
self.reset_parameters()
def reset_parameters(self):
self.weights.weight.data.normal_(std=1.0 / math.sqrt(self.embedding_size))
def forward(self, input, embs, label):
n = input.shape[0]
log_target = torch.log(
torch.sigmoid(torch.sum(torch.mul(embs, self.weights(label)), 1))
)
negs = (
torch.multinomial(
self.sample_weights, self.num_sampled * n, replacement=True
)
.view(n, self.num_sampled)
.to(input.device)
)
noise = torch.neg(self.weights(negs))
sum_log_sampled = torch.sum(
torch.log(torch.sigmoid(torch.bmm(noise, embs.unsqueeze(2)))), 1
).squeeze()
loss = log_target + sum_log_sampled
return -loss.sum() / n
def run(proc_id, n_gpus, args, devices, data):
dev_id = devices[proc_id]
if n_gpus > 1:
dist_init_method = "tcp://{master_ip}:{master_port}".format(
master_ip="127.0.0.1", master_port="12345"
)
world_size = n_gpus
torch.distributed.init_process_group(
backend="gloo",
init_method=dist_init_method,
world_size=world_size,
rank=proc_id,
timeout=datetime.timedelta(seconds=100),
)
torch.cuda.set_device(dev_id)
g, train_pairs, index2word, edge_types, num_nodes, edge_type_count = data
epochs = args.epoch
batch_size = args.batch_size
embedding_size = args.dimensions
embedding_u_size = args.edge_dim
u_num = edge_type_count
num_sampled = args.negative_samples
dim_a = args.att_dim
att_head = 1
neighbor_samples = args.neighbor_samples
num_workers = args.workers
train_pairs = torch.split(
torch.tensor(train_pairs), math.ceil(len(train_pairs) / n_gpus)
)[proc_id]
neighbor_sampler = NeighborSampler(g, [neighbor_samples])
train_dataloader = torch.utils.data.DataLoader(
train_pairs,
batch_size=batch_size,
collate_fn=neighbor_sampler.sample,
shuffle=True,
num_workers=num_workers,
pin_memory=True,
)
model = DGLGATNE(
num_nodes, embedding_size, embedding_u_size, edge_types, edge_type_count, dim_a,
)
nsloss = NSLoss(num_nodes, num_sampled, embedding_size)
model.to(dev_id)
if n_gpus > 1:
model = DistributedDataParallel(
model, device_ids=[dev_id], output_device=dev_id
)
nsloss.to(dev_id)
if n_gpus > 1:
mmodel = model.module
else:
mmodel = model
embeddings_params = list(map(id, mmodel.node_embeddings.parameters())) + list(
map(id, mmodel.node_type_embeddings.parameters())
)
weights_params = list(map(id, nsloss.weights.parameters()))
optimizer = torch.optim.Adam(
[
{
"params": filter(
lambda p: id(p) not in embeddings_params, model.parameters(),
)
},
{
"params": filter(
lambda p: id(p) not in weights_params, nsloss.parameters(),
)
},
],
lr=2e-3,
)
sparse_optimizer = torch.optim.SparseAdam(
[
{"params": mmodel.node_embeddings.parameters()},
{"params": mmodel.node_type_embeddings.parameters()},
{"params": nsloss.weights.parameters()},
],
lr=2e-3,
)
if n_gpus > 1:
torch.distributed.barrier()
if proc_id == 0:
start = time.time()
for epoch in range(epochs):
model.train()
data_iter = train_dataloader
if proc_id == 0:
data_iter = tqdm(
train_dataloader,
desc="epoch %d" % (epoch),
total=(len(train_pairs) + (batch_size - 1)) // batch_size,
)
avg_loss = 0.0
for i, (block, head_invmap, tails, block_types) in enumerate(data_iter):
optimizer.zero_grad()
sparse_optimizer.zero_grad()
# embs: [batch_size, edge_type_count, embedding_size]
block_types = block_types.to(dev_id)
embs = model(block[0].to(dev_id))[head_invmap]
embs = embs.gather(
1, block_types.view(-1, 1, 1).expand(embs.shape[0], 1, embs.shape[2]),
)[:, 0]
loss = nsloss(
block[0].dstdata[dgl.NID][head_invmap].to(dev_id),
embs,
tails.to(dev_id),
)
loss.backward()
optimizer.step()
sparse_optimizer.step()
if proc_id == 0:
avg_loss += loss.item()
post_fix = {
"avg_loss": avg_loss / (i + 1),
"loss": loss.item(),
}
data_iter.set_postfix(post_fix)
if n_gpus > 1:
torch.distributed.barrier()
if proc_id == 0:
model.eval()
# {'1': {}, '2': {}}
final_model = dict(
zip(edge_types, [dict() for _ in range(edge_type_count)])
)
for i in range(num_nodes):
train_inputs = (
torch.tensor([i for _ in range(edge_type_count)])
.unsqueeze(1)
.to(dev_id)
) # [i, i]
train_types = (
torch.tensor(list(range(edge_type_count))).unsqueeze(1).to(dev_id)
) # [0, 1]
pairs = torch.cat(
(train_inputs, train_inputs, train_types), dim=1
) # (2, 3)
(
train_blocks,
train_invmap,
fake_tails,
train_types,
) = neighbor_sampler.sample(pairs.cpu())
node_emb = model(train_blocks[0].to(dev_id))[train_invmap]
node_emb = node_emb.gather(
1,
train_types.to(dev_id)
.view(-1, 1, 1)
.expand(node_emb.shape[0], 1, node_emb.shape[2]),
)[:, 0]
for j in range(edge_type_count):
final_model[edge_types[j]][index2word[i]] = (
node_emb[j].cpu().detach().numpy()
)
valid_aucs, valid_f1s, valid_prs = [], [], []
test_aucs, test_f1s, test_prs = [], [], []
for i in range(edge_type_count):
if args.eval_type == "all" or edge_types[i] in args.eval_type.split(
","
):
tmp_auc, tmp_f1, tmp_pr = evaluate(
final_model[edge_types[i]],
valid_true_data_by_edge[edge_types[i]],
valid_false_data_by_edge[edge_types[i]],
num_workers,
)
valid_aucs.append(tmp_auc)
valid_f1s.append(tmp_f1)
valid_prs.append(tmp_pr)
tmp_auc, tmp_f1, tmp_pr = evaluate(
final_model[edge_types[i]],
testing_true_data_by_edge[edge_types[i]],
testing_false_data_by_edge[edge_types[i]],
num_workers,
)
test_aucs.append(tmp_auc)
test_f1s.append(tmp_f1)
test_prs.append(tmp_pr)
print("valid auc:", np.mean(valid_aucs))
print("valid pr:", np.mean(valid_prs))
print("valid f1:", np.mean(valid_f1s))
if proc_id == 0:
end = time.time()
average_auc = np.mean(test_aucs)
average_f1 = np.mean(test_f1s)
average_pr = np.mean(test_prs)
print("Overall ROC-AUC:", average_auc)
print("Overall PR-AUC", average_pr)
print("Overall F1:", average_f1)
print("Training Time", end - start)
def train_model(network_data):
index2word, vocab, type_nodes = generate_vocab(network_data)
edge_types = list(network_data.keys())
num_nodes = len(index2word)
edge_type_count = len(edge_types)
devices = list(map(int, args.gpu.split(",")))
n_gpus = len(devices)
neighbor_samples = args.neighbor_samples
num_workers = args.workers
g = get_graph(network_data, vocab)
all_walks = []
for i in range(edge_type_count):
nodes = torch.LongTensor(type_nodes[i] * args.num_walks)
traces, types = dgl.sampling.random_walk(
g, nodes, metapath=[edge_types[i]] * (neighbor_samples - 1)
)
all_walks.append(traces)
train_pairs = generate_pairs(all_walks, args.window_size, num_workers)
data = g, train_pairs, index2word, edge_types, num_nodes, edge_type_count
if n_gpus == 1:
run(0, n_gpus, args, devices, data)
else:
procs = []
for proc_id in range(n_gpus):
p = mp.Process(
target=thread_wrapped_func(run),
args=(proc_id, n_gpus, args, devices, data),
)
p.start()
procs.append(p)
for p in procs:
p.join()
if __name__ == "__main__":
args = parse_args()
file_name = args.input
print(args)
setup_seed(1234)
training_data_by_type = load_training_data(file_name + "/train.txt")
valid_true_data_by_edge, valid_false_data_by_edge = load_testing_data(
file_name + "/valid.txt"
)
testing_true_data_by_edge, testing_false_data_by_edge = load_testing_data(
file_name + "/test.txt"
)
train_model(training_data_by_type)
...@@ -5,69 +5,159 @@ import networkx as nx ...@@ -5,69 +5,159 @@ import networkx as nx
import numpy as np import numpy as np
from gensim.models.keyedvectors import Vocab from gensim.models.keyedvectors import Vocab
from six import iteritems from six import iteritems
from sklearn.metrics import (auc, f1_score, precision_recall_curve, from sklearn.metrics import auc, f1_score, precision_recall_curve, roc_auc_score
roc_auc_score)
import torch import torch
import time
import multiprocessing
from functools import partial, reduce, wraps
import torch.multiprocessing as mp
from _thread import start_new_thread
import traceback
def thread_wrapped_func(func):
"""
Wraps a process entry point to make it work with OpenMP.
"""
@wraps(func)
def decorated_function(*args, **kwargs):
queue = mp.Queue()
def _queue_result():
exception, trace, res = None, None, None
try:
res = func(*args, **kwargs)
except Exception as e:
exception = e
trace = traceback.format_exc()
queue.put((res, exception, trace))
start_new_thread(_queue_result, ())
result, exception, trace = queue.get()
if exception is None:
return result
else:
assert isinstance(exception, Exception)
raise exception.__class__(trace)
return decorated_function
def parse_args(): def parse_args():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--input', type=str, default='data/amazon', parser.add_argument(
help='Input dataset path') "--input", type=str, default="data/amazon", help="Input dataset path"
)
parser.add_argument('--features', type=str, default=None,
help='Input node features') parser.add_argument(
"--features", type=str, default=None, help="Input node features"
parser.add_argument('--epoch', type=int, default=100, )
help='Number of epoch. Default is 100.')
parser.add_argument(
parser.add_argument('--batch-size', type=int, default=64, "--epoch", type=int, default=100, help="Number of epoch. Default is 100."
help='Number of batch_size. Default is 64.') )
parser.add_argument('--eval-type', type=str, default='all', parser.add_argument(
help='The edge type(s) for evaluation.') "--batch-size",
type=int,
parser.add_argument('--schema', type=str, default=None, default=64,
help='The metapath schema (e.g., U-I-U,I-U-I).') help="Number of batch_size. Default is 64.",
)
parser.add_argument('--dimensions', type=int, default=200,
help='Number of dimensions. Default is 200.') parser.add_argument(
"--eval-type", type=str, default="all", help="The edge type(s) for evaluation."
parser.add_argument('--edge-dim', type=int, default=10, )
help='Number of edge embedding dimensions. Default is 10.')
parser.add_argument(
parser.add_argument('--att-dim', type=int, default=20, "--schema",
help='Number of attention dimensions. Default is 20.') type=str,
default=None,
parser.add_argument('--walk-length', type=int, default=10, help="The metapath schema (e.g., U-I-U,I-U-I).",
help='Length of walk per source. Default is 10.') )
parser.add_argument('--num-walks', type=int, default=20, parser.add_argument(
help='Number of walks per source. Default is 20.') "--dimensions",
type=int,
parser.add_argument('--window-size', type=int, default=5, default=200,
help='Context size for optimization. Default is 5.') help="Number of dimensions. Default is 200.",
)
parser.add_argument('--negative-samples', type=int, default=5,
help='Negative samples for optimization. Default is 5.') parser.add_argument(
"--edge-dim",
parser.add_argument('--neighbor-samples', type=int, default=10, type=int,
help='Neighbor samples for aggregation. Default is 10.') default=10,
help="Number of edge embedding dimensions. Default is 10.",
parser.add_argument('--patience', type=int, default=5, )
help='Early stopping patience. Default is 5.')
parser.add_argument(
"--att-dim",
type=int,
default=20,
help="Number of attention dimensions. Default is 20.",
)
parser.add_argument(
"--walk-length",
type=int,
default=10,
help="Length of walk per source. Default is 10.",
)
parser.add_argument(
"--num-walks",
type=int,
default=20,
help="Number of walks per source. Default is 20.",
)
parser.add_argument(
"--window-size",
type=int,
default=5,
help="Context size for optimization. Default is 5.",
)
parser.add_argument(
"--negative-samples",
type=int,
default=5,
help="Negative samples for optimization. Default is 5.",
)
parser.add_argument(
"--neighbor-samples",
type=int,
default=10,
help="Neighbor samples for aggregation. Default is 10.",
)
parser.add_argument(
"--patience", type=int, default=5, help="Early stopping patience. Default is 5."
)
parser.add_argument(
"--gpu", type=str, default=None, help="Comma separated list of GPU device IDs."
)
parser.add_argument(
"--workers", type=int, default=4, help="Number of workers.",
)
return parser.parse_args() return parser.parse_args()
# for each line, the data is [edge_type, node, node] # for each line, the data is [edge_type, node, node]
def load_training_data(f_name): def load_training_data(f_name):
print('We are loading data from:', f_name) print("We are loading data from:", f_name)
edge_data_by_type = dict() edge_data_by_type = dict()
all_nodes = list() all_nodes = list()
with open(f_name, 'r') as f: with open(f_name, "r") as f:
for line in f: for line in f:
words = line[:-1].split(' ') # line[-1] == '\n' words = line[:-1].split(" ") # line[-1] == '\n'
if words[0] not in edge_data_by_type: if words[0] not in edge_data_by_type:
edge_data_by_type[words[0]] = list() edge_data_by_type[words[0]] = list()
x, y = words[1], words[2] x, y = words[1], words[2]
...@@ -75,20 +165,20 @@ def load_training_data(f_name): ...@@ -75,20 +165,20 @@ def load_training_data(f_name):
all_nodes.append(x) all_nodes.append(x)
all_nodes.append(y) all_nodes.append(y)
all_nodes = list(set(all_nodes)) all_nodes = list(set(all_nodes))
print('Total training nodes: ' + str(len(all_nodes))) print("Total training nodes: " + str(len(all_nodes)))
return edge_data_by_type return edge_data_by_type
# for each line, the data is [edge_type, node, node, true_or_false] # for each line, the data is [edge_type, node, node, true_or_false]
def load_testing_data(f_name): def load_testing_data(f_name):
print('We are loading data from:', f_name) print("We are loading data from:", f_name)
true_edge_data_by_type = dict() true_edge_data_by_type = dict()
false_edge_data_by_type = dict() false_edge_data_by_type = dict()
all_edges = list() all_edges = list()
all_nodes = list() all_nodes = list()
with open(f_name, 'r') as f: with open(f_name, "r") as f:
for line in f: for line in f:
words = line[:-1].split(' ') words = line[:-1].split(" ")
x, y = words[1], words[2] x, y = words[1], words[2]
if int(words[3]) == 1: if int(words[3]) == 1:
if words[0] not in true_edge_data_by_type: if words[0] not in true_edge_data_by_type:
...@@ -105,35 +195,67 @@ def load_testing_data(f_name): ...@@ -105,35 +195,67 @@ def load_testing_data(f_name):
def load_node_type(f_name): def load_node_type(f_name):
print('We are loading node type from:', f_name) print("We are loading node type from:", f_name)
node_type = {} node_type = {}
with open(f_name, 'r') as f: with open(f_name, "r") as f:
for line in f: for line in f:
items = line.strip().split() items = line.strip().split()
node_type[items[0]] = items[1] node_type[items[0]] = items[1]
return node_type return node_type
def generate_pairs(all_walks, window_size): def generate_pairs_parallel(walks, skip_window=None, layer_id=None):
pairs = []
for walk in walks:
walk = walk.tolist()
for i in range(len(walk)):
for j in range(1, skip_window + 1):
if i - j >= 0:
pairs.append((walk[i], walk[i - j], layer_id))
if i + j < len(walk):
pairs.append((walk[i], walk[i + j], layer_id))
return pairs
def generate_pairs(all_walks, window_size, num_workers):
# for each node, choose the first neighbor and second neighbor of it to form pairs # for each node, choose the first neighbor and second neighbor of it to form pairs
# Get all worker processes
start_time = time.time()
print("We are generating pairs with {} cores.".format(num_workers))
# Start all worker processes
pool = multiprocessing.Pool(processes=num_workers)
pairs = [] pairs = []
skip_window = window_size // 2 skip_window = window_size // 2
for layer_id, walks in enumerate(all_walks): for layer_id, walks in enumerate(all_walks):
for walk in walks: block_num = len(walks) // num_workers
for i in range(len(walk)): if block_num > 0:
for j in range(1, skip_window + 1): walks_list = [
if i - j >= 0: walks[i * block_num : min((i + 1) * block_num, len(walks))]
pairs.append((walk[i], walk[i-j], layer_id)) for i in range(num_workers)
if i + j < len(walk): ]
pairs.append((walk[i], walk[i+j], layer_id)) else:
return pairs walks_list = [walks]
tmp_result = pool.map(
partial(
generate_pairs_parallel, skip_window=skip_window, layer_id=layer_id
),
walks_list,
)
pairs += reduce(lambda x, y: x + y, tmp_result)
pool.close()
end_time = time.time()
print("Generate pairs end, use {}s.".format(end_time - start_time))
return np.array([list(pair) for pair in set(pairs)])
def generate_vocab(network_data): def generate_vocab(network_data):
nodes, index2word = [], [] nodes, index2word = [], []
for edge_type in network_data: for edge_type in network_data:
node1, node2 = zip(*network_data[edge_type]) node1, node2 = zip(*network_data[edge_type])
index2word = index2word + list(node1) + list(node2) index2word = index2word + list(node1) + list(node2)
index2word = list(set(index2word)) index2word = list(set(index2word))
vocab = {} vocab = {}
i = 0 i = 0
...@@ -150,31 +272,39 @@ def generate_vocab(network_data): ...@@ -150,31 +272,39 @@ def generate_vocab(network_data):
return index2word, vocab, nodes return index2word, vocab, nodes
def get_score(local_model, node1, node2): def get_score(local_model, edge):
node1, node2 = str(edge[0]), str(edge[1])
try: try:
vector1 = local_model[node1] vector1 = local_model[node1]
vector2 = local_model[node2] vector2 = local_model[node2]
return np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2)) return np.dot(vector1, vector2) / (
np.linalg.norm(vector1) * np.linalg.norm(vector2)
)
except Exception as e: except Exception as e:
pass pass
def evaluate(model, true_edges, false_edges): def evaluate(model, true_edges, false_edges, num_workers):
true_list = list() true_list = list()
prediction_list = list() prediction_list = list()
true_num = 0 true_num = 0
for edge in true_edges:
tmp_score = get_score(model, str(edge[0]), str(edge[1])) # Start all worker processes
if tmp_score is not None: pool = multiprocessing.Pool(processes=num_workers)
true_list.append(1) tmp_true_score_list = pool.map(partial(get_score, model), true_edges)
prediction_list.append(tmp_score) tmp_false_score_list = pool.map(partial(get_score, model), false_edges)
true_num += 1 pool.close()
for edge in false_edges: prediction_list += [
tmp_score = get_score(model, str(edge[0]), str(edge[1])) tmp_score for tmp_score in tmp_true_score_list if tmp_score is not None
if tmp_score is not None: ]
true_list.append(0) true_num = len(prediction_list)
prediction_list.append(tmp_score) true_list += [1] * true_num
prediction_list += [
tmp_score for tmp_score in tmp_false_score_list if tmp_score is not None
]
true_list += [0] * (len(prediction_list) - true_num)
sorted_pred = prediction_list[:] sorted_pred = prediction_list[:]
sorted_pred.sort() sorted_pred.sort()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment