"git@developer.sourcefind.cn:renzhc/diffusers_dcu.git" did not exist on "fa9e35fca4f32436f4c6bb890a1b3dfcefa465f7"
Unverified Commit 94c67203 authored by Hao Xiong's avatar Hao Xiong Committed by GitHub
Browse files

[Example] Implement deepwalk by dgl and pytorch (#1503)



* deepwalk

* add docstr, etc.

* add tested version

* some doc

* some doc

* add sample and some docs, fix a bug

* update speed

* update speed

* docs
Co-authored-by: default avatarxiang song(charlie.song) <classicxsong@gmail.com>
parent 64f49703
# DeepWalk
- Paper link: [here](https://arxiv.org/pdf/1403.6652.pdf)
- Other implementation: [gensim](https://github.com/phanein/deepwalk), [deepwalk-c](https://github.com/xgfs/deepwalk-c)
The implementation includes multi-processing training with CPU and mixed training with CPU and multi-GPU.
## Dependencies
- PyTorch 1.0.1+
## Tested version
- PyTorch 1.5.0
- DGL 0.4.3
## How to run the code
Format of a network file:
```
1(node id) 2(node id)
1 3
...
```
To run the code:
```
python3 deepwalk.py --net_file net.txt --emb_file emb.txt --adam --mix --lr 0.2 --num_procs 4 --batch_size 100 --negative 5
```
## How to save the embedding
Functions:
```
SkipGramModel.save_embedding(dataset, file_name)
SkipGramModel.save_embedding_txt(dataset, file_name)
```
## Evaluation
To evalutate embedding on multi-label classification, please refer to [here](https://github.com/ShawXh/Evaluate-Embedding)
YouTube (1M nodes).
| Implementation | Macro-F1 (%) <br> 1% &emsp;&emsp; 3% &emsp;&emsp; 5% &emsp;&emsp; 7% &emsp;&emsp; 9% | Micro-F1 (%) <br> 1% &emsp;&emsp; 3% &emsp;&emsp; 5% &emsp;&emsp; 7% &emsp;&emsp; 9% |
|----|----|----|
| gensim.word2vec(hs) | 28.73 &emsp; 32.51 &emsp; 33.67 &emsp; 34.28 &emsp; 34.79 | 35.73 &emsp; 38.34 &emsp; 39.37 &emsp; 40.08 &emsp; 40.77 |
| gensim.word2vec(ns) | 28.18 &emsp; 32.25 &emsp; 33.56 &emsp; 34.60 &emsp; 35.22 | 35.35 &emsp; 37.69 &emsp; 38.08 &emsp; 40.24 &emsp; 41.09 |
| ours | 24.58 &emsp; 31.23 &emsp; 33.97 &emsp; 35.41 &emsp; 36.48 | 38.93 &emsp; 43.17 &emsp; 44.73 &emsp; 45.42 &emsp; 45.92 |
The comparison between running time is shown as below, where the numbers in the brackets denote time used on random-walk.
| Implementation | gensim.word2vec(hs) | gensim.word2vec(ns) | Ours |
|----|----|----|----|
| Time (s) | 27119.6(1759.8) | 10580.3(1704.3) | 428.89 |
Parameters.
- walk_length = 80, number_walks = 10, window_size = 5
- Ours: 4GPU (Tesla V100), lr = 0.2, batchs_size = 128, neg_weight = 5, negative = 1, num_thread = 4
- Others: workers = 8, negative = 5
Speeding-up with mixed CPU & multi-GPU. The used parameters are the same as above.
| #GPUs | 1 | 2 | 4 |
|----------|-------|-------|-------|
| Time (s) |1419.64| 952.04|428.89 |
\ No newline at end of file
import torch
import argparse
import dgl
import torch.multiprocessing as mp
from torch.utils.data import DataLoader
import os
import random
import time
import numpy as np
from reading_data import DeepwalkDataset
from model import SkipGramModel
from utils import thread_wrapped_func, shuffle_walks
class DeepwalkTrainer:
def __init__(self, args):
""" Initializing the trainer with the input arguments """
self.args = args
self.dataset = DeepwalkDataset(
net_file=args.net_file,
map_file=args.map_file,
walk_length=args.walk_length,
window_size=args.window_size,
num_walks=args.num_walks,
batch_size=args.batch_size,
negative=args.negative,
num_procs=args.num_procs,
fast_neg=args.fast_neg,
)
self.emb_size = len(self.dataset.net)
self.emb_model = None
def init_device_emb(self):
""" set the device before training
will be called once in fast_train_mp / fast_train
"""
choices = sum([self.args.only_gpu, self.args.only_cpu, self.args.mix])
assert choices == 1, "Must choose only *one* training mode in [only_cpu, only_gpu, mix]"
assert self.args.num_procs >= 1, "The number of process must be larger than 1"
choices = sum([self.args.sgd, self.args.adam, self.args.avg_sgd])
assert choices == 1, "Must choose only *one* gradient descent strategy in [sgd, avg_sgd, adam]"
# initializing embedding on CPU
self.emb_model = SkipGramModel(
emb_size=self.emb_size,
emb_dimension=self.args.dim,
walk_length=self.args.walk_length,
window_size=self.args.window_size,
batch_size=self.args.batch_size,
only_cpu=self.args.only_cpu,
only_gpu=self.args.only_gpu,
mix=self.args.mix,
neg_weight=self.args.neg_weight,
negative=self.args.negative,
lr=self.args.lr,
lap_norm=self.args.lap_norm,
adam=self.args.adam,
sgd=self.args.sgd,
avg_sgd=self.args.avg_sgd,
fast_neg=self.args.fast_neg,
)
torch.set_num_threads(self.args.num_threads)
if self.args.only_gpu:
print("Run in 1 GPU")
self.emb_model.all_to_device(0)
elif self.args.mix:
print("Mix CPU with %d GPU" % self.args.num_procs)
if self.args.num_procs == 1:
self.emb_model.set_device(0)
else:
print("Run in %d CPU process" % self.args.num_procs)
def train(self):
""" train the embedding """
if self.args.num_procs > 1:
self.fast_train_mp()
else:
self.fast_train()
def fast_train_mp(self):
""" multi-cpu-core or mix cpu & multi-gpu """
self.init_device_emb()
self.emb_model.share_memory()
start_all = time.time()
ps = []
np_ = self.args.num_procs
for i in range(np_):
p = mp.Process(target=self.fast_train_sp, args=(i,))
ps.append(p)
p.start()
for p in ps:
p.join()
print("Used time: %.2fs" % (time.time()-start_all))
self.emb_model.save_embedding(self.dataset, self.args.emb_file)
@thread_wrapped_func
def fast_train_sp(self, gpu_id):
""" a subprocess for fast_train_mp """
if self.args.mix:
self.emb_model.set_device(gpu_id)
torch.set_num_threads(self.args.num_threads)
sampler = self.dataset.create_sampler(gpu_id)
dataloader = DataLoader(
dataset=sampler.seeds,
batch_size=self.args.batch_size,
collate_fn=sampler.sample,
shuffle=False,
drop_last=False,
num_workers=4,
)
num_batches = len(dataloader)
print("num batchs: %d in subprocess [%d]" % (num_batches, gpu_id))
# number of positive node pairs in a sequence
num_pos = int(2 * self.args.walk_length * self.args.window_size\
- self.args.window_size * (self.args.window_size + 1))
start = time.time()
with torch.no_grad():
max_i = self.args.iterations * num_batches
for i, walks in enumerate(dataloader):
# decay learning rate for SGD
lr = self.args.lr * (max_i - i) / max_i
if lr < 0.00001:
lr = 0.00001
if self.args.fast_neg:
self.emb_model.fast_learn(walks, lr)
else:
# do negative sampling
bs = len(walks)
neg_nodes = torch.LongTensor(
np.random.choice(self.dataset.neg_table,
bs * num_pos * self.args.negative,
replace=True))
self.emb_model.fast_learn(walks, lr, neg_nodes=neg_nodes)
if i > 0 and i % self.args.print_interval == 0:
print("Solver [%d] batch %d tt: %.2fs" % (gpu_id, i, time.time()-start))
start = time.time()
def fast_train(self):
""" fast train with dataloader """
# the number of postive node pairs of a node sequence
num_pos = 2 * self.args.walk_length * self.args.window_size\
- self.args.window_size * (self.args.window_size + 1)
num_pos = int(num_pos)
self.init_device_emb()
sampler = self.dataset.create_sampler(0)
dataloader = DataLoader(
dataset=sampler.seeds,
batch_size=self.args.batch_size,
collate_fn=sampler.sample,
shuffle=False,
drop_last=False,
num_workers=4,
)
num_batches = len(dataloader)
print("num batchs: %d" % num_batches)
start_all = time.time()
start = time.time()
with torch.no_grad():
max_i = self.args.iterations * num_batches
for iteration in range(self.args.iterations):
print("\nIteration: " + str(iteration + 1))
for i, walks in enumerate(dataloader):
# decay learning rate for SGD
lr = self.args.lr * (max_i - i) / max_i
if lr < 0.00001:
lr = 0.00001
if self.args.fast_neg:
self.emb_model.fast_learn(walks, lr)
else:
# do negative sampling
bs = len(walks)
neg_nodes = torch.LongTensor(
np.random.choice(self.dataset.neg_table,
bs * num_pos * self.args.negative,
replace=True))
self.emb_model.fast_learn(walks, lr, neg_nodes=neg_nodes)
if i > 0 and i % self.args.print_interval == 0:
print("Batch %d, training time: %.2fs" % (i, time.time()-start))
start = time.time()
print("Training used time: %.2fs" % (time.time()-start_all))
self.emb_model.save_embedding(self.dataset, self.args.emb_file)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="DeepWalk")
parser.add_argument('--net_file', type=str,
help="path of the txt network file")
parser.add_argument('--emb_file', type=str, default="emb.npy",
help='path of the npy embedding file')
parser.add_argument('--map_file', type=str, default="nodeid_to_index.pickle",
help='path of the mapping dict that maps node ids to embedding index')
parser.add_argument('--dim', default=128, type=int,
help="embedding dimensions")
parser.add_argument('--window_size', default=5, type=int,
help="context window size")
parser.add_argument('--num_walks', default=10, type=int,
help="number of walks for each node")
parser.add_argument('--negative', default=5, type=int,
help="negative samples for each positve node pair")
parser.add_argument('--iterations', default=1, type=int,
help="iterations")
parser.add_argument('--batch_size', default=10, type=int,
help="number of node sequences in each batch")
parser.add_argument('--print_interval', default=1000, type=int,
help="number of batches between printing")
parser.add_argument('--walk_length', default=80, type=int,
help="number of nodes in a sequence")
parser.add_argument('--lr', default=0.2, type=float,
help="learning rate")
parser.add_argument('--neg_weight', default=1., type=float,
help="negative weight")
parser.add_argument('--lap_norm', default=0.01, type=float,
help="weight of laplacian normalization")
parser.add_argument('--mix', default=False, action="store_true",
help="mixed training with CPU and GPU")
parser.add_argument('--only_cpu', default=False, action="store_true",
help="training with CPU")
parser.add_argument('--only_gpu', default=False, action="store_true",
help="training with GPU")
parser.add_argument('--fast_neg', default=True, action="store_true",
help="do negative sampling inside a batch")
parser.add_argument('--adam', default=False, action="store_true",
help="use adam for embedding updation, recommended")
parser.add_argument('--sgd', default=False, action="store_true",
help="use sgd for embedding updation")
parser.add_argument('--avg_sgd', default=False, action="store_true",
help="average gradients of sgd for embedding updation")
parser.add_argument('--num_threads', default=2, type=int,
help="number of threads used for each CPU-core/GPU")
parser.add_argument('--num_procs', default=1, type=int,
help="number of GPUs/CPUs when mixed training")
args = parser.parse_args()
start_time = time.time()
trainer = DeepwalkTrainer(args)
trainer.train()
print("Total used time: %.2f" % (time.time() - start_time))
\ No newline at end of file
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import init
import random
import numpy as np
def init_emb2pos_index(walk_length, window_size, batch_size):
''' select embedding of positive nodes from a batch of node embeddings
Return
------
index_emb_posu torch.LongTensor : the indices of u_embeddings
index_emb_posv torch.LongTensor : the indices of v_embeddings
Usage
-----
# emb_u.shape: [batch_size * walk_length, dim]
batch_emb2posu = torch.index_select(emb_u, 0, index_emb_posu)
'''
idx_list_u = []
idx_list_v = []
for b in range(batch_size):
for i in range(walk_length):
for j in range(i-window_size, i):
if j >= 0:
idx_list_u.append(j + b * walk_length)
idx_list_v.append(i + b * walk_length)
for j in range(i + 1, i + 1 + window_size):
if j < walk_length:
idx_list_u.append(j + b * walk_length)
idx_list_v.append(i + b * walk_length)
# [num_pos * batch_size]
index_emb_posu = torch.LongTensor(idx_list_u)
index_emb_posv = torch.LongTensor(idx_list_v)
return index_emb_posu, index_emb_posv
def init_emb2neg_index(walk_length, window_size, negative, batch_size):
'''select embedding of negative nodes from a batch of node embeddings
for fast negative sampling
Return
------
index_emb_negu torch.LongTensor : the indices of u_embeddings
index_emb_negv torch.LongTensor : the indices of v_embeddings
Usage
-----
# emb_u.shape: [batch_size * walk_length, dim]
batch_emb2negu = torch.index_select(emb_u, 0, index_emb_negu)
'''
idx_list_u = []
for b in range(batch_size):
for i in range(walk_length):
for j in range(i-window_size, i):
if j >= 0:
idx_list_u += [i + b * walk_length] * negative
for j in range(i+1, i+1+window_size):
if j < walk_length:
idx_list_u += [i + b * walk_length] * negative
idx_list_v = list(range(batch_size * walk_length))\
* negative * window_size * 2
random.shuffle(idx_list_v)
idx_list_v = idx_list_v[:len(idx_list_u)]
# [bs * walk_length * negative]
index_emb_negu = torch.LongTensor(idx_list_u)
index_emb_negv = torch.LongTensor(idx_list_v)
return index_emb_negu, index_emb_negv
def init_grad_avg(walk_length, window_size, batch_size):
'''select nodes' gradients from gradient matrix
Usage
-----
'''
grad_avg = []
for b in range(batch_size):
for i in range(walk_length):
if i < window_size:
grad_avg.append(1. / float(i+window_size))
elif i >= walk_length - window_size:
grad_avg.append(1. / float(walk_length - i - 1 + window_size))
else:
grad_avg.append(0.5 / window_size)
# [num_pos * batch_size]
return torch.Tensor(grad_avg).unsqueeze(1)
def init_empty_grad(emb_dimension, walk_length, batch_size):
""" initialize gradient matrix """
grad_u = torch.zeros((batch_size * walk_length, emb_dimension))
grad_v = torch.zeros((batch_size * walk_length, emb_dimension))
return grad_u, grad_v
def adam(grad, state_sum, nodes, lr, device, only_gpu):
""" calculate gradients according to adam """
grad_sum = (grad * grad).mean(1)
if not only_gpu:
grad_sum = grad_sum.cpu()
state_sum.index_add_(0, nodes, grad_sum) # cpu
std = state_sum[nodes].to(device) # gpu
std_values = std.sqrt_().add_(1e-10).unsqueeze(1)
grad = (lr * grad / std_values) # gpu
return grad
class SkipGramModel(nn.Module):
""" Negative sampling based skip-gram """
def __init__(self,
emb_size,
emb_dimension,
walk_length,
window_size,
batch_size,
only_cpu,
only_gpu,
mix,
neg_weight,
negative,
lr,
lap_norm,
adam,
sgd,
avg_sgd,
fast_neg,
):
""" initialize embedding on CPU
Paremeters
----------
emb_size int : number of nodes
emb_dimension int : embedding dimension
walk_length int : number of nodes in a sequence
window_size int : context window size
batch_size int : number of node sequences in each batch
only_cpu bool : training with CPU
only_gpu bool : training with GPU
mix bool : mixed training with CPU and GPU
negative int : negative samples for each positve node pair
neg_weight float : negative weight
lr float : initial learning rate
lap_norm float : weight of laplacian normalization
adam bool : use adam for embedding updation
sgd bool : use sgd for embedding updation
avg_sgd bool : average gradients of sgd for embedding updation
fast_neg bool : do negative sampling inside a batch
"""
super(SkipGramModel, self).__init__()
self.emb_size = emb_size
self.emb_dimension = emb_dimension
self.walk_length = walk_length
self.window_size = window_size
self.batch_size = batch_size
self.only_cpu = only_cpu
self.only_gpu = only_gpu
self.mixed_train = mix
self.neg_weight = neg_weight
self.negative = negative
self.lr = lr
self.lap_norm = lap_norm
self.adam = adam
self.sgd = sgd
self.avg_sgd = avg_sgd
self.fast_neg = fast_neg
# initialize the device as cpu
self.device = torch.device("cpu")
# content embedding
self.u_embeddings = nn.Embedding(
self.emb_size, self.emb_dimension, sparse=True)
# context embedding
self.v_embeddings = nn.Embedding(
self.emb_size, self.emb_dimension, sparse=True)
# initialze embedding
initrange = 1.0 / self.emb_dimension
init.uniform_(self.u_embeddings.weight.data, -initrange, initrange)
init.constant_(self.v_embeddings.weight.data, 0)
# lookup_table is used for fast sigmoid computing
self.lookup_table = torch.sigmoid(torch.arange(-6.01, 6.01, 0.01))
self.lookup_table[0] = 0.
self.lookup_table[-1] = 1.
# indexes to select positive/negative node pairs from batch_walks
self.index_emb_posu, self.index_emb_posv = init_emb2pos_index(
self.walk_length,
self.window_size,
self.batch_size)
if self.fast_neg:
self.index_emb_negu, self.index_emb_negv = init_emb2neg_index(
self.walk_length,
self.window_size,
self.negative,
self.batch_size)
# coefficients for averaging the gradients
if self.avg_sgd:
self.grad_avg = init_grad_avg(
self.walk_length,
self.window_size,
self.batch_size)
# adam
if self.adam:
self.state_sum_u = torch.zeros(self.emb_size)
self.state_sum_v = torch.zeros(self.emb_size)
# gradients of nodes in batch_walks
self.grad_u, self.grad_v = init_empty_grad(
self.emb_dimension,
self.walk_length,
self.batch_size)
def share_memory(self):
""" share the parameters across subprocesses """
self.u_embeddings.weight.share_memory_()
self.v_embeddings.weight.share_memory_()
if self.adam:
self.state_sum_u.share_memory_()
self.state_sum_v.share_memory_()
def set_device(self, gpu_id):
""" set gpu device """
self.device = torch.device("cuda:%d" % gpu_id)
print("The device is", self.device)
self.lookup_table = self.lookup_table.to(self.device)
self.index_emb_posu = self.index_emb_posu.to(self.device)
self.index_emb_posv = self.index_emb_posv.to(self.device)
if self.fast_neg:
self.index_emb_negu = self.index_emb_negu.to(self.device)
self.index_emb_negv = self.index_emb_negv.to(self.device)
self.grad_u = self.grad_u.to(self.device)
self.grad_v = self.grad_v.to(self.device)
if self.avg_sgd:
self.grad_avg = self.grad_avg.to(self.device)
def all_to_device(self, gpu_id):
""" move all of the parameters to a single GPU """
self.device = torch.device("cuda:%d" % gpu_id)
self.set_device(gpu_id)
self.u_embeddings = self.u_embeddings.cuda(gpu_id)
self.v_embeddings = self.v_embeddings.cuda(gpu_id)
if self.adam:
self.state_sum_u = self.state_sum_u.to(self.device)
self.state_sum_v = self.state_sum_v.to(self.device)
def fast_sigmoid(self, score):
""" do fast sigmoid by looking up in a pre-defined table """
idx = torch.floor((score + 6.01) / 0.01).long()
return self.lookup_table[idx]
def fast_learn(self, batch_walks, lr, neg_nodes=None):
""" Learn a batch of random walks in a fast way. It has the following features:
1. It calculating the gradients directly without the forward operation.
2. It does sigmoid by a looking up table.
Specifically, for each positive/negative node pair (i,j), the updating procedure is as following:
score = self.fast_sigmoid(u_embedding[i].dot(v_embedding[j]))
# label = 1 for positive samples; label = 0 for negative samples.
u_embedding[i] += (label - score) * v_embedding[j]
v_embedding[i] += (label - score) * u_embedding[j]
Parameters
----------
batch_walks list : a list of node sequnces
lr float : current learning rate
neg_nodes torch.LongTensor : a long tensor of sampled true negative nodes. If neg_nodes is None,
then do negative sampling randomly from the nodes in batch_walks as an alternative.
Usage example
-------------
batch_walks = [torch.LongTensor([1,2,3,4]),
torch.LongTensor([2,3,4,2])])
lr = 0.01
neg_nodes = None
"""
if self.adam:
lr = self.lr
# [batch_size, walk_length]
if isinstance(batch_walks, list):
nodes = torch.stack(batch_walks)
elif isinstance(batch_walks, torch.LongTensor):
nodes = batch_walks
if self.only_gpu:
nodes = nodes.to(self.device)
if neg_nodes is not None:
neg_nodes = neg_nodes.to(self.device)
emb_u = self.u_embeddings(nodes).view(-1, self.emb_dimension).to(self.device)
emb_v = self.v_embeddings(nodes).view(-1, self.emb_dimension).to(self.device)
## Postive
bs = len(batch_walks)
if bs < self.batch_size:
index_emb_posu, index_emb_posv = init_emb2pos_index(
self.walk_length,
self.window_size,
bs)
index_emb_posu = index_emb_posu.to(self.device)
index_emb_posv = index_emb_posv.to(self.device)
else:
index_emb_posu = self.index_emb_posu
index_emb_posv = self.index_emb_posv
# num_pos: the number of positive node pairs generated by a single walk sequence
# [batch_size * num_pos, dim]
emb_pos_u = torch.index_select(emb_u, 0, index_emb_posu)
emb_pos_v = torch.index_select(emb_v, 0, index_emb_posv)
pos_score = torch.sum(torch.mul(emb_pos_u, emb_pos_v), dim=1)
pos_score = torch.clamp(pos_score, max=6, min=-6)
# [batch_size * num_pos, 1]
score = (1 - self.fast_sigmoid(pos_score)).unsqueeze(1)
# [batch_size * num_pos, dim]
if self.lap_norm > 0:
grad_u_pos = score * emb_pos_v + self.lap_norm * (emb_pos_v - emb_pos_u)
grad_v_pos = score * emb_pos_u + self.lap_norm * (emb_pos_u - emb_pos_v)
else:
grad_u_pos = score * emb_pos_v
grad_v_pos = score * emb_pos_u
# [batch_size * walk_length, dim]
if bs < self.batch_size:
grad_u, grad_v = init_empty_grad(
self.emb_dimension,
self.walk_length,
bs)
grad_u = grad_u.to(self.device)
grad_v = grad_v.to(self.device)
else:
self.grad_u = self.grad_u.to(self.device)
self.grad_u.zero_()
self.grad_v = self.grad_v.to(self.device)
self.grad_v.zero_()
grad_u = self.grad_u
grad_v = self.grad_v
grad_u.index_add_(0, index_emb_posu, grad_u_pos)
grad_v.index_add_(0, index_emb_posv, grad_v_pos)
## Negative
if bs < self.batch_size:
index_emb_negu, index_emb_negv = init_emb2neg_index(
self.walk_length, self.window_size, self.negative, bs)
index_emb_negu = index_emb_negu.to(self.device)
index_emb_negv = index_emb_negv.to(self.device)
else:
index_emb_negu = self.index_emb_negu
index_emb_negv = self.index_emb_negv
emb_neg_u = torch.index_select(emb_u, 0, index_emb_negu)
if neg_nodes is None:
emb_neg_v = torch.index_select(emb_v, 0, index_emb_negv)
else:
emb_neg_v = self.v_embeddings.weight[neg_nodes].to(self.device)
# [batch_size * walk_length * negative, dim]
neg_score = torch.sum(torch.mul(emb_neg_u, emb_neg_v), dim=1)
neg_score = torch.clamp(neg_score, max=6, min=-6)
# [batch_size * walk_length * negative, 1]
score = - self.fast_sigmoid(neg_score).unsqueeze(1)
grad_u_neg = self.neg_weight * score * emb_neg_v
grad_v_neg = self.neg_weight * score * emb_neg_u
grad_u.index_add_(0, index_emb_negu, grad_u_neg)
if neg_nodes is None:
grad_v.index_add_(0, index_emb_negv, grad_v_neg)
## Update
nodes = nodes.view(-1)
if self.avg_sgd:
# since the times that a node are performed backward propagation are different,
# we need to average the gradients by different weight.
# e.g. for sequence [1, 2, 3, ...] with window_size = 5, we have positive node
# pairs [(1,2), (1, 3), (1,4), ...]. To average the gradients for each node, we
# perform weighting on the gradients of node pairs.
# The weights are: [1/5, 1/5, ..., 1/6, ..., 1/10, ..., 1/6, ..., 1/5].
if bs < self.batch_size:
grad_avg = init_grad_avg(
self.walk_length,
self.window_size,
bs).to(self.device)
else:
grad_avg = self.grad_avg
grad_u = grad_avg * grad_u * lr
grad_v = grad_avg * grad_v * lr
elif self.sgd:
grad_u = grad_u * lr
grad_v = grad_v * lr
elif self.adam:
# use adam optimizer
grad_u = adam(grad_u, self.state_sum_u, nodes, lr, self.device, self.only_gpu)
grad_v = adam(grad_v, self.state_sum_v, nodes, lr, self.device, self.only_gpu)
if self.mixed_train:
grad_u = grad_u.cpu()
grad_v = grad_v.cpu()
if neg_nodes is not None:
grad_v_neg = grad_v_neg.cpu()
self.u_embeddings.weight.data.index_add_(0, nodes.view(-1), grad_u)
self.v_embeddings.weight.data.index_add_(0, nodes.view(-1), grad_v)
if neg_nodes is not None:
self.v_embeddings.weight.data.index_add_(0, neg_nodes.view(-1), lr * grad_v_neg)
return
def forward(self, pos_u, pos_v, neg_v):
''' Do forward and backward. It is designed for future use. '''
emb_u = self.u_embeddings(pos_u)
emb_v = self.v_embeddings(pos_v)
emb_neg_v = self.v_embeddings(neg_v)
score = torch.sum(torch.mul(emb_u, emb_v), dim=1)
score = torch.clamp(score, max=6, min=-6)
score = -F.logsigmoid(score)
neg_score = torch.bmm(emb_neg_v, emb_u.unsqueeze(2)).squeeze()
neg_score = torch.clamp(neg_score, max=6, min=-6)
neg_score = -torch.sum(F.logsigmoid(-neg_score), dim=1)
#return torch.mean(score + neg_score)
return torch.sum(score), torch.sum(neg_score)
def save_embedding(self, dataset, file_name):
""" Write embedding to local file.
Parameter
---------
dataset DeepwalkDataset : the dataset
file_name str : the file name
"""
embedding = self.u_embeddings.weight.cpu().data.numpy()
np.save(file_name, embedding)
def save_embedding_txt(self, dataset, file_name):
""" Write embedding to local file. For future use.
Parameter
---------
dataset DeepwalkDataset : the dataset
file_name str : the file name
"""
embedding = self.u_embeddings.weight.cpu().data.numpy()
with open(file_name, 'w') as f:
f.write('%d %d\n' % (self.emb_size, self.emb_dimension))
for wid in range(self.emb_size):
e = ' '.join(map(lambda x: str(x), embedding[wid]))
f.write('%s %s\n' % (str(dataset.id2node[wid]), e))
import numpy as np
import scipy.sparse as sp
import pickle
import torch
from torch.utils.data import DataLoader
import random
import time
import dgl
from utils import shuffle_walks
np.random.seed(3141592653)
def ReadTxtNet(file_path="", undirected=True):
""" Read the txt network file.
Notations: The network is unweighted.
Parameters
----------
file_path str : path of network file
undirected bool : whether the edges are undirected
Return
------
net dict : a dict recording the connections in the graph
node2id dict : a dict mapping the nodes to their embedding indices
id2node dict : a dict mapping nodes embedding indices to the nodes
"""
node2id = {}
id2node = {}
cid = 0
src = []
dst = []
net = {}
with open(file_path, "r") as f:
for line in f.readlines():
n1, n2 = list(map(int, line.strip().split(" ")[:2]))
if n1 not in node2id:
node2id[n1] = cid
id2node[cid] = n1
cid += 1
if n2 not in node2id:
node2id[n2] = cid
id2node[cid] = n2
cid += 1
n1 = node2id[n1]
n2 = node2id[n2]
if n1 not in net:
net[n1] = {n2: 1}
src.append(n1)
dst.append(n2)
elif n2 not in net[n1]:
net[n1][n2] = 1
src.append(n1)
dst.append(n2)
if undirected:
if n2 not in net:
net[n2] = {n1: 1}
src.append(n2)
dst.append(n1)
elif n1 not in net[n2]:
net[n2][n1] = 1
src.append(n2)
dst.append(n1)
print("node num: %d" % len(net))
print("edge num: %d" % len(src))
assert max(net.keys()) == len(net) - 1, "error reading net, quit"
sm = sp.coo_matrix(
(np.ones(len(src)), (src, dst)),
dtype=np.float32)
return net, node2id, id2node, sm
def net2graph(net_sm):
""" Transform the network to DGL graph
Return
------
G DGLGraph : graph by DGL
"""
start = time.time()
G = dgl.DGLGraph(net_sm)
end = time.time()
t = end - start
print("Building DGLGraph in %.2fs" % t)
return G
class DeepwalkDataset:
def __init__(self,
net_file,
map_file,
walk_length=80,
window_size=5,
num_walks=10,
batch_size=32,
negative=5,
num_procs=4,
fast_neg=True,
):
""" This class has the following functions:
1. Transform the txt network file into DGL graph;
2. Generate random walk sequences for the trainer;
3. Provide the negative table if the user hopes to sample negative
nodes according to nodes' degrees;
Parameter
---------
net_file str : path of the txt network file
walk_length int : number of nodes in a sequence
window_size int : context window size
num_walks int : number of walks for each node
batch_size int : number of node sequences in each batch
negative int : negative samples for each positve node pair
fast_neg bool : whether do negative sampling inside a batch
"""
self.walk_length = walk_length
self.window_size = window_size
self.num_walks = num_walks
self.batch_size = batch_size
self.negative = negative
self.num_procs = num_procs
self.fast_neg = fast_neg
self.net, self.node2id, self.id2node, self.sm = ReadTxtNet(net_file)
self.save_mapping(map_file)
self.G = net2graph(self.sm)
# random walk seeds
start = time.time()
seeds = torch.cat([torch.LongTensor(self.G.nodes())] * num_walks)
self.seeds = torch.split(shuffle_walks(seeds), int(np.ceil(len(self.net) * self.num_walks / self.num_procs)), 0)
end = time.time()
t = end - start
print("%d seeds in %.2fs" % (len(seeds), t))
# negative table for true negative sampling
if not fast_neg:
node_degree = np.array(list(map(lambda x: len(self.net[x]), self.net.keys())))
node_degree = np.power(node_degree, 0.75)
node_degree /= np.sum(node_degree)
node_degree = np.array(node_degree * 1e8, dtype=np.int)
self.neg_table = []
for idx, node in enumerate(self.net.keys()):
self.neg_table += [node] * node_degree[idx]
self.neg_table_size = len(self.neg_table)
self.neg_table = np.array(self.neg_table, dtype=np.long)
del node_degree
def create_sampler(self, gpu_id):
""" Still in construction...
Several mode:
1. do true negative sampling.
1.1 from random walk sequence
1.2 from node degree distribution
return the sampled node ids
2. do false negative sampling from random walk sequence
save GPU, faster
return the node indices in the sequences
"""
return DeepwalkSampler(self.G, self.seeds[gpu_id], self.walk_length)
def save_mapping(self, map_file):
with open(map_file, "wb") as f:
pickle.dump(self.node2id, f)
class DeepwalkSampler(object):
def __init__(self, G, seeds, walk_length):
self.G = G
self.seeds = seeds
self.walk_length = walk_length
def sample(self, seeds):
walks = dgl.contrib.sampling.random_walk(self.G, seeds,
1, self.walk_length-1)
return walks
\ No newline at end of file
import torch
from functools import wraps
from _thread import start_new_thread
import torch.multiprocessing as mp
def thread_wrapped_func(func):
"""Wrapped func for torch.multiprocessing.Process.
With this wrapper we can use OMP threads in subprocesses
otherwise, OMP_NUM_THREADS=1 is mandatory.
How to use:
@thread_wrapped_func
def func_to_wrap(args ...):
"""
@wraps(func)
def decorated_function(*args, **kwargs):
queue = mp.Queue()
def _queue_result():
exception, trace, res = None, None, None
try:
res = func(*args, **kwargs)
except Exception as e:
exception = e
trace = traceback.format_exc()
queue.put((res, exception, trace))
start_new_thread(_queue_result, ())
result, exception, trace = queue.get()
if exception is None:
return result
else:
assert isinstance(exception, Exception)
raise exception.__class__(trace)
return decorated_function
def shuffle_walks(walks):
seeds = torch.randperm(walks.size()[0])
return walks[seeds]
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment