Commit 9c8a2a14 authored by xinghao's avatar xinghao
Browse files

Initial commit

parents
Pipeline #3002 canceled with stages
# @lint-ignore-every LICENSELINT
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""
Utilities for MLPerf logging
"""
import os
import torch
try:
from mlperf_logging import mllog
from mlperf_logging.mllog import constants
_MLLOGGER = mllog.get_mllogger()
except ImportError as error:
print("Unable to import mlperf_logging, ", error)
def log_start(*args, **kwargs):
"log with start tag"
_log_print(_MLLOGGER.start, *args, **kwargs)
def log_end(*args, **kwargs):
"log with end tag"
_log_print(_MLLOGGER.end, *args, **kwargs)
def log_event(*args, **kwargs):
"log with event tag"
_log_print(_MLLOGGER.event, *args, **kwargs)
def _log_print(logger, *args, **kwargs):
"makes mlperf logger aware of distributed execution"
if "stack_offset" not in kwargs:
kwargs["stack_offset"] = 3
if "value" not in kwargs:
kwargs["value"] = None
if kwargs.pop("log_all_ranks", False):
log = True
else:
log = get_rank() == 0
if log:
logger(*args, **kwargs)
def config_logger(benchmark):
"initiates mlperf logger"
mllog.config(
filename=os.path.join(
os.path.dirname(os.path.abspath(__file__)), f"{benchmark}.log"
)
)
_MLLOGGER.logger.propagate = False
def barrier():
"""
Works as a temporary distributed barrier, currently pytorch
doesn't implement barrier for NCCL backend.
Calls all_reduce on dummy tensor and synchronizes with GPU.
"""
if torch.distributed.is_available() and torch.distributed.is_initialized():
torch.distributed.all_reduce(torch.cuda.FloatTensor(1))
torch.cuda.synchronize()
def get_rank():
"""
Gets distributed rank or returns zero if distributed is not initialized.
"""
if torch.distributed.is_available() and torch.distributed.is_initialized():
rank = torch.distributed.get_rank()
else:
rank = 0
return rank
def mlperf_submission_log(benchmark):
"""
Logs information needed for MLPerf submission
"""
config_logger(benchmark)
log_event(
key=constants.SUBMISSION_BENCHMARK,
value=benchmark,
)
log_event(key=constants.SUBMISSION_ORG, value="reference_implementation")
log_event(key=constants.SUBMISSION_DIVISION, value="closed")
log_event(key=constants.SUBMISSION_STATUS, value="onprem")
log_event(key=constants.SUBMISSION_PLATFORM, value="reference_implementation")
log_event(key=constants.SUBMISSION_ENTRY, value="reference_implementation")
log_event(key=constants.SUBMISSION_POC_NAME, value="reference_implementation")
log_event(key=constants.SUBMISSION_POC_EMAIL, value="reference_implementation")
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import torch
from torch.optim import Optimizer
class RWSAdagrad(Optimizer):
"""Implements Row Wise Sparse Adagrad algorithm.
Arguments:
params (iterable): iterable of parameters to optimize or dicts defining
parameter groups
lr (float, optional): learning rate (default: 1e-2)
lr_decay (float, optional): learning rate decay (default: 0)
weight_decay (float, optional): weight decay (L2 penalty) (default: 0)
eps (float, optional): term added to the denominator to improve
numerical stability (default: 1e-10)
"""
def __init__(
self,
params,
lr=1e-2,
lr_decay=0.0,
weight_decay=0.0,
initial_accumulator_value=0.0,
eps=1e-10,
):
if not 0.0 <= lr:
raise ValueError("Invalid learning rate: {}".format(lr))
if not 0.0 <= lr_decay:
raise ValueError("Invalid lr_decay value: {}".format(lr_decay))
if not 0.0 <= weight_decay:
raise ValueError("Invalid weight_decay value: {}".format(weight_decay))
if not 0.0 <= initial_accumulator_value:
raise ValueError(
"Invalid initial_accumulator_value value: {}".format(
initial_accumulator_value
)
)
if not 0.0 <= eps:
raise ValueError("Invalid epsilon value: {}".format(eps))
self.defaults = dict(
lr=lr,
lr_decay=lr_decay,
eps=eps,
weight_decay=weight_decay,
initial_accumulator_value=initial_accumulator_value,
)
super(RWSAdagrad, self).__init__(params, self.defaults)
self.momentum_initialized = False
for group in self.param_groups:
for p in group["params"]:
self.state[p]["step"] = 0
def share_memory(self):
for group in self.param_groups:
for p in group["params"]:
state = self.state[p]
if p.grad.data.is_sparse:
state["momentum"].share_memory_()
else:
state["sum"].share_memory_()
def step(self, closure=None):
"""Performs a single optimization step.
Arguments:
closure (callable, optional): A closure that reevaluates the model
and returns the loss.
"""
loss = None
if closure is not None:
loss = closure()
for group in self.param_groups:
for p in group["params"]:
if p.grad is None:
continue
if not self.momentum_initialized:
if p.grad.data.is_sparse:
self.state[p]["momentum"] = torch.full(
[p.data.shape[0]],
self.defaults["initial_accumulator_value"],
dtype=torch.float32,
)
else:
self.state[p]["sum"] = torch.full_like(
p.data,
self.defaults["initial_accumulator_value"],
dtype=torch.float32,
)
grad = p.grad
state = self.state[p]
state["step"] += 1
if group["weight_decay"] != 0:
if p.grad.data.is_sparse:
raise RuntimeError(
"weight_decay option is not compatible with sparse gradients"
)
grad = grad.add(group["weight_decay"], p.data)
clr = group["lr"] / (1.0 + (state["step"] - 1.0) * group["lr_decay"])
if grad.is_sparse:
grad = (
grad.coalesce()
) # the update is non-linear so indices must be unique
grad_indices = grad._indices()
grad_values = grad._values()
size = grad.size()
def make_sparse(values, row_wise):
constructor = grad.new
matrix_size = [size[0]] if row_wise else size
return constructor(grad_indices, values, matrix_size)
if grad_values.numel() > 0:
momentum_update = make_sparse(
grad_values.pow(2).mean(dim=1), True
)
state["momentum"].add_(momentum_update) # update momentum
std = state["momentum"].sparse_mask(momentum_update.coalesce())
std_values = std._values().sqrt_().add_(group["eps"])
p.data.add_(
make_sparse(
grad_values / std_values.view(std_values.size()[0], 1),
False,
),
alpha=-clr,
)
else:
state["sum"].addcmul_(grad, grad, value=1.0)
std = state["sum"].sqrt().add_(group["eps"])
p.data.addcdiv_(grad, std, value=-clr)
self.momentum_initialized = True
return loss
#!/bin/bash
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
#WARNING: must have compiled PyTorch
#check if extra argument is passed to the test
if [[ $# == 1 ]]; then
dlrm_extra_option=$1
else
dlrm_extra_option=""
fi
#echo $dlrm_extra_option
dlrm_py="python dlrm_s_pytorch.py"
echo "Running commands ..."
#run pytorch
echo $dlrm_py
$dlrm_py --mini-batch-size=1 --data-size=1 --nepochs=1 --arch-interaction-op=dot --learning-rate=0.1 --debug-mode $dlrm_extra_option > ppp1
$dlrm_py --mini-batch-size=2 --data-size=4 --nepochs=1 --arch-interaction-op=dot --learning-rate=0.1 --debug-mode $dlrm_extra_option > ppp2
$dlrm_py --mini-batch-size=2 --data-size=5 --nepochs=1 --arch-interaction-op=dot --learning-rate=0.1 --debug-mode $dlrm_extra_option > ppp3
$dlrm_py --mini-batch-size=2 --data-size=5 --nepochs=3 --arch-interaction-op=dot --learning-rate=0.1 --debug-mode $dlrm_extra_option > ppp4
echo "All PyTorch tests completed."
echo "Output files: ppp1, ppp2, ppp3, ppp4"
\ No newline at end of file
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# This script performs the visualization of the embedding tables created in
# DLRM during the training procedure. We use two popular techniques for
# visualization: umap (https://umap-learn.readthedocs.io/en/latest/) and
# tsne (https://scikit-learn.org/stable/modules/generated/sklearn.manifold.TSNE.html).
# These links also provide instructions on how to install these packages
# in different environments.
#
# Warning: the size of the data to be visualized depends on the RAM on your machine.
#
#
# Connand line examples:
#
# Full analysis of embeddings and data representations for Criteo Kaggle data:
# $python ./tools/visualize.py --data-set=kaggle --load-model=../dlrm-2020-05-25/criteo.pytorch-e-0-i-110591
# --raw-data-file=../../criteo/input/train.txt --skip-categorical-analysis
# --processed-data-file=../../criteo/input/kaggleAdDisplayChallenge_processed.npz
#
#
# To run just the analysis of categoricala data for Criteo Kaggle data set:
# $python ./tools/visualize.py --data-set=kaggle --load-model=../dlrm-2020-05-25/criteo.pytorch-e-0-i-110591 \
# --raw-data-file=../../criteo/input/train.txt --data-randomize=none --processed-data-file=../../criteo/input/kaggleAdDisplayChallenge_processed.npz \
# --skip-embedding --skip-data-plots
#
#
# The following command line arguments are available to the user:
#
# --load-model - DLRM model file
# --data-set - one of ["kaggle", "terabyte"]
# --max-ind-range - max index range used during the traning
# --output-dir - output directory, if not specified, it will be traeted from the model and datset names
# --max-umap-size - max number of points to visualize using UMAP, default=50000
# --use-tsne - use T-SNE
# --max-tsne-size - max number of points to visualize using T-SNE, default=1000)
# --skip-embedding - skips analysis of embedding tables
# --umap-metric - metric for UMAP
# --skip-data-plots - skips data plots
# --skip-categorical-analysis - skips categorical analysis
#
# # data file related
# --raw-data-file
# --processed-data-file
# --data-sub-sample-rate
# --data-randomize
# --memory-map
# --mini-batch-size
# --num-workers
# --test-mini-batch-size
# --test-num-workers
# --num-batches
# --mlperf-logging
import argparse
import collections
import json
import math
import os
import sys
import dlrm_data_pytorch as dp
import hdbscan
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import torch
import umap
from dlrm_s_pytorch import DLRM_Net
from sklearn import manifold
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
def visualize_embeddings_umap(
emb_l,
output_dir="",
max_size=500000,
umap_metric="euclidean",
cat_counts=None,
use_max_count=True,
):
for k in range(0, len(emb_l)):
E = emb_l[k].weight.detach().cpu().numpy()
print("umap", E.shape)
# create histogram of norms
bins = 50
norms = [np.linalg.norm(E[i], ord=2) for i in range(0, E.shape[0])]
# plt.hist(norms, bins = bins)
# plt.title("Cat norm hist var. "+str(k))
hist, bins = np.histogram(norms, bins=bins)
logbins = np.logspace(np.log10(bins[0]), np.log10(bins[-1]), len(bins))
plt.figure(figsize=(8, 8))
plt.title(
"Categorical norms: " + str(k) + " cardinality " + str(len(cat_counts[k]))
)
plt.hist(norms, bins=logbins)
plt.xscale("log")
# plt.legend()
plt.savefig(output_dir + "/cat-norm-histogram-" + str(k) + ".png")
plt.close()
if E.shape[0] < 20:
print("Skipping small embedding")
continue
n_vis = min(max_size, E.shape[0])
min_cnt = 0
# reducer = umap.UMAP(random_state=42, n_neighbors=25, min_dist=0.1)
reducer = umap.UMAP(random_state=42, metric=umap_metric)
if use_max_count is False or n_vis == E.shape[0]:
Y = reducer.fit_transform(E[:n_vis, :])
else:
# select values with couns > 1
done = False
min_cnt = 1
while done == False:
el_cnt = (cat_counts[k] > min_cnt).sum()
if el_cnt <= max_size:
done = True
else:
min_cnt = min_cnt + 1
E1 = []
for i in range(0, E.shape[0]):
if cat_counts[k][i] > min_cnt:
E1.append(E[i, :])
print("max_count_len", len(E1), "mincount", min_cnt)
Y = reducer.fit_transform(np.array(E1))
n_vis = len(E1)
plt.figure(figsize=(8, 8))
linewidth = 0
size = 1
if Y.shape[0] < 2500:
linewidth = 1
size = 5
if cat_counts is None:
plt.scatter(-Y[:, 0], -Y[:, 1], s=size, marker=".", linewidth=linewidth)
else:
# print(cat_counts[k])
n_disp = min(len(cat_counts[k]), Y.shape[0])
cur_max = math.log(max(cat_counts[k]))
norm_cat_count = [
math.log(cat_counts[k][i] + 1) / cur_max
for i in range(0, len(cat_counts[k]))
]
plt.scatter(
-Y[0:n_disp, 0],
-Y[0:n_disp, 1],
s=size,
marker=".",
linewidth=linewidth,
c=np.array(norm_cat_count)[0:n_disp],
cmap="viridis",
)
plt.colorbar()
plt.title(
"UMAP: categorical var. "
+ str(k)
+ " ("
+ str(n_vis)
+ " of "
+ str(E.shape[0])
+ ", min count "
+ str(min_cnt)
+ ")"
)
plt.savefig(
output_dir
+ "/cat-"
+ str(k)
+ "-"
+ str(n_vis)
+ "-of-"
+ str(E.shape[0])
+ "-umap.png"
)
plt.close()
def visualize_embeddings_tsne(emb_l, output_dir="", max_size=10000):
for k in range(0, len(emb_l)):
E = emb_l[k].weight.detach().cpu()
print("tsne", E.shape)
if E.shape[0] < 20:
print("Skipping small embedding")
continue
n_vis = min(max_size, E.shape[0])
tsne = manifold.TSNE(init="pca", random_state=0, method="exact")
Y = tsne.fit_transform(E[:n_vis, :])
plt.figure(figsize=(8, 8))
linewidth = 0
if Y.shape[0] < 5000:
linewidth = 1
plt.scatter(-Y[:, 0], -Y[:, 1], s=1, marker=".", linewidth=linewidth)
plt.title(
"TSNE: categorical var. "
+ str(k)
+ " ("
+ str(n_vis)
+ " of "
+ str(E.shape[0])
+ ")"
)
plt.savefig(
output_dir
+ "/cat-"
+ str(k)
+ "-"
+ str(n_vis)
+ "-of-"
+ str(E.shape[0])
+ "-tsne.png"
)
plt.close()
def analyse_categorical_data(X_cat, n_days=10, output_dir=""):
# analyse categorical variables
n_vec = len(X_cat)
n_cat = len(X_cat[0])
n_days = n_days
print("n_vec", n_vec, "n_cat", n_cat)
# for c in train_data.X_cat:
# print(n_cat, c)
all_cat = np.array(X_cat)
print("all_cat.shape", all_cat.shape)
day_size = all_cat.shape[0] / n_days
for i in range(0, n_cat):
l_d = []
l_s1 = []
l_s2 = []
l_int = []
l_rem = []
cat = all_cat[:, i]
print("cat", i, cat.shape)
for d in range(1, n_days):
offset = int(d * day_size)
# print(offset)
cat1 = cat[:offset]
cat2 = cat[offset:]
s1 = set(cat1)
s2 = set(cat2)
intersect = list(s1 & s2)
# print(intersect)
l_d.append(d)
l_s1.append(len(s1))
l_s2.append(len(s2))
l_int.append(len(intersect))
l_rem.append((len(s1) - len(intersect)))
print(
d,
",",
len(s1),
",",
len(s2),
",",
len(intersect),
",",
(len(s1) - len(intersect)),
)
print("spit", l_d)
print("before", l_s1)
print("after", l_s2)
print("inters.", l_int)
print("removed", l_rem)
plt.figure(figsize=(8, 8))
plt.plot(l_d, l_s1, "g", label="before")
plt.plot(l_d, l_s2, "r", label="after")
plt.plot(l_d, l_int, "b", label="intersect")
plt.plot(l_d, l_rem, "y", label="removed")
plt.title("categorical var. " + str(i))
plt.legend()
plt.savefig(output_dir + "/cat-" + str(i).zfill(3) + ".png")
plt.close()
def analyse_categorical_counts(X_cat, emb_l=None, output_dir=""):
# analyse categorical variables
n_vec = len(X_cat)
n_cat = len(X_cat[0])
print("n_vec", n_vec, "n_cat", n_cat)
# for c in train_data.X_cat:
# print(n_cat, c)
all_cat = np.array(X_cat)
print("all_cat.shape", all_cat.shape)
all_counts = []
for i in range(0, n_cat):
cat = all_cat[:, i]
if emb_l is None:
s = set(cat)
counts = np.zeros((len(s)))
print("cat", i, cat.shape, len(s))
else:
s = emb_l[i].weight.detach().cpu().shape[0]
counts = np.zeros((s))
print("cat", i, cat.shape, s)
for d in range(0, n_vec):
cv = int(cat[d])
counts[cv] = counts[cv] + 1
all_counts.append(counts)
if emb_l is None:
plt.figure(figsize=(8, 8))
plt.plot(counts)
plt.title("Categorical var " + str(i) + " cardinality " + str(len(counts)))
# plt.legend()
else:
E = emb_l[i].weight.detach().cpu().numpy()
norms = [np.linalg.norm(E[i], ord=2) for i in range(0, E.shape[0])]
fig, (ax0, ax1) = plt.subplots(2, 1)
fig.suptitle(
"Categorical variable: " + str(i) + " cardinality " + str(len(counts))
)
ax0.plot(counts)
ax0.set_yscale("log")
ax0.set_title("Counts", fontsize=10)
ax1.plot(norms)
ax1.set_title("Norms", fontsize=10)
plt.savefig(output_dir + "/cat_counts-" + str(i).zfill(3) + ".png")
plt.close()
return all_counts
def dlrm_output_wrap(dlrm, X, lS_o, lS_i, T):
all_feat_vec = []
all_cat_vec = []
x_vec = None
t_out = None
c_out = None
z_out = []
p_out = None
z_size = len(dlrm.top_l)
x = dlrm.apply_mlp(X, dlrm.bot_l)
# debug prints
# print("intermediate")
# print(x[0].detach().cpu().numpy())
x_vec = x[0].detach().cpu().numpy()
all_feat_vec.append(x_vec)
# all_X.append(x[0].detach().cpu().numpy())
# process sparse features(using embeddings), resulting in a list of row vectors
ly = dlrm.apply_emb(lS_o, lS_i, dlrm.emb_l)
for e in ly:
# print(e.detach().cpu().numpy())
all_feat_vec.append(e[0].detach().cpu().numpy())
all_cat_vec.append(e[0].detach().cpu().numpy())
all_feat_vec = np.concatenate(all_feat_vec, axis=0)
all_cat_vec = np.concatenate(all_cat_vec, axis=0)
# all_features.append(all_feat_vec)
# all_cat.append(all_cat_vec)
t_out = int(T.detach().cpu().numpy()[0, 0])
# all_T.append(int(T.detach().cpu().numpy()[0,0]))
z = dlrm.interact_features(x, ly)
# print(z.detach().cpu().numpy())
# z_out = z.detach().cpu().numpy().flatten()
z_out.append(z.detach().cpu().numpy().flatten())
# all_z[0].append(z.detach().cpu().numpy().flatten())
# obtain probability of a click (using top mlp)
# print(dlrm.top_l)
# p = dlrm.apply_mlp(z, dlrm.top_l)
for i in range(0, z_size):
z = dlrm.top_l[i](z)
# if i < z_size-1:
# curr_z = z.detach().cpu().numpy().flatten()
z_out.append(z.detach().cpu().numpy().flatten())
# all_z[i+1].append(curr_z)
# print("z append", i)
# print("z",i, z.detach().cpu().numpy().flatten().shape)
p = z
# clamp output if needed
if 0.0 < dlrm.loss_threshold and dlrm.loss_threshold < 1.0:
z = torch.clamp(p, min=dlrm.loss_threshold, max=(1.0 - dlrm.loss_threshold))
else:
z = p
class_thresh = 0.0 # -0.25
zp = z.detach().cpu().numpy()[0, 0] + class_thresh
p_out = int(zp + 0.5)
if p_out > 1:
p_out = 1
if p_out < 0:
p_out = 0
# all_pred.append(int(z.detach().cpu().numpy()[0,0]+0.5))
# print(int(z.detach().cpu().numpy()[0,0]+0.5))
if int(p_out) == t_out:
c_out = 0
else:
c_out = 1
return all_feat_vec, x_vec, all_cat_vec, t_out, c_out, z_out, p_out
def create_umap_data(dlrm, data_ld, max_size=50000, offset=0, info=""):
all_features = []
all_X = []
all_cat = []
all_T = []
all_c = []
all_z = []
all_pred = []
z_size = len(dlrm.top_l)
print("z_size", z_size)
for i in range(0, z_size):
all_z.append([])
for j, (X, lS_o, lS_i, T) in enumerate(data_ld):
if j < offset:
continue
if j >= max_size + offset:
break
af, x, cat, t, c, z, p = dlrm_output_wrap(dlrm, X, lS_o, lS_i, T)
all_features.append(af)
all_X.append(x)
all_cat.append(cat)
all_T.append(t)
all_c.append(c)
all_pred.append(p)
for i in range(0, z_size):
all_z[i].append(z[i])
# # calculate classifier metrics
ac = accuracy_score(all_T, all_pred)
f1 = f1_score(all_T, all_pred)
ps = precision_score(all_T, all_pred)
rc = recall_score(all_T, all_pred)
print(info, "accuracy", ac, "f1", f1, "precision", ps, "recall", rc)
return all_features, all_X, all_cat, all_T, all_z, all_c, all_pred
def plot_all_data_3(
umap_Y,
umap_T,
train_Y=None,
train_T=None,
test_Y=None,
test_T=None,
total_train_size="",
total_test_size="",
info="",
output_dir="",
orig_space_dim=0,
):
size = 1
colors = ["red", "green"]
fig, (ax0, ax1, ax2) = plt.subplots(1, 3)
fig.suptitle("UMAP: " + info + " space dim " + str(orig_space_dim))
ax0.scatter(
umap_Y[:, 0],
umap_Y[:, 1],
s=size,
c=umap_T,
cmap=matplotlib.colors.ListedColormap(colors),
marker=".",
linewidth=0,
)
ax0.set_title(
"UMAP (" + str(len(umap_T)) + " of " + total_train_size + ")", fontsize=7
)
if train_Y is not None and train_T is not None:
ax1.scatter(
train_Y[:, 0],
train_Y[:, 1],
s=size,
c=train_T,
cmap=matplotlib.colors.ListedColormap(colors),
marker=".",
linewidth=0,
)
ax1.set_title(
"Train (" + str(len(train_T)) + " of " + total_train_size + ")", fontsize=7
)
if test_Y is not None and test_T is not None:
ax2.scatter(
test_Y[:, 0],
test_Y[:, 1],
s=size,
c=test_T,
cmap=matplotlib.colors.ListedColormap(colors),
marker=".",
linewidth=0,
)
ax2.set_title(
"Test (" + str(len(test_T)) + " of " + total_test_size + ")", fontsize=7
)
plt.savefig(output_dir + "/" + info + "-umap.png")
plt.close()
def plot_one_class_3(
umap_Y,
umap_T,
train_Y,
train_T,
test_Y,
test_T,
target=0,
col="red",
total_train_size="",
total_test_size="",
info="",
output_dir="",
orig_space_dim=0,
):
size = 1
fig, (ax0, ax1, ax2) = plt.subplots(1, 3)
fig.suptitle("UMAP: " + info + " space dim " + str(orig_space_dim))
ind_l_umap = [i for i, x in enumerate(umap_T) if x == target]
Y_umap_l = np.array([umap_Y[i, :] for i in ind_l_umap])
ax0.scatter(Y_umap_l[:, 0], Y_umap_l[:, 1], s=size, c=col, marker=".", linewidth=0)
ax0.set_title(
"UMAP, (" + str(len(umap_T)) + " of " + total_train_size + ")", fontsize=7
)
if train_Y is not None and train_T is not None:
ind_l_test = [i for i, x in enumerate(train_T) if x == target]
Y_test_l = np.array([train_Y[i, :] for i in ind_l_test])
ax1.scatter(
Y_test_l[:, 0], Y_test_l[:, 1], s=size, c=col, marker=".", linewidth=0
)
ax1.set_title(
"Train, (" + str(len(train_T)) + " of " + total_train_size + ")", fontsize=7
)
if test_Y is not None and test_T is not None:
ind_l_test = [i for i, x in enumerate(test_T) if x == target]
Y_test_l = np.array([test_Y[i, :] for i in ind_l_test])
ax2.scatter(
Y_test_l[:, 0], Y_test_l[:, 1], s=size, c=col, marker=".", linewidth=0
)
ax2.set_title(
"Test, (" + str(len(test_T)) + " of " + total_test_size + ")", fontsize=7
)
plt.savefig(output_dir + "/" + info + "-umap.png")
plt.close()
def visualize_umap_data(
umap_Y,
umap_T,
umap_C,
umap_P,
train_Y,
train_T,
train_C,
train_P,
test_Y=None,
test_T=None,
test_C=None,
test_P=None,
total_train_size="",
total_test_size="",
info="",
output_dir="",
orig_space_dim=0,
):
# all classes
plot_all_data_3(
umap_Y=umap_Y,
umap_T=umap_T,
train_Y=train_Y,
train_T=train_T,
test_Y=test_Y,
test_T=test_T,
total_train_size=total_train_size,
total_test_size=total_test_size,
info=info,
output_dir=output_dir,
orig_space_dim=orig_space_dim,
)
# all predictions
plot_all_data_3(
umap_Y=umap_Y,
umap_T=umap_P,
train_Y=train_Y,
train_T=train_P,
test_Y=test_Y,
test_T=test_P,
total_train_size=total_train_size,
total_test_size=total_test_size,
info=info + ", all-predictions",
output_dir=output_dir,
orig_space_dim=orig_space_dim,
)
# class 0
plot_one_class_3(
umap_Y=umap_Y,
umap_T=umap_T,
train_Y=train_Y,
train_T=train_T,
test_Y=test_Y,
test_T=test_T,
target=0,
col="red",
total_train_size=total_train_size,
total_test_size=total_test_size,
info=info + " class " + str(0),
output_dir=output_dir,
orig_space_dim=orig_space_dim,
)
# class 1
plot_one_class_3(
umap_Y=umap_Y,
umap_T=umap_T,
train_Y=train_Y,
train_T=train_T,
test_Y=test_Y,
test_T=test_T,
target=1,
col="green",
total_train_size=total_train_size,
total_test_size=total_test_size,
info=info + " class " + str(1),
output_dir=output_dir,
orig_space_dim=orig_space_dim,
)
# correct classification
plot_one_class_3(
umap_Y=umap_Y,
umap_T=umap_C,
train_Y=train_Y,
train_T=train_C,
test_Y=test_Y,
test_T=test_C,
target=0,
col="green",
total_train_size=total_train_size,
total_test_size=total_test_size,
info=info + " correct ",
output_dir=output_dir,
orig_space_dim=orig_space_dim,
)
# errors
plot_one_class_3(
umap_Y=umap_Y,
umap_T=umap_C,
train_Y=train_Y,
train_T=train_C,
test_Y=test_Y,
test_T=test_C,
target=1,
col="red",
total_train_size=total_train_size,
total_test_size=total_test_size,
info=info + " errors ",
output_dir=output_dir,
orig_space_dim=orig_space_dim,
)
# prediction 0
plot_one_class_3(
umap_Y=umap_Y,
umap_T=umap_P,
train_Y=train_Y,
train_T=train_P,
test_Y=test_Y,
test_T=test_P,
target=0,
col="red",
total_train_size=total_train_size,
total_test_size=total_test_size,
info=info + " predict-0 ",
output_dir=output_dir,
orig_space_dim=orig_space_dim,
)
# prediction 1
plot_one_class_3(
umap_Y=umap_Y,
umap_T=umap_P,
train_Y=train_Y,
train_T=train_P,
test_Y=test_Y,
test_T=test_P,
target=1,
col="green",
total_train_size=total_train_size,
total_test_size=total_test_size,
info=info + " predict-1 ",
output_dir=output_dir,
orig_space_dim=orig_space_dim,
)
def hdbscan_clustering(umap_data, train_data, test_data, info="", output_dir=""):
clusterer = hdbscan.HDBSCAN(
min_samples=10, min_cluster_size=500, prediction_data=True
)
umap_labels = clusterer.fit_predict(umap_data)
train_labels, _ = hdbscan.approximate_predict(clusterer, train_data)
test_labels, _ = hdbscan.approximate_predict(clusterer, test_data)
fig, ((ax00, ax01, ax02), (ax10, ax11, ax12)) = plt.subplots(2, 3)
fig.suptitle("HDBSCAN clastering: " + info)
# plot umap data
umap_clustered = umap_labels >= 0
umap_coll = collections.Counter(umap_clustered)
print("umap_clustered", umap_coll)
# print("umap_data", umap_data.shape)
# print("~umap_clustered", umap_clustered.count(False), ~umap_clustered)
ax00.scatter(
umap_data[~umap_clustered, 0],
umap_data[~umap_clustered, 1],
c=(0.5, 0.5, 0.5),
s=0.1,
alpha=0.5,
)
ax00.set_title("UMAP Outliers " + str(umap_coll[False]), fontsize=7)
ax10.scatter(
umap_data[umap_clustered, 0],
umap_data[umap_clustered, 1],
c=umap_labels[umap_clustered],
s=0.1,
cmap="Spectral",
)
ax10.set_title("UMAP Inliers " + str(umap_coll[True]), fontsize=7)
# plot train data
train_clustered = train_labels >= 0
train_coll = collections.Counter(train_clustered)
ax01.scatter(
train_data[~train_clustered, 0],
train_data[~train_clustered, 1],
c=(0.5, 0.5, 0.5),
s=0.1,
alpha=0.5,
)
ax01.set_title("Train Outliers " + str(train_coll[False]), fontsize=7)
ax11.scatter(
train_data[train_clustered, 0],
train_data[train_clustered, 1],
c=train_labels[train_clustered],
s=0.1,
cmap="Spectral",
)
ax11.set_title("Train Inliers " + str(train_coll[True]), fontsize=7)
# plot test data
test_clustered = test_labels >= 0
test_coll = collections.Counter(test_clustered)
ax02.scatter(
test_data[~test_clustered, 0],
test_data[~test_clustered, 1],
c=(0.5, 0.5, 0.5),
s=0.1,
alpha=0.5,
)
ax02.set_title("Tets Outliers " + str(test_coll[False]), fontsize=7)
ax12.scatter(
test_data[test_clustered, 0],
test_data[test_clustered, 1],
c=test_labels[test_clustered],
s=0.1,
cmap="Spectral",
)
ax12.set_title("Test Inliers " + str(test_coll[True]), fontsize=7)
plt.savefig(output_dir + "/" + info + "-hdbscan.png")
plt.close()
def visualize_all_data_umap(
dlrm,
train_ld,
test_ld=None,
max_umap_size=50000,
output_dir="",
umap_metric="euclidean",
):
data_ratio = 1
print("creating umap data")
(
umap_train_feat,
umap_train_X,
umap_train_cat,
umap_train_T,
umap_train_z,
umap_train_c,
umap_train_p,
) = create_umap_data(
dlrm=dlrm, data_ld=train_ld, max_size=max_umap_size, offset=0, info="umap"
)
# transform train and test data
(
train_feat,
train_X,
train_cat,
train_T,
train_z,
train_c,
train_p,
) = create_umap_data(
dlrm=dlrm,
data_ld=train_ld,
max_size=max_umap_size * data_ratio,
offset=max_umap_size,
info="train",
)
test_feat, test_X, test_cat, test_T, test_z, test_c, test_p = create_umap_data(
dlrm=dlrm,
data_ld=test_ld,
max_size=max_umap_size * data_ratio,
offset=0,
info="test",
)
print("umap_train_feat", np.array(umap_train_feat).shape)
reducer_all_feat = umap.UMAP(random_state=42, metric=umap_metric)
umap_feat_Y = reducer_all_feat.fit_transform(umap_train_feat)
train_feat_Y = reducer_all_feat.transform(train_feat)
test_feat_Y = reducer_all_feat.transform(test_feat)
visualize_umap_data(
umap_Y=umap_feat_Y,
umap_T=umap_train_T,
umap_C=umap_train_c,
umap_P=umap_train_p,
train_Y=train_feat_Y,
train_T=train_T,
train_C=train_c,
train_P=train_p,
test_Y=test_feat_Y,
test_T=test_T,
test_C=test_c,
test_P=test_p,
total_train_size=str(len(train_ld)),
total_test_size=str(len(test_ld)),
info="all-features",
output_dir=output_dir,
orig_space_dim=np.array(umap_train_feat).shape[1],
)
hdbscan_clustering(
umap_data=umap_feat_Y,
train_data=train_feat_Y,
test_data=test_feat_Y,
info="umap-all-features",
output_dir=output_dir,
)
# hdbscan_clustering(umap_data = np.array(umap_train_feat),
# train_data = np.array(train_feat),
# test_data = np.array(test_feat),
# info = "all-features",
# output_dir = output_dir)
print("umap_train_X", np.array(umap_train_X).shape)
reducer_X = umap.UMAP(random_state=42, metric=umap_metric)
umap_X_Y = reducer_X.fit_transform(umap_train_X)
train_X_Y = reducer_X.transform(train_X)
test_X_Y = reducer_X.transform(test_X)
visualize_umap_data(
umap_Y=umap_X_Y,
umap_T=umap_train_T,
umap_C=umap_train_c,
umap_P=umap_train_p,
train_Y=train_X_Y,
train_T=train_T,
train_C=train_c,
train_P=train_p,
test_Y=test_X_Y,
test_T=test_T,
test_C=test_c,
test_P=test_p,
total_train_size=str(len(train_ld)),
total_test_size=str(len(test_ld)),
info="cont-features",
output_dir=output_dir,
orig_space_dim=np.array(umap_train_X).shape[1],
)
print("umap_train_cat", np.array(umap_train_cat).shape)
reducer_cat = umap.UMAP(random_state=42, metric=umap_metric)
umap_cat_Y = reducer_cat.fit_transform(umap_train_cat)
train_cat_Y = reducer_cat.transform(train_cat)
test_cat_Y = reducer_cat.transform(test_cat)
visualize_umap_data(
umap_Y=umap_cat_Y,
umap_T=umap_train_T,
umap_C=umap_train_c,
umap_P=umap_train_p,
train_Y=train_cat_Y,
train_T=train_T,
train_C=train_c,
train_P=train_p,
test_Y=test_cat_Y,
test_T=test_T,
test_C=test_c,
test_P=test_p,
total_train_size=str(len(train_ld)),
total_test_size=str(len(test_ld)),
info="cat-features",
output_dir=output_dir,
orig_space_dim=np.array(umap_train_cat).shape[1],
)
# UMAP for z data
for i in range(0, len(umap_train_z)):
print("z", i, np.array(umap_train_z[i]).shape)
reducer_z = umap.UMAP(random_state=42, metric=umap_metric)
umap_z_Y = reducer_z.fit_transform(umap_train_z[i])
train_z_Y = reducer_z.transform(train_z[i])
test_z_Y = reducer_z.transform(test_z[i])
visualize_umap_data(
umap_Y=umap_z_Y,
umap_T=umap_train_T,
umap_C=umap_train_c,
umap_P=umap_train_p,
train_Y=train_z_Y,
train_T=train_T,
train_C=train_c,
train_P=train_p,
test_Y=test_z_Y,
test_T=test_T,
test_C=test_c,
test_P=test_p,
total_train_size=str(len(train_ld)),
total_test_size=str(len(test_ld)),
info="z-features-" + str(i),
output_dir=output_dir,
orig_space_dim=np.array(umap_train_z[i]).shape[1],
)
def analyze_model_data(
output_dir,
dlrm,
train_ld,
test_ld,
train_data,
skip_embedding=False,
use_tsne=False,
max_umap_size=50000,
max_tsne_size=10000,
skip_categorical_analysis=False,
skip_data_plots=False,
umap_metric="euclidean",
):
if not os.path.exists(output_dir):
os.makedirs(output_dir)
if skip_embedding is False:
cat_counts = None
cat_counts = analyse_categorical_counts(
X_cat=train_data.X_cat, emb_l=dlrm.emb_l, output_dir=output_dir
)
visualize_embeddings_umap(
emb_l=dlrm.emb_l,
output_dir=output_dir,
max_size=max_umap_size,
umap_metric=umap_metric,
cat_counts=cat_counts,
)
if use_tsne is True:
visualize_embeddings_tsne(
emb_l=dlrm.emb_l, output_dir=output_dir, max_size=max_tsne_size
)
# data visualization and analysis
if skip_data_plots is False:
visualize_all_data_umap(
dlrm=dlrm,
train_ld=train_ld,
test_ld=test_ld,
max_umap_size=max_umap_size,
output_dir=output_dir,
umap_metric=umap_metric,
)
# analyse categorical variables
if skip_categorical_analysis is False and args.data_randomize == "none":
analyse_categorical_data(
X_cat=train_data.X_cat, n_days=10, output_dir=output_dir
)
if __name__ == "__main__":
output_dir = ""
### parse arguments ###
parser = argparse.ArgumentParser(description="Exploratory DLRM analysis")
parser.add_argument("--load-model", type=str, default="")
parser.add_argument("--data-set", choices=["kaggle", "terabyte"], help="dataset")
# parser.add_argument("--dataset-path", required=True, help="path to the dataset")
parser.add_argument("--max-ind-range", type=int, default=-1)
# parser.add_argument("--mlperf-bin-loader", action="store_true", default=False)
parser.add_argument("--output-dir", type=str, default="")
parser.add_argument("--skip-embedding", action="store_true", default=False)
parser.add_argument("--umap-metric", type=str, default="euclidean")
parser.add_argument("--skip-data-plots", action="store_true", default=False)
parser.add_argument(
"--skip-categorical-analysis", action="store_true", default=False
)
# umap relatet
parser.add_argument("--max-umap-size", type=int, default=50000)
# tsne related
parser.add_argument("--use-tsne", action="store_true", default=False)
parser.add_argument("--max-tsne-size", type=int, default=1000)
# data file related
parser.add_argument("--raw-data-file", type=str, default="")
parser.add_argument("--processed-data-file", type=str, default="")
parser.add_argument("--data-sub-sample-rate", type=float, default=0.0) # in [0, 1]
parser.add_argument(
"--data-randomize", type=str, default="total"
) # none, total or day or none
parser.add_argument("--memory-map", action="store_true", default=False)
parser.add_argument("--mini-batch-size", type=int, default=1)
parser.add_argument("--num-workers", type=int, default=0)
parser.add_argument("--test-mini-batch-size", type=int, default=1)
parser.add_argument("--test-num-workers", type=int, default=0)
parser.add_argument("--num-batches", type=int, default=0)
# mlperf logging (disables other output and stops early)
parser.add_argument("--mlperf-logging", action="store_true", default=False)
args = parser.parse_args()
print("command line args: ", json.dumps(vars(args)))
if output_dir == "":
output_dir = (
args.data_set + "-" + os.path.split(args.load_model)[-1] + "-vis_all"
)
print("output_dir:", output_dir)
if args.data_set == "kaggle":
# 1. Criteo Kaggle Display Advertisement Challenge Dataset (see ./bench/dlrm_s_criteo_kaggle.sh)
m_spa = 16
ln_emb = np.array(
[
1460,
583,
10131227,
2202608,
305,
24,
12517,
633,
3,
93145,
5683,
8351593,
3194,
27,
14992,
5461306,
10,
5652,
2173,
4,
7046547,
18,
15,
286181,
105,
142572,
]
)
ln_bot = np.array([13, 512, 256, 64, 16])
ln_top = np.array([367, 512, 256, 1])
elif args.dataset == "terabyte":
if args.max_ind_range == 10000000:
# 2. Criteo Terabyte (see ./bench/dlrm_s_criteo_terabyte.sh [--sub-sample=0.875] --max-in-range=10000000)
m_spa = 64
ln_emb = np.array(
[
9980333,
36084,
17217,
7378,
20134,
3,
7112,
1442,
61,
9758201,
1333352,
313829,
10,
2208,
11156,
122,
4,
970,
14,
9994222,
7267859,
9946608,
415421,
12420,
101,
36,
]
)
ln_bot = np.array([13, 512, 256, 64])
ln_top = np.array([415, 512, 512, 256, 1])
elif args.max_ind_range == 40000000:
# 3. Criteo Terabyte MLPerf training (see ./bench/run_and_time.sh --max-in-range=40000000)
m_spa = 128
ln_emb = np.array(
[
39884406,
39043,
17289,
7420,
20263,
3,
7120,
1543,
63,
38532951,
2953546,
403346,
10,
2208,
11938,
155,
4,
976,
14,
39979771,
25641295,
39664984,
585935,
12972,
108,
36,
]
)
ln_bot = np.array([13, 512, 256, 128])
ln_top = np.array([479, 1024, 1024, 512, 256, 1])
else:
raise ValueError("only --max-in-range 10M or 40M is supported")
else:
raise ValueError("only kaggle|terabyte dataset options are supported")
# check input parameters
if args.data_randomize != "none" and args.skip_categorical_analysis is not True:
print("Incorrect option for categoricat analysis, use: --data-randomize=none")
sys.exit(-1)
dlrm = DLRM_Net(
m_spa,
ln_emb,
ln_bot,
ln_top,
arch_interaction_op="dot",
arch_interaction_itself=False,
sigmoid_bot=-1,
sigmoid_top=ln_top.size - 2,
sync_dense_params=True,
loss_threshold=0.0,
ndevices=-1,
qr_flag=False,
qr_operation=None,
qr_collisions=None,
qr_threshold=None,
md_flag=False,
md_threshold=None,
)
# Load model is specified
if not (args.load_model == ""):
print("Loading saved model {}".format(args.load_model))
ld_model = torch.load(args.load_model, map_location=torch.device("cpu"))
dlrm.load_state_dict(ld_model["state_dict"])
print("Model loaded", args.load_model)
# print(dlrm)
z_size = len(dlrm.top_l)
for i in range(0, z_size):
print("z", i, dlrm.top_l[i])
# load data
train_data = None
test_data = None
if args.raw_data_file is not "" or args.processed_data_file is not "":
train_data, train_ld, test_data, test_ld = dp.make_criteo_data_and_loaders(args)
analyze_model_data(
output_dir=output_dir,
dlrm=dlrm,
train_ld=train_ld,
test_ld=test_ld,
train_data=train_data,
skip_embedding=args.skip_embedding,
use_tsne=args.use_tsne,
max_umap_size=args.max_umap_size,
max_tsne_size=args.max_tsne_size,
skip_categorical_analysis=args.skip_categorical_analysis,
skip_data_plots=args.skip_data_plots,
umap_metric=args.umap_metric,
)
ARG FROM_IMAGE_NAME=pytorch/pytorch:1.13.1-cuda11.6-cudnn8-runtime
FROM ${FROM_IMAGE_NAME}
WORKDIR /workspace/torchrec_dlrm
COPY . .
RUN pip install --no-cache-dir -r requirements.txt
# TorchRec DLRM Example
`dlrm_main.py` trains, validates, and tests a [Deep Learning Recommendation Model](https://arxiv.org/abs/1906.00091) (DLRM) with TorchRec. The DLRM model contains both data parallel components (e.g. multi-layer perceptrons & interaction arch) and model parallel components (e.g. embedding tables). The DLRM model is pipelined so that dataloading, data-parallel to model-parallel comms, and forward/backward are overlapped. Can be run with either a random dataloader or [Criteo 1 TB click logs dataset](https://ailab.criteo.com/download-criteo-1tb-click-logs-dataset/).
It has been tested on the following cloud instance types:
| Cloud | Instance Type | GPUs | vCPUs | Memory (GB) |
| ------ | ------------------- | ---------------- | ----- | ----------- |
| AWS | p4d.24xlarge | 8 x A100 (40GB) | 96 | 1152 |
| Azure | Standard_ND96asr_v4 | 8 x A100 (40GB) | 96 | 900 |
| GCP | a2-megagpu-16g | 16 x A100 (40GB) | 96 | 1300 |
A basic understanding of [TorchRec](https://github.com/pytorch/torchrec) will help in understanding `dlrm_main.py`. See this [tutorial](https://pytorch.org/tutorials/intermediate/torchrec_tutorial.html).
# Running
## Install dependencies
`pip install tqdm torchmetrics`
## Torchx
We recommend using [torchx](https://pytorch.org/torchx/main/quickstart.html) to run. Here we use the [DDP builtin](https://pytorch.org/torchx/main/components/distributed.html)
1. pip install torchx
2. (optional) setup a slurm or kubernetes cluster
3.
a. locally: `torchx run -s local_cwd dist.ddp -j 1x2 --script dlrm_main.py`
b. remotely: `torchx run -s slurm dist.ddp -j 1x8 --script dlrm_main.py`
## TorchRun
You can also use [torchrun](https://pytorch.org/docs/stable/elastic/run.html).
* e.g. `torchrun --nnodes 1 --nproc_per_node 2 --rdzv_backend c10d --rdzv_endpoint localhost --rdzv_id 54321 --role trainer dlrm_main.py`
## Preliminary Training Results
**Setup:**
* Dataset: Criteo 1TB Click Logs dataset
* CUDA 11.0, NCCL 2.10.3.
* AWS p4d24xlarge instances, each with 8 40GB NVIDIA A100s.
**Results**
Common settings across all runs:
```
--num_embeddings_per_feature 40000000,39060,17295,7424,20265,3,7122,1543,63,40000000,3067956,405282,10,2209,11938,155,4,976,14,40000000,40000000,40000000,590152,12973,108,36 --embedding_dim 128 --pin_memory --over_arch_layer_sizes 1024,1024,512,256,1 --dense_arch_layer_sizes 512,256,128 --epochs 1
```
|Number of GPUs|Collective Size of Embedding Tables (GiB)|Local Batch Size|Global Batch Size|Learning Rate|Interaction Type|Optimizer|AUROC over Val Set After 1 Epoch|AUROC Over Test Set After 1 Epoch|Training speed|Time to Train 1 Epoch|Unique Flags|
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
|8|104.54|256|2,048|1.0|Dot product interaction|SGD|0.8032|0.8030|~100.0 batches/s == ~204,800 samples/s|6h30m08s |`--batch_size 256 --learning_rate 1.0`|
|8|104.54|2,048|16,384|0.006|Dot product interaction|Adagrad|0.8021|0.7959|~56.5 batches/s == ~925,696 samples/s|1h16m15s |`--batch_size 2048 --learning_rate 0.006 --adagrad` |
|8|104.54|2,048|16,384|0.006|DCN v2|Adagrad|0.8035|0.7973|~55.0 batches/s == ~901,120 samples/s|1h20m21s |`--batch_size 2048 --learning_rate 0.006 --adagrad --interaction_type=dcn` |
|8|104.54|16,384|131,072|0.006|DCN v2|Adagrad|0.8025|0.7963|~9.08 batches/s == ~1,190,128 samples/s|58m 49s |`--batch_size 16384 --learning_rate 0.006 --adagrad --interaction_type=dcn`|
Training speed is calculated using the formula: `average it/s * local batch size * number of GPUs used`. The benchmark displays `it/s` measurements
during the run.
**Reproduce**
Run the following command to reproduce the results for a single node (8 GPUs) on AWS. This command makes use of the `aws_component.py` script.
Ensure to:
- set $PATH_TO_1TB_NUMPY_FILES to the path with the pre-processed .npy files of the Criteo 1TB dataset.
- set $TRAIN_QUEUE to the partition that handles training jobs
**NVTabular**
For an alternative way of preprocessing the dataset using NVTabular, which can decrease the time required from several days to just hours. See the run instructions [here] (https://github.com/pytorch/torchrec/tree/main/examples/nvt_dataloader).
Preprocessing command (numpy):
After downloading and uncompressing the [Criteo 1TB Click Logs dataset](consisting of 24 files from [day 0](https://storage.googleapis.com/criteo-cail-datasets/day_0.gz) to [day 23](https://storage.googleapis.com/criteo-cail-datasets/day_23.gz)), process the raw tsv files into the proper format for training by running `./scripts/process_Criteo_1TB_Click_Logs_dataset.sh` with necessary command line arguments.
Example usage:
```
bash ./scripts/process_Criteo_1TB_Click_Logs_dataset.sh \
./criteo_1tb/raw_input_dataset_dir \
./criteo_1tb/temp_intermediate_files_dir \
./criteo_1tb/numpy_contiguous_shuffled_output_dataset_dir
```
The script requires 700GB of RAM and takes 1-2 days to run. We currently have features in development to reduce the preproccessing time and memory overhead.
MD5 checksums of the expected final preprocessed dataset files are in md5sums_preprocessed_criteo_click_logs_dataset.txt.
We are working on improving this experience, for updates about this see https://github.com/pytorch/torchrec/tree/main/examples/nvt_dataloader
Example command:
```
torchx run --scheduler slurm --scheduler_args partition=$TRAIN_QUEUE,time=5:00:00 aws_component.py:run_dlrm_main --num_trainers=8 -- --pin_memory --batch_size 2048 --epochs 1 --num_embeddings_per_feature "45833188,36746,17245,7413,20243,3,7114,1441,62,29275261,1572176,345138,10,2209,11267,128,4,974,14,48937457,11316796,40094537,452104,12606,104,35" --embedding_dim 128 --dense_arch_layer_sizes 512,256,128 --over_arch_layer_sizes 1024,1024,512,256,1 --in_memory_binary_criteo_path $PATH_TO_1TB_NUMPY_FILES --learning_rate 15.0
```
Upon scheduling the job, there should be an output that looks like this:
```
warnings.warn(
slurm://torchx/14731
torchx 2022-01-07 21:06:59 INFO Launched app: slurm://torchx/14731
torchx 2022-01-07 21:06:59 INFO AppStatus:
msg: ''
num_restarts: -1
roles: []
state: UNKNOWN (7)
structured_error_msg: <NONE>
ui_url: null
torchx 2022-01-07 21:06:59 INFO Job URL: None
```
In this example, the job was launched to: `slurm://torchx/14731`.
Run the following commands to check the status of your job and read the logs:
```
# Status should be "RUNNING" if properly scheduled
torchx status slurm://torchx/14731
# Log file was automatically created in the directory where you launched the job from
cat slurm-14731.out
```
The results from the training can be found in the log file (e.g. `slurm-14731.out`).
**Debugging**
The `--validation_freq_within_epoch x` parameter can be used to print the AUROC every `x` iterations through an epoch.
The in-memory dataloader can take approximately 20-30 minutes to load the data into memory before training starts. The
`--mmap_mode` parameter can be used to load data from disk which reduces start-up time for training at the cost
of QPS.
**Inference**
A module which can be used for DLRM inference exists [here](https://github.com/pytorch/torchrec/blob/main/examples/inference/dlrm_predict.py#L49). Please see the [TorchRec inference examples](https://github.com/pytorch/torchrec/tree/main/examples/inference) for more information.
# Running the MLPerf DLRM v2 benchmark
## Create the synthetic multi-hot dataset
### Step 1: Download and uncompressing the [Criteo 1TB Click Logs dataset](https://storage.googleapis.com/criteo-cail-datasets/day_{0-23}.gz)
### Step 2: Run the 1TB Criteo Preprocess script.
Example usage:
```
bash ./scripts/process_Criteo_1TB_Click_Logs_dataset.sh \
./criteo_1tb/raw_input_dataset_dir \
./criteo_1tb/temp_intermediate_files_dir \
./criteo_1tb/numpy_contiguous_shuffled_output_dataset_dir
```
The script requires 700GB of RAM and takes 1-2 days to run. MD5 checksums for the output dataset files are in md5sums_preprocessed_criteo_click_logs_dataset.txt.
### Step 3: Run the `materialize_synthetic_multihot_dataset.py` script
#### Single-process version:
```
python materialize_synthetic_multihot_dataset.py \
--in_memory_binary_criteo_path $PREPROCESSED_CRITEO_1TB_CLICK_LOGS_DATASET_PATH \
--output_path $MATERIALIZED_DATASET_PATH \
--num_embeddings_per_feature 40000000,39060,17295,7424,20265,3,7122,1543,63,40000000,3067956,405282,10,2209,11938,155,4,976,14,40000000,40000000,40000000,590152,12973,108,36 \
--multi_hot_sizes 3,2,1,2,6,1,1,1,1,7,3,8,1,6,9,5,1,1,1,12,100,27,10,3,1,1 \
--multi_hot_distribution_type uniform
```
#### Multiple-processes version:
```
torchx run -s local_cwd dist.ddp -j 1x8 --script -- materialize_synthetic_multihot_dataset.py -- \
--in_memory_binary_criteo_path $PREPROCESSED_CRITEO_1TB_CLICK_LOGS_DATASET_PATH \
--output_path $MATERIALIZED_DATASET_PATH \
--num_embeddings_per_feature 40000000,39060,17295,7424,20265,3,7122,1543,63,40000000,3067956,405282,10,2209,11938,155,4,976,14,40000000,40000000,40000000,590152,12973,108,36 \
--multi_hot_sizes 3,2,1,2,6,1,1,1,1,7,3,8,1,6,9,5,1,1,1,12,100,27,10,3,1,1 \
--multi_hot_distribution_type uniform
```
### Run the MLPerf DLRM v2 benchmark, which uses the materialized multi-hot dataset
Example running 8 GPUs:
```
export MULTIHOT_PREPROCESSED_DATASET=$your_path_here
export TOTAL_TRAINING_SAMPLES=4195197692 ;
export GLOBAL_BATCH_SIZE=65536 ;
export WORLD_SIZE=8 ;
torchx run -s local_cwd dist.ddp -j 1x8 --script dlrm_main.py -- \
--embedding_dim 128 \
--dense_arch_layer_sizes 512,256,128 \
--over_arch_layer_sizes 1024,1024,512,256,1 \
--synthetic_multi_hot_criteo_path $MULTIHOT_PREPROCESSED_DATASET \
--num_embeddings_per_feature 40000000,39060,17295,7424,20265,3,7122,1543,63,40000000,3067956,405282,10,2209,11938,155,4,976,14,40000000,40000000,40000000,590152,12973,108,36 \
--validation_freq_within_epoch $((TOTAL_TRAINING_SAMPLES / (GLOBAL_BATCH_SIZE * 20))) \
--epochs 1 \
--pin_memory \
--mmap_mode \
--batch_size $((GLOBAL_BATCH_SIZE / WORLD_SIZE)) \
--interaction_type=dcn \
--dcn_num_layers=3 \
--dcn_low_rank_dim=512 \
--adagrad \
--learning_rate 0.005
```
Note: The proposed target AUROC to reach within one epoch is 0.8030.
## (Alternative method that trains multi-hot data generated on-the-fly)
It is possible to use the 1-hot preprocessed dataset (the output of `./scripts/process_Criteo_1TB_Click_Logs_dataset.sh`) to create the synthetic multi-hot data on-the-fly during training. This is useful if your system does not have the space to store the 3.8 TB materialized multi-hot dataset. Example run command:
```
export PREPROCESSED_DATASET=$insert_your_path_here
export TOTAL_TRAINING_SAMPLES=4195197692 ;
export BATCHSIZE=65536 ;
export WORLD_SIZE=8 ;
torchx run -s local_cwd dist.ddp -j 1x8 --script dlrm_main.py -- \
--embedding_dim 128 \
--dense_arch_layer_sizes 512,256,128 \
--over_arch_layer_sizes 1024,1024,512,256,1 \
--in_memory_binary_criteo_path $PREPROCESSED_DATASET \
--num_embeddings_per_feature 40000000,39060,17295,7424,20265,3,7122,1543,63,40000000,3067956,405282,10,2209,11938,155,4,976,14,40000000,40000000,40000000,590152,12973,108,36 \
--validation_freq_within_epoch $((TOTAL_TRAINING_SAMPLES / (BATCHSIZE * 20))) \
--epochs 1 \
--pin_memory \
--mmap_mode \
--batch_size $((GLOBAL_BATCH_SIZE / WORLD_SIZE)) \
--interaction_type=dcn \
--dcn_num_layers=3 \
--dcn_low_rank_dim=512 \
--adagrad \
--learning_rate 0.005 \
--multi_hot_distribution_type uniform \
--multi_hot_sizes=3,2,1,2,6,1,1,1,1,7,3,8,1,6,9,5,1,1,1,12,100,27,10,3,1,1
```
# Replicating the MLPerf DLRM v1 benchmark using the [TorchRec-based implementation](./torchrec_dlrm/dlrm_main.py)
## Create the 1-hot preprocessed dataset
### Step 1: [Download](./torchrec_dlrm/scripts/download_Criteo_1TB_Click_Logs_dataset.sh) and uncompressing the Criteo 1TB Click Logs dataset (24 files from [day 0](https://storage.googleapis.com/criteo-cail-datasets/day_0.gz) to [day 23](https://storage.googleapis.com/criteo-cail-datasets/day_23.gz))
### Step 2: Run the 1TB Criteo Preprocess script.
Example usage:
```
bash ./scripts/process_Criteo_1TB_Click_Logs_dataset.sh \
./criteo_1tb/raw_input_dataset_dir \
./criteo_1tb/temp_intermediate_files_dir \
./criteo_1tb/numpy_contiguous_shuffled_output_dataset_dir
```
The script requires 700GB of RAM and takes 1-2 days to run. MD5 checksums for the output dataset files are in md5sums_preprocessed_criteo_click_logs_dataset.txt.
## Run the TorchRec-based implementation with the MLPerf DLRM v1 benchmark settings
Example running 8 GPUs:
```
export PREPROCESSED_DATASET=$insert_your_path_here
export TOTAL_TRAINING_SAMPLES=4195197692 ;
export GLOBAL_BATCH_SIZE=16384 ;
export WORLD_SIZE=8 ;
torchx run -s local_cwd dist.ddp -j 1x8 --script dlrm_main.py -- \
--embedding_dim 128 \
--dense_arch_layer_sizes 512,256,128 \
--over_arch_layer_sizes 1024,1024,512,256,1 \
--in_memory_binary_criteo_path $PREPROCESSED_DATASET \
--num_embeddings_per_feature 40000000,39060,17295,7424,20265,3,7122,1543,63,40000000,3067956,405282,10,2209,11938,155,4,976,14,40000000,40000000,40000000,590152,12973,108,36 \
--validation_freq_within_epoch $((TOTAL_TRAINING_SAMPLES / (GLOBAL_BATCH_SIZE * 20))) \
--epochs 1 \
--pin_memory \
--mmap_mode \
--batch_size $((GLOBAL_BATCH_SIZE / WORLD_SIZE)) \
--learning_rate 1.0
```
## Comparison of MLPerf DLRM Benchmark Settings: v1 vs. v2:
||v1|v2|
| --- | --- | --- |
|Optimizer|SGD|Adagrad|
|Learning rate|1.0|0.005|
|Batch size|16384|65536|
|Interaction type|Dot product|DCN v2|
|Benchmark Script|[v1](https://github.com/facebookresearch/dlrm/blob/mlperf/dlrm_s_pytorch.py)|[v2 (using TorchRec)](./torchrec_dlrm/dlrm_main.py)|
|Dataset preprocessing scripts/instructions|[v1](https://github.com/facebookresearch/dlrm/blob/main/data_utils.py)|[v2](https://github.com/facebookresearch/dlrm/tree/main/torchrec_dlrm#create-the-synthetic-multi-hot-dataset)|
|Synthetically-generated multi-hot sparse features|No (Uses 1-hot sparse features) |Yes (synthetically-generatated multi-hot sparse features generated from the original 1-hot sparse features)|
# Criteo Kaggle Display Advertising Challenge dataset usage.
### Preliminary
- Python >= 3.9
- Cuda >= 12.0
### Setup environment
Install PyTorch nightly version
```bash
pip install torch --index-url https://download.pytorch.org/whl/nightly/cu126
```
Install FBGEMM-GPU
```bash
pip install fbgemm-gpu --index-url https://download.pytorch.org/whl/nightly/cu126
```
Install torchrec from local build
```bash
git clone https://github.com/pytorch/torchrec.git
python -m pip install -e torchrec
```
Install additional dependencies
```bash
pip install -r requirements.txt
```
### Download the dataset.
```
wget http://go.criteo.net/criteo-research-kaggle-display-advertising-challenge-dataset.tar.gz
```
### Uncompress
```
tar zxvf criteo-research-kaggle-display-advertising-challenge-dataset.tar.gz
```
### Preprocess the dataset to numpy files.
```
python -m torchrec.datasets.scripts.npy_preproc_criteo --input_dir $INPUT_PATH --output_dir $OUTPUT_PATH --dataset_name criteo_kaggle
```
### Run the benchmark.
```
export PREPROCESSED_DATASET=$insert_your_path_here
export GLOBAL_BATCH_SIZE=16384 ;
export WORLD_SIZE=8 ;
export LEARNING_RATE=0.5 ;
torchx run -s local_cwd dist.ddp -j 1x8 --script dlrm_main.py -- \
--in_memory_binary_criteo_path $PREPROCESSED_DATASET \
--pin_memory \
--mmap_mode \
--batch_size $((GLOBAL_BATCH_SIZE / WORLD_SIZE)) \
--learning_rate $LEARNING_RATE \
--dataset_name criteo_kaggle \
--num_embeddings_per_feature 40000000,39060,17295,7424,20265,3,7122,1543,63,40000000,3067956,405282,10,2209,11938,155,4,976,14,40000000,40000000,40000000,590152,12973,108,36 \
--embedding_dim 128 \
--over_arch_layer_sizes 1024,1024,512,256,1 \
--dense_arch_layer_sizes 512,256,128 \
--epochs 1 \
--validation_freq_within_epoch 12802
```
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import os
import torchx.specs as specs
from torchx.components.dist import ddp
def run_dlrm_main(num_trainers: int = 8, *script_args: str) -> specs.AppDef:
"""
Args:
num_trainers: The number of trainers to use.
script_args: A variable number of parameters to provide dlrm_main.py.
"""
cwd = os.getcwd()
entrypoint = os.path.join(cwd, "dlrm_main.py")
user = os.environ.get("USER")
image = f"/data/home/{user}"
if num_trainers > 8 and num_trainers % 8 != 0:
raise ValueError(
"Trainer jobs spanning multiple hosts must be in multiples of 8."
)
nproc_per_node = 8 if num_trainers >= 8 else num_trainers
num_replicas = max(num_trainers // 8, 1)
return ddp(
*script_args,
name="train_dlrm",
image=image,
# AWS p4d instance (https://aws.amazon.com/ec2/instance-types/p4/).
cpu=96,
gpu=8,
memMB=-1,
script=entrypoint,
j=f"{num_replicas}x{nproc_per_node}",
)
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import argparse
import os
from torch import distributed as dist
from torch.utils.data import DataLoader
from torchrec.datasets.criteo import (
CAT_FEATURE_COUNT,
DAYS,
DEFAULT_CAT_NAMES,
DEFAULT_INT_NAMES,
InMemoryBinaryCriteoIterDataPipe,
)
from torchrec.datasets.random import RandomRecDataset
# OSS import
try:
# pyre-ignore[21]
# @manual=//ai_codesign/benchmarks/dlrm/torchrec_dlrm/data:multi_hot_criteo
from data.multi_hot_criteo import MultiHotCriteoIterDataPipe
except ImportError:
pass
# internal import
try:
from .multi_hot_criteo import MultiHotCriteoIterDataPipe # noqa F811
except ImportError:
pass
STAGES = ["train", "val", "test"]
def _get_random_dataloader(
args: argparse.Namespace,
stage: str,
) -> DataLoader:
attr = f"limit_{stage}_batches"
num_batches = getattr(args, attr)
if stage in ["val", "test"] and args.test_batch_size is not None:
batch_size = args.test_batch_size
else:
batch_size = args.batch_size
return DataLoader(
RandomRecDataset(
keys=DEFAULT_CAT_NAMES,
batch_size=batch_size,
hash_size=args.num_embeddings,
hash_sizes=(
args.num_embeddings_per_feature
if hasattr(args, "num_embeddings_per_feature")
else None
),
manual_seed=getattr(args, "seed", None),
ids_per_feature=1,
num_dense=len(DEFAULT_INT_NAMES),
num_batches=num_batches,
),
batch_size=None,
batch_sampler=None,
pin_memory=args.pin_memory,
num_workers=0,
)
def _get_in_memory_dataloader(
args: argparse.Namespace,
stage: str,
) -> DataLoader:
if args.in_memory_binary_criteo_path is not None:
dir_path = args.in_memory_binary_criteo_path
sparse_part = "sparse.npy"
datapipe = InMemoryBinaryCriteoIterDataPipe
else:
dir_path = args.synthetic_multi_hot_criteo_path
sparse_part = "sparse_multi_hot.npz"
datapipe = MultiHotCriteoIterDataPipe
if args.dataset_name == "criteo_kaggle":
# criteo_kaggle has no validation set, so use 2nd half of training set for now.
# Setting stage to "test" will get the 2nd half of the dataset.
# Setting root_name to "train" reads from the training set file.
(root_name, stage) = (
("train", "train") if stage == "train" else ("train", "test")
)
stage_files: list[list[str]] = [
[os.path.join(dir_path, f"{root_name}_dense.npy")],
[os.path.join(dir_path, f"{root_name}_{sparse_part}")],
[os.path.join(dir_path, f"{root_name}_labels.npy")],
]
# criteo_1tb code path uses below two conditionals
elif stage == "train":
stage_files: list[list[str]] = [
[os.path.join(dir_path, f"day_{i}_dense.npy") for i in range(DAYS - 1)],
[os.path.join(dir_path, f"day_{i}_{sparse_part}") for i in range(DAYS - 1)],
[os.path.join(dir_path, f"day_{i}_labels.npy") for i in range(DAYS - 1)],
]
elif stage in ["val", "test"]:
stage_files: list[list[str]] = [
[os.path.join(dir_path, f"day_{DAYS-1}_dense.npy")],
[os.path.join(dir_path, f"day_{DAYS-1}_{sparse_part}")],
[os.path.join(dir_path, f"day_{DAYS-1}_labels.npy")],
]
if stage in ["val", "test"] and args.test_batch_size is not None:
batch_size = args.test_batch_size
else:
batch_size = args.batch_size
dataloader = DataLoader(
datapipe(
stage,
*stage_files, # pyre-ignore[6]
batch_size=batch_size,
rank=dist.get_rank(),
world_size=dist.get_world_size(),
drop_last=args.drop_last_training_batch if stage == "train" else False,
shuffle_batches=args.shuffle_batches,
shuffle_training_set=args.shuffle_training_set,
shuffle_training_set_random_seed=args.seed,
mmap_mode=args.mmap_mode,
hashes=(
args.num_embeddings_per_feature
if args.num_embeddings is None
else ([args.num_embeddings] * CAT_FEATURE_COUNT)
),
),
batch_size=None,
pin_memory=args.pin_memory,
collate_fn=lambda x: x,
)
return dataloader
def get_dataloader(args: argparse.Namespace, backend: str, stage: str) -> DataLoader:
"""
Gets desired dataloader from dlrm_main command line options. Currently, this
function is able to return either a DataLoader wrapped around a RandomRecDataset or
a Dataloader wrapped around an InMemoryBinaryCriteoIterDataPipe.
Args:
args (argparse.Namespace): Command line options supplied to dlrm_main.py's main
function.
backend (str): "nccl" or "gloo".
stage (str): "train", "val", or "test".
Returns:
dataloader (DataLoader): PyTorch dataloader for the specified options.
"""
stage = stage.lower()
if stage not in STAGES:
raise ValueError(f"Supplied stage was {stage}. Must be one of {STAGES}.")
args.pin_memory = (
(backend == "nccl") if not hasattr(args, "pin_memory") else args.pin_memory
)
if (
args.in_memory_binary_criteo_path is None
and args.synthetic_multi_hot_criteo_path is None
):
return _get_random_dataloader(args, stage)
else:
return _get_in_memory_dataloader(args, stage)
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import zipfile
from collections.abc import Iterator
import numpy as np
import torch
from iopath.common.file_io import PathManager, PathManagerFactory
from pyre_extensions import none_throws
from torch.utils.data import IterableDataset
from torchrec.datasets.criteo import CAT_FEATURE_COUNT, DEFAULT_CAT_NAMES
from torchrec.datasets.utils import Batch, PATH_MANAGER_KEY
from torchrec.sparse.jagged_tensor import KeyedJaggedTensor
class MultiHotCriteoIterDataPipe(IterableDataset):
"""
Datapipe designed to operate over the MLPerf DLRM v2 synthetic multi-hot dataset.
This dataset can be created by following the steps in
torchrec_dlrm/scripts/materialize_synthetic_multihot_dataset.py.
Each rank reads only the data for the portion of the dataset it is responsible for.
Args:
stage (str): "train", "val", or "test".
dense_paths (List[str]): List of path strings to dense npy files.
sparse_paths (List[str]): List of path strings to multi-hot sparse npz files.
labels_paths (List[str]): List of path strings to labels npy files.
batch_size (int): batch size.
rank (int): rank.
world_size (int): world size.
drop_last (Optional[bool]): Whether to drop the last batch if it is incomplete.
shuffle_batches (bool): Whether to shuffle batches
shuffle_training_set (bool): Whether to shuffle all samples in the dataset.
shuffle_training_set_random_seed (int): The random generator seed used when
shuffling the training set.
hashes (Optional[int]): List of max categorical feature value for each feature.
Length of this list should be CAT_FEATURE_COUNT.
path_manager_key (str): Path manager key used to load from different
filesystems.
Example::
datapipe = MultiHotCriteoIterDataPipe(
dense_paths=["day_0_dense.npy"],
sparse_paths=["day_0_sparse_multi_hot.npz"],
labels_paths=["day_0_labels.npy"],
batch_size=1024,
rank=torch.distributed.get_rank(),
world_size=torch.distributed.get_world_size(),
)
batch = next(iter(datapipe))
"""
def __init__(
self,
stage: str,
dense_paths: list[str],
sparse_paths: list[str],
labels_paths: list[str],
batch_size: int,
rank: int,
world_size: int,
drop_last: bool | None = False,
shuffle_batches: bool = False,
shuffle_training_set: bool = False,
shuffle_training_set_random_seed: int = 0,
mmap_mode: bool = False,
hashes: list[int] | None = None,
path_manager_key: str = PATH_MANAGER_KEY,
) -> None:
self.stage = stage
self.dense_paths = dense_paths
self.sparse_paths = sparse_paths
self.labels_paths = labels_paths
self.batch_size = batch_size
self.rank = rank
self.world_size = world_size
self.drop_last = drop_last
self.shuffle_batches = shuffle_batches
self.shuffle_training_set = shuffle_training_set
np.random.seed(shuffle_training_set_random_seed)
self.mmap_mode = mmap_mode
# hashes are not used because they were already applied in the
# script that generates the multi-hot dataset.
self.hashes: np.ndarray = np.array(hashes).reshape((CAT_FEATURE_COUNT, 1))
self.path_manager_key = path_manager_key
self.path_manager: PathManager = PathManagerFactory().get(path_manager_key)
if shuffle_training_set and stage == "train":
# Currently not implemented for the materialized multi-hot dataset.
self._shuffle_and_load_data_for_rank()
else:
m = "r" if mmap_mode else None
self.dense_arrs: list[np.ndarray] = [
np.load(f, mmap_mode=m) for f in self.dense_paths
]
self.labels_arrs: list[np.ndarray] = [
np.load(f, mmap_mode=m) for f in self.labels_paths
]
self.sparse_arrs: list = []
for sparse_path in self.sparse_paths:
multi_hot_ids_l = []
for feat_id_num in range(CAT_FEATURE_COUNT):
multi_hot_ft_ids = self._load_from_npz(
sparse_path, f"{feat_id_num}.npy"
)
multi_hot_ids_l.append(multi_hot_ft_ids)
self.sparse_arrs.append(multi_hot_ids_l)
len_d0 = len(self.dense_arrs[0])
second_half_start_index = int(len_d0 // 2 + len_d0 % 2)
if stage == "val":
self.dense_arrs[0] = self.dense_arrs[0][:second_half_start_index, :]
self.labels_arrs[0] = self.labels_arrs[0][:second_half_start_index, :]
self.sparse_arrs[0] = [
feats[:second_half_start_index, :] for feats in self.sparse_arrs[0]
]
elif stage == "test":
self.dense_arrs[0] = self.dense_arrs[0][second_half_start_index:, :]
self.labels_arrs[0] = self.labels_arrs[0][second_half_start_index:, :]
self.sparse_arrs[0] = [
feats[second_half_start_index:, :] for feats in self.sparse_arrs[0]
]
# When mmap_mode is enabled, sparse features are hashed when
# samples are batched in def __iter__. Otherwise, the dataset has been
# preloaded with sparse features hashed in the preload stage, here:
# if not self.mmap_mode and self.hashes is not None:
# for k, _ in enumerate(self.sparse_arrs):
# self.sparse_arrs[k] = [
# feat % hash
# for (feat, hash) in zip(self.sparse_arrs[k], self.hashes)
# ]
self.num_rows_per_file: list[int] = list(map(len, self.dense_arrs))
total_rows = sum(self.num_rows_per_file)
self.num_full_batches: int = (
total_rows // batch_size // self.world_size * self.world_size
)
self.last_batch_sizes: np.ndarray = np.array(
[0 for _ in range(self.world_size)]
)
remainder = total_rows % (self.world_size * batch_size)
if not self.drop_last and 0 < remainder:
if remainder < self.world_size:
self.num_full_batches -= self.world_size
self.last_batch_sizes += batch_size
else:
self.last_batch_sizes += remainder // self.world_size
self.last_batch_sizes[: remainder % self.world_size] += 1
self.multi_hot_sizes: list[int] = [
multi_hot_feat.shape[-1] for multi_hot_feat in self.sparse_arrs[0]
]
# These values are the same for the KeyedJaggedTensors in all batches, so they
# are computed once here. This avoids extra work from the KeyedJaggedTensor sync
# functions.
self.keys: list[str] = DEFAULT_CAT_NAMES
self.index_per_key: dict[str, int] = {
key: i for (i, key) in enumerate(self.keys)
}
def _load_from_npz(self, fname, npy_name):
# figure out offset of .npy in .npz
zf = zipfile.ZipFile(fname)
info = zf.NameToInfo[npy_name]
assert info.compress_type == 0
zf.fp.seek(info.header_offset + len(info.FileHeader()) + 20)
# read .npy header
zf.open(npy_name, "r")
version = np.lib.format.read_magic(zf.fp)
shape, fortran_order, dtype = np.lib.format._read_array_header(zf.fp, version)
assert (
dtype == "int32"
), f"sparse multi-hot dtype is {dtype} but should be int32"
offset = zf.fp.tell()
# create memmap
return np.memmap(
zf.filename,
dtype=dtype,
shape=shape,
order="F" if fortran_order else "C",
mode="r",
offset=offset,
)
def _np_arrays_to_batch(
self,
dense: np.ndarray,
sparse: list[np.ndarray],
labels: np.ndarray,
) -> Batch:
if self.shuffle_batches:
# Shuffle all 3 in unison
shuffler = np.random.permutation(len(dense))
sparse = [multi_hot_ft[shuffler, :] for multi_hot_ft in sparse]
dense = dense[shuffler]
labels = labels[shuffler]
batch_size = len(dense)
lengths = torch.ones((CAT_FEATURE_COUNT * batch_size), dtype=torch.int32)
for k, multi_hot_size in enumerate(self.multi_hot_sizes):
lengths[k * batch_size : (k + 1) * batch_size] = multi_hot_size
offsets = torch.cumsum(torch.concat((torch.tensor([0]), lengths)), dim=0)
length_per_key = [
batch_size * multi_hot_size for multi_hot_size in self.multi_hot_sizes
]
offset_per_key = torch.cumsum(
torch.concat((torch.tensor([0]), torch.tensor(length_per_key))), dim=0
)
values = torch.concat([torch.from_numpy(feat).flatten() for feat in sparse])
return Batch(
dense_features=torch.from_numpy(dense.copy()),
sparse_features=KeyedJaggedTensor(
keys=self.keys,
values=values,
lengths=lengths,
offsets=offsets,
stride=batch_size,
length_per_key=length_per_key,
offset_per_key=offset_per_key.tolist(),
index_per_key=self.index_per_key,
),
labels=torch.from_numpy(labels.reshape(-1).copy()),
)
def __iter__(self) -> Iterator[Batch]:
# Invariant: buffer never contains more than batch_size rows.
buffer: list[np.ndarray] | None = None
def append_to_buffer(
dense: np.ndarray,
sparse: list[np.ndarray],
labels: np.ndarray,
) -> None:
nonlocal buffer
if buffer is None:
buffer = [dense, sparse, labels]
else:
buffer[0] = np.concatenate((buffer[0], dense))
buffer[1] = [np.concatenate((b, s)) for b, s in zip(buffer[1], sparse)]
buffer[2] = np.concatenate((buffer[2], labels))
# Maintain a buffer that can contain up to batch_size rows. Fill buffer as
# much as possible on each iteration. Only return a new batch when batch_size
# rows are filled.
file_idx = 0
row_idx = 0
batch_idx = 0
buffer_row_count = 0
cur_batch_size = (
self.batch_size if self.num_full_batches > 0 else self.last_batch_sizes[0]
)
while (
batch_idx
< self.num_full_batches + (self.last_batch_sizes[0] > 0) * self.world_size
):
if buffer_row_count == cur_batch_size or file_idx == len(self.dense_arrs):
if batch_idx % self.world_size == self.rank:
yield self._np_arrays_to_batch(*none_throws(buffer))
buffer = None
buffer_row_count = 0
batch_idx += 1
if 0 <= batch_idx - self.num_full_batches < self.world_size and (
self.last_batch_sizes[0] > 0
):
cur_batch_size = self.last_batch_sizes[
batch_idx - self.num_full_batches
]
else:
rows_to_get = min(
cur_batch_size - buffer_row_count,
self.num_rows_per_file[file_idx] - row_idx,
)
buffer_row_count += rows_to_get
slice_ = slice(row_idx, row_idx + rows_to_get)
if batch_idx % self.world_size == self.rank:
dense_inputs = self.dense_arrs[file_idx][slice_, :]
sparse_inputs = [
feats[slice_, :] for feats in self.sparse_arrs[file_idx]
]
target_labels = self.labels_arrs[file_idx][slice_, :]
# if self.mmap_mode and self.hashes is not None:
# sparse_inputs = [
# feats % hash
# for (feats, hash) in zip(sparse_inputs, self.hashes)
# ]
append_to_buffer(
dense_inputs,
sparse_inputs,
target_labels,
)
row_idx += rows_to_get
if row_idx >= self.num_rows_per_file[file_idx]:
file_idx += 1
row_idx = 0
def __len__(self) -> int:
return self.num_full_batches // self.world_size + (self.last_batch_sizes[0] > 0)
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import argparse
import itertools
import os
import sys
from collections.abc import Iterator
from dataclasses import dataclass, field
from enum import Enum
import torch
import torchmetrics as metrics
from pyre_extensions import none_throws
from torch import distributed as dist
from torch.utils.data import DataLoader
from torchrec import EmbeddingBagCollection
from torchrec.datasets.criteo import DEFAULT_CAT_NAMES, DEFAULT_INT_NAMES
from torchrec.distributed import TrainPipelineSparseDist
from torchrec.distributed.comm import get_local_size
from torchrec.distributed.model_parallel import (
DistributedModelParallel,
get_default_sharders,
)
from torchrec.distributed.planner import EmbeddingShardingPlanner, Topology
from torchrec.distributed.planner.storage_reservations import (
HeuristicalStorageReservation,
)
from torchrec.models.dlrm import DLRM, DLRM_DCN, DLRM_Projection, DLRMTrain
from torchrec.modules.embedding_configs import EmbeddingBagConfig
from torchrec.optim.apply_optimizer_in_backward import apply_optimizer_in_backward
from torchrec.optim.keyed import CombinedOptimizer, KeyedOptimizerWrapper
from torchrec.optim.optimizers import in_backward_optimizer_filter
from tqdm import tqdm
# OSS import
try:
# pyre-ignore[21]
# @manual=//ai_codesign/benchmarks/dlrm/torchrec_dlrm/data:dlrm_dataloader
from data.dlrm_dataloader import get_dataloader
# pyre-ignore[21]
# @manual=//ai_codesign/benchmarks/dlrm/torchrec_dlrm:lr_scheduler
from lr_scheduler import LRPolicyScheduler
# pyre-ignore[21]
# @manual=//ai_codesign/benchmarks/dlrm/torchrec_dlrm:multi_hot
from multi_hot import Multihot, RestartableMap
except ImportError:
pass
# internal import
try:
from .data.dlrm_dataloader import get_dataloader # noqa F811
from .lr_scheduler import LRPolicyScheduler # noqa F811
from .multi_hot import Multihot, RestartableMap # noqa F811
except ImportError:
pass
TRAIN_PIPELINE_STAGES = 3 # Number of stages in TrainPipelineSparseDist.
class InteractionType(Enum):
ORIGINAL = "original"
DCN = "dcn"
PROJECTION = "projection"
def __str__(self):
return self.value
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="torchrec dlrm example trainer")
parser.add_argument(
"--epochs",
type=int,
default=1,
help="number of epochs to train",
)
parser.add_argument(
"--batch_size",
type=int,
default=32,
help="batch size to use for training",
)
parser.add_argument(
"--drop_last_training_batch",
dest="drop_last_training_batch",
action="store_true",
help="Drop the last non-full training batch",
)
parser.add_argument(
"--test_batch_size",
type=int,
default=None,
help="batch size to use for validation and testing",
)
parser.add_argument(
"--limit_train_batches",
type=int,
default=None,
help="number of train batches",
)
parser.add_argument(
"--limit_val_batches",
type=int,
default=None,
help="number of validation batches",
)
parser.add_argument(
"--limit_test_batches",
type=int,
default=None,
help="number of test batches",
)
parser.add_argument(
"--dataset_name",
type=str,
choices=["criteo_1t", "criteo_kaggle"],
default="criteo_1t",
help="dataset for experiment, current support criteo_1tb, criteo_kaggle",
)
parser.add_argument(
"--num_embeddings",
type=int,
default=100_000,
help="max_ind_size. The number of embeddings in each embedding table. Defaults"
" to 100_000 if num_embeddings_per_feature is not supplied.",
)
parser.add_argument(
"--num_embeddings_per_feature",
type=str,
default=None,
help="Comma separated max_ind_size per sparse feature. The number of embeddings"
" in each embedding table. 26 values are expected for the Criteo dataset.",
)
parser.add_argument(
"--dense_arch_layer_sizes",
type=str,
default="512,256,64",
help="Comma separated layer sizes for dense arch.",
)
parser.add_argument(
"--over_arch_layer_sizes",
type=str,
default="512,512,256,1",
help="Comma separated layer sizes for over arch.",
)
parser.add_argument(
"--embedding_dim",
type=int,
default=64,
help="Size of each embedding.",
)
parser.add_argument(
"--interaction_branch1_layer_sizes",
type=str,
default="2048,2048",
help="Comma separated layer sizes for interaction branch1 (only on dlrm with projection).",
)
parser.add_argument(
"--interaction_branch2_layer_sizes",
type=str,
default="2048,2048",
help="Comma separated layer sizes for interaction branch2 (only on dlrm with projection).",
)
parser.add_argument(
"--dcn_num_layers",
type=int,
default=3,
help="Number of DCN layers in interaction layer (only on dlrm with DCN).",
)
parser.add_argument(
"--dcn_low_rank_dim",
type=int,
default=512,
help="Low rank dimension for DCN in interaction layer (only on dlrm with DCN).",
)
parser.add_argument(
"--undersampling_rate",
type=float,
help="Desired proportion of zero-labeled samples to retain (i.e. undersampling zero-labeled rows)."
" Ex. 0.3 indicates only 30pct of the rows with label 0 will be kept."
" All rows with label 1 will be kept. Value should be between 0 and 1."
" When not supplied, no undersampling occurs.",
)
parser.add_argument(
"--seed",
type=int,
help="Random seed for reproducibility.",
)
parser.add_argument(
"--pin_memory",
dest="pin_memory",
action="store_true",
help="Use pinned memory when loading data.",
)
parser.add_argument(
"--mmap_mode",
dest="mmap_mode",
action="store_true",
help="--mmap_mode mmaps the dataset."
" That is, the dataset is kept on disk but is accessed as if it were in memory."
" --mmap_mode is intended mostly for faster debugging. Use --mmap_mode to bypass"
" preloading the dataset when preloading takes too long or when there is "
" insufficient memory available to load the full dataset.",
)
parser.add_argument(
"--in_memory_binary_criteo_path",
type=str,
default=None,
help="Directory path containing the Criteo dataset npy files.",
)
parser.add_argument(
"--synthetic_multi_hot_criteo_path",
type=str,
default=None,
help="Directory path containing the MLPerf v2 synthetic multi-hot dataset npz files.",
)
parser.add_argument(
"--learning_rate",
type=float,
default=15.0,
help="Learning rate.",
)
parser.add_argument(
"--eps",
type=float,
default=1e-8,
help="Epsilon for Adagrad optimizer.",
)
parser.add_argument(
"--shuffle_batches",
dest="shuffle_batches",
action="store_true",
help="Shuffle each batch during training.",
)
parser.add_argument(
"--shuffle_training_set",
dest="shuffle_training_set",
action="store_true",
help="Shuffle the training set in memory. This will override mmap_mode",
)
parser.add_argument(
"--validation_freq_within_epoch",
type=int,
default=None,
help="Frequency at which validation will be run within an epoch.",
)
parser.set_defaults(
pin_memory=None,
mmap_mode=None,
drop_last=None,
shuffle_batches=None,
shuffle_training_set=None,
)
parser.add_argument(
"--adagrad",
dest="adagrad",
action="store_true",
help="Flag to determine if adagrad optimizer should be used.",
)
parser.add_argument(
"--interaction_type",
type=InteractionType,
choices=list(InteractionType),
default=InteractionType.ORIGINAL,
help="Determine the interaction type to be used (original, dcn, or projection)"
" default is original DLRM with pairwise dot product",
)
parser.add_argument(
"--collect_multi_hot_freqs_stats",
dest="collect_multi_hot_freqs_stats",
action="store_true",
help="Flag to determine whether to collect stats on freq of embedding access.",
)
parser.add_argument(
"--multi_hot_sizes",
type=str,
default=None,
help="Comma separated multihot size per sparse feature. 26 values are expected for the Criteo dataset.",
)
parser.add_argument(
"--multi_hot_distribution_type",
type=str,
choices=["uniform", "pareto"],
default=None,
help="Multi-hot distribution options.",
)
parser.add_argument("--lr_warmup_steps", type=int, default=0)
parser.add_argument("--lr_decay_start", type=int, default=0)
parser.add_argument("--lr_decay_steps", type=int, default=0)
parser.add_argument(
"--print_lr",
action="store_true",
help="Print learning rate every iteration.",
)
parser.add_argument(
"--allow_tf32",
action="store_true",
help="Enable TensorFloat-32 mode for matrix multiplications on A100 (or newer) GPUs.",
)
parser.add_argument(
"--print_sharding_plan",
action="store_true",
help="Print the sharding plan used for each embedding table.",
)
return parser.parse_args(argv)
def _evaluate(
limit_batches: int | None,
pipeline: TrainPipelineSparseDist,
eval_dataloader: DataLoader,
stage: str,
) -> float:
"""
Evaluates model. Computes and prints AUROC. Helper function for train_val_test.
Args:
limit_batches (Optional[int]): Limits the dataloader to the first `limit_batches` batches.
pipeline (TrainPipelineSparseDist): data pipeline.
eval_dataloader (DataLoader): Dataloader for either the validation set or test set.
stage (str): "val" or "test".
Returns:
float: auroc result
"""
pipeline._model.eval()
device = pipeline._device
iterator = itertools.islice(iter(eval_dataloader), limit_batches)
auroc = metrics.AUROC(task="multiclass", num_classes=2).to(device)
is_rank_zero = dist.get_rank() == 0
if is_rank_zero:
pbar = tqdm(
iter(int, 1),
desc=f"Evaluating {stage} set",
total=len(eval_dataloader),
disable=False,
)
with torch.no_grad():
while True:
try:
_loss, logits, labels = pipeline.progress(iterator)
preds = torch.sigmoid(logits)
preds_reshaped = torch.stack((1 - preds, preds), dim=1)
auroc(preds_reshaped, labels)
if is_rank_zero:
pbar.update(1)
except StopIteration:
break
auroc_result = auroc.compute().item()
num_samples = torch.tensor(sum(map(len, auroc.target)), device=device)
dist.reduce(num_samples, 0, op=dist.ReduceOp.SUM)
if is_rank_zero:
print(f"AUROC over {stage} set: {auroc_result}.")
print(f"Number of {stage} samples: {num_samples}")
return auroc_result
def batched(it: Iterator, n: int):
assert n >= 1
for x in it:
yield itertools.chain((x,), itertools.islice(it, n - 1))
def _train(
pipeline: TrainPipelineSparseDist,
train_dataloader: DataLoader,
val_dataloader: DataLoader,
epoch: int,
lr_scheduler,
print_lr: bool,
validation_freq: int | None,
limit_train_batches: int | None,
limit_val_batches: int | None,
) -> None:
"""
Trains model for 1 epoch. Helper function for train_val_test.
Args:
pipeline (TrainPipelineSparseDist): data pipeline.
train_dataloader (DataLoader): Training set's dataloader.
val_dataloader (DataLoader): Validation set's dataloader.
epoch (int): The number of complete passes through the training set so far.
lr_scheduler (LRPolicyScheduler): Learning rate scheduler.
print_lr (bool): Whether to print the learning rate every training step.
validation_freq (Optional[int]): The number of training steps between validation runs within an epoch.
limit_train_batches (Optional[int]): Limits the training set to the first `limit_train_batches` batches.
limit_val_batches (Optional[int]): Limits the validation set to the first `limit_val_batches` batches.
Returns:
None.
"""
pipeline._model.train()
iterator = itertools.islice(iter(train_dataloader), limit_train_batches)
is_rank_zero = dist.get_rank() == 0
if is_rank_zero:
pbar = tqdm(
iter(int, 1),
desc=f"Epoch {epoch}",
total=len(train_dataloader),
disable=False,
)
start_it = 0
n = (
validation_freq
if validation_freq
else limit_train_batches
if limit_train_batches
else len(train_dataloader)
)
for batched_iterator in batched(iterator, n):
for it in itertools.count(start_it):
try:
if is_rank_zero and print_lr:
for i, g in enumerate(pipeline._optimizer.param_groups):
print(f"lr: {it} {i} {g['lr']:.6f}")
pipeline.progress(batched_iterator)
lr_scheduler.step()
if is_rank_zero:
pbar.update(1)
except StopIteration:
if is_rank_zero:
print("Total number of iterations:", it)
start_it = it
break
if validation_freq and start_it % validation_freq == 0:
_evaluate(limit_val_batches, pipeline, val_dataloader, "val")
pipeline._model.train()
@dataclass
class TrainValTestResults:
val_aurocs: list[float] = field(default_factory=list)
test_auroc: float | None = None
def train_val_test(
args: argparse.Namespace,
model: torch.nn.Module,
optimizer: torch.optim.Optimizer,
device: torch.device,
train_dataloader: DataLoader,
val_dataloader: DataLoader,
test_dataloader: DataLoader,
lr_scheduler: LRPolicyScheduler,
) -> TrainValTestResults:
"""
Train/validation/test loop.
Args:
args (argparse.Namespace): parsed command line args.
model (torch.nn.Module): model to train.
optimizer (torch.optim.Optimizer): optimizer to use.
device (torch.device): device to use.
train_dataloader (DataLoader): Training set's dataloader.
val_dataloader (DataLoader): Validation set's dataloader.
test_dataloader (DataLoader): Test set's dataloader.
lr_scheduler (LRPolicyScheduler): Learning rate scheduler.
Returns:
TrainValTestResults.
"""
results = TrainValTestResults()
pipeline = TrainPipelineSparseDist(
model, optimizer, device, execute_all_batches=True
)
for epoch in range(args.epochs):
_train(
pipeline,
train_dataloader,
val_dataloader,
epoch,
lr_scheduler,
args.print_lr,
args.validation_freq_within_epoch,
args.limit_train_batches,
args.limit_val_batches,
)
val_auroc = _evaluate(args.limit_val_batches, pipeline, val_dataloader, "val")
results.val_aurocs.append(val_auroc)
test_auroc = _evaluate(args.limit_test_batches, pipeline, test_dataloader, "test")
results.test_auroc = test_auroc
return results
def main(argv: list[str]) -> None:
"""
Trains, validates, and tests a Deep Learning Recommendation Model (DLRM)
(https://arxiv.org/abs/1906.00091). The DLRM model contains both data parallel
components (e.g. multi-layer perceptrons & interaction arch) and model parallel
components (e.g. embedding tables). The DLRM model is pipelined so that dataloading,
data-parallel to model-parallel comms, and forward/backward are overlapped. Can be
run with either a random dataloader or an in-memory Criteo 1 TB click logs dataset
(https://ailab.criteo.com/download-criteo-1tb-click-logs-dataset/).
Args:
argv (List[str]): command line args.
Returns:
None.
"""
args = parse_args(argv)
for name, val in vars(args).items():
try:
vars(args)[name] = list(map(int, val.split(",")))
except (ValueError, AttributeError):
pass
torch.backends.cuda.matmul.allow_tf32 = args.allow_tf32
if args.multi_hot_sizes is not None:
assert (
args.num_embeddings_per_feature is not None
and len(args.multi_hot_sizes) == len(args.num_embeddings_per_feature)
or args.num_embeddings_per_feature is None
and len(args.multi_hot_sizes) == len(DEFAULT_CAT_NAMES)
), "--multi_hot_sizes must be a comma delimited list the same size as the number of embedding tables."
assert (
args.in_memory_binary_criteo_path is None
or args.synthetic_multi_hot_criteo_path is None
), "--in_memory_binary_criteo_path and --synthetic_multi_hot_criteo_path are mutually exclusive CLI arguments."
assert (
args.multi_hot_sizes is None or args.synthetic_multi_hot_criteo_path is None
), "--multi_hot_sizes is used to convert 1-hot to multi-hot. It's inapplicable with --synthetic_multi_hot_criteo_path."
assert (
args.multi_hot_distribution_type is None
or args.synthetic_multi_hot_criteo_path is None
), "--multi_hot_distribution_type is used to convert 1-hot to multi-hot. It's inapplicable with --synthetic_multi_hot_criteo_path."
rank = int(os.environ["LOCAL_RANK"])
if torch.cuda.is_available():
device: torch.device = torch.device(f"cuda:{rank}")
backend = "nccl"
torch.cuda.set_device(device)
else:
device: torch.device = torch.device("cpu")
backend = "gloo"
if rank == 0:
print(
"PARAMS: (lr, batch_size, warmup_steps, decay_start, decay_steps): "
f"{(args.learning_rate, args.batch_size, args.lr_warmup_steps, args.lr_decay_start, args.lr_decay_steps)}"
)
dist.init_process_group(backend=backend)
if args.num_embeddings_per_feature is not None:
args.num_embeddings = None
# Sets default limits for random dataloader iterations when left unspecified.
if (
args.in_memory_binary_criteo_path
is args.synthetic_multi_hot_criteo_path
is None
):
for split in ["train", "val", "test"]:
attr = f"limit_{split}_batches"
if getattr(args, attr) is None:
setattr(args, attr, 10)
train_dataloader = get_dataloader(args, backend, "train")
val_dataloader = get_dataloader(args, backend, "val")
test_dataloader = get_dataloader(args, backend, "test")
eb_configs = [
EmbeddingBagConfig(
name=f"t_{feature_name}",
embedding_dim=args.embedding_dim,
num_embeddings=(
none_throws(args.num_embeddings_per_feature)[feature_idx]
if args.num_embeddings is None
else args.num_embeddings
),
feature_names=[feature_name],
)
for feature_idx, feature_name in enumerate(DEFAULT_CAT_NAMES)
]
sharded_module_kwargs = {}
if args.over_arch_layer_sizes is not None:
sharded_module_kwargs["over_arch_layer_sizes"] = args.over_arch_layer_sizes
if args.interaction_type == InteractionType.ORIGINAL:
dlrm_model = DLRM(
embedding_bag_collection=EmbeddingBagCollection(
tables=eb_configs, device=torch.device("meta")
),
dense_in_features=len(DEFAULT_INT_NAMES),
dense_arch_layer_sizes=args.dense_arch_layer_sizes,
over_arch_layer_sizes=args.over_arch_layer_sizes,
dense_device=device,
)
elif args.interaction_type == InteractionType.DCN:
dlrm_model = DLRM_DCN(
embedding_bag_collection=EmbeddingBagCollection(
tables=eb_configs, device=torch.device("meta")
),
dense_in_features=len(DEFAULT_INT_NAMES),
dense_arch_layer_sizes=args.dense_arch_layer_sizes,
over_arch_layer_sizes=args.over_arch_layer_sizes,
dcn_num_layers=args.dcn_num_layers,
dcn_low_rank_dim=args.dcn_low_rank_dim,
dense_device=device,
)
elif args.interaction_type == InteractionType.PROJECTION:
dlrm_model = DLRM_Projection(
embedding_bag_collection=EmbeddingBagCollection(
tables=eb_configs, device=torch.device("meta")
),
dense_in_features=len(DEFAULT_INT_NAMES),
dense_arch_layer_sizes=args.dense_arch_layer_sizes,
over_arch_layer_sizes=args.over_arch_layer_sizes,
interaction_branch1_layer_sizes=args.interaction_branch1_layer_sizes,
interaction_branch2_layer_sizes=args.interaction_branch2_layer_sizes,
dense_device=device,
)
else:
raise ValueError(
"Unknown interaction option set. Should be original, dcn, or projection."
)
train_model = DLRMTrain(dlrm_model)
embedding_optimizer = torch.optim.Adagrad if args.adagrad else torch.optim.SGD
# This will apply the Adagrad optimizer in the backward pass for the embeddings (sparse_arch). This means that
# the optimizer update will be applied in the backward pass, in this case through a fused op.
# TorchRec will use the FBGEMM implementation of EXACT_ADAGRAD. For GPU devices, a fused CUDA kernel is invoked. For CPU, FBGEMM_GPU invokes CPU kernels
# https://github.com/pytorch/FBGEMM/blob/2cb8b0dff3e67f9a009c4299defbd6b99cc12b8f/fbgemm_gpu/fbgemm_gpu/split_table_batched_embeddings_ops.py#L676-L678
# Note that lr_decay, weight_decay and initial_accumulator_value for Adagrad optimizer in FBGEMM v0.3.2
# cannot be specified below. This equivalently means that all these parameters are hardcoded to zero.
optimizer_kwargs = {"lr": args.learning_rate}
if args.adagrad:
optimizer_kwargs["eps"] = args.eps
apply_optimizer_in_backward(
embedding_optimizer,
train_model.model.sparse_arch.parameters(),
optimizer_kwargs,
)
planner = EmbeddingShardingPlanner(
topology=Topology(
local_world_size=get_local_size(),
world_size=dist.get_world_size(),
compute_device=device.type,
),
batch_size=args.batch_size,
# If experience OOM, increase the percentage. see
# https://pytorch.org/torchrec/torchrec.distributed.planner.html#torchrec.distributed.planner.storage_reservations.HeuristicalStorageReservation
storage_reservation=HeuristicalStorageReservation(percentage=0.05),
)
plan = planner.collective_plan(
train_model, get_default_sharders(), dist.GroupMember.WORLD
)
model = DistributedModelParallel(
module=train_model,
device=device,
plan=plan,
)
if rank == 0 and args.print_sharding_plan:
for collectionkey, plans in model._plan.plan.items():
print(collectionkey)
for table_name, plan in plans.items():
print(table_name, "\n", plan, "\n")
def optimizer_with_params():
if args.adagrad:
return lambda params: torch.optim.Adagrad(
params, lr=args.learning_rate, eps=args.eps
)
else:
return lambda params: torch.optim.SGD(params, lr=args.learning_rate)
dense_optimizer = KeyedOptimizerWrapper(
dict(in_backward_optimizer_filter(model.named_parameters())),
optimizer_with_params(),
)
optimizer = CombinedOptimizer([model.fused_optimizer, dense_optimizer])
lr_scheduler = LRPolicyScheduler(
optimizer, args.lr_warmup_steps, args.lr_decay_start, args.lr_decay_steps
)
if args.multi_hot_sizes is not None:
multihot = Multihot(
args.multi_hot_sizes,
args.num_embeddings_per_feature,
args.batch_size,
collect_freqs_stats=args.collect_multi_hot_freqs_stats,
dist_type=args.multi_hot_distribution_type,
)
multihot.pause_stats_collection_during_val_and_test(model)
train_dataloader = RestartableMap(
multihot.convert_to_multi_hot, train_dataloader
)
val_dataloader = RestartableMap(multihot.convert_to_multi_hot, val_dataloader)
test_dataloader = RestartableMap(multihot.convert_to_multi_hot, test_dataloader)
train_val_test(
args,
model,
optimizer,
device,
train_dataloader,
val_dataloader,
test_dataloader,
lr_scheduler,
)
if args.collect_multi_hot_freqs_stats:
multihot.save_freqs_stats()
def invoke_main() -> None:
main(sys.argv[1:])
if __name__ == "__main__":
invoke_main() # pragma: no cover
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# Copied from https://github.com/facebookresearch/dlrm/blob/mlperf/dlrm_s_pytorch.py
import sys
from torch.optim.lr_scheduler import _LRScheduler
class LRPolicyScheduler(_LRScheduler):
def __init__(self, optimizer, num_warmup_steps, decay_start_step, num_decay_steps):
self.num_warmup_steps = num_warmup_steps
self.decay_start_step = decay_start_step
self.decay_end_step = decay_start_step + num_decay_steps
self.num_decay_steps = num_decay_steps
if self.decay_start_step < self.num_warmup_steps:
sys.exit("Learning rate warmup must finish before the decay starts")
super(LRPolicyScheduler, self).__init__(optimizer)
def get_lr(self):
step_count = self._step_count
if step_count < self.num_warmup_steps:
# warmup
scale = 1.0 - (self.num_warmup_steps - step_count) / self.num_warmup_steps
lr = [base_lr * scale for base_lr in self.base_lrs]
self.last_lr = lr
elif self.decay_start_step <= step_count and step_count < self.decay_end_step:
# decay
decayed_steps = step_count - self.decay_start_step
scale = ((self.num_decay_steps - decayed_steps) / self.num_decay_steps) ** 2
min_lr = 0.0000001
lr = [max(min_lr, base_lr * scale) for base_lr in self.base_lrs]
self.last_lr = lr
else:
if self.num_decay_steps > 0:
# freeze at last, either because we're after decay
# or because we're between warmup and decay
lr = self.last_lr
else:
# do not adjust
lr = self.base_lrs
return lr
9287c283b01087427df915257300bf36 day_0_sparse_multi_hot.npz
f4fedd921a4b214b03d0dfcd31cc30e6 day_1_sparse_multi_hot.npz
2a15bdd8d25781c4cdcf5d791dfd19f9 day_2_sparse_multi_hot.npz
0341aaee2f661f9e939a39d7a6be0aea day_3_sparse_multi_hot.npz
e8db54dbf5fe438ecb76fcc2d520f31a day_4_sparse_multi_hot.npz
fd35a7a2bc0ba63935b4b1c742eca018 day_5_sparse_multi_hot.npz
7d5c72b6bbe8be1dfa1f69db5c7e64fd day_6_sparse_multi_hot.npz
59bcb9855243d3a5c0ae56daa18e1033 day_7_sparse_multi_hot.npz
b9f7fbccae6bb9fdabf30259b5e305f0 day_8_sparse_multi_hot.npz
03da5bf484870a3c77f66befc680ad04 day_9_sparse_multi_hot.npz
eb048fc4fbd7ffa7932b81a523fe5a39 day_10_sparse_multi_hot.npz
a2ebee45c9836c8c8598610a6a3e9d60 day_11_sparse_multi_hot.npz
0dd59855e1a7b65c42f7e7af303c610e day_12_sparse_multi_hot.npz
7510698f3fcff9f3d7ef9dd478e637aa day_13_sparse_multi_hot.npz
562978b7b93e179f1adb9b9f5e1dc338 day_14_sparse_multi_hot.npz
042967232f016fbccf0d40d72d0b48bb day_15_sparse_multi_hot.npz
7b59170fb0e2d1e78f15cb60cea22723 day_16_sparse_multi_hot.npz
5054c54515d2574cda0f11646787df44 day_17_sparse_multi_hot.npz
28d3dbf6c70e68f01df12a4c3298f754 day_18_sparse_multi_hot.npz
db7554263a1754d3e29341d0f03bc2f0 day_19_sparse_multi_hot.npz
91ee92ffb4810c26e157c1335ef4de06 day_20_sparse_multi_hot.npz
2c99fad7b146b0ba581dce34f640f44e day_21_sparse_multi_hot.npz
c7ba52c5aaf24d76acca22a0cb13b737 day_22_sparse_multi_hot.npz
c46b7e31ec6f2f8768fa60bdfc0f6e40 day_23_sparse_multi_hot.npz
427113b0c4d85a8fceaf793457302067 day_0_dense.npy
4db255ce4388893e7aa1dcf157077975 day_0_labels.npy
8b444e74159dbede896e2f3b5ed31ac0 day_0_sparse.npy
3afc11c56062d8bbea4df300b5a42966 day_1_dense.npy
fb40746738a7c6f4ee021033bdd518c5 day_1_labels.npy
61e95a487c955b515155b31611444f32 day_1_sparse.npy
4e73d5bb330c43826665bec142c6b407 day_2_dense.npy
f0adfec8191781e3f201d45f923e6ea1 day_2_labels.npy
0473d30872cd6e582c5da0272a0569f8 day_2_sparse.npy
df1f3395e0da4a06aa23b2e069ff3ad9 day_3_dense.npy
69caadf4d219f18b83f3591fe76f17c7 day_3_labels.npy
d6b0d02ff18da470b7ee17f97d5380e0 day_3_sparse.npy
27868a93adc66c47d4246acbad8bb689 day_4_dense.npy
c4a6a16342f0770d67d689c6c173c681 day_4_labels.npy
ca54008489cb84becc3f37e7b29035c7 day_4_sparse.npy
e9bc6de06d09b1feebf857d9786ee15c day_5_dense.npy
9e3e17f345474cfbde5d62b543e07d6b day_5_labels.npy
d1374ee84f80ea147957f8af0e12ebe4 day_5_sparse.npy
09c8bf0fd4798172e0369134ddc7204a day_6_dense.npy
945cef1132ceab8b23f4d0e269522be2 day_6_labels.npy
e4df1c271e1edd72ee4658a39cca2888 day_6_sparse.npy
ae718f0d6d29a8b605ae5d12fad3ffcc day_7_dense.npy
5ff5e7eef5b88b80ef03d06fc7e81bcf day_7_labels.npy
cbcb7501a6b74a45dd5c028c13a4afbc day_7_sparse.npy
5a589746fd15819afbc70e2503f94b35 day_8_dense.npy
43871397750dfdc69cadcbee7e95f2bd day_8_labels.npy
c1fb4369c7da27d23f4c7f97c8893250 day_8_sparse.npy
4bb86eecb92eb4e3368085c2b1bab131 day_9_dense.npy
f851934555147d436131230ebbdd5609 day_9_labels.npy
e4ac0fb8a030f0769541f88142c9f931 day_9_sparse.npy
7fc29f50da6c60185381ca4ad1cb2059 day_10_dense.npy
e3b3f6f974c4820064db0046bbf954c8 day_10_labels.npy
1018a9ab88c4a7369325c9d6df73b411 day_10_sparse.npy
df822ae73cbaa016bf7d371d87313b56 day_11_dense.npy
26219e9c89c6ce831e7da273da666df1 day_11_labels.npy
f1596fc0337443a6672a864cd541fb05 day_11_sparse.npy
015968b4d9940ec9e28cc34788013d6e day_12_dense.npy
f0ca7ce0ab6033cdd355df94d11c7ed7 day_12_labels.npy
03a2ebd22b01cc18b6e338de77b4103f day_12_sparse.npy
9d79239a9e976e4dd9b8839c7cbe1eba day_13_dense.npy
4b099b9200bbb490afc08b5cd63daa0e day_13_labels.npy
2b507e0f97d972ea6ada9b3af64de151 day_13_sparse.npy
9242e6c974603ec235f163f72fdbc766 day_14_dense.npy
80cae15e032ffb9eff292738ba4d0dce day_14_labels.npy
3dccc979f7c71fae45a10c98ba6c9cb7 day_14_sparse.npy
64c6c0fcd0940f7e0d7001aa945ec8f8 day_15_dense.npy
a6a730d1ef55368f3f0b21d32b039662 day_15_labels.npy
c852516852cc404cb40d4de8626d2ca1 day_15_sparse.npy
5c75b60e63e9cf98dec13fbb64839c10 day_16_dense.npy
5a71a29d8df1e8baf6bf28353f1588d4 day_16_labels.npy
6c838050751697a91bbf3e68ffd4a696 day_16_sparse.npy
9798bccb5a67c5eac834153ea8bbe110 day_17_dense.npy
0a814b7eb83f375dd5a555ade6908356 day_17_labels.npy
40d2bc23fbcccb3ddb1390cc5e694cf0 day_17_sparse.npy
cda094dfe7f5711877a6486f9863cd4b day_18_dense.npy
a4fa26ada0d4c312b7e3354de0f5ee30 day_18_labels.npy
51711de9194737813a74bfb25c0f5d30 day_18_sparse.npy
0f0b2c0ed279462cdcc6f79252fd3395 day_19_dense.npy
b21ad457474b01bd3f95fc46b6b9f04b day_19_labels.npy
dd4b72cd704981441d17687f526e42ae day_19_sparse.npy
95ffc084f6cafe382afe72cbcae186bc day_20_dense.npy
9555e572e8bee22d71db8c2ac121ea8a day_20_labels.npy
bc9a8c79c93ea39f32230459b4c4572a day_20_sparse.npy
4680683973be5b1a890c9314cfb2e93b day_21_dense.npy
672edc866e7ff1928d15338a99e5f336 day_21_labels.npy
e4a8ae42a6d46893da6edb73e7d8a3f7 day_21_sparse.npy
3d56f190639398da2bfdc33f87cd34f0 day_22_dense.npy
733da710c5981cb67d041aa1039e4e6b day_22_labels.npy
42ef88d6bb2550a88711fed6fc144846 day_22_sparse.npy
cdf7af87cbc7e9b468c0be46b1767601 day_23_dense.npy
dd68f93301812026ed6f58dfb0757fa7 day_23_labels.npy
0c33f1562529cc3bca7f3708e2be63c9 day_23_sparse.npy
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment