Unverified Commit d98c71ef authored by Chao Ma's avatar Chao Ma Committed by GitHub
Browse files

[DGL-KE] Clean up argument for DGL-KE (#1325)

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* clean up args

* change args for eval

* update

* update

* update

* update num_worker=8

* add num_proc=1

* update

* update

* add eval percent

* update

* update

* update

* save emb

* update

* update

* update

* update

* update

* remove neg_chunk_size

* update

* update

* update

* remove

* update

* remove

* change num_thread
parent 53b39f06
......@@ -4,69 +4,68 @@
# DistMult 1GPU
DGLBACKEND=pytorch python3 train.py --model DistMult --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 400 --gamma 143.0 --lr 0.08 --batch_size_eval 16 \
--valid --test -adv --mix_cpu_gpu --eval_interval 100000 --gpu 0 \
--num_worker=8 --max_step 40000
--valid --test -adv --mix_cpu_gpu --eval_interval 100000 --gpu 0 --num_thread 4 --max_step 40000
# DistMult 8GPU
DGLBACKEND=pytorch python3 train.py --model DistMult --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 400 --gamma 143.0 --lr 0.08 --batch_size_eval 16 \
--valid --test -adv --mix_cpu_gpu --eval_interval 100000 --num_proc 8 --gpu 0 1 2 3 4 5 6 7 \
--num_worker=4 --max_step 10000 --rel_part --async_update
--max_step 10000 --num_thread 4 --rel_part --async_update
# ComplEx 1GPU
DGLBACKEND=pytorch python3 train.py --model ComplEx --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 400 --gamma 143.0 --lr 0.1 --regularization_coef 2.00E-06 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 \
--gpu 0 --num_worker=8 --max_step 32000
--gpu 0 --num_thread 4 --max_step 32000
# ComplEx 8GPU
DGLBACKEND=pytorch python3 train.py --model ComplEx --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 400 --gamma 143.0 --lr 0.1 --regularization_coef 2.00E-06 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 --num_proc 8 \
--gpu 0 1 2 3 4 5 6 7 --num_worker=4 --max_step 4000 --rel_part --async_update
--gpu 0 1 2 3 4 5 6 7 --max_step 4000 --num_thread 4 --rel_part --async_update
# TransE_l1 1GPU
DGLBACKEND=pytorch python3 train.py --model TransE_l1 --dataset FB15k --batch_size 1024 \
--neg_sample_size 64 --regularization_coef 1e-07 --hidden_dim 400 --gamma 16.0 --lr 0.01 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 \
--gpu 0 --num_worker=8 --max_step 48000
--gpu 0 --num_thread 4 --max_step 48000
# TransE_l1 8GPU
DGLBACKEND=pytorch python3 train.py --model TransE_l1 --dataset FB15k --batch_size 1024 \
--neg_sample_size 64 --regularization_coef 1e-07 --hidden_dim 400 --gamma 16.0 --lr 0.01 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 --num_proc 8 \
--gpu 0 1 2 3 4 5 6 7 --num_worker=4 --max_step 6000 --rel_part --async_update
--gpu 0 1 2 3 4 5 6 7 --max_step 6000 --num_thread 4 --rel_part --async_update
# TransE_l2 1GPU
DGLBACKEND=pytorch python3 train.py --model TransE_l2 --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 2000 --gamma 12.0 --lr 0.1 --max_step 30000 \
--batch_size_eval 16 --gpu 0 --valid --test -adv --regularization_coef=2e-7
--batch_size_eval 16 --gpu 0 --valid --test -adv --num_thread 4 --regularization_coef=2e-7
# RESCAL 1GPU
DGLBACKEND=pytorch python3 train.py --model RESCAL --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 500 --gamma 24.0 --lr 0.03 --max_step 30000 \
--batch_size_eval 16 --gpu 0 --valid --test -adv
--batch_size_eval 16 --gpu 0 --num_thread 4 --valid --test -adv
# TransR 1GPU
DGLBACKEND=pytorch python3 train.py --model TransR --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --regularization_coef 5e-8 --hidden_dim 200 --gamma 8.0 --lr 0.015 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 \
--gpu 0 --num_worker=8 --max_step 32000
--gpu 0 --num_thread 4 --max_step 32000
# TransR 8GPU
DGLBACKEND=pytorch python3 train.py --model TransR --dataset FB15k --batch_size 1024 \
--neg_sample_size 256 --regularization_coef 5e-8 --hidden_dim 200 --gamma 8.0 --lr 0.015 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 --num_proc 8 \
--gpu 0 1 2 3 4 5 6 7 --num_worker=4 --max_step 4000 --rel_part --async_update
--gpu 0 1 2 3 4 5 6 7 --max_step 4000 --num_thread 4 --rel_part --async_update
# RotatE 1GPU
DGLBACKEND=pytorch python3 train.py --model RotatE --dataset FB15k --batch_size 2048 \
--neg_sample_size 256 --regularization_coef 1e-07 --hidden_dim 200 --gamma 12.0 --lr 0.009 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 -de \
--mix_cpu_gpu --max_step 40000 --gpu 0 --num_worker=4
--mix_cpu_gpu --num_thread 4 --max_step 40000 --gpu 0
# RotatE 8GPU
DGLBACKEND=pytorch python3 train.py --model RotatE --dataset FB15k --batch_size 2048 \
--neg_sample_size 256 --regularization_coef 1e-07 --hidden_dim 200 --gamma 12.0 --lr 0.009 \
--batch_size_eval 16 --valid --test -adv --mix_cpu_gpu --eval_interval 100000 -de \
--mix_cpu_gpu --max_step 5000 --num_proc 8 --gpu 0 1 2 3 4 5 6 7 --num_worker=4 \
--rel_part --async_update
--mix_cpu_gpu --max_step 5000 --num_proc 8 --gpu 0 1 2 3 4 5 6 7 \
--num_thread 4 --rel_part --async_update
# for wn18
DGLBACKEND=pytorch python3 train.py --model TransE_l1 --dataset wn18 --batch_size 1024 \
......@@ -101,16 +100,16 @@ DGLBACKEND=pytorch python3 train.py --model RotatE --dataset wn18 --batch_size 1
# for Freebase
DGLBACKEND=pytorch python3 train.py --model ComplEx --dataset Freebase --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 400 --gamma 500.0 --lr 0.1 --max_step 50000 \
--batch_size_eval 128 --test -adv --eval_interval 300000 \
--neg_sample_size_test 100000 --eval_percent 0.02 --num_proc 64
--neg_sample_size 256 --hidden_dim 400 --gamma 500.0 --lr 0.1 --max_step 50000 \
--batch_size_eval 128 --test -adv --eval_interval 300000 --num_thread 1 \
--neg_sample_size_test 100000 --eval_percent 0.02 --num_proc 48
# Freebase multi-gpu
# TransE_l2 8 GPU
DGLBACKEND=pytorch python3 train.py --model TransE_l2 --dataset Freebase --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 400 --gamma 10 --lr 0.1 --batch_size_eval 1000 \
--valid --test -adv --mix_cpu_gpu --neg_deg_sample_eval --neg_sample_size_test 1000 \
--num_proc 8 --gpu 0 1 2 3 4 5 6 7 --num_worker 4 --regularization_coef 1e-9 \
--num_proc 8 --gpu 0 1 2 3 4 5 6 7 --num_thread 4 --regularization_coef 1e-9 \
--no_eval_filter --max_step 400000 --rel_part --eval_interval 100000 --log_interval 10000 \
--no_eval_filter --async_update --neg_deg_sample --force_sync_interval 1000
......@@ -118,6 +117,6 @@ DGLBACKEND=pytorch python3 train.py --model TransE_l2 --dataset Freebase --batch
DGLBACKEND=pytorch python3 train.py --model TransE_l2 --dataset Freebase --batch_size 1024 \
--neg_sample_size 256 --hidden_dim 400 --gamma 10 --lr 0.1 --batch_size_eval 1000 \
--valid --test -adv --mix_cpu_gpu --neg_deg_sample_eval --neg_sample_size_test 1000 \
--num_proc 16 --gpu 0 1 2 3 4 5 6 7 --num_worker 4 --regularization_coef 1e-9 \
--num_proc 16 --gpu 0 1 2 3 4 5 6 7 --num_thread 4 --regularization_coef 1e-9 \
--no_eval_filter --max_step 200000 --soft_rel_part --eval_interval 100000 --log_interval 10000 \
--no_eval_filter --async_update --neg_deg_sample --force_sync_interval 1000
##################################################################################
# This script runing distmult model on Freebase dataset in distributed setting.
# You can change the hyper-parameter in this file but DO NOT run script manually
##################################################################################
machine_id=$1
server_count=$2
# Delete the temp file
rm *-shape
##################################################################################
# Start kvserver
##################################################################################
SERVER_ID_LOW=$((machine_id*server_count))
SERVER_ID_HIGH=$(((machine_id+1)*server_count))
while [ $SERVER_ID_LOW -lt $SERVER_ID_HIGH ]
do
MKL_NUM_THREADS=1 OMP_NUM_THREADS=1 DGLBACKEND=pytorch python3 ../kvserver.py --model TransE_l2 --dataset FB15k \
--hidden_dim 400 --gamma 19.9 --lr 0.25 --total_client 64 --server_id $SERVER_ID_LOW &
let SERVER_ID_LOW+=1
done
##################################################################################
# Start kvclient
##################################################################################
MKL_NUM_THREADS=1 OMP_NUM_THREADS=1 DGLBACKEND=pytorch python3 ../kvclient.py --model TransE_l2 --dataset FB15k \
--batch_size 1000 --neg_sample_size 200 --hidden_dim 400 --gamma 19.9 --lr 0.25 --max_step 500 --log_interval 100 --num_thread 1 \
--batch_size_eval 16 --test -adv --regularization_coef 1e-9 --total_machine 4 --num_client 16
\ No newline at end of file
......@@ -26,4 +26,4 @@ done
##################################################################################
MKL_NUM_THREADS=1 OMP_NUM_THREADS=1 DGLBACKEND=pytorch python3 ../kvclient.py --model ComplEx --dataset Freebase \
--batch_size 1024 --neg_sample_size 256 --hidden_dim 400 --gamma 143.0 --lr 0.1 --max_step 12500 --log_interval 100 \
--batch_size_eval 1000 --neg_sample_size_test 1000 --test -adv --total_machine 4 --num_client 40
\ No newline at end of file
--batch_size_eval 1000 --neg_sample_size_test 1000 --test -adv --total_machine 4 --num_thread 1 --num_client 40
\ No newline at end of file
......@@ -26,4 +26,4 @@ done
##################################################################################
MKL_NUM_THREADS=1 OMP_NUM_THREADS=1 DGLBACKEND=pytorch python3 ../kvclient.py --model DistMult --dataset Freebase \
--batch_size 1024 --neg_sample_size 256 --hidden_dim 400 --gamma 143.0 --lr 0.08 --max_step 12500 --log_interval 100 \
--batch_size_eval 1000 --neg_sample_size_test 1000 --test -adv --total_machine 4 --num_client 40
\ No newline at end of file
--batch_size_eval 1000 --neg_sample_size_test 1000 --test -adv --total_machine 4 --num_thread 1 --num_client 40
\ No newline at end of file
......@@ -25,5 +25,5 @@ done
# Start kvclient
##################################################################################
MKL_NUM_THREADS=1 OMP_NUM_THREADS=1 DGLBACKEND=pytorch python3 ../kvclient.py --model TransE_l2 --dataset Freebase \
--batch_size 1000 --neg_sample_size 200 --hidden_dim 400 --gamma 10 --lr 0.1 --max_step 12500 --log_interval 100 \
--batch_size 1000 --neg_sample_size 200 --hidden_dim 400 --gamma 10 --lr 0.1 --max_step 12500 --log_interval 100 --num_thread 1 \
--batch_size_eval 1000 --neg_sample_size_test 1000 --test -adv --regularization_coef 1e-9 --total_machine 4 --num_client 40
\ No newline at end of file
......@@ -2,7 +2,7 @@
# User runs this script to launch distrobited jobs on cluster
##################################################################################
script_path=~/dgl/apps/kg/distributed
script_file=./freebase_transe_l2.sh
script_file=./fb15k_transe_l2.sh
user_name=ubuntu
ssh_key=~/mctt.pem
......
......@@ -22,8 +22,8 @@ class ArgParser(argparse.ArgumentParser):
super(ArgParser, self).__init__()
self.add_argument('--model_name', default='TransE',
choices=['TransE', 'TransE_l1', 'TransE_l2', 'TransH', 'TransR', 'TransD',
'RESCAL', 'DistMult', 'ComplEx', 'RotatE', 'pRotatE'],
choices=['TransE', 'TransE_l1', 'TransE_l2', 'TransR',
'RESCAL', 'DistMult', 'ComplEx', 'RotatE'],
help='model to use')
self.add_argument('--data_path', type=str, default='data',
help='root path of all dataset')
......@@ -36,15 +36,12 @@ class ArgParser(argparse.ArgumentParser):
help='a list of data files, e.g. entity relation train valid test')
self.add_argument('--model_path', type=str, default='ckpts',
help='the place where models are saved')
self.add_argument('--batch_size', type=int, default=8,
self.add_argument('--batch_size_eval', type=int, default=8,
help='batch size used for eval and test')
self.add_argument('--neg_sample_size', type=int, default=-1,
self.add_argument('--neg_sample_size_test', type=int, default=-1,
help='negative sampling size for testing')
self.add_argument('--neg_deg_sample', action='store_true',
self.add_argument('--neg_deg_sample_eval', action='store_true',
help='negative sampling proportional to vertex degree for testing')
self.add_argument('--neg_chunk_size', type=int, default=-1,
help='chunk size of the negative edges.')
self.add_argument('--hidden_dim', type=int, default=256,
help='hidden dim used by relation and entity')
self.add_argument('-g', '--gamma', type=float, default=12.0,
......@@ -53,7 +50,6 @@ class ArgParser(argparse.ArgumentParser):
help='sample some percentage for evaluation.')
self.add_argument('--no_eval_filter', action='store_true',
help='do not filter positive edges among negative edges for evaluation')
self.add_argument('--gpu', type=int, default=[-1], nargs='+',
help='a list of active gpu ids, e.g. 0')
self.add_argument('--mix_cpu_gpu', action='store_true',
......@@ -62,11 +58,6 @@ class ArgParser(argparse.ArgumentParser):
help='double entitiy dim for complex number')
self.add_argument('-dr', '--double_rel', action='store_true',
help='double relation dim for complex number')
self.add_argument('--seed', type=int, default=0,
help='set random seed fro reproducibility')
self.add_argument('--num_worker', type=int, default=16,
help='number of workers used for loading data')
self.add_argument('--num_proc', type=int, default=1,
help='number of process used')
self.add_argument('--num_thread', type=int, default=1,
......@@ -96,7 +87,7 @@ def get_logger(args):
def main(args):
args.eval_filter = not args.no_eval_filter
if args.neg_deg_sample:
if args.neg_deg_sample_eval:
assert not args.eval_filter, "if negative sampling based on degree, we can't filter positive edges."
# load dataset and samplers
......@@ -108,59 +99,50 @@ def main(args):
args.strict_rel_part = False
args.soft_rel_part = False
args.async_update = False
args.batch_size_eval = args.batch_size
logger = get_logger(args)
# Here we want to use the regualr negative sampler because we need to ensure that
# all positive edges are excluded.
eval_dataset = EvalDataset(dataset, args)
args.neg_sample_size_test = args.neg_sample_size
args.neg_deg_sample_eval = args.neg_deg_sample
if args.neg_sample_size < 0:
if args.neg_sample_size_test < 0:
args.neg_sample_size_test = args.neg_sample_size = eval_dataset.g.number_of_nodes()
if args.neg_chunk_size < 0:
args.neg_chunk_size = args.neg_sample_size
num_workers = args.num_worker
# for multiprocessing evaluation, we don't need to sample multiple batches at a time
# in each process.
if args.num_proc > 1:
num_workers = 1
args.num_workers = 8 # fix num_workers to 8
if args.num_proc > 1:
test_sampler_tails = []
test_sampler_heads = []
for i in range(args.num_proc):
test_sampler_head = eval_dataset.create_sampler('test', args.batch_size,
args.neg_sample_size,
args.neg_chunk_size,
test_sampler_head = eval_dataset.create_sampler('test', args.batch_size_eval,
args.neg_sample_size_test,
args.neg_sample_size_test,
args.eval_filter,
mode='chunk-head',
num_workers=num_workers,
num_workers=args.num_workers,
rank=i, ranks=args.num_proc)
test_sampler_tail = eval_dataset.create_sampler('test', args.batch_size,
args.neg_sample_size,
args.neg_chunk_size,
test_sampler_tail = eval_dataset.create_sampler('test', args.batch_size_eval,
args.neg_sample_size_test,
args.neg_sample_size_test,
args.eval_filter,
mode='chunk-tail',
num_workers=num_workers,
num_workers=args.num_workers,
rank=i, ranks=args.num_proc)
test_sampler_heads.append(test_sampler_head)
test_sampler_tails.append(test_sampler_tail)
else:
test_sampler_head = eval_dataset.create_sampler('test', args.batch_size,
args.neg_sample_size,
args.neg_chunk_size,
test_sampler_head = eval_dataset.create_sampler('test', args.batch_size_eval,
args.neg_sample_size_test,
args.neg_sample_size_test,
args.eval_filter,
mode='chunk-head',
num_workers=num_workers,
num_workers=args.num_workers,
rank=0, ranks=1)
test_sampler_tail = eval_dataset.create_sampler('test', args.batch_size,
args.neg_sample_size,
args.neg_chunk_size,
test_sampler_tail = eval_dataset.create_sampler('test', args.batch_size_eval,
args.neg_sample_size_test,
args.neg_sample_size_test,
args.eval_filter,
mode='chunk-tail',
num_workers=num_workers,
num_workers=args.num_workers,
rank=0, ranks=1)
# load model
......
......@@ -18,8 +18,7 @@ from dataloader import get_dataset, get_partition_dataset
import dgl
import dgl.backend as F
NUM_THREAD = 1 # Fix the number of threads to 1 on kvclient
NUM_WORKER = 1 # Fix the number of worker for sampler to 1
WAIT_TIME = 10
class ArgParser(argparse.ArgumentParser):
def __init__(self):
......@@ -33,48 +32,37 @@ class ArgParser(argparse.ArgumentParser):
help='root path of all dataset')
self.add_argument('--dataset', type=str, default='FB15k',
help='dataset name, under data_path')
self.add_argument('--format', type=str, default='1',
help='the format of the dataset.')
self.add_argument('--save_path', type=str, default='ckpts',
self.add_argument('--format', type=str, default='built_in',
help='the format of the dataset, it can be built_in,'\
'raw_udd_{htr} and udd_{htr}')
self.add_argument('--save_path', type=str, default='../ckpts',
help='place to save models and logs')
self.add_argument('--save_emb', type=str, default=None,
help='save the embeddings in the specific location.')
self.add_argument('--max_step', type=int, default=80000,
help='train xx steps')
self.add_argument('--warm_up_step', type=int, default=None,
help='for learning rate decay')
self.add_argument('--batch_size', type=int, default=1024,
help='batch size')
self.add_argument('--batch_size_eval', type=int, default=8,
help='batch size used for eval and test')
self.add_argument('--neg_sample_size', type=int, default=128,
help='negative sampling size')
self.add_argument('--neg_chunk_size', type=int, default=-1,
help='chunk size of the negative edges.')
self.add_argument('--neg_deg_sample', action='store_true',
help='negative sample proportional to vertex degree in the training')
self.add_argument('--neg_deg_sample_eval', action='store_true',
help='negative sampling proportional to vertex degree in the evaluation')
self.add_argument('--neg_sample_size_valid', type=int, default=1000,
help='negative sampling size for validation')
self.add_argument('--neg_chunk_size_valid', type=int, default=-1,
help='chunk size of the negative edges.')
self.add_argument('--neg_sample_size_test', type=int, default=-1,
help='negative sampling size for testing')
self.add_argument('--neg_chunk_size_test', type=int, default=-1,
help='chunk size of the negative edges.')
self.add_argument('--hidden_dim', type=int, default=256,
help='hidden dim used by relation and entity')
self.add_argument('--lr', type=float, default=0.0001,
help='learning rate')
self.add_argument('-g', '--gamma', type=float, default=12.0,
help='margin value')
self.add_argument('--eval_percent', type=float, default=1,
help='sample some percentage for evaluation.')
self.add_argument('--no_eval_filter', action='store_true',
help='do not filter positive edges among negative edges for evaluation')
self.add_argument('--gpu', type=int, default=[-1], nargs='+',
help='a list of active gpu ids, e.g. 0 1 2 4')
self.add_argument('--mix_cpu_gpu', action='store_true',
......@@ -83,16 +71,16 @@ class ArgParser(argparse.ArgumentParser):
help='double entitiy dim for complex number')
self.add_argument('-dr', '--double_rel', action='store_true',
help='double relation dim for complex number')
self.add_argument('--seed', type=int, default=0,
help='set random seed fro reproducibility')
self.add_argument('-log', '--log_interval', type=int, default=1000,
help='do evaluation after every x steps')
self.add_argument('--eval_interval', type=int, default=10000,
help='do evaluation after every x steps')
self.add_argument('--eval_percent', type=float, default=1,
help='sample some percentage for evaluation.')
self.add_argument('-adv', '--neg_adversarial_sampling', action='store_true',
help='if use negative adversarial sampling')
self.add_argument('-a', '--adversarial_temperature', default=1.0, type=float)
self.add_argument('-a', '--adversarial_temperature', default=1.0, type=float,
help='adversarial_temperature')
self.add_argument('--valid', action='store_true',
help='if valid a model')
self.add_argument('--test', action='store_true',
......@@ -101,30 +89,22 @@ class ArgParser(argparse.ArgumentParser):
help='set value > 0.0 if regularization is used')
self.add_argument('-rn', '--regularization_norm', type=int, default=3,
help='norm used in regularization')
self.add_argument('--num_worker', type=int, default=32,
help='number of workers used for loading data')
self.add_argument('--non_uni_weight', action='store_true',
help='if use uniform weight when computing loss')
self.add_argument('--init_step', type=int, default=0,
help='DONT SET MANUALLY, used for resume')
self.add_argument('--step', type=int, default=0,
help='DONT SET MANUALLY, track current step')
self.add_argument('--pickle_graph', action='store_true',
help='pickle built graph, building a huge graph is slow.')
self.add_argument('--num_proc', type=int, default=1,
help='number of process used')
self.add_argument('--num_thread', type=int, default=1,
help='number of thread used')
self.add_argument('--rel_part', action='store_true',
help='enable relation partitioning')
self.add_argument('--soft_rel_part', action='store_true',
help='enable soft relation partition')
self.add_argument('--nomp_thread_per_process', type=int, default=-1,
help='num of omp threads used per process in multi-process training')
self.add_argument('--async_update', action='store_true',
help='allow async_update on node embedding')
self.add_argument('--force_sync_interval', type=int, default=-1,
help='We force a synchronization between processes every x steps')
self.add_argument('--strict_rel_part', action='store_true',
help='Strict relation partition')
self.add_argument('--machine_id', type=int, default=0,
help='Unique ID of current machine.')
......@@ -191,7 +171,8 @@ def get_local_machine_id(server_namebook):
def start_worker(args, logger):
"""Start kvclient for training
"""
train_time_start = time.time()
init_time_start = time.time()
time.sleep(WAIT_TIME) # wait for launch script
server_namebook = dgl.contrib.read_ip_config(filename=args.ip_config)
......@@ -218,50 +199,44 @@ def start_worker(args, logger):
entity_partition_book.share_memory_()
local2global.share_memory_()
model = load_model(logger, args, n_entities, n_relations)
model.share_memory()
# When we generate a batch of negative edges from a set of positive edges,
# we first divide the positive edges into chunks and corrupt the edges in a chunk
# together. By default, the chunk size is equal to the negative sample size.
# Usually, this works well. But we also allow users to specify the chunk size themselves.
if args.neg_chunk_size < 0:
args.neg_chunk_size = args.neg_sample_size
num_workers = NUM_WORKER
train_data = TrainDataset(dataset, args, ranks=args.num_client)
# if there is no cross partition relaiton, we fall back to strict_rel_part
args.strict_rel_part = args.mix_cpu_gpu and (train_data.cross_part == False)
args.soft_rel_part = args.mix_cpu_gpu and args.soft_rel_part and train_data.cross_part
args.num_workers = 8 # fix num_workers to 8
train_samplers = []
for i in range(args.num_client):
train_sampler_head = train_data.create_sampler(args.batch_size,
args.neg_sample_size,
args.neg_chunk_size,
args.neg_sample_size,
mode='head',
num_workers=num_workers,
num_workers=args.num_workers,
shuffle=True,
exclude_positive=False,
rank=i)
train_sampler_tail = train_data.create_sampler(args.batch_size,
args.neg_sample_size,
args.neg_chunk_size,
args.neg_sample_size,
mode='tail',
num_workers=num_workers,
num_workers=args.num_workers,
shuffle=True,
exclude_positive=False,
rank=i)
train_samplers.append(NewBidirectionalOneShotIterator(train_sampler_head, train_sampler_tail,
args.neg_chunk_size, args.neg_sample_size,
args.neg_sample_size, args.neg_sample_size,
True, n_entities))
dataset = None
print('Total data loading time {:.3f} seconds'.format(time.time() - train_time_start))
model = load_model(logger, args, n_entities, n_relations)
model.share_memory()
print('Total initialize time {:.3f} seconds'.format(time.time() - init_time_start))
rel_parts = train_data.rel_parts if args.strict_rel_part or args.soft_rel_part else None
cross_rels = train_data.cross_rels if args.soft_rel_part else None
args.num_thread = NUM_THREAD
procs = []
barrier = mp.Barrier(args.num_client)
for i in range(args.num_client):
......
......@@ -48,16 +48,15 @@ class ArgParser(argparse.ArgumentParser):
help='root path of all dataset')
self.add_argument('--dataset', type=str, default='FB15k',
help='dataset name, under data_path')
self.add_argument('--format', type=str, default='1',
help='the format of the dataset.')
self.add_argument('--format', type=str, default='built_in',
help='the format of the dataset, it can be built_in,'\
'raw_udd_{htr} and udd_{htr}')
self.add_argument('--hidden_dim', type=int, default=256,
help='hidden dim used by relation and entity')
self.add_argument('--lr', type=float, default=0.0001,
help='learning rate')
self.add_argument('-g', '--gamma', type=float, default=12.0,
help='margin value')
self.add_argument('--gpu', type=int, default=[-1], nargs='+',
help='a list of active gpu ids, e.g. 0')
self.add_argument('--mix_cpu_gpu', action='store_true',
......@@ -66,20 +65,8 @@ class ArgParser(argparse.ArgumentParser):
help='double entitiy dim for complex number')
self.add_argument('-dr', '--double_rel', action='store_true',
help='double relation dim for complex number')
self.add_argument('--seed', type=int, default=0,
help='set random seed for reproducibility')
self.add_argument('--rel_part', action='store_true',
help='enable relation partitioning')
self.add_argument('--soft_rel_part', action='store_true',
help='enable soft relation partition')
self.add_argument('--nomp_thread_per_process', type=int, default=-1,
help='num of omp threads used per process in multi-process training')
self.add_argument('--async_update', action='store_true',
help='allow async_update on node embedding')
self.add_argument('--strict_rel_part', action='store_true',
help='Strict relation partition')
self.add_argument('--num_thread', type=int, default=1,
help='number of thread used')
self.add_argument('--server_id', type=int, default=0,
help='Unique ID of each server')
self.add_argument('--ip_config', type=str, default='ip_config.txt',
......@@ -107,6 +94,9 @@ def get_server_data(args, machine_id):
print('n_entities: ' + str(dataset.n_entities))
print('n_relations: ' + str(dataset.n_relations))
args.soft_rel_part = False
args.strict_rel_part = False
model = load_model(None, args, dataset.n_entities, dataset.n_relations)
return g2l, model.entity_emb.emb, model.entity_emb.state_sum, model.relation_emb.emb, model.relation_emb.state_sum
......
This diff is collapsed.
......@@ -41,7 +41,7 @@ def train(args, model, train_sampler, valid_samplers=None, rank=0, rel_parts=Non
model.prepare_relation(mx.gpu(gpu_id))
start = time.time()
for step in range(args.init_step, args.max_step):
for step in range(0, args.max_step):
pos_g, neg_g = next(train_sampler)
args.step = step
with mx.autograd.record():
......@@ -92,6 +92,6 @@ def test(args, model, test_samplers, rank=0, mode='Test', queue=None):
metrics[metric] = sum([log[metric] for log in logs]) / len(logs)
for k, v in metrics.items():
print('{} average {} at [{}/{}]: {}'.format(mode, k, args.step, args.max_step, v))
print('{} average {}: {}'.format(mode, k, v))
for i in range(len(test_samplers)):
test_samplers[i] = test_samplers[i].reset()
......@@ -112,11 +112,10 @@ def train(args, model, train_sampler, valid_samplers=None, rank=0, rel_parts=Non
update_time = 0
forward_time = 0
backward_time = 0
for step in range(args.init_step, args.max_step):
for step in range(0, args.max_step):
start1 = time.time()
pos_g, neg_g = next(train_sampler)
sample_time += time.time() - start1
args.step = step
if client is not None:
model.pull_model(client, pos_g, neg_g)
......@@ -200,7 +199,7 @@ def test(args, model, test_samplers, rank=0, mode='Test', queue=None):
queue.put(logs)
else:
for k, v in metrics.items():
print('[{}]{} average {} at [{}/{}]: {}'.format(rank, mode, k, args.step, args.max_step, v))
print('[{}]{} average {}: {}'.format(rank, mode, k, v))
test_samplers[0] = test_samplers[0].reset()
test_samplers[1] = test_samplers[1].reset()
......@@ -212,6 +211,8 @@ def train_mp(args, model, train_sampler, valid_samplers=None, rank=0, rel_parts=
@thread_wrapped_func
def test_mp(args, model, test_samplers, rank=0, mode='Test', queue=None):
if args.num_proc > 1:
th.set_num_threads(args.num_thread)
test(args, model, test_samplers, rank, mode, queue)
@thread_wrapped_func
......@@ -248,11 +249,6 @@ def dist_train_test(args, model, train_sampler, entity_pb, relation_pb, l2g, ran
if args.neg_deg_sample_eval:
assert not args.eval_filter, "if negative sampling based on degree, we can't filter positive edges."
if args.neg_chunk_size_valid < 0:
args.neg_chunk_size_valid = args.neg_sample_size_valid
if args.neg_chunk_size_test < 0:
args.neg_chunk_size_test = args.neg_sample_size_test
print("Pull relation_emb ...")
relation_id = F.arange(0, model_test.n_relations)
relation_data = client.pull(name='relation_emb', id_tensor=relation_id)
......@@ -278,10 +274,10 @@ def dist_train_test(args, model, train_sampler, entity_pb, relation_pb, l2g, ran
end += count
percent += 1
if args.save_emb is not None:
if not os.path.exists(args.save_emb):
os.mkdir(args.save_emb)
model_test.save_emb(args.save_emb, args.dataset)
if args.save_emb is not None:
if not os.path.exists(args.save_emb):
os.mkdir(args.save_emb)
model.save_emb(args.save_emb, args.dataset)
if args.test:
args.num_thread = 1
......@@ -290,17 +286,17 @@ def dist_train_test(args, model, train_sampler, entity_pb, relation_pb, l2g, ran
for i in range(args.num_test_proc):
test_sampler_head = eval_dataset.create_sampler('test', args.batch_size_eval,
args.neg_sample_size_test,
args.neg_chunk_size_test,
args.neg_sample_size_test,
args.eval_filter,
mode='chunk-head',
num_workers=args.num_thread,
num_workers=args.num_workers,
rank=i, ranks=args.num_test_proc)
test_sampler_tail = eval_dataset.create_sampler('test', args.batch_size_eval,
args.neg_sample_size_test,
args.neg_chunk_size_test,
args.neg_sample_size_test,
args.eval_filter,
mode='chunk-tail',
num_workers=args.num_thread,
num_workers=args.num_workers,
rank=i, ranks=args.num_test_proc)
test_sampler_heads.append(test_sampler_head)
test_sampler_tails.append(test_sampler_tail)
......@@ -332,7 +328,7 @@ def dist_train_test(args, model, train_sampler, entity_pb, relation_pb, l2g, ran
for metric in logs[0].keys():
metrics[metric] = sum([log[metric] for log in logs]) / len(logs)
for k, v in metrics.items():
print('Test average {} at [{}/{}]: {}'.format(k, args.step, args.max_step, v))
print('Test average {} : {}'.format(k, v))
for proc in procs:
proc.join()
......
......@@ -224,7 +224,6 @@ class KVServer(object):
else: # Read shared-tensor
while True:
if (os.path.exists(name+'-data-shape')):
time.sleep(2) # wait writing finish
break
else:
time.sleep(2) # wait until the file been created
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment