Commit c43a53e4 authored by sunxx1's avatar sunxx1
Browse files

Merge branch 'add_Recommendation' into 'main'

添加VAE-CF和dlrm

See merge request dcutoolkit/deeplearing/dlexamples_new!24
parents 5394b117 56225fdf
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""
Utilities for MLPerf logging
"""
import os
import torch
try:
from mlperf_logging import mllog
from mlperf_logging.mllog import constants
_MLLOGGER = mllog.get_mllogger()
except ImportError as error:
print("Unable to import mlperf_logging, ", error)
def log_start(*args, **kwargs):
"log with start tag"
_log_print(_MLLOGGER.start, *args, **kwargs)
def log_end(*args, **kwargs):
"log with end tag"
_log_print(_MLLOGGER.end, *args, **kwargs)
def log_event(*args, **kwargs):
"log with event tag"
_log_print(_MLLOGGER.event, *args, **kwargs)
def _log_print(logger, *args, **kwargs):
"makes mlperf logger aware of distributed execution"
if 'stack_offset' not in kwargs:
kwargs['stack_offset'] = 3
if 'value' not in kwargs:
kwargs['value'] = None
if kwargs.pop('log_all_ranks', False):
log = True
else:
log = (get_rank() == 0)
if log:
logger(*args, **kwargs)
def config_logger(benchmark):
"initiates mlperf logger"
mllog.config(filename=os.path.join(os.path.dirname(os.path.abspath(__file__)), f'{benchmark}.log'))
_MLLOGGER.logger.propagate = False
def barrier():
"""
Works as a temporary distributed barrier, currently pytorch
doesn't implement barrier for NCCL backend.
Calls all_reduce on dummy tensor and synchronizes with GPU.
"""
if torch.distributed.is_available() and torch.distributed.is_initialized():
torch.distributed.all_reduce(torch.cuda.FloatTensor(1))
torch.cuda.synchronize()
def get_rank():
"""
Gets distributed rank or returns zero if distributed is not initialized.
"""
if torch.distributed.is_available() and torch.distributed.is_initialized():
rank = torch.distributed.get_rank()
else:
rank = 0
return rank
def mlperf_submission_log(benchmark):
"""
Logs information needed for MLPerf submission
"""
config_logger(benchmark)
log_event(
key=constants.SUBMISSION_BENCHMARK,
value=benchmark,
)
log_event(
key=constants.SUBMISSION_ORG,
value='reference_implementation')
log_event(
key=constants.SUBMISSION_DIVISION,
value='closed')
log_event(
key=constants.SUBMISSION_STATUS,
value='onprem')
log_event(
key=constants.SUBMISSION_PLATFORM,
value='reference_implementation')
log_event(
key=constants.SUBMISSION_ENTRY,
value="reference_implementation")
log_event(
key=constants.SUBMISSION_POC_NAME,
value='reference_implementation')
log_event(
key=constants.SUBMISSION_POC_EMAIL,
value='reference_implementation')
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import torch
from torch.optim import Optimizer
class RWSAdagrad(Optimizer):
"""Implements Row Wise Sparse Adagrad algorithm.
Arguments:
params (iterable): iterable of parameters to optimize or dicts defining
parameter groups
lr (float, optional): learning rate (default: 1e-2)
lr_decay (float, optional): learning rate decay (default: 0)
weight_decay (float, optional): weight decay (L2 penalty) (default: 0)
eps (float, optional): term added to the denominator to improve
numerical stability (default: 1e-10)
"""
def __init__(self, params, lr=1e-2, lr_decay=0.0, weight_decay=0.0, initial_accumulator_value=0.0, eps=1e-10):
if not 0.0 <= lr:
raise ValueError("Invalid learning rate: {}".format(lr))
if not 0.0 <= lr_decay:
raise ValueError("Invalid lr_decay value: {}".format(lr_decay))
if not 0.0 <= weight_decay:
raise ValueError("Invalid weight_decay value: {}".format(weight_decay))
if not 0.0 <= initial_accumulator_value:
raise ValueError("Invalid initial_accumulator_value value: {}".format(initial_accumulator_value))
if not 0.0 <= eps:
raise ValueError("Invalid epsilon value: {}".format(eps))
self.defaults = dict(lr=lr, lr_decay=lr_decay, eps=eps, weight_decay=weight_decay,
initial_accumulator_value=initial_accumulator_value)
super(RWSAdagrad, self).__init__(params, self.defaults)
self.momentum_initialized = False
for group in self.param_groups:
for p in group['params']:
self.state[p]['step'] = 0
def share_memory(self):
for group in self.param_groups:
for p in group['params']:
state = self.state[p]
if p.grad.data.is_sparse:
state['momentum'].share_memory_()
else:
state['sum'].share_memory_()
def step(self, closure=None):
"""Performs a single optimization step.
Arguments:
closure (callable, optional): A closure that reevaluates the model
and returns the loss.
"""
loss = None
if closure is not None:
loss = closure()
for group in self.param_groups:
for p in group['params']:
if p.grad is None:
continue
if not self.momentum_initialized :
if p.grad.data.is_sparse:
self.state[p]['momentum'] = torch.full(
[p.data.shape[0]],
self.defaults["initial_accumulator_value"],
dtype=torch.float32,
)
else:
self.state[p]['sum'] = torch.full_like(p.data,
self.defaults["initial_accumulator_value"],
dtype=torch.float32,
)
grad = p.grad
state = self.state[p]
state['step'] += 1
if group['weight_decay'] != 0:
if p.grad.data.is_sparse:
raise RuntimeError("weight_decay option is not compatible with sparse gradients")
grad = grad.add(group['weight_decay'], p.data)
clr = group['lr'] / (1.0 + (state['step'] - 1.0) * group['lr_decay'])
if grad.is_sparse:
grad = grad.coalesce() # the update is non-linear so indices must be unique
grad_indices = grad._indices()
grad_values = grad._values()
size = grad.size()
def make_sparse(values, row_wise):
constructor = grad.new
matrix_size = [size[0]] if row_wise else size
return constructor(grad_indices, values, matrix_size)
if grad_values.numel() > 0:
momentum_update = make_sparse(grad_values.pow(2).mean(dim=1), True)
state['momentum'].add_(momentum_update) # update momentum
std = state['momentum'].sparse_mask(momentum_update.coalesce())
std_values = std._values().sqrt_().add_(group['eps'])
p.data.add_(make_sparse(grad_values / std_values.view(std_values.size()[0], 1), False), alpha=-clr)
else:
state['sum'].addcmul_(grad, grad, value=1.0)
std = state['sum'].sqrt().add_(group['eps'])
p.data.addcdiv_(grad, std, value=-clr)
self.momentum_initialized = True
return loss
future
numpy
onnx
pydot
torch
torchviz
scikit-learn
tqdm
torchrec-nightly
torchx-nightly
#!/bin/bash
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
#
#WARNING: must have compiled PyTorch and caffe2
#check if extra argument is passed to the test
if [[ $# == 1 ]]; then
dlrm_extra_option=$1
else
dlrm_extra_option=""
fi
#echo $dlrm_extra_option
dlrm_py="python dlrm_s_pytorch.py"
dlrm_c2="python dlrm_s_caffe2.py"
echo "Running commands ..."
#run pytorch
echo $dlrm_py
$dlrm_py --mini-batch-size=1 --data-size=1 --nepochs=1 --arch-interaction-op=dot --learning-rate=0.1 --debug-mode $dlrm_extra_option > ppp1
$dlrm_py --mini-batch-size=2 --data-size=4 --nepochs=1 --arch-interaction-op=dot --learning-rate=0.1 --debug-mode $dlrm_extra_option > ppp2
$dlrm_py --mini-batch-size=2 --data-size=5 --nepochs=1 --arch-interaction-op=dot --learning-rate=0.1 --debug-mode $dlrm_extra_option > ppp3
$dlrm_py --mini-batch-size=2 --data-size=5 --nepochs=3 --arch-interaction-op=dot --learning-rate=0.1 --debug-mode $dlrm_extra_option > ppp4
#run caffe2
echo $dlrm_c2
$dlrm_c2 --mini-batch-size=1 --data-size=1 --nepochs=1 --arch-interaction-op=dot --learning-rate=0.1 --debug-mode $dlrm_extra_option > ccc1
$dlrm_c2 --mini-batch-size=2 --data-size=4 --nepochs=1 --arch-interaction-op=dot --learning-rate=0.1 --debug-mode $dlrm_extra_option > ccc2
$dlrm_c2 --mini-batch-size=2 --data-size=5 --nepochs=1 --arch-interaction-op=dot --learning-rate=0.1 --debug-mode $dlrm_extra_option > ccc3
$dlrm_c2 --mini-batch-size=2 --data-size=5 --nepochs=3 --arch-interaction-op=dot --learning-rate=0.1 --debug-mode $dlrm_extra_option > ccc4
echo "Checking results ..."
#check results
#WARNING: correct test will have no difference in numeric values
#(but might have some verbal difference, e.g. due to warnnings)
#in the output file
echo "diff test1 (no numeric values in the output = SUCCESS)"
diff ccc1 ppp1
echo "diff test2 (no numeric values in the output = SUCCESS)"
diff ccc2 ppp2
echo "diff test3 (no numeric values in the output = SUCCESS)"
diff ccc3 ppp3
echo "diff test4 (no numeric values in the output = SUCCESS)"
diff ccc4 ppp4
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
#
#
# This script performs the visualization of the embedding tables created in
# DLRM during the training procedure. We use two popular techniques for
# visualization: umap (https://umap-learn.readthedocs.io/en/latest/) and
# tsne (https://scikit-learn.org/stable/modules/generated/sklearn.manifold.TSNE.html).
# These links also provide instructions on how to install these packages
# in different environments.
#
# Warning: the size of the data to be visualized depends on the RAM on your machine.
#
#
# Connand line examples:
#
# Full analysis of embeddings and data representations for Criteo Kaggle data:
# $python ./tools/visualize.py --data-set=kaggle --load-model=../dlrm-2020-05-25/criteo.pytorch-e-0-i-110591
# --raw-data-file=../../criteo/input/train.txt --skip-categorical-analysis
# --processed-data-file=../../criteo/input/kaggleAdDisplayChallenge_processed.npz
#
#
# To run just the analysis of categoricala data for Criteo Kaggle data set:
# $python ./tools/visualize.py --data-set=kaggle --load-model=../dlrm-2020-05-25/criteo.pytorch-e-0-i-110591 \
# --raw-data-file=../../criteo/input/train.txt --data-randomize=none --processed-data-file=../../criteo/input/kaggleAdDisplayChallenge_processed.npz \
# --skip-embedding --skip-data-plots
#
#
# The following command line arguments are available to the user:
#
# --load-model - DLRM model file
# --data-set - one of ["kaggle", "terabyte"]
# --max-ind-range - max index range used during the traning
# --output-dir - output directory, if not specified, it will be traeted from the model and datset names
# --max-umap-size - max number of points to visualize using UMAP, default=50000
# --use-tsne - use T-SNE
# --max-tsne-size - max number of points to visualize using T-SNE, default=1000)
# --skip-embedding - skips analysis of embedding tables
# --umap-metric - metric for UMAP
# --skip-data-plots - skips data plots
# --skip-categorical-analysis - skips categorical analysis
#
# # data file related
# --raw-data-file
# --processed-data-file
# --data-sub-sample-rate
# --data-randomize
# --memory-map
# --mini-batch-size
# --num-workers
# --test-mini-batch-size
# --test-num-workers
# --num-batches
# --mlperf-logging
import os
import sys
import argparse
import numpy as np
import umap
import hdbscan
import json
import torch
import math
import matplotlib
import matplotlib.pyplot as plt
import collections
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn import manifold
import dlrm_data_pytorch as dp
from dlrm_s_pytorch import DLRM_Net
def visualize_embeddings_umap(emb_l,
output_dir = "",
max_size = 500000,
umap_metric = "euclidean",
cat_counts = None,
use_max_count = True):
for k in range(0, len(emb_l)):
E = emb_l[k].weight.detach().cpu().numpy()
print("umap", E.shape)
# create histogram of norms
bins = 50
norms = [np.linalg.norm(E[i], ord=2) for i in range(0,E.shape[0])]
# plt.hist(norms, bins = bins)
# plt.title("Cat norm hist var. "+str(k))
hist, bins = np.histogram(norms, bins=bins)
logbins = np.logspace(np.log10(bins[0]),np.log10(bins[-1]),len(bins))
plt.figure(figsize=(8,8))
plt.title("Categorical norms: " + str(k) + " cardinality " + str(len(cat_counts[k])))
plt.hist(norms, bins=logbins)
plt.xscale("log")
# plt.legend()
plt.savefig(output_dir+"/cat-norm-histogram-"+str(k)+".png")
plt.close()
if E.shape[0] < 20:
print("Skipping small embedding")
continue
n_vis = min(max_size, E.shape[0])
min_cnt = 0
# reducer = umap.UMAP(random_state=42, n_neighbors=25, min_dist=0.1)
reducer = umap.UMAP(random_state=42, metric=umap_metric)
if use_max_count is False or n_vis == E.shape[0]:
Y = reducer.fit_transform(E[:n_vis,:])
else:
# select values with couns > 1
done = False
min_cnt = 1
while done == False:
el_cnt = (cat_counts[k] > min_cnt).sum()
if el_cnt <= max_size:
done = True
else:
min_cnt = min_cnt+1
E1= []
for i in range(0, E.shape[0]):
if cat_counts[k][i] > min_cnt:
E1.append(E[i,:])
print("max_count_len", len(E1), "mincount", min_cnt)
Y = reducer.fit_transform(np.array(E1))
n_vis = len(E1)
plt.figure(figsize=(8,8))
linewidth = 0
size = 1
if Y.shape[0] < 2500:
linewidth = 1
size = 5
if cat_counts is None:
plt.scatter(-Y[:,0], -Y[:,1], s=size, marker=".", linewidth=linewidth)
else:
#print(cat_counts[k])
n_disp = min(len(cat_counts[k]), Y.shape[0])
cur_max = math.log(max(cat_counts[k]))
norm_cat_count = [math.log(cat_counts[k][i]+1)/cur_max for i in range(0, len(cat_counts[k]))]
plt.scatter(-Y[0:n_disp,0], -Y[0:n_disp,1], s=size, marker=".", linewidth=linewidth, c=np.array(norm_cat_count)[0:n_disp], cmap="viridis")
plt.colorbar()
plt.title("UMAP: categorical var. " + str(k) + " (" + str(n_vis) + " of " + str(E.shape[0]) + ", min count " + str(min_cnt) + ")")
plt.savefig(output_dir + "/cat-" + str(k) + "-" + str(n_vis) + "-of-" + str(E.shape[0]) + "-umap.png")
plt.close()
def visualize_embeddings_tsne(emb_l,
output_dir = "",
max_size = 10000):
for k in range(0, len(emb_l)):
E = emb_l[k].weight.detach().cpu()
print("tsne", E.shape)
if E.shape[0] < 20:
print("Skipping small embedding")
continue
n_vis = min(max_size, E.shape[0])
tsne = manifold.TSNE(init="pca", random_state=0, method="exact")
Y = tsne.fit_transform(E[:n_vis,:])
plt.figure(figsize=(8, 8))
linewidth = 0
if Y.shape[0] < 5000:
linewidth = 1
plt.scatter(-Y[:,0], -Y[:,1], s=1, marker=".", linewidth=linewidth)
plt.title("TSNE: categorical var. " + str(k) + " (" + str(n_vis) + " of " + str(E.shape[0]) + ")")
plt.savefig(output_dir + "/cat-" + str(k) + "-" + str(n_vis) + "-of-" + str(E.shape[0]) + "-tsne.png")
plt.close()
def analyse_categorical_data(X_cat, n_days=10, output_dir=""):
# analyse categorical variables
n_vec = len(X_cat)
n_cat = len(X_cat[0])
n_days = n_days
print("n_vec", n_vec, "n_cat", n_cat)
# for c in train_data.X_cat:
# print(n_cat, c)
all_cat = np.array(X_cat)
print("all_cat.shape", all_cat.shape)
day_size = all_cat.shape[0]/n_days
for i in range(0,n_cat):
l_d = []
l_s1 = []
l_s2 = []
l_int = []
l_rem = []
cat = all_cat[:,i]
print("cat", i, cat.shape)
for d in range(1,n_days):
offset = int(d*day_size)
#print(offset)
cat1 = cat[:offset]
cat2 = cat[offset:]
s1 = set(cat1)
s2 = set(cat2)
intersect = list(s1 & s2)
#print(intersect)
l_d.append(d)
l_s1.append(len(s1))
l_s2.append(len(s2))
l_int.append(len(intersect))
l_rem.append((len(s1)-len(intersect)))
print(d, ",", len(s1), ",", len(s2), ",", len(intersect), ",", (len(s1)-len(intersect)))
print("spit", l_d)
print("before", l_s1)
print("after", l_s2)
print("inters.", l_int)
print("removed", l_rem)
plt.figure(figsize=(8,8))
plt.plot(l_d, l_s1, "g", label="before")
plt.plot(l_d, l_s2, "r", label="after")
plt.plot(l_d, l_int, "b", label="intersect")
plt.plot(l_d, l_rem, "y", label="removed")
plt.title("categorical var. "+str(i))
plt.legend()
plt.savefig(output_dir+"/cat-"+str(i).zfill(3)+".png")
plt.close()
def analyse_categorical_counts(X_cat, emb_l=None, output_dir=""):
# analyse categorical variables
n_vec = len(X_cat)
n_cat = len(X_cat[0])
print("n_vec", n_vec, "n_cat", n_cat)
# for c in train_data.X_cat:
# print(n_cat, c)
all_cat = np.array(X_cat)
print("all_cat.shape", all_cat.shape)
all_counts = []
for i in range(0,n_cat):
cat = all_cat[:,i]
if emb_l is None:
s = set(cat)
counts = np.zeros((len(s)))
print("cat", i, cat.shape, len(s))
else:
s = emb_l[i].weight.detach().cpu().shape[0]
counts = np.zeros((s))
print("cat", i, cat.shape, s)
for d in range(0,n_vec):
cv = int(cat[d])
counts[cv] = counts[cv]+1
all_counts.append(counts)
if emb_l is None:
plt.figure(figsize=(8,8))
plt.plot(counts)
plt.title("Categorical var "+str(i) + " cardinality " + str(len(counts)))
# plt.legend()
else:
E = emb_l[i].weight.detach().cpu().numpy()
norms = [np.linalg.norm(E[i], ord=2) for i in range(0,E.shape[0])]
fig, (ax0, ax1) = plt.subplots(2, 1)
fig.suptitle("Categorical variable: " + str(i)+" cardinality "+str(len(counts)))
ax0.plot(counts)
ax0.set_yscale("log")
ax0.set_title("Counts", fontsize=10)
ax1.plot(norms)
ax1.set_title("Norms", fontsize=10)
plt.savefig(output_dir+"/cat_counts-"+str(i).zfill(3)+".png")
plt.close()
return all_counts
def dlrm_output_wrap(dlrm, X, lS_o, lS_i, T):
all_feat_vec = []
all_cat_vec = []
x_vec = None
t_out = None
c_out = None
z_out = []
p_out = None
z_size = len(dlrm.top_l)
x = dlrm.apply_mlp(X, dlrm.bot_l)
# debug prints
#print("intermediate")
#print(x[0].detach().cpu().numpy())
x_vec = x[0].detach().cpu().numpy()
all_feat_vec.append(x_vec)
# all_X.append(x[0].detach().cpu().numpy())
# process sparse features(using embeddings), resulting in a list of row vectors
ly = dlrm.apply_emb(lS_o, lS_i, dlrm.emb_l)
for e in ly:
#print(e.detach().cpu().numpy())
all_feat_vec.append(e[0].detach().cpu().numpy())
all_cat_vec.append(e[0].detach().cpu().numpy())
all_feat_vec= np.concatenate(all_feat_vec, axis=0)
all_cat_vec= np.concatenate(all_cat_vec, axis=0)
# all_features.append(all_feat_vec)
# all_cat.append(all_cat_vec)
t_out = int(T.detach().cpu().numpy()[0,0])
# all_T.append(int(T.detach().cpu().numpy()[0,0]))
z = dlrm.interact_features(x, ly)
# print(z.detach().cpu().numpy())
# z_out = z.detach().cpu().numpy().flatten()
z_out.append(z.detach().cpu().numpy().flatten())
# all_z[0].append(z.detach().cpu().numpy().flatten())
# obtain probability of a click (using top mlp)
# print(dlrm.top_l)
# p = dlrm.apply_mlp(z, dlrm.top_l)
for i in range(0, z_size):
z = dlrm.top_l[i](z)
# if i < z_size-1:
# curr_z = z.detach().cpu().numpy().flatten()
z_out.append(z.detach().cpu().numpy().flatten())
# all_z[i+1].append(curr_z)
# print("z append", i)
# print("z",i, z.detach().cpu().numpy().flatten().shape)
p = z
# clamp output if needed
if 0.0 < dlrm.loss_threshold and dlrm.loss_threshold < 1.0:
z = torch.clamp(p, min=dlrm.loss_threshold, max=(1.0 - dlrm.loss_threshold))
else:
z = p
class_thresh = 0.0 #-0.25
zp = z.detach().cpu().numpy()[0,0]+ class_thresh
p_out = int(zp+0.5)
if p_out > 1:
p_out = 1
if p_out < 0:
p_out = 0
# all_pred.append(int(z.detach().cpu().numpy()[0,0]+0.5))
#print(int(z.detach().cpu().numpy()[0,0]+0.5))
if int(p_out) == t_out:
c_out = 0
else:
c_out = 1
return all_feat_vec, x_vec, all_cat_vec, t_out, c_out, z_out, p_out
def create_umap_data(dlrm, data_ld, max_size=50000, offset=0, info=""):
all_features = []
all_X = []
all_cat = []
all_T = []
all_c = []
all_z = []
all_pred = []
z_size = len(dlrm.top_l)
print("z_size", z_size)
for i in range(0, z_size):
all_z.append([])
for j, (X, lS_o, lS_i, T) in enumerate(data_ld):
if j < offset:
continue
if j >= max_size+offset:
break
af, x, cat, t, c, z, p = dlrm_output_wrap(dlrm, X, lS_o, lS_i, T)
all_features.append(af)
all_X.append(x)
all_cat.append(cat)
all_T.append(t)
all_c.append(c)
all_pred.append(p)
for i in range(0, z_size):
all_z[i].append(z[i])
# # calculate classifier metrics
ac = accuracy_score(all_T, all_pred)
f1 = f1_score(all_T, all_pred)
ps = precision_score(all_T, all_pred)
rc = recall_score(all_T, all_pred)
print(info, "accuracy", ac, "f1", f1, "precision", ps, "recall", rc)
return all_features, all_X, all_cat, all_T, all_z, all_c, all_pred
def plot_all_data_3(umap_Y,
umap_T,
train_Y = None,
train_T = None,
test_Y = None,
test_T = None,
total_train_size = "",
total_test_size = "",
info = "",
output_dir = "",
orig_space_dim = 0):
size = 1
colors = ["red","green"]
fig, (ax0, ax1, ax2) = plt.subplots(1, 3)
fig.suptitle("UMAP: " + info + " space dim "+str(orig_space_dim))
ax0.scatter(umap_Y[:,0], umap_Y[:,1], s=size, c=umap_T, cmap=matplotlib.colors.ListedColormap(colors), marker=".", linewidth=0)
ax0.set_title("UMAP ("+str(len(umap_T))+" of "+ total_train_size+")", fontsize=7)
if train_Y is not None and train_T is not None:
ax1.scatter(train_Y[:,0], train_Y[:,1], s=size, c=train_T, cmap=matplotlib.colors.ListedColormap(colors), marker=".", linewidth=0)
ax1.set_title("Train ("+str(len(train_T))+" of "+ total_train_size+")", fontsize=7)
if test_Y is not None and test_T is not None:
ax2.scatter(test_Y[:,0], test_Y[:,1], s=size, c=test_T, cmap=matplotlib.colors.ListedColormap(colors), marker=".", linewidth=0)
ax2.set_title("Test ("+str(len(test_T))+" of "+ total_test_size+")", fontsize=7)
plt.savefig(output_dir+"/"+info+"-umap.png")
plt.close()
def plot_one_class_3(umap_Y,
umap_T,
train_Y,
train_T,
test_Y,
test_T,
target = 0,
col = "red",
total_train_size = "",
total_test_size = "",
info = "",
output_dir = "",
orig_space_dim = 0):
size = 1
fig, (ax0, ax1, ax2) = plt.subplots(1, 3)
fig.suptitle("UMAP: "+ info + " space dim "+str(orig_space_dim))
ind_l_umap = [i for i,x in enumerate(umap_T) if x == target]
Y_umap_l = np.array([umap_Y[i,:] for i in ind_l_umap])
ax0.scatter(Y_umap_l[:,0], Y_umap_l[:,1], s=size, c=col, marker=".", linewidth=0)
ax0.set_title("UMAP, ("+str(len(umap_T))+" of "+ total_train_size+")", fontsize=7)
if train_Y is not None and train_T is not None:
ind_l_test = [i for i,x in enumerate(train_T) if x == target]
Y_test_l = np.array([train_Y[i,:] for i in ind_l_test])
ax1.scatter(Y_test_l[:,0], Y_test_l[:,1], s=size, c=col, marker=".", linewidth=0)
ax1.set_title("Train, ("+str(len(train_T))+" of "+ total_train_size+")", fontsize=7)
if test_Y is not None and test_T is not None:
ind_l_test = [i for i,x in enumerate(test_T) if x == target]
Y_test_l = np.array([test_Y[i,:] for i in ind_l_test])
ax2.scatter(Y_test_l[:,0], Y_test_l[:,1], s=size, c=col, marker=".", linewidth=0)
ax2.set_title("Test, ("+str(len(test_T))+" of "+ total_test_size+")", fontsize=7)
plt.savefig(output_dir+"/"+info+"-umap.png")
plt.close()
def visualize_umap_data(umap_Y,
umap_T,
umap_C,
umap_P,
train_Y,
train_T,
train_C,
train_P,
test_Y = None,
test_T = None,
test_C = None,
test_P = None,
total_train_size = "",
total_test_size = "",
info = "",
output_dir = "",
orig_space_dim = 0):
# all classes
plot_all_data_3(umap_Y = umap_Y,
umap_T = umap_T,
train_Y = train_Y,
train_T = train_T,
test_Y = test_Y,
test_T = test_T,
total_train_size = total_train_size,
total_test_size = total_test_size,
info = info,
output_dir = output_dir,
orig_space_dim = orig_space_dim)
# all predictions
plot_all_data_3(umap_Y = umap_Y,
umap_T = umap_P,
train_Y = train_Y,
train_T = train_P,
test_Y = test_Y,
test_T = test_P,
total_train_size = total_train_size,
total_test_size = total_test_size,
info = info+", all-predictions",
output_dir = output_dir,
orig_space_dim = orig_space_dim)
# class 0
plot_one_class_3(umap_Y = umap_Y,
umap_T = umap_T,
train_Y = train_Y,
train_T = train_T,
test_Y = test_Y,
test_T = test_T,
target = 0,
col = "red",
total_train_size = total_train_size,
total_test_size = total_test_size,
info = info+" class " + str(0),
output_dir = output_dir,
orig_space_dim = orig_space_dim)
# class 1
plot_one_class_3(umap_Y = umap_Y,
umap_T = umap_T,
train_Y = train_Y,
train_T = train_T,
test_Y = test_Y,
test_T = test_T,
target = 1,
col = "green",
total_train_size = total_train_size,
total_test_size = total_test_size,
info = info + " class " + str(1),
output_dir = output_dir,
orig_space_dim = orig_space_dim)
# correct classification
plot_one_class_3(umap_Y = umap_Y,
umap_T = umap_C,
train_Y = train_Y,
train_T = train_C,
test_Y = test_Y,
test_T = test_C,
target = 0,
col = "green",
total_train_size = total_train_size,
total_test_size = total_test_size,
info = info + " correct ",
output_dir = output_dir,
orig_space_dim = orig_space_dim)
# errors
plot_one_class_3(umap_Y = umap_Y,
umap_T = umap_C,
train_Y = train_Y,
train_T = train_C,
test_Y = test_Y,
test_T = test_C,
target = 1,
col = "red",
total_train_size = total_train_size,
total_test_size = total_test_size,
info = info + " errors ",
output_dir = output_dir,
orig_space_dim = orig_space_dim)
# prediction 0
plot_one_class_3(umap_Y = umap_Y,
umap_T = umap_P,
train_Y = train_Y,
train_T = train_P,
test_Y = test_Y,
test_T = test_P,
target = 0,
col = "red",
total_train_size = total_train_size,
total_test_size = total_test_size,
info = info + " predict-0 ",
output_dir = output_dir,
orig_space_dim = orig_space_dim)
# prediction 1
plot_one_class_3(umap_Y = umap_Y,
umap_T = umap_P,
train_Y = train_Y,
train_T = train_P,
test_Y = test_Y,
test_T = test_P,
target = 1,
col = "green",
total_train_size = total_train_size,
total_test_size = total_test_size,
info = info + " predict-1 ",
output_dir = output_dir,
orig_space_dim = orig_space_dim)
def hdbscan_clustering(umap_data, train_data, test_data, info="", output_dir=""):
clusterer = hdbscan.HDBSCAN(min_samples=10, min_cluster_size=500, prediction_data=True)
umap_labels = clusterer.fit_predict(umap_data)
train_labels, _ = hdbscan.approximate_predict(clusterer, train_data)
test_labels, _ = hdbscan.approximate_predict(clusterer, test_data)
fig, ((ax00, ax01, ax02), (ax10, ax11, ax12)) = plt.subplots(2, 3)
fig.suptitle("HDBSCAN clastering: "+ info )
# plot umap data
umap_clustered = (umap_labels >= 0)
umap_coll = collections.Counter(umap_clustered)
print("umap_clustered", umap_coll)
# print("umap_data", umap_data.shape)
# print("~umap_clustered", umap_clustered.count(False), ~umap_clustered)
ax00.scatter(umap_data[~umap_clustered, 0],
umap_data[~umap_clustered, 1],
c=(0.5, 0.5, 0.5),
s=0.1,
alpha=0.5)
ax00.set_title("UMAP Outliers " + str(umap_coll[False]), fontsize=7)
ax10.scatter(umap_data[umap_clustered, 0],
umap_data[umap_clustered, 1],
c=umap_labels[umap_clustered],
s=0.1,
cmap="Spectral")
ax10.set_title("UMAP Inliers " + str(umap_coll[True]), fontsize=7)
# plot train data
train_clustered = (train_labels >= 0)
train_coll = collections.Counter(train_clustered)
ax01.scatter(train_data[~train_clustered, 0],
train_data[~train_clustered, 1],
c=(0.5, 0.5, 0.5),
s=0.1,
alpha=0.5)
ax01.set_title("Train Outliers " + str(train_coll[False]), fontsize=7)
ax11.scatter(train_data[train_clustered, 0],
train_data[train_clustered, 1],
c=train_labels[train_clustered],
s=0.1,
cmap="Spectral")
ax11.set_title("Train Inliers " + str(train_coll[True]), fontsize=7)
# plot test data
test_clustered = (test_labels >= 0)
test_coll = collections.Counter(test_clustered)
ax02.scatter(test_data[~test_clustered, 0],
test_data[~test_clustered, 1],
c=(0.5, 0.5, 0.5),
s=0.1,
alpha=0.5)
ax02.set_title("Tets Outliers " + str(test_coll[False]), fontsize=7)
ax12.scatter(test_data[test_clustered, 0],
test_data[test_clustered, 1],
c=test_labels[test_clustered],
s=0.1,
cmap="Spectral")
ax12.set_title("Test Inliers " + str(test_coll[True]), fontsize=7)
plt.savefig(output_dir+"/"+info+"-hdbscan.png")
plt.close()
def visualize_all_data_umap(dlrm,
train_ld,
test_ld = None,
max_umap_size = 50000,
output_dir = "",
umap_metric = "euclidean"):
data_ratio = 1
print("creating umap data")
umap_train_feat, umap_train_X, umap_train_cat, umap_train_T, umap_train_z, umap_train_c, umap_train_p = create_umap_data(dlrm=dlrm, data_ld=train_ld, max_size=max_umap_size, offset=0, info="umap")
# transform train and test data
train_feat, train_X, train_cat, train_T, train_z, train_c, train_p = create_umap_data(dlrm=dlrm, data_ld=train_ld, max_size=max_umap_size*data_ratio, offset=max_umap_size, info="train")
test_feat, test_X, test_cat, test_T, test_z, test_c, test_p = create_umap_data(dlrm=dlrm, data_ld=test_ld, max_size=max_umap_size*data_ratio, offset=0, info="test")
print("umap_train_feat", np.array(umap_train_feat).shape)
reducer_all_feat = umap.UMAP(random_state=42, metric=umap_metric)
umap_feat_Y = reducer_all_feat.fit_transform(umap_train_feat)
train_feat_Y = reducer_all_feat.transform(train_feat)
test_feat_Y = reducer_all_feat.transform(test_feat)
visualize_umap_data(umap_Y = umap_feat_Y,
umap_T = umap_train_T,
umap_C = umap_train_c,
umap_P = umap_train_p,
train_Y = train_feat_Y,
train_T = train_T,
train_C = train_c,
train_P = train_p,
test_Y = test_feat_Y,
test_T = test_T,
test_C = test_c,
test_P = test_p,
total_train_size = str(len(train_ld)),
total_test_size = str(len(test_ld)),
info = "all-features",
output_dir = output_dir,
orig_space_dim = np.array(umap_train_feat).shape[1])
hdbscan_clustering(umap_data = umap_feat_Y,
train_data = train_feat_Y,
test_data = test_feat_Y,
info = "umap-all-features",
output_dir = output_dir)
# hdbscan_clustering(umap_data = np.array(umap_train_feat),
# train_data = np.array(train_feat),
# test_data = np.array(test_feat),
# info = "all-features",
# output_dir = output_dir)
print("umap_train_X", np.array(umap_train_X).shape)
reducer_X = umap.UMAP(random_state=42, metric=umap_metric)
umap_X_Y = reducer_X.fit_transform(umap_train_X)
train_X_Y = reducer_X.transform(train_X)
test_X_Y = reducer_X.transform(test_X)
visualize_umap_data(umap_Y = umap_X_Y,
umap_T = umap_train_T,
umap_C = umap_train_c,
umap_P = umap_train_p,
train_Y = train_X_Y,
train_T = train_T,
train_C = train_c,
train_P = train_p,
test_Y = test_X_Y,
test_T = test_T,
test_C = test_c,
test_P = test_p,
total_train_size = str(len(train_ld)),
total_test_size = str(len(test_ld)),
info = "cont-features",
output_dir = output_dir,
orig_space_dim = np.array(umap_train_X).shape[1])
print("umap_train_cat", np.array(umap_train_cat).shape)
reducer_cat = umap.UMAP(random_state=42, metric=umap_metric)
umap_cat_Y = reducer_cat.fit_transform(umap_train_cat)
train_cat_Y = reducer_cat.transform(train_cat)
test_cat_Y = reducer_cat.transform(test_cat)
visualize_umap_data(umap_Y = umap_cat_Y,
umap_T = umap_train_T,
umap_C = umap_train_c,
umap_P = umap_train_p,
train_Y = train_cat_Y,
train_T = train_T,
train_C = train_c,
train_P = train_p,
test_Y = test_cat_Y,
test_T = test_T,
test_C = test_c,
test_P = test_p,
total_train_size = str(len(train_ld)),
total_test_size = str(len(test_ld)),
info = "cat-features",
output_dir = output_dir,
orig_space_dim = np.array(umap_train_cat).shape[1])
# UMAP for z data
for i in range(0,len(umap_train_z)):
print("z", i, np.array(umap_train_z[i]).shape)
reducer_z = umap.UMAP(random_state=42, metric=umap_metric)
umap_z_Y = reducer_z.fit_transform(umap_train_z[i])
train_z_Y = reducer_z.transform(train_z[i])
test_z_Y = reducer_z.transform(test_z[i])
visualize_umap_data(umap_Y = umap_z_Y,
umap_T = umap_train_T,
umap_C = umap_train_c,
umap_P = umap_train_p,
train_Y = train_z_Y,
train_T = train_T,
train_C = train_c,
train_P = train_p,
test_Y = test_z_Y,
test_T = test_T,
test_C = test_c,
test_P = test_p,
total_train_size = str(len(train_ld)),
total_test_size = str(len(test_ld)),
info = "z-features-"+str(i),
output_dir = output_dir,
orig_space_dim = np.array(umap_train_z[i]).shape[1])
def analyze_model_data(output_dir,
dlrm,
train_ld,
test_ld,
train_data,
skip_embedding = False,
use_tsne = False,
max_umap_size = 50000,
max_tsne_size = 10000,
skip_categorical_analysis = False,
skip_data_plots = False,
umap_metric = "euclidean"):
if not os.path.exists(output_dir):
os.makedirs(output_dir)
if skip_embedding is False:
cat_counts = None
cat_counts = analyse_categorical_counts(X_cat=train_data.X_cat, emb_l=dlrm.emb_l, output_dir=output_dir)
visualize_embeddings_umap(emb_l = dlrm.emb_l,
output_dir = output_dir,
max_size = max_umap_size,
umap_metric = umap_metric,
cat_counts = cat_counts)
if use_tsne is True:
visualize_embeddings_tsne(emb_l = dlrm.emb_l,
output_dir = output_dir,
max_size = max_tsne_size)
# data visualization and analysis
if skip_data_plots is False:
visualize_all_data_umap(dlrm=dlrm, train_ld=train_ld, test_ld=test_ld, max_umap_size=max_umap_size, output_dir=output_dir, umap_metric=umap_metric)
# analyse categorical variables
if skip_categorical_analysis is False and args.data_randomize == "none":
analyse_categorical_data(X_cat=train_data.X_cat, n_days=10, output_dir=output_dir)
if __name__ == "__main__":
output_dir = ""
### parse arguments ###
parser = argparse.ArgumentParser(
description="Exploratory DLRM analysis"
)
parser.add_argument("--load-model", type=str, default="")
parser.add_argument("--data-set", choices=["kaggle", "terabyte"], help="dataset")
# parser.add_argument("--dataset-path", required=True, help="path to the dataset")
parser.add_argument("--max-ind-range", type=int, default=-1)
# parser.add_argument("--mlperf-bin-loader", action="store_true", default=False)
parser.add_argument("--output-dir", type=str, default="")
parser.add_argument("--skip-embedding", action="store_true", default=False)
parser.add_argument("--umap-metric", type=str, default="euclidean")
parser.add_argument("--skip-data-plots", action="store_true", default=False)
parser.add_argument("--skip-categorical-analysis", action="store_true", default=False)
# umap relatet
parser.add_argument("--max-umap-size", type=int, default=50000)
# tsne related
parser.add_argument("--use-tsne", action="store_true", default=False)
parser.add_argument("--max-tsne-size", type=int, default=1000)
# data file related
parser.add_argument("--raw-data-file", type=str, default="")
parser.add_argument("--processed-data-file", type=str, default="")
parser.add_argument("--data-sub-sample-rate", type=float, default=0.0) # in [0, 1]
parser.add_argument("--data-randomize", type=str, default="total") # none, total or day or none
parser.add_argument("--memory-map", action="store_true", default=False)
parser.add_argument("--mini-batch-size", type=int, default=1)
parser.add_argument("--num-workers", type=int, default=0)
parser.add_argument("--test-mini-batch-size", type=int, default=1)
parser.add_argument("--test-num-workers", type=int, default=0)
parser.add_argument("--num-batches", type=int, default=0)
# mlperf logging (disables other output and stops early)
parser.add_argument("--mlperf-logging", action="store_true", default=False)
args = parser.parse_args()
print("command line args: ", json.dumps(vars(args)))
if output_dir == "":
output_dir = args.data_set+"-"+os.path.split(args.load_model)[-1]+"-vis_all"
print("output_dir:", output_dir)
if args.data_set == "kaggle":
# 1. Criteo Kaggle Display Advertisement Challenge Dataset (see ./bench/dlrm_s_criteo_kaggle.sh)
m_spa=16
ln_emb=np.array([1460,583,10131227,2202608,305,24,12517,633,3,93145,5683,8351593,3194,27,14992,5461306,10,5652,2173,4,7046547,18,15,286181,105,142572])
ln_bot=np.array([13,512,256,64,16])
ln_top=np.array([367,512,256,1])
elif args.dataset == "terabyte":
if args.max_ind_range == 10000000:
# 2. Criteo Terabyte (see ./bench/dlrm_s_criteo_terabyte.sh [--sub-sample=0.875] --max-in-range=10000000)
m_spa=64
ln_emb=np.array([9980333,36084,17217,7378,20134,3,7112,1442,61, 9758201,1333352,313829,10,2208,11156,122,4,970,14, 9994222, 7267859, 9946608,415421,12420,101, 36])
ln_bot=np.array([13,512,256,64])
ln_top=np.array([415,512,512,256,1])
elif args.max_ind_range == 40000000:
# 3. Criteo Terabyte MLPerf training (see ./bench/run_and_time.sh --max-in-range=40000000)
m_spa=128
ln_emb=np.array([39884406,39043,17289,7420,20263,3,7120,1543,63,38532951,2953546,403346,10,2208,11938,155,4,976,14,39979771,25641295,39664984,585935,12972,108,36])
ln_bot=np.array([13,512,256,128])
ln_top=np.array([479,1024,1024,512,256,1])
else:
raise ValueError("only --max-in-range 10M or 40M is supported")
else:
raise ValueError("only kaggle|terabyte dataset options are supported")
# check input parameters
if args.data_randomize != "none" and args.skip_categorical_analysis is not True:
print("Incorrect option for categoricat analysis, use: --data-randomize=none")
sys.exit(-1)
dlrm = DLRM_Net(
m_spa,
ln_emb,
ln_bot,
ln_top,
arch_interaction_op="dot",
arch_interaction_itself=False,
sigmoid_bot=-1,
sigmoid_top=ln_top.size - 2,
sync_dense_params=True,
loss_threshold=0.0,
ndevices=-1,
qr_flag=False,
qr_operation=None,
qr_collisions=None,
qr_threshold=None,
md_flag=False,
md_threshold=None,
)
# Load model is specified
if not (args.load_model == ""):
print("Loading saved model {}".format(args.load_model))
ld_model = torch.load(args.load_model, map_location=torch.device("cpu"))
dlrm.load_state_dict(ld_model["state_dict"])
print("Model loaded", args.load_model)
#print(dlrm)
z_size = len(dlrm.top_l)
for i in range(0, z_size):
print("z", i, dlrm.top_l[i])
# load data
train_data = None
test_data = None
if args.raw_data_file is not "" or args.processed_data_file is not "":
train_data, train_ld, test_data, test_ld = dp.make_criteo_data_and_loaders(args)
analyze_model_data(output_dir = output_dir,
dlrm = dlrm,
train_ld = train_ld,
test_ld = test_ld,
train_data = train_data,
skip_embedding = args.skip_embedding,
use_tsne = args.use_tsne,
max_umap_size = args.max_umap_size,
max_tsne_size = args.max_tsne_size,
skip_categorical_analysis = args.skip_categorical_analysis,
skip_data_plots = args.skip_data_plots,
umap_metric = args.umap_metric)
# TorchRec DLRM Example
`dlrm_main.py` trains, validates, and tests a [Deep Learning Recommendation Model](https://arxiv.org/abs/1906.00091) (DLRM) with TorchRec. The DLRM model contains both data parallel components (e.g. multi-layer perceptrons & interaction arch) and model parallel components (e.g. embedding tables). The DLRM model is pipelined so that dataloading, data-parallel to model-parallel comms, and forward/backward are overlapped. Can be run with either a random dataloader or [Criteo 1 TB click logs dataset](https://ailab.criteo.com/download-criteo-1tb-click-logs-dataset/).
It has been tested on the following cloud instance types:
| Cloud | Instance Type | GPUs | vCPUs | Memory (GB) |
| ------ | ------------------- | ---------------- | ----- | ----------- |
| AWS | p4d.24xlarge | 8 x A100 (40GB) | 96 | 1152 |
| Azure | Standard_ND96asr_v4 | 8 x A100 (40GB) | 96 | 900 |
| GCP | a2-megagpu-16g | 16 x A100 (40GB) | 96 | 1300 |
# Running
## Install dependencies
`pip install tqdm torchmetrics`
## Torchx
We recommend using [torchx](https://pytorch.org/torchx/main/quickstart.html) to run. Here we use the [DDP builtin](https://pytorch.org/torchx/main/components/distributed.html)
1. pip install torchx
2. (optional) setup a slurm or kubernetes cluster
3.
a. locally: `torchx run -s local_cwd dist.ddp -j 1x2 --script dlrm_main.py`
b. remotely: `torchx run -s slurm dist.ddp -j 1x8 --script dlrm_main.py`
## TorchRun
You can also use [torchrun](https://pytorch.org/docs/stable/elastic/run.html).
* e.g. `torchrun --nnodes 1 --nproc_per_node 2 --rdzv_backend c10d --rdzv_endpoint localhost --rdzv_id 54321 --role trainer dlrm_main.py`
## Preliminary Training Results
**Setup:**
* Dataset: Criteo 1TB Click Logs dataset
* CUDA 11.0, NCCL 2.10.3.
* AWS p4d24xlarge instances, each with 8 40GB NVIDIA A100s.
**Results**
Common settings across all runs:
```
--num_embeddings_per_feature "45833188,36746,17245,7413,20243,3,7114,1441,62,29275261,1572176,345138,10,2209,11267,128,4,974,14,48937457,11316796,40094537,452104,12606,104,35" --embedding_dim_size 128 --pin_memory --over_arch_layer_sizes "1024,1024,512,256,1" --dense_arch_layer_sizes "512,256,128" --epochs 1 --shuffle_batches
```
|Number of GPUs|Collective Size of Embedding Tables (GiB)|Local Batch Size|Global Batch Size|Learning Rate|AUROC over Val Set After 1 Epoch|AUROC Over Test Set After 1 Epoch|Train Records/Second|Time to Train 1 Epoch | Unique Flags |
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
|8|91.10|256|2048|1.0|0.8032480478286743|0.8032934069633484|~284,615 rec/s| 4h7m00s | `--batch_size 256 --learning_rate 1.0`|
|1|91.10|16384|16384|15.0|0.8025434017181396|0.8026024103164673|~740,065 rec/s| 1h35m29s | `--batch_size 16384 --learning_rate 15.0 --change_lr --lr_change_point 0.65 --lr_after_change_point 0.035` |
|4|91.10|4096|16384|15.0|0.8030692934989929|0.8030484914779663|~1,458,176 rec/s| 48m39s | `--batch_size 4096 --learning_rate 15.0 --change_lr --lr_change_point 0.80 --lr_after_change_point 0.20` |
|8|91.10|2048|16384|15.0|0.802501916885376|0.8025660514831543|~1,671,168 rec/s| 43m24s | `--batch_size 2048 --learning_rate 15.0 --change_lr --lr_change_point 0.80 --lr_after_change_point 0.20` |
|8|91.10|8192|65536|15.0|0.7996258735656738|0.7996508479118347|~5,373,952 rec/s| 13m40s | `--batch_size 8192 --learning_rate 15.0`|
QPS (train record/second) is calculated by using the following formula: `x it/s * local_batch_size * num_gpus`. The `it/s`
can be found within the logs of the training results.
The final row, using 8 GPUs with a batch size of 8192, was not tuned to hit the MLPerf benchmark but is shown to
highlight the QPS (train record/second) achievable with torchrec.
The `change_lr` parameter activates the variable learning rate schedule. `lr_after_change_point` is a parameter that we use to dictate the point
at which we'll shift the learning rate to the value specified by `lr_change_point`. We found that having a high learning rate to start (e.g. 15.0) and dropping to a smaller learning rate (e.g. 0.20) near the end of the first epoch (e.g. 80% through) helped us converge faster to the 0.8025 MLPerf AUROC metric.
**Reproduce**
Run the following command to reproduce the results for a single node (8 GPUs) on AWS. This command makes use of the `aws_component.py` script.
Ensure to:
- set $PATH_TO_1TB_NUMPY_FILES to the path with the pre-processed .npy files of the Criteo 1TB dataset.
- set $TRAIN_QUEUE to the partition that handles training jobs
Example command:
```
torchx run --scheduler slurm --scheduler_args partition=$TRAIN_QUEUE,time=5:00:00 aws_component.py:run_dlrm_main --num_trainers=8 -- --pin_memory --batch_size 2048 --epochs 1 --num_embeddings_per_feature "45833188,36746,17245,7413,20243,3,7114,1441,62,29275261,1572176,345138,10,2209,11267,128,4,974,14,48937457,11316796,40094537,452104,12606,104,35" --embedding_dim 128 --dense_arch_layer_sizes "512,256,128" --over_arch_layer_sizes "1024,1024,512,256,1" --in_memory_binary_criteo_path $PATH_TO_1TB_NUMPY_FILES --learning_rate 15.0 --shuffle_batches --change_lr --lr_change_point 0.80 --lr_after_change_point 0.20
```
Upon scheduling the job, there should be an output that looks like this:
```
warnings.warn(
slurm://torchx/14731
torchx 2022-01-07 21:06:59 INFO Launched app: slurm://torchx/14731
torchx 2022-01-07 21:06:59 INFO AppStatus:
msg: ''
num_restarts: -1
roles: []
state: UNKNOWN (7)
structured_error_msg: <NONE>
ui_url: null
torchx 2022-01-07 21:06:59 INFO Job URL: None
```
In this example, the job was launched to: `slurm://torchx/14731`.
Run the following commands to check the status of your job and read the logs:
```
# Status should be "RUNNING" if properly scheduled
torchx status slurm://torchx/14731
# Log file was automatically created in the directory where you launched the job from
cat slurm-14731.out
```
The results from the training can be found in the log file (e.g. `slurm-14731.out`).
**Debugging**
The `--validation_freq_within_epoch x` parameter can be used to print the AUROC every `x` iterations through an epoch.
The in-memory dataloader can take approximately 20-30 minutes to load the data into memory before training starts. The
`--mmap_mode` parameter can be used to load data from disk which reduces start-up time for training at the cost
of QPS.
**TODO/Work In Progress**
* Write section on how to pre-process the dataset.
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import os
import torchx.specs as specs
from torchx.components.dist import ddp
from torchx.specs.api import Resource
def run_dlrm_main(num_trainers: int = 8, *script_args: str) -> specs.AppDef:
"""
Args:
num_trainers: The number of trainers to use.
script_args: A variable number of parameters to provide dlrm_main.py.
"""
cwd = os.getcwd()
entrypoint = os.path.join(cwd, "dlrm_main.py")
user = os.environ.get("USER")
image = f"/data/home/{user}"
if num_trainers > 8 and num_trainers % 8 != 0:
raise ValueError(
"Trainer jobs spanning multiple hosts must be in multiples of 8."
)
nproc_per_node = 8 if num_trainers >= 8 else num_trainers
num_replicas = max(num_trainers // 8, 1)
return ddp(
*script_args,
name="train_dlrm",
image=image,
# AWS p4d instance (https://aws.amazon.com/ec2/instance-types/p4/).
cpu=96,
gpu=8,
memMB=-1,
script=entrypoint,
j=f"{num_replicas}x{nproc_per_node}",
)
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import argparse
import os
from typing import List
from torch import distributed as dist
from torch.utils.data import DataLoader
from torchrec.datasets.criteo import (
CAT_FEATURE_COUNT,
DAYS,
DEFAULT_CAT_NAMES,
DEFAULT_INT_NAMES,
InMemoryBinaryCriteoIterDataPipe,
)
from torchrec.datasets.random import RandomRecDataset
STAGES = ["train", "val", "test"]
def _get_random_dataloader(
args: argparse.Namespace,
) -> DataLoader:
return DataLoader(
RandomRecDataset(
keys=DEFAULT_CAT_NAMES,
batch_size=args.batch_size,
hash_size=args.num_embeddings,
hash_sizes=args.num_embeddings_per_feature
if hasattr(args, "num_embeddings_per_feature")
else None,
manual_seed=args.seed if hasattr(args, "seed") else None,
ids_per_feature=1,
num_dense=len(DEFAULT_INT_NAMES),
),
batch_size=None,
batch_sampler=None,
pin_memory=args.pin_memory,
num_workers=0,
)
def _get_in_memory_dataloader(
args: argparse.Namespace,
stage: str,
) -> DataLoader:
files = os.listdir(args.in_memory_binary_criteo_path)
def is_final_day(s: str) -> bool:
return f"day_{DAYS - 1}" in s
if stage == "train":
# Train set gets all data except from the final day.
files = list(filter(lambda s: not is_final_day(s), files))
rank = dist.get_rank()
world_size = dist.get_world_size()
batch_size = args.batch_size
else:
# Validation set gets the first half of the final day's samples. Test set get
# the other half.
files = list(filter(is_final_day, files))
rank = (
dist.get_rank()
if stage == "val"
else dist.get_rank() + dist.get_world_size()
)
world_size = dist.get_world_size() * 2
batch_size = (
args.batch_size if args.test_batch_size is None else args.test_batch_size
)
stage_files: List[List[str]] = [
sorted(
map(
lambda x: os.path.join(args.in_memory_binary_criteo_path, x),
filter(lambda s: kind in s, files),
)
)
for kind in ["dense", "sparse", "labels"]
]
dataloader = DataLoader(
InMemoryBinaryCriteoIterDataPipe(
*stage_files, # pyre-ignore[6]
batch_size=batch_size,
rank=rank,
world_size=world_size,
shuffle_batches=args.shuffle_batches,
mmap_mode=args.mmap_mode,
hashes=args.num_embeddings_per_feature
if args.num_embeddings is None
else ([args.num_embeddings] * CAT_FEATURE_COUNT),
),
batch_size=None,
pin_memory=args.pin_memory,
collate_fn=lambda x: x,
)
return dataloader
def get_dataloader(args: argparse.Namespace, backend: str, stage: str) -> DataLoader:
"""
Gets desired dataloader from dlrm_main command line options. Currently, this
function is able to return either a DataLoader wrapped around a RandomRecDataset or
a Dataloader wrapped around an InMemoryBinaryCriteoIterDataPipe.
Args:
args (argparse.Namespace): Command line options supplied to dlrm_main.py's main
function.
backend (str): "nccl" or "gloo".
stage (str): "train", "val", or "test".
Returns:
dataloader (DataLoader): PyTorch dataloader for the specified options.
"""
stage = stage.lower()
if stage not in STAGES:
raise ValueError(f"Supplied stage was {stage}. Must be one of {STAGES}.")
args.pin_memory = (
(backend == "nccl") if not hasattr(args, "pin_memory") else args.pin_memory
)
if (
not hasattr(args, "in_memory_binary_criteo_path")
or args.in_memory_binary_criteo_path is None
):
return _get_random_dataloader(args)
else:
return _get_in_memory_dataloader(args, stage)
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import argparse
import itertools
import os
import sys
from dataclasses import dataclass, field
from typing import cast, Iterator, List, Optional, Tuple
import torch
import torchmetrics as metrics
from fbgemm_gpu.split_embedding_configs import EmbOptimType as OptimType
from pyre_extensions import none_throws
from torch import distributed as dist, nn
from torch.utils.data import DataLoader
from torchrec import EmbeddingBagCollection
from torchrec.datasets.criteo import (
DEFAULT_CAT_NAMES,
DEFAULT_INT_NAMES,
TOTAL_TRAINING_SAMPLES,
)
from torchrec.datasets.utils import Batch
from torchrec.distributed import TrainPipelineSparseDist
from torchrec.distributed.embeddingbag import EmbeddingBagCollectionSharder
from torchrec.distributed.model_parallel import DistributedModelParallel
from torchrec.distributed.types import ModuleSharder
from torchrec.models.dlrm import DLRM, DLRMV2, DLRMTrain
from torchrec.modules.embedding_configs import EmbeddingBagConfig
from torchrec.optim.keyed import CombinedOptimizer, KeyedOptimizerWrapper
from tqdm import tqdm
# OSS import
try:
# pyre-ignore[21]
# @manual=//ai_codesign/benchmarks/dlrm/torchrec_dlrm/data:dlrm_dataloader
from data.dlrm_dataloader import get_dataloader, STAGES
# pyre-ignore[21]
# @manual=//ai_codesign/benchmarks/dlrm/torchrec_dlrm:multi_hot
from multi_hot import Multihot
except ImportError:
pass
# internal import
try:
from .data.dlrm_dataloader import get_dataloader, STAGES # noqa F811
from .multi_hot import Multihot # noqa F811
except ImportError:
pass
TRAIN_PIPELINE_STAGES = 3 # Number of stages in TrainPipelineSparseDist.
def parse_args(argv: List[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="torchrec dlrm example trainer")
parser.add_argument(
"--epochs", type=int, default=1, help="number of epochs to train"
)
parser.add_argument(
"--batch_size", type=int, default=32, help="batch size to use for training"
)
parser.add_argument(
"--test_batch_size",
type=int,
default=None,
help="batch size to use for validation and testing",
)
parser.add_argument(
"--limit_train_batches",
type=int,
default=None,
help="number of train batches",
)
parser.add_argument(
"--limit_val_batches",
type=int,
default=None,
help="number of validation batches",
)
parser.add_argument(
"--limit_test_batches",
type=int,
default=None,
help="number of test batches",
)
parser.add_argument(
"--dataset_name",
type=str,
default="criteo_1t",
help="dataset for experiment, current support criteo_1tb, criteo_kaggle",
)
parser.add_argument(
"--num_embeddings",
type=int,
default=100_000,
help="max_ind_size. The number of embeddings in each embedding table. Defaults"
" to 100_000 if num_embeddings_per_feature is not supplied.",
)
parser.add_argument(
"--num_embeddings_per_feature",
type=str,
default=None,
help="Comma separated max_ind_size per sparse feature. The number of embeddings"
" in each embedding table. 26 values are expected for the Criteo dataset.",
)
parser.add_argument(
"--dense_arch_layer_sizes",
type=str,
default="512,256,64",
help="Comma separated layer sizes for dense arch.",
)
parser.add_argument(
"--over_arch_layer_sizes",
type=str,
default="512,512,256,1",
help="Comma separated layer sizes for over arch.",
)
parser.add_argument(
"--embedding_dim",
type=int,
default=64,
help="Size of each embedding.",
)
parser.add_argument(
"--interaction_branch1_layer_sizes",
type=str,
default="2048,2048",
help="Comma separated layer sizes for interaction branch1 (only on dlrmv2).",
)
parser.add_argument(
"--interaction_branch2_layer_sizes",
type=str,
default="2048,2048",
help="Comma separated layer sizes for interaction branch2 (only on dlrmv2).",
)
parser.add_argument(
"--undersampling_rate",
type=float,
help="Desired proportion of zero-labeled samples to retain (i.e. undersampling zero-labeled rows)."
" Ex. 0.3 indicates only 30pct of the rows with label 0 will be kept."
" All rows with label 1 will be kept. Value should be between 0 and 1."
" When not supplied, no undersampling occurs.",
)
parser.add_argument(
"--seed",
type=int,
help="Random seed for reproducibility.",
)
parser.add_argument(
"--pin_memory",
dest="pin_memory",
action="store_true",
help="Use pinned memory when loading data.",
)
parser.add_argument(
"--mmap_mode",
dest="mmap_mode",
action="store_true",
help="--mmap_mode mmaps the dataset."
" That is, the dataset is kept on disk but is accessed as if it were in memory."
" --mmap_mode is intended mostly for faster debugging. Use --mmap_mode to bypass"
" preloading the dataset when preloading takes too long or when there is "
" insufficient memory available to load the full dataset.",
)
parser.add_argument(
"--in_memory_binary_criteo_path",
type=str,
default=None,
help="Path to a folder containing the binary (npy) files for the Criteo dataset."
" When supplied, InMemoryBinaryCriteoIterDataPipe is used.",
)
parser.add_argument(
"--learning_rate",
type=float,
default=15.0,
help="Learning rate.",
)
parser.add_argument(
"--shuffle_batches",
dest="shuffle_batches",
action="store_true",
help="Shuffle each batch during training.",
)
parser.add_argument(
"--validation_freq_within_epoch",
type=int,
default=None,
help="Frequency at which validation will be run within an epoch.",
)
parser.add_argument(
"--change_lr",
dest="change_lr",
action="store_true",
help="Flag to determine whether learning rate should be changed part way through training.",
)
parser.add_argument(
"--lr_change_point",
type=float,
default=0.80,
help="The point through training at which learning rate should change to the value set by"
" lr_after_change_point. The default value is 0.80 which means that 80% through the total iterations (totaled"
" across all epochs), the learning rate will change.",
)
parser.add_argument(
"--lr_after_change_point",
type=float,
default=0.20,
help="Learning rate after change point in first epoch.",
)
parser.set_defaults(
pin_memory=None,
mmap_mode=None,
shuffle_batches=None,
change_lr=None,
)
parser.add_argument(
"--adagrad",
dest="adagrad",
action="store_true",
help="Flag to determine if adagrad optimizer should be used.",
)
parser.add_argument(
"--dlrmv2",
dest="dlrmv2",
action="store_true",
help="Flag to determine if dlrmv2 should be used.",
)
parser.add_argument(
"--collect_multi_hot_freqs_stats",
dest="collect_multi_hot_freqs_stats",
action="store_true",
help="Flag to determine whether to collect stats on freq of embedding access.",
)
parser.add_argument(
"--multi_hot_size",
type=int,
default=1,
help="The number of Multi-hot indices to use. When 1, multi-hot is disabled.",
)
parser.add_argument(
"--multi_hot_min_table_size",
type=int,
default=200,
help="The minimum number of rows an embedding table must have to run multi-hot inputs.",
)
parser.add_argument(
"--multi_hot_distribution_type",
type=str,
choices=["uniform", "pareto"],
default="uniform",
help="Path to a folder containing the binary (npy) files for the Criteo dataset."
" When supplied, InMemoryBinaryCriteoIterDataPipe is used.",
)
return parser.parse_args(argv)
def _evaluate(
limit_batches: Optional[int],
train_pipeline: TrainPipelineSparseDist,
iterator: Iterator[Batch],
next_iterator: Iterator[Batch],
stage: str,
) -> Tuple[float, float]:
"""
Evaluates model. Computes and prints metrics including AUROC and Accuracy. Helper
function for train_val_test.
Args:
limit_batches (Optional[int]): number of batches.
train_pipeline (TrainPipelineSparseDist): pipelined model.
iterator (Iterator[Batch]): Iterator used for val/test batches.
next_iterator (Iterator[Batch]): Iterator used for the next phase (either train
if there are more epochs to train on or test if all epochs are complete).
Used to queue up the next TRAIN_PIPELINE_STAGES - 1 batches before
train_val_test switches to the next phase. This is done so that when the
next phase starts, the first output train_pipeline generates an output for
is the 1st batch for that phase.
stage (str): "val" or "test".
Returns:
Tuple[float, float]: auroc and accuracy result
"""
model = train_pipeline._model
model.eval()
device = train_pipeline._device
if limit_batches is not None:
limit_batches -= TRAIN_PIPELINE_STAGES - 1
# Because TrainPipelineSparseDist buffer batches internally, we load in
# TRAIN_PIPELINE_STAGES - 1 batches from the next_iterator into the buffers so that
# when train_val_test switches to the next phase, train_pipeline will start
# producing results for the TRAIN_PIPELINE_STAGES - 1 buffered batches (as opposed
# to the last TRAIN_PIPELINE_STAGES - 1 batches from iterator).
combined_iterator = itertools.chain(
iterator
if limit_batches is None
else itertools.islice(iterator, limit_batches),
itertools.islice(next_iterator, TRAIN_PIPELINE_STAGES - 1),
)
auroc = metrics.AUROC(compute_on_step=False).to(device)
accuracy = metrics.Accuracy(compute_on_step=False).to(device)
# Infinite iterator instead of while-loop to leverage tqdm progress bar.
with torch.no_grad():
for _ in tqdm(iter(int, 1), desc=f"Evaluating {stage} set"):
try:
_loss, logits, labels = train_pipeline.progress(combined_iterator)
preds = torch.sigmoid(logits)
auroc(preds, labels)
accuracy(preds, labels)
except StopIteration:
break
auroc_result = auroc.compute().item()
accuracy_result = accuracy.compute().item()
if dist.get_rank() == 0:
print(f"AUROC over {stage} set: {auroc_result}.")
print(f"Accuracy over {stage} set: {accuracy_result}.")
return auroc_result, accuracy_result
def _train(
train_pipeline: TrainPipelineSparseDist,
iterator: Iterator[Batch],
next_iterator: Iterator[Batch],
within_epoch_val_dataloader: DataLoader,
epoch: int,
epochs: int,
change_lr: bool,
lr_change_point: float,
lr_after_change_point: float,
validation_freq_within_epoch: Optional[int],
limit_train_batches: Optional[int],
limit_val_batches: Optional[int],
) -> None:
"""
Trains model for 1 epoch. Helper function for train_val_test.
Args:
args (argparse.Namespace): parsed command line args.
train_pipeline (TrainPipelineSparseDist): pipelined model.
iterator (Iterator[Batch]): Iterator used for training batches.
next_iterator (Iterator[Batch]): Iterator used for validation batches
in between epochs. Used to queue up the next TRAIN_PIPELINE_STAGES - 1
batches before train_val_test switches to validation mode. This is done
so that when validation starts, the first output train_pipeline generates
an output for is the 1st validation batch (as opposed to a buffered train
batch).
within_epoch_val_dataloader (DataLoader): Dataloader to create iterators for
validation within an epoch. This is only used if
validation_freq_within_epoch is specified.
epoch (int): Which epoch the model is being trained on.
epochs (int): Number of epochs to train.
change_lr (bool): Whether learning rate should be changed part way through
training.
lr_change_point (float): The point through training at which learning rate
should change to the value set by lr_after_change_point.
Applied only if change_lr is set to True.
lr_after_change_point (float): Learning rate after change point in first epoch.
Applied only if change_lr is set to True.
validation_freq_within_epoch (Optional[int]): Frequency at which validation
will be run within an epoch.
limit_train_batches (Optional[int]): Number of train batches.
limit_val_batches (Optional[int]): Number of validation batches.
Returns:
None.
"""
train_pipeline._model.train()
# For the first epoch, train_pipeline has no buffered batches, but for all other
# epochs, train_pipeline will have TRAIN_PIPELINE_STAGES - 1 from iterator already
# present in its buffer.
if limit_train_batches is not None and epoch > 0:
limit_train_batches -= TRAIN_PIPELINE_STAGES - 1
# Because TrainPipelineSparseDist buffer batches internally, we load in
# TRAIN_PIPELINE_STAGES - 1 batches from the next_iterator into the buffers so that
# when train_val_test switches to the next phase, train_pipeline will start
# producing results for the TRAIN_PIPELINE_STAGES - 1 buffered batches (as opposed
# to the last TRAIN_PIPELINE_STAGES - 1 batches from iterator).
combined_iterator = itertools.chain(
iterator
if limit_train_batches is None
else itertools.islice(iterator, limit_train_batches),
itertools.islice(next_iterator, TRAIN_PIPELINE_STAGES - 1),
)
samples_per_trainer = TOTAL_TRAINING_SAMPLES / dist.get_world_size() * epochs
# Infinite iterator instead of while-loop to leverage tqdm progress bar.
for it in tqdm(itertools.count(), desc=f"Epoch {epoch}"):
try:
train_pipeline.progress(combined_iterator)
if change_lr and (
(it * (epoch + 1) / samples_per_trainer) > lr_change_point
): # progress made through the epoch
print(f"Changing learning rate to: {lr_after_change_point}")
optimizer = train_pipeline._optimizer
lr = lr_after_change_point
for g in optimizer.param_groups:
g["lr"] = lr
if (
validation_freq_within_epoch
and it > 0
and it % validation_freq_within_epoch == 0
):
_evaluate(
limit_val_batches,
train_pipeline,
iter(within_epoch_val_dataloader),
iterator,
"val",
)
train_pipeline._model.train()
except StopIteration:
break
@dataclass
class TrainValTestResults:
val_accuracies: List[float] = field(default_factory=list)
val_aurocs: List[float] = field(default_factory=list)
test_accuracy: Optional[float] = None
test_auroc: Optional[float] = None
def train_val_test(
args: argparse.Namespace,
train_pipeline: TrainPipelineSparseDist,
train_dataloader: DataLoader,
val_dataloader: DataLoader,
test_dataloader: DataLoader,
) -> TrainValTestResults:
"""
Train/validation/test loop. Contains customized logic to ensure each dataloader's
batches are used for the correct designated purpose (train, val, test). This logic
is necessary because TrainPipelineSparseDist buffers batches internally (so we
avoid batches designated for one purpose like training getting buffered and used for
another purpose like validation).
Args:
args (argparse.Namespace): parsed command line args.
train_pipeline (TrainPipelineSparseDist): pipelined model.
train_dataloader (DataLoader): DataLoader used for training.
val_dataloader (DataLoader): DataLoader used for validation.
test_dataloader (DataLoader): DataLoader used for testing.
Returns:
TrainValTestResults.
"""
train_val_test_results = TrainValTestResults()
train_iterator = iter(train_dataloader)
test_iterator = iter(test_dataloader)
for epoch in range(args.epochs):
val_iterator = iter(val_dataloader)
_train(
train_pipeline,
train_iterator,
val_iterator,
val_dataloader,
epoch,
args.epochs,
args.change_lr,
args.lr_change_point,
args.lr_after_change_point,
args.validation_freq_within_epoch,
args.limit_train_batches,
args.limit_val_batches,
)
train_iterator = iter(train_dataloader)
val_next_iterator = (
test_iterator if epoch == args.epochs - 1 else train_iterator
)
val_accuracy, val_auroc = _evaluate(
args.limit_val_batches,
train_pipeline,
val_iterator,
val_next_iterator,
"val",
)
train_val_test_results.val_accuracies.append(val_accuracy)
train_val_test_results.val_aurocs.append(val_auroc)
test_accuracy, test_auroc = _evaluate(
args.limit_test_batches,
train_pipeline,
test_iterator,
iter(test_dataloader),
"test",
)
train_val_test_results.test_accuracy = test_accuracy
train_val_test_results.test_auroc = test_auroc
return train_val_test_results
def main(argv: List[str]) -> None:
"""
Trains, validates, and tests a Deep Learning Recommendation Model (DLRM)
(https://arxiv.org/abs/1906.00091). The DLRM model contains both data parallel
components (e.g. multi-layer perceptrons & interaction arch) and model parallel
components (e.g. embedding tables). The DLRM model is pipelined so that dataloading,
data-parallel to model-parallel comms, and forward/backward are overlapped. Can be
run with either a random dataloader or an in-memory Criteo 1 TB click logs dataset
(https://ailab.criteo.com/download-criteo-1tb-click-logs-dataset/).
Args:
argv (List[str]): command line args.
Returns:
None.
"""
args = parse_args(argv)
rank = int(os.environ["LOCAL_RANK"])
if torch.cuda.is_available():
device: torch.device = torch.device(f"cuda:{rank}")
backend = "nccl"
torch.cuda.set_device(device)
else:
device: torch.device = torch.device("cpu")
backend = "gloo"
if not torch.distributed.is_initialized():
dist.init_process_group(backend=backend)
if args.num_embeddings_per_feature is not None:
args.num_embeddings_per_feature = list(
map(int, args.num_embeddings_per_feature.split(","))
)
args.num_embeddings = None
# TODO add CriteoIterDataPipe support and add random_dataloader arg
train_dataloader = get_dataloader(args, backend, "train")
val_dataloader = get_dataloader(args, backend, "val")
test_dataloader = get_dataloader(args, backend, "test")
# Sets default limits for random dataloader iterations when left unspecified.
if args.in_memory_binary_criteo_path is None:
for stage in STAGES:
attr = f"limit_{stage}_batches"
if getattr(args, attr) is None:
setattr(args, attr, 10)
eb_configs = [
EmbeddingBagConfig(
name=f"t_{feature_name}",
embedding_dim=args.embedding_dim,
num_embeddings=none_throws(args.num_embeddings_per_feature)[feature_idx]
if args.num_embeddings is None
else args.num_embeddings,
feature_names=[feature_name],
)
for feature_idx, feature_name in enumerate(DEFAULT_CAT_NAMES)
]
sharded_module_kwargs = {}
if args.over_arch_layer_sizes is not None:
sharded_module_kwargs["over_arch_layer_sizes"] = list(
map(int, args.over_arch_layer_sizes.split(","))
)
if args.dlrmv2:
dlrm_model = DLRMV2(
embedding_bag_collection=EmbeddingBagCollection(
tables=eb_configs, device=torch.device("meta")
),
dense_in_features=len(DEFAULT_INT_NAMES),
dense_arch_layer_sizes=list(map(int, args.dense_arch_layer_sizes.split(","))),
over_arch_layer_sizes=list(map(int, args.over_arch_layer_sizes.split(","))),
interaction_branch1_layer_sizes=list(map(int, args.interaction_branch1_layer_sizes.split(","))),
interaction_branch2_layer_sizes=list(map(int, args.interaction_branch2_layer_sizes.split(","))),
dense_device=device,
)
else:
dlrm_model = DLRM(
embedding_bag_collection=EmbeddingBagCollection(
tables=eb_configs, device=torch.device("meta")
),
dense_in_features=len(DEFAULT_INT_NAMES),
dense_arch_layer_sizes=list(map(int, args.dense_arch_layer_sizes.split(","))),
over_arch_layer_sizes=list(map(int, args.over_arch_layer_sizes.split(","))),
dense_device=device,
)
train_model = DLRMTrain(dlrm_model)
fused_params = {
"learning_rate": args.learning_rate,
"optimizer": OptimType.EXACT_ROWWISE_ADAGRAD
if args.adagrad
else OptimType.EXACT_SGD,
}
sharders = [
EmbeddingBagCollectionSharder(fused_params=fused_params),
]
model = DistributedModelParallel(
module=train_model,
device=device,
sharders=cast(List[ModuleSharder[nn.Module]], sharders),
)
def optimizer_with_params():
if args.adagrad:
return lambda params: torch.optim.Adagrad(params, lr=args.learning_rate)
else:
return lambda params: torch.optim.SGD(params, lr=args.learning_rate)
dense_optimizer = KeyedOptimizerWrapper(
dict(model.named_parameters()),
optimizer_with_params(),
)
optimizer = CombinedOptimizer([model.fused_optimizer, dense_optimizer])
train_pipeline = TrainPipelineSparseDist(
model,
optimizer,
device,
)
if 1 < args.multi_hot_size:
multihot = Multihot(
args.multi_hot_size,
args.multi_hot_min_table_size,
args.num_embeddings_per_feature,
args.batch_size,
collect_freqs_stats=args.collect_multi_hot_freqs_stats,
type=args.multi_hot_distribution_type,
)
multihot.pause_stats_collection_during_val_and_test(train_pipeline._model)
train_dataloader = map(multihot.convert_to_multi_hot, train_dataloader)
val_dataloader = map(multihot.convert_to_multi_hot, val_dataloader)
test_dataloader = map(multihot.convert_to_multi_hot, test_dataloader)
train_val_test(
args, train_pipeline, train_dataloader, val_dataloader, test_dataloader
)
if 1 < args.multi_hot_size and multihot.collect_freqs_stats:
multihot.save_freqs_stats()
if __name__ == "__main__":
main(sys.argv[1:])
from typing import (
List,
Tuple,
)
import torch
import numpy as np
from torchrec.datasets.utils import Batch
from torchrec.sparse.jagged_tensor import KeyedJaggedTensor
class Multihot():
def __init__(
self,
multi_hot_size: int,
multi_hot_min_table_size: int,
ln_emb: List[int],
batch_size: int,
collect_freqs_stats: bool,
type: str = "uniform",
):
if type != "uniform" and type != "pareto":
raise ValueError(
"Multi-hot distribution type {} is not supported."
"Only \"uniform\" and \"pareto\" are supported.".format(type)
)
self.multi_hot_min_table_size = multi_hot_min_table_size
self.multi_hot_size = multi_hot_size
self.batch_size = batch_size
self.ln_emb = ln_emb
self.lS_i_offsets_cache = self.__make_multi_hot_indices_cache(multi_hot_size, ln_emb)
self.lS_o_cache = self.__make_offsets_cache(multi_hot_size, multi_hot_min_table_size, ln_emb, batch_size)
# For plotting frequency access
self.collect_freqs_stats = collect_freqs_stats
self.model_to_track = None
self.freqs_pre_hash = []
self.freqs_post_hash = []
for row_count in ln_emb:
self.freqs_pre_hash.append(np.zeros((row_count)))
self.freqs_post_hash.append(np.zeros((row_count)))
def save_freqs_stats(self) -> None:
if torch.distributed.is_available() and torch.distributed.is_initialized():
rank = torch.distributed.get_rank()
else:
rank = 0
pre_dict = {str(k) : e for k, e in enumerate(self.freqs_pre_hash)}
np.save(f"stats_pre_hash_{rank}_pareto.npy", pre_dict)
post_dict = {str(k) : e for k, e in enumerate(self.freqs_post_hash)}
np.save(f"stats_post_hash_{rank}_pareto.npy", post_dict)
def pause_stats_collection_during_val_and_test(self, model: torch.nn.Module) -> None:
self.model_to_track = model
def __make_multi_hot_indices_cache(
self,
multi_hot_size: int,
ln_emb: List[int],
) -> List[np.array]:
cache = [ np.zeros((rows_count, multi_hot_size)) for rows_count in ln_emb ]
for k, e in enumerate(ln_emb):
np.random.seed(k) # The seed is necessary for all ranks produce the same lookup values.
if type == "uniform":
cache[k][:,1:] = np.random.randint(0, e, size=(e, multi_hot_size-1))
elif type == "pareto":
cache[k][:,1:] = np.random.pareto(a=0.25, size=(e, multi_hot_size-1)).astype(np.int32) % e
# cache axes are [table, batch, offset]
cache = [ torch.from_numpy(table_cache).int() for table_cache in cache ]
return cache
def __make_offsets_cache(
self,
multi_hot_size: int,
multi_hot_min_table_size: int,
ln_emb: List[int],
batch_size: int,
) -> List[torch.Tensor]:
lS_o = torch.ones((len(ln_emb) * self.batch_size), dtype=torch.int32)
for cf, table_length in enumerate(ln_emb):
if table_length >= multi_hot_min_table_size:
lS_o[cf*batch_size : (cf+1)*batch_size] = multi_hot_size
lS_o = torch.cumsum( torch.concat((torch.tensor([0]), lS_o)), axis=0)
return lS_o
def __make_new_batch(
self,
lS_o: torch.Tensor,
lS_i: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]:
lS_i = lS_i.reshape(-1, self.batch_size)
if 1 < self.multi_hot_size:
multi_hot_i_l = []
for cf, table_length in enumerate(self.ln_emb):
if table_length < self.multi_hot_min_table_size:
multi_hot_i_l.append(lS_i[cf])
else:
keys = lS_i[cf]
multi_hot_i = torch.nn.functional.embedding(keys, self.lS_i_offsets_cache[cf])
multi_hot_i[:,0] = keys
multi_hot_i = multi_hot_i.reshape(-1)
multi_hot_i_l.append(multi_hot_i)
if self.collect_freqs_stats and (
self.model_to_track is None or self.model_to_track.training
):
self.freqs_pre_hash[cf][lS_i[cf]] += 1
self.freqs_post_hash[cf][multi_hot_i] += 1
lS_i = torch.cat(multi_hot_i_l)
return self.lS_o_cache, lS_i
else:
lS_i = torch.cat(lS_i)
return lS_o, lS_i
def convert_to_multi_hot(self, batch: Batch) -> Batch:
lS_i = batch.sparse_features._values
lS_o = batch.sparse_features._offsets
lS_o, lS_i = self.__make_new_batch(lS_o, lS_i)
new_sparse_features=KeyedJaggedTensor.from_offsets_sync(
keys=batch.sparse_features._keys,
values=lS_i,
offsets=lS_o,
)
return Batch(
dense_features=batch.dense_features,
sparse_features=new_sparse_features,
labels=batch.labels,
)
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import os
import tempfile
import unittest
import uuid
from torch.distributed.launcher.api import elastic_launch, LaunchConfig
from torchrec import test_utils
from torchrec.datasets.test_utils.criteo_test_utils import CriteoTest
from ..dlrm_main import main
class MainTest(unittest.TestCase):
@classmethod
def _run_trainer_random(cls) -> None:
main(
[
"--limit_train_batches",
"10",
"--limit_val_batches",
"8",
"--limit_test_batches",
"6",
"--over_arch_layer_sizes",
"8,1",
"--dense_arch_layer_sizes",
"8,8",
"--embedding_dim",
"8",
"--num_embeddings",
"8",
]
)
@test_utils.skip_if_asan
def test_main_function(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
lc = LaunchConfig(
min_nodes=1,
max_nodes=1,
nproc_per_node=2,
run_id=str(uuid.uuid4()),
rdzv_backend="c10d",
rdzv_endpoint=os.path.join(tmpdir, "rdzv"),
rdzv_configs={"store_type": "file"},
start_method="spawn",
monitor_interval=1,
max_restarts=0,
)
elastic_launch(config=lc, entrypoint=self._run_trainer_random)()
@classmethod
def _run_trainer_criteo_in_memory(cls) -> None:
with CriteoTest._create_dataset_npys(
num_rows=50, filenames=[f"day_{i}" for i in range(24)]
) as files:
main(
[
"--over_arch_layer_sizes",
"8,1",
"--dense_arch_layer_sizes",
"8,8",
"--embedding_dim",
"8",
"--num_embeddings",
"64",
"--batch_size",
"2",
"--in_memory_binary_criteo_path",
os.path.dirname(files[0]),
"--epochs",
"2",
]
)
@test_utils.skip_if_asan
def test_main_function_criteo_in_memory(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
lc = LaunchConfig(
min_nodes=1,
max_nodes=1,
nproc_per_node=2,
run_id=str(uuid.uuid4()),
rdzv_backend="c10d",
rdzv_endpoint=os.path.join(tmpdir, "rdzv"),
rdzv_configs={"store_type": "file"},
start_method="spawn",
monitor_interval=1,
max_restarts=0,
)
elastic_launch(config=lc, entrypoint=self._run_trainer_criteo_in_memory)()
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
#
# Mixed-Dimensions Trick
#
# Description: Applies mixed dimension trick to embeddings to reduce
# embedding sizes.
#
# References:
# [1] Antonio Ginart, Maxim Naumov, Dheevatsa Mudigere, Jiyan Yang, James Zou,
# "Mixed Dimension Embeddings with Application to Memory-Efficient Recommendation
# Systems", CoRR, arXiv:1909.11810, 2019
from __future__ import absolute_import, division, print_function, unicode_literals
import torch
import torch.nn as nn
def md_solver(n, alpha, d0=None, B=None, round_dim=True, k=None):
'''
An external facing function call for mixed-dimension assignment
with the alpha power temperature heuristic
Inputs:
n -- (torch.LongTensor) ; Vector of num of rows for each embedding matrix
alpha -- (torch.FloatTensor); Scalar, non-negative, controls dim. skew
d0 -- (torch.FloatTensor); Scalar, baseline embedding dimension
B -- (torch.FloatTensor); Scalar, parameter budget for embedding layer
round_dim -- (bool); flag for rounding dims to nearest pow of 2
k -- (torch.LongTensor) ; Vector of average number of queries per inference
'''
n, indices = torch.sort(n)
k = k[indices] if k is not None else torch.ones(len(n))
d = alpha_power_rule(n.type(torch.float) / k, alpha, d0=d0, B=B)
if round_dim:
d = pow_2_round(d)
undo_sort = [0] * len(indices)
for i, v in enumerate(indices):
undo_sort[v] = i
return d[undo_sort]
def alpha_power_rule(n, alpha, d0=None, B=None):
if d0 is not None:
lamb = d0 * (n[0].type(torch.float) ** alpha)
elif B is not None:
lamb = B / torch.sum(n.type(torch.float) ** (1 - alpha))
else:
raise ValueError("Must specify either d0 or B")
d = torch.ones(len(n)) * lamb * (n.type(torch.float) ** (-alpha))
for i in range(len(d)):
if i == 0 and d0 is not None:
d[i] = d0
else:
d[i] = 1 if d[i] < 1 else d[i]
return (torch.round(d).type(torch.long))
def pow_2_round(dims):
return 2 ** torch.round(torch.log2(dims.type(torch.float)))
class PrEmbeddingBag(nn.Module):
def __init__(self, num_embeddings, embedding_dim, base_dim):
super(PrEmbeddingBag, self).__init__()
self.embs = nn.EmbeddingBag(
num_embeddings, embedding_dim, mode="sum", sparse=True)
torch.nn.init.xavier_uniform_(self.embs.weight)
if embedding_dim < base_dim:
self.proj = nn.Linear(embedding_dim, base_dim, bias=False)
torch.nn.init.xavier_uniform_(self.proj.weight)
elif embedding_dim == base_dim:
self.proj = nn.Identity()
else:
raise ValueError(
"Embedding dim " + str(embedding_dim) + " > base dim " + str(base_dim)
)
def forward(self, input, offsets=None, per_sample_weights=None):
return self.proj(self.embs(
input, offsets=offsets, per_sample_weights=per_sample_weights))
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
#
# Quotient-Remainder Trick
#
# Description: Applies quotient remainder-trick to embeddings to reduce
# embedding sizes.
#
# References:
# [1] Hao-Jun Michael Shi, Dheevatsa Mudigere, Maxim Naumov, Jiyan Yang,
# "Compositional Embeddings Using Complementary Partitions for Memory-Efficient
# Recommendation Systems", CoRR, arXiv:1909.02107, 2019
from __future__ import absolute_import, division, print_function, unicode_literals
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.parameter import Parameter
import numpy as np
class QREmbeddingBag(nn.Module):
r"""Computes sums or means over two 'bags' of embeddings, one using the quotient
of the indices and the other using the remainder of the indices, without
instantiating the intermediate embeddings, then performs an operation to combine these.
For bags of constant length and no :attr:`per_sample_weights`, this class
* with ``mode="sum"`` is equivalent to :class:`~torch.nn.Embedding` followed by ``torch.sum(dim=0)``,
* with ``mode="mean"`` is equivalent to :class:`~torch.nn.Embedding` followed by ``torch.mean(dim=0)``,
* with ``mode="max"`` is equivalent to :class:`~torch.nn.Embedding` followed by ``torch.max(dim=0)``.
However, :class:`~torch.nn.EmbeddingBag` is much more time and memory efficient than using a chain of these
operations.
QREmbeddingBag also supports per-sample weights as an argument to the forward
pass. This scales the output of the Embedding before performing a weighted
reduction as specified by ``mode``. If :attr:`per_sample_weights`` is passed, the
only supported ``mode`` is ``"sum"``, which computes a weighted sum according to
:attr:`per_sample_weights`.
Known Issues:
Autograd breaks with multiple GPUs. It breaks only with multiple embeddings.
Args:
num_categories (int): total number of unique categories. The input indices must be in
0, 1, ..., num_categories - 1.
embedding_dim (list): list of sizes for each embedding vector in each table. If ``"add"``
or ``"mult"`` operation are used, these embedding dimensions must be
the same. If a single embedding_dim is used, then it will use this
embedding_dim for both embedding tables.
num_collisions (int): number of collisions to enforce.
operation (string, optional): ``"concat"``, ``"add"``, or ``"mult". Specifies the operation
to compose embeddings. ``"concat"`` concatenates the embeddings,
``"add"`` sums the embeddings, and ``"mult"`` multiplies
(component-wise) the embeddings.
Default: ``"mult"``
max_norm (float, optional): If given, each embedding vector with norm larger than :attr:`max_norm`
is renormalized to have norm :attr:`max_norm`.
norm_type (float, optional): The p of the p-norm to compute for the :attr:`max_norm` option. Default ``2``.
scale_grad_by_freq (boolean, optional): if given, this will scale gradients by the inverse of frequency of
the words in the mini-batch. Default ``False``.
Note: this option is not supported when ``mode="max"``.
mode (string, optional): ``"sum"``, ``"mean"`` or ``"max"``. Specifies the way to reduce the bag.
``"sum"`` computes the weighted sum, taking :attr:`per_sample_weights`
into consideration. ``"mean"`` computes the average of the values
in the bag, ``"max"`` computes the max value over each bag.
Default: ``"mean"``
sparse (bool, optional): if ``True``, gradient w.r.t. :attr:`weight` matrix will be a sparse tensor. See
Notes for more details regarding sparse gradients. Note: this option is not
supported when ``mode="max"``.
Attributes:
weight (Tensor): the learnable weights of each embedding table is the module of shape
`(num_embeddings, embedding_dim)` initialized using a uniform distribution
with sqrt(1 / num_categories).
Inputs: :attr:`input` (LongTensor), :attr:`offsets` (LongTensor, optional), and
:attr:`per_index_weights` (Tensor, optional)
- If :attr:`input` is 2D of shape `(B, N)`,
it will be treated as ``B`` bags (sequences) each of fixed length ``N``, and
this will return ``B`` values aggregated in a way depending on the :attr:`mode`.
:attr:`offsets` is ignored and required to be ``None`` in this case.
- If :attr:`input` is 1D of shape `(N)`,
it will be treated as a concatenation of multiple bags (sequences).
:attr:`offsets` is required to be a 1D tensor containing the
starting index positions of each bag in :attr:`input`. Therefore,
for :attr:`offsets` of shape `(B)`, :attr:`input` will be viewed as
having ``B`` bags. Empty bags (i.e., having 0-length) will have
returned vectors filled by zeros.
per_sample_weights (Tensor, optional): a tensor of float / double weights, or None
to indicate all weights should be taken to be ``1``. If specified, :attr:`per_sample_weights`
must have exactly the same shape as input and is treated as having the same
:attr:`offsets`, if those are not ``None``. Only supported for ``mode='sum'``.
Output shape: `(B, embedding_dim)`
"""
__constants__ = ['num_categories', 'embedding_dim', 'num_collisions',
'operation', 'max_norm', 'norm_type', 'scale_grad_by_freq',
'mode', 'sparse']
def __init__(self, num_categories, embedding_dim, num_collisions,
operation='mult', max_norm=None, norm_type=2.,
scale_grad_by_freq=False, mode='mean', sparse=False,
_weight=None):
super(QREmbeddingBag, self).__init__()
assert operation in ['concat', 'mult', 'add'], 'Not valid operation!'
self.num_categories = num_categories
if isinstance(embedding_dim, int) or len(embedding_dim) == 1:
self.embedding_dim = [embedding_dim, embedding_dim]
else:
self.embedding_dim = embedding_dim
self.num_collisions = num_collisions
self.operation = operation
self.max_norm = max_norm
self.norm_type = norm_type
self.scale_grad_by_freq = scale_grad_by_freq
if self.operation == 'add' or self.operation == 'mult':
assert self.embedding_dim[0] == self.embedding_dim[1], \
'Embedding dimensions do not match!'
self.num_embeddings = [int(np.ceil(num_categories / num_collisions)),
num_collisions]
if _weight is None:
self.weight_q = Parameter(torch.Tensor(self.num_embeddings[0], self.embedding_dim[0]))
self.weight_r = Parameter(torch.Tensor(self.num_embeddings[1], self.embedding_dim[1]))
self.reset_parameters()
else:
assert list(_weight[0].shape) == [self.num_embeddings[0], self.embedding_dim[0]], \
'Shape of weight for quotient table does not match num_embeddings and embedding_dim'
assert list(_weight[1].shape) == [self.num_embeddings[1], self.embedding_dim[1]], \
'Shape of weight for remainder table does not match num_embeddings and embedding_dim'
self.weight_q = Parameter(_weight[0])
self.weight_r = Parameter(_weight[1])
self.mode = mode
self.sparse = sparse
def reset_parameters(self):
nn.init.uniform_(self.weight_q, np.sqrt(1 / self.num_categories))
nn.init.uniform_(self.weight_r, np.sqrt(1 / self.num_categories))
def forward(self, input, offsets=None, per_sample_weights=None):
input_q = (input / self.num_collisions).long()
input_r = torch.remainder(input, self.num_collisions).long()
embed_q = F.embedding_bag(input_q, self.weight_q, offsets, self.max_norm,
self.norm_type, self.scale_grad_by_freq, self.mode,
self.sparse, per_sample_weights)
embed_r = F.embedding_bag(input_r, self.weight_r, offsets, self.max_norm,
self.norm_type, self.scale_grad_by_freq, self.mode,
self.sparse, per_sample_weights)
if self.operation == 'concat':
embed = torch.cat((embed_q, embed_r), dim=1)
elif self.operation == 'add':
embed = embed_q + embed_r
elif self.operation == 'mult':
embed = embed_q * embed_r
return embed
def extra_repr(self):
s = '{num_embeddings}, {embedding_dim}'
if self.max_norm is not None:
s += ', max_norm={max_norm}'
if self.norm_type != 2:
s += ', norm_type={norm_type}'
if self.scale_grad_by_freq is not False:
s += ', scale_grad_by_freq={scale_grad_by_freq}'
s += ', mode={mode}'
return s.format(**self.__dict__)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment