"""Training GCMC model on the MovieLens data set by mini-batch sampling. The script loads the full graph in CPU and samples subgraphs for computing gradients on the training device. The script also supports multi-GPU for further acceleration. """ import argparse import logging import os, time import random import string import traceback import dgl import numpy as np import torch as th import torch.multiprocessing as mp import torch.nn as nn import tqdm from data import MovieLens from model import BiDecoder, DenseBiDecoder, GCMCLayer from torch.nn.parallel import DistributedDataParallel from torch.utils.data import DataLoader from utils import ( get_activation, get_optimizer, MetricLogger, to_etype_name, torch_net_info, torch_total_param_num, ) class Net(nn.Module): def __init__(self, args, dev_id): super(Net, self).__init__() self._act = get_activation(args.model_activation) self.encoder = GCMCLayer( args.rating_vals, args.src_in_units, args.dst_in_units, args.gcn_agg_units, args.gcn_out_units, args.gcn_dropout, args.gcn_agg_accum, agg_act=self._act, share_user_item_param=args.share_param, device=dev_id, ) if args.mix_cpu_gpu and args.use_one_hot_fea: # if use_one_hot_fea, user and movie feature is None # W can be extremely large, with mix_cpu_gpu W should be stored in CPU self.encoder.partial_to(dev_id) else: self.encoder.to(dev_id) self.decoder = BiDecoder( in_units=args.gcn_out_units, num_classes=len(args.rating_vals), num_basis=args.gen_r_num_basis_func, ) self.decoder.to(dev_id) def forward( self, compact_g, frontier, ufeat, ifeat, possible_rating_values ): user_out, movie_out = self.encoder(frontier, ufeat, ifeat) pred_ratings = self.decoder(compact_g, user_out, movie_out) return pred_ratings def load_subtensor(input_nodes, pair_graph, blocks, dataset, parent_graph): output_nodes = pair_graph.ndata[dgl.NID] head_feat = ( input_nodes["user"] if dataset.user_feature is None else dataset.user_feature[input_nodes["user"]] ) tail_feat = ( input_nodes["movie"] if dataset.movie_feature is None else dataset.movie_feature[input_nodes["movie"]] ) for block in blocks: block.dstnodes["user"].data["ci"] = parent_graph.nodes["user"].data[ "ci" ][block.dstnodes["user"].data[dgl.NID]] block.srcnodes["user"].data["cj"] = parent_graph.nodes["user"].data[ "cj" ][block.srcnodes["user"].data[dgl.NID]] block.dstnodes["movie"].data["ci"] = parent_graph.nodes["movie"].data[ "ci" ][block.dstnodes["movie"].data[dgl.NID]] block.srcnodes["movie"].data["cj"] = parent_graph.nodes["movie"].data[ "cj" ][block.srcnodes["movie"].data[dgl.NID]] return head_feat, tail_feat, blocks def flatten_etypes(pair_graph, dataset, segment): n_users = pair_graph.number_of_nodes("user") n_movies = pair_graph.number_of_nodes("movie") src = [] dst = [] labels = [] ratings = [] for rating in dataset.possible_rating_values: src_etype, dst_etype = pair_graph.edges( order="eid", etype=to_etype_name(rating) ) src.append(src_etype) dst.append(dst_etype) label = np.searchsorted(dataset.possible_rating_values, rating) ratings.append(th.LongTensor(np.full_like(src_etype, rating))) labels.append(th.LongTensor(np.full_like(src_etype, label))) src = th.cat(src) dst = th.cat(dst) ratings = th.cat(ratings) labels = th.cat(labels) flattened_pair_graph = dgl.heterograph( {("user", "rate", "movie"): (src, dst)}, num_nodes_dict={"user": n_users, "movie": n_movies}, ) flattened_pair_graph.edata["rating"] = ratings flattened_pair_graph.edata["label"] = labels return flattened_pair_graph def evaluate(args, dev_id, net, dataset, dataloader, segment="valid"): possible_rating_values = dataset.possible_rating_values nd_possible_rating_values = th.FloatTensor(possible_rating_values).to( dev_id ) real_pred_ratings = [] true_rel_ratings = [] for input_nodes, pair_graph, blocks in dataloader: head_feat, tail_feat, blocks = load_subtensor( input_nodes, pair_graph, blocks, dataset, dataset.valid_enc_graph if segment == "valid" else dataset.test_enc_graph, ) frontier = blocks[0] true_relation_ratings = ( dataset.valid_truths[pair_graph.edata[dgl.EID]] if segment == "valid" else dataset.test_truths[pair_graph.edata[dgl.EID]] ) frontier = frontier.to(dev_id) head_feat = head_feat.to(dev_id) tail_feat = tail_feat.to(dev_id) pair_graph = pair_graph.to(dev_id) with th.no_grad(): pred_ratings = net( pair_graph, frontier, head_feat, tail_feat, possible_rating_values, ) batch_pred_ratings = ( th.softmax(pred_ratings, dim=1) * nd_possible_rating_values.view(1, -1) ).sum(dim=1) real_pred_ratings.append(batch_pred_ratings) true_rel_ratings.append(true_relation_ratings) real_pred_ratings = th.cat(real_pred_ratings, dim=0) true_rel_ratings = th.cat(true_rel_ratings, dim=0).to(dev_id) rmse = ((real_pred_ratings - true_rel_ratings) ** 2.0).mean().item() rmse = np.sqrt(rmse) return rmse def config(): parser = argparse.ArgumentParser(description="GCMC") parser.add_argument("--seed", default=123, type=int) parser.add_argument("--gpu", type=str, default="0") parser.add_argument("--save_dir", type=str, help="The saving directory") parser.add_argument("--save_id", type=int, help="The saving log id") parser.add_argument("--silent", action="store_true") parser.add_argument( "--data_name", default="ml-1m", type=str, help="The dataset name: ml-100k, ml-1m, ml-10m", ) parser.add_argument( "--data_test_ratio", type=float, default=0.1 ) ## for ml-100k the test ration is 0.2 parser.add_argument("--data_valid_ratio", type=float, default=0.1) parser.add_argument("--use_one_hot_fea", action="store_true", default=False) parser.add_argument("--model_activation", type=str, default="leaky") parser.add_argument("--gcn_dropout", type=float, default=0.7) parser.add_argument("--gcn_agg_norm_symm", type=bool, default=True) parser.add_argument("--gcn_agg_units", type=int, default=500) parser.add_argument("--gcn_agg_accum", type=str, default="sum") parser.add_argument("--gcn_out_units", type=int, default=75) parser.add_argument("--gen_r_num_basis_func", type=int, default=2) parser.add_argument("--train_max_epoch", type=int, default=1000) parser.add_argument("--train_log_interval", type=int, default=1) parser.add_argument("--train_valid_interval", type=int, default=1) parser.add_argument("--train_optimizer", type=str, default="adam") parser.add_argument("--train_grad_clip", type=float, default=1.0) parser.add_argument("--train_lr", type=float, default=0.01) parser.add_argument("--train_min_lr", type=float, default=0.0001) parser.add_argument("--train_lr_decay_factor", type=float, default=0.5) parser.add_argument("--train_decay_patience", type=int, default=25) parser.add_argument("--train_early_stopping_patience", type=int, default=50) parser.add_argument("--share_param", default=False, action="store_true") parser.add_argument("--mix_cpu_gpu", default=False, action="store_true") parser.add_argument("--minibatch_size", type=int, default=20000) parser.add_argument("--num_workers_per_gpu", type=int, default=8) args = parser.parse_args() ### configure save_fir to save all the info if args.save_dir is None: args.save_dir = ( args.data_name + "_" + "".join( random.choices(string.ascii_uppercase + string.digits, k=2) ) ) if args.save_id is None: args.save_id = np.random.randint(20) args.save_dir = os.path.join("log", args.save_dir) if not os.path.isdir(args.save_dir): os.makedirs(args.save_dir) return args def run(proc_id, n_gpus, args, devices, dataset): dev_id = devices[proc_id] if n_gpus > 1: dist_init_method = "tcp://{master_ip}:{master_port}".format( master_ip="127.0.0.1", master_port="12345" ) world_size = n_gpus th.distributed.init_process_group( backend="nccl", init_method=dist_init_method, world_size=world_size, rank=dev_id, ) if n_gpus > 0: th.cuda.set_device(dev_id) train_labels = dataset.train_labels train_truths = dataset.train_truths num_edges = train_truths.shape[0] reverse_types = { to_etype_name(k): "rev-" + to_etype_name(k) for k in dataset.possible_rating_values } reverse_types.update({v: k for k, v in reverse_types.items()}) sampler = dgl.dataloading.MultiLayerNeighborSampler( [None], return_eids=True ) sampler = dgl.dataloading.as_edge_prediction_sampler(sampler) dataloader = dgl.dataloading.DataLoader( dataset.train_enc_graph, { to_etype_name(k): th.arange( dataset.train_enc_graph.number_of_edges(etype=to_etype_name(k)) ) for k in dataset.possible_rating_values }, sampler, use_ddp=n_gpus > 1, batch_size=args.minibatch_size, shuffle=True, drop_last=False, ) if proc_id == 0: valid_dataloader = dgl.dataloading.DataLoader( dataset.valid_dec_graph, th.arange(dataset.valid_dec_graph.number_of_edges()), sampler, g_sampling=dataset.valid_enc_graph, batch_size=args.minibatch_size, shuffle=False, drop_last=False, ) test_dataloader = dgl.dataloading.DataLoader( dataset.test_dec_graph, th.arange(dataset.test_dec_graph.number_of_edges()), sampler, g_sampling=dataset.test_enc_graph, batch_size=args.minibatch_size, shuffle=False, drop_last=False, ) nd_possible_rating_values = th.FloatTensor(dataset.possible_rating_values) nd_possible_rating_values = nd_possible_rating_values.to(dev_id) net = Net(args=args, dev_id=dev_id) net = net.to(dev_id) if n_gpus > 1: net = DistributedDataParallel( net, device_ids=[dev_id], output_device=dev_id ) rating_loss_net = nn.CrossEntropyLoss() learning_rate = args.train_lr optimizer = get_optimizer(args.train_optimizer)( net.parameters(), lr=learning_rate ) print("Loading network finished ...\n") ### declare the loss information best_valid_rmse = np.inf no_better_valid = 0 best_epoch = -1 count_rmse = 0 count_num = 0 count_loss = 0 print("Start training ...") dur = [] iter_idx = 1 for epoch in range(1, args.train_max_epoch): if n_gpus > 1: dataloader.set_epoch(epoch) if epoch > 1: t0 = time.time() net.train() with tqdm.tqdm(dataloader) as tq: for step, (input_nodes, pair_graph, blocks) in enumerate(tq): head_feat, tail_feat, blocks = load_subtensor( input_nodes, pair_graph, blocks, dataset, dataset.train_enc_graph, ) frontier = blocks[0] compact_g = flatten_etypes(pair_graph, dataset, "train").to( dev_id ) true_relation_labels = compact_g.edata["label"] true_relation_ratings = compact_g.edata["rating"] head_feat = head_feat.to(dev_id) tail_feat = tail_feat.to(dev_id) frontier = frontier.to(dev_id) pred_ratings = net( compact_g, frontier, head_feat, tail_feat, dataset.possible_rating_values, ) loss = rating_loss_net( pred_ratings, true_relation_labels.to(dev_id) ).mean() count_loss += loss.item() optimizer.zero_grad() loss.backward() nn.utils.clip_grad_norm_(net.parameters(), args.train_grad_clip) optimizer.step() if proc_id == 0 and iter_idx == 1: print( "Total #Param of net: %d" % (torch_total_param_num(net)) ) real_pred_ratings = ( th.softmax(pred_ratings, dim=1) * nd_possible_rating_values.view(1, -1) ).sum(dim=1) rmse = ( (real_pred_ratings - true_relation_ratings.to(dev_id)) ** 2 ).sum() count_rmse += rmse.item() count_num += pred_ratings.shape[0] tq.set_postfix( { "loss": "{:.4f}".format(count_loss / iter_idx), "rmse": "{:.4f}".format(count_rmse / count_num), }, refresh=False, ) iter_idx += 1 if epoch > 1: epoch_time = time.time() - t0 print("Epoch {} time {}".format(epoch, epoch_time)) if epoch % args.train_valid_interval == 0: if n_gpus > 1: th.distributed.barrier() if proc_id == 0: valid_rmse = evaluate( args=args, dev_id=dev_id, net=net, dataset=dataset, dataloader=valid_dataloader, segment="valid", ) logging_str = "Val RMSE={:.4f}".format(valid_rmse) if valid_rmse < best_valid_rmse: best_valid_rmse = valid_rmse no_better_valid = 0 best_epoch = epoch test_rmse = evaluate( args=args, dev_id=dev_id, net=net, dataset=dataset, dataloader=test_dataloader, segment="test", ) best_test_rmse = test_rmse logging_str += ", Test RMSE={:.4f}".format(test_rmse) else: no_better_valid += 1 if ( no_better_valid > args.train_early_stopping_patience and learning_rate <= args.train_min_lr ): logging.info( "Early stopping threshold reached. Stop training." ) break if no_better_valid > args.train_decay_patience: new_lr = max( learning_rate * args.train_lr_decay_factor, args.train_min_lr, ) if new_lr < learning_rate: logging.info("\tChange the LR to %g" % new_lr) learning_rate = new_lr for p in optimizer.param_groups: p["lr"] = learning_rate no_better_valid = 0 print("Change the LR to %g" % new_lr) # sync on evalution if n_gpus > 1: th.distributed.barrier() if proc_id == 0: print(logging_str) if proc_id == 0: print( "Best epoch Idx={}, Best Valid RMSE={:.4f}, Best Test RMSE={:.4f}".format( best_epoch, best_valid_rmse, best_test_rmse ) ) if __name__ == "__main__": args = config() devices = list(map(int, args.gpu.split(","))) n_gpus = len(devices) # For GCMC based on sampling, we require node has its own features. # Otherwise (node_id is the feature), the model can not scale dataset = MovieLens( args.data_name, "cpu", mix_cpu_gpu=args.mix_cpu_gpu, use_one_hot_fea=args.use_one_hot_fea, symm=args.gcn_agg_norm_symm, test_ratio=args.data_test_ratio, valid_ratio=args.data_valid_ratio, ) print("Loading data finished ...\n") args.src_in_units = dataset.user_feature_shape[1] args.dst_in_units = dataset.movie_feature_shape[1] args.rating_vals = dataset.possible_rating_values # cpu if devices[0] == -1: run(0, 0, args, ["cpu"], dataset) # gpu elif n_gpus == 1: run(0, n_gpus, args, devices, dataset) # multi gpu else: # Create csr/coo/csc formats before launching training processes with multi-gpu. # This avoids creating certain formats in each sub-process, which saves momory and CPU. dataset.train_enc_graph.create_formats_() dataset.train_dec_graph.create_formats_() mp.spawn(run, args=(n_gpus, args, devices, dataset), nprocs=n_gpus)