"vscode:/vscode.git/clone" did not exist on "bf140c2abfe17c3692ebd542188f8ee424ab727d"
Commit 45b5353f authored by Chris Waterson's avatar Chris Waterson Committed by GitHub
Browse files

Merge pull request #1122 from vmarkovtsev/distr

Swivel: add multiple GPU support
parents 3be9ece9 ceee992a
...@@ -52,7 +52,6 @@ embeddings are stored in separate files. ...@@ -52,7 +52,6 @@ embeddings are stored in separate files.
""" """
from __future__ import print_function from __future__ import print_function
import argparse
import glob import glob
import math import math
import os import os
...@@ -62,6 +61,7 @@ import threading ...@@ -62,6 +61,7 @@ import threading
import numpy as np import numpy as np
import tensorflow as tf import tensorflow as tf
from tensorflow.python.client import device_lib
flags = tf.app.flags flags = tf.app.flags
...@@ -85,13 +85,26 @@ flags.DEFINE_float('confidence_base', 0.1, 'Base for l2 confidence function') ...@@ -85,13 +85,26 @@ flags.DEFINE_float('confidence_base', 0.1, 'Base for l2 confidence function')
flags.DEFINE_float('learning_rate', 1.0, 'Initial learning rate') flags.DEFINE_float('learning_rate', 1.0, 'Initial learning rate')
flags.DEFINE_integer('num_concurrent_steps', 2, flags.DEFINE_integer('num_concurrent_steps', 2,
'Number of threads to train with') 'Number of threads to train with')
flags.DEFINE_integer('num_readers', 4,
'Number of threads to read the input data and feed it')
flags.DEFINE_float('num_epochs', 40, 'Number epochs to train for') flags.DEFINE_float('num_epochs', 40, 'Number epochs to train for')
flags.DEFINE_float('per_process_gpu_memory_fraction', 0.25, flags.DEFINE_float('per_process_gpu_memory_fraction', 0,
'Fraction of GPU memory to use') 'Fraction of GPU memory to use, 0 means allow_growth')
flags.DEFINE_integer('num_gpus', 0,
'Number of GPUs to use, 0 means all available')
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
def log(message, *args, **kwargs):
tf.logging.info(message, *args, **kwargs)
def get_available_gpus():
return [d.name for d in device_lib.list_local_devices()
if d.device_type == 'GPU']
def embeddings_with_init(vocab_size, embedding_dim, name): def embeddings_with_init(vocab_size, embedding_dim, name):
"""Creates and initializes the embedding tensors.""" """Creates and initializes the embedding tensors."""
return tf.get_variable(name=name, return tf.get_variable(name=name,
...@@ -130,7 +143,7 @@ def count_matrix_input(filenames, submatrix_rows, submatrix_cols): ...@@ -130,7 +143,7 @@ def count_matrix_input(filenames, submatrix_rows, submatrix_cols):
queued_global_row, queued_global_col, queued_count = tf.train.batch( queued_global_row, queued_global_col, queued_count = tf.train.batch(
[global_row, global_col, count], [global_row, global_col, count],
batch_size=1, batch_size=1,
num_threads=4, num_threads=FLAGS.num_readers,
capacity=32) capacity=32)
queued_global_row = tf.reshape(queued_global_row, [submatrix_rows]) queued_global_row = tf.reshape(queued_global_row, [submatrix_rows])
...@@ -164,16 +177,14 @@ def write_embeddings_to_disk(config, model, sess): ...@@ -164,16 +177,14 @@ def write_embeddings_to_disk(config, model, sess):
# Row Embedding # Row Embedding
row_vocab_path = config.input_base_path + '/row_vocab.txt' row_vocab_path = config.input_base_path + '/row_vocab.txt'
row_embedding_output_path = config.output_base_path + '/row_embedding.tsv' row_embedding_output_path = config.output_base_path + '/row_embedding.tsv'
print('Writing row embeddings to:', row_embedding_output_path) log('Writing row embeddings to: %s', row_embedding_output_path)
sys.stdout.flush()
write_embedding_tensor_to_disk(row_vocab_path, row_embedding_output_path, write_embedding_tensor_to_disk(row_vocab_path, row_embedding_output_path,
sess, model.row_embedding) sess, model.row_embedding)
# Column Embedding # Column Embedding
col_vocab_path = config.input_base_path + '/col_vocab.txt' col_vocab_path = config.input_base_path + '/col_vocab.txt'
col_embedding_output_path = config.output_base_path + '/col_embedding.tsv' col_embedding_output_path = config.output_base_path + '/col_embedding.tsv'
print('Writing column embeddings to:', col_embedding_output_path) log('Writing column embeddings to: %s', col_embedding_output_path)
sys.stdout.flush()
write_embedding_tensor_to_disk(col_vocab_path, col_embedding_output_path, write_embedding_tensor_to_disk(col_vocab_path, col_embedding_output_path,
sess, model.col_embedding) sess, model.col_embedding)
...@@ -186,8 +197,7 @@ class SwivelModel(object): ...@@ -186,8 +197,7 @@ class SwivelModel(object):
self._config = config self._config = config
# Create paths to input data files # Create paths to input data files
print('Reading model from:', config.input_base_path) log('Reading model from: %s', config.input_base_path)
sys.stdout.flush()
count_matrix_files = glob.glob(config.input_base_path + '/shard-*.pb') count_matrix_files = glob.glob(config.input_base_path + '/shard-*.pb')
row_sums_path = config.input_base_path + '/row_sums.txt' row_sums_path = config.input_base_path + '/row_sums.txt'
col_sums_path = config.input_base_path + '/col_sums.txt' col_sums_path = config.input_base_path + '/col_sums.txt'
...@@ -198,16 +208,19 @@ class SwivelModel(object): ...@@ -198,16 +208,19 @@ class SwivelModel(object):
self.n_rows = len(row_sums) self.n_rows = len(row_sums)
self.n_cols = len(col_sums) self.n_cols = len(col_sums)
print('Matrix dim: (%d,%d) SubMatrix dim: (%d,%d) ' % ( log('Matrix dim: (%d,%d) SubMatrix dim: (%d,%d)',
self.n_rows, self.n_cols, config.submatrix_rows, config.submatrix_cols)) self.n_rows, self.n_cols, config.submatrix_rows, config.submatrix_cols)
sys.stdout.flush()
self.n_submatrices = (self.n_rows * self.n_cols / self.n_submatrices = (self.n_rows * self.n_cols /
(config.submatrix_rows * config.submatrix_cols)) (config.submatrix_rows * config.submatrix_cols))
print('n_submatrices: %d' % (self.n_submatrices)) log('n_submatrices: %d', self.n_submatrices)
sys.stdout.flush()
with tf.device('/cpu:0'):
# ===== CREATE VARIABLES ====== # ===== CREATE VARIABLES ======
# embeddings # Get input
global_row, global_col, count = count_matrix_input(
count_matrix_files, config.submatrix_rows, config.submatrix_cols)
# Embeddings
self.row_embedding = embeddings_with_init( self.row_embedding = embeddings_with_init(
embedding_dim=config.embedding_size, embedding_dim=config.embedding_size,
vocab_size=self.n_rows, vocab_size=self.n_rows,
...@@ -229,12 +242,22 @@ class SwivelModel(object): ...@@ -229,12 +242,22 @@ class SwivelModel(object):
tf.summary.histogram('row_bias', self.row_bias) tf.summary.histogram('row_bias', self.row_bias)
tf.summary.histogram('col_bias', self.col_bias) tf.summary.histogram('col_bias', self.col_bias)
# ===== CREATE GRAPH ===== # Add optimizer
l2_losses = []
sigmoid_losses = []
self.global_step = tf.Variable(0, name='global_step')
opt = tf.train.AdagradOptimizer(config.learning_rate)
# Get input all_grads = []
global_row, global_col, count = count_matrix_input(
count_matrix_files, config.submatrix_rows, config.submatrix_cols)
devices = ['/gpu:%d' % i for i in range(FLAGS.num_gpus)] \
if FLAGS.num_gpus > 0 else get_available_gpus()
self.devices_number = len(devices)
with tf.variable_scope(tf.get_variable_scope()):
for dev in devices:
with tf.device(dev):
with tf.name_scope(dev[1:].replace(':', '_')):
# ===== CREATE GRAPH =====
# Fetch embeddings. # Fetch embeddings.
selected_row_embedding = tf.nn.embedding_lookup( selected_row_embedding = tf.nn.embedding_lookup(
self.row_embedding, global_row) self.row_embedding, global_row)
...@@ -242,49 +265,72 @@ class SwivelModel(object): ...@@ -242,49 +265,72 @@ class SwivelModel(object):
self.col_embedding, global_col) self.col_embedding, global_col)
# Fetch biases. # Fetch biases.
selected_row_bias = tf.nn.embedding_lookup([self.row_bias], global_row) selected_row_bias = tf.nn.embedding_lookup(
selected_col_bias = tf.nn.embedding_lookup([self.col_bias], global_col) [self.row_bias], global_row)
selected_col_bias = tf.nn.embedding_lookup(
[self.col_bias], global_col)
# Multiply the row and column embeddings to generate predictions. # Multiply the row and column embeddings to generate predictions.
predictions = tf.matmul( predictions = tf.matmul(
selected_row_embedding, selected_col_embedding, transpose_b=True) selected_row_embedding, selected_col_embedding,
transpose_b=True)
# These binary masks separate zero from non-zero values. # These binary masks separate zero from non-zero values.
count_is_nonzero = tf.to_float(tf.cast(count, tf.bool)) count_is_nonzero = tf.to_float(tf.cast(count, tf.bool))
count_is_zero = 1 - tf.to_float(tf.cast(count, tf.bool)) count_is_zero = 1 - count_is_nonzero
objectives = count_is_nonzero * tf.log(count + 1e-30) objectives = count_is_nonzero * tf.log(count + 1e-30)
objectives -= tf.reshape(selected_row_bias, [config.submatrix_rows, 1]) objectives -= tf.reshape(
selected_row_bias, [config.submatrix_rows, 1])
objectives -= selected_col_bias objectives -= selected_col_bias
objectives += matrix_log_sum objectives += matrix_log_sum
err = predictions - objectives err = predictions - objectives
# The confidence function scales the L2 loss based on the raw co-occurrence # The confidence function scales the L2 loss based on the raw
# count. # co-occurrence count.
l2_confidence = (config.confidence_base + config.confidence_scale * tf.pow( l2_confidence = (config.confidence_base +
config.confidence_scale * tf.pow(
count, config.confidence_exponent)) count, config.confidence_exponent))
l2_loss = config.loss_multiplier * tf.reduce_sum( l2_loss = config.loss_multiplier * tf.reduce_sum(
0.5 * l2_confidence * err * err * count_is_nonzero) 0.5 * l2_confidence * err * err * count_is_nonzero)
l2_losses.append(tf.expand_dims(l2_loss, 0))
sigmoid_loss = config.loss_multiplier * tf.reduce_sum( sigmoid_loss = config.loss_multiplier * tf.reduce_sum(
tf.nn.softplus(err) * count_is_zero) tf.nn.softplus(err) * count_is_zero)
sigmoid_losses.append(tf.expand_dims(sigmoid_loss, 0))
self.loss = l2_loss + sigmoid_loss loss = l2_loss + sigmoid_loss
grads = opt.compute_gradients(loss)
all_grads.append(grads)
with tf.device('/cpu:0'):
# ===== MERGE LOSSES =====
l2_loss = tf.reduce_mean(tf.concat(l2_losses, 0), 0, name="l2_loss")
sigmoid_loss = tf.reduce_mean(tf.concat(sigmoid_losses, 0), 0,
name="sigmoid_loss")
self.loss = l2_loss + sigmoid_loss
average = tf.train.ExponentialMovingAverage(0.8, self.global_step)
loss_average_op = average.apply((self.loss,))
tf.summary.scalar("l2_loss", l2_loss) tf.summary.scalar("l2_loss", l2_loss)
tf.summary.scalar("sigmoid_loss", sigmoid_loss) tf.summary.scalar("sigmoid_loss", sigmoid_loss)
tf.summary.scalar("loss", self.loss) tf.summary.scalar("loss", self.loss)
# Add optimizer. # Apply the gradients to adjust the shared variables.
self.global_step = tf.Variable(0, name='global_step') apply_gradient_ops = []
opt = tf.train.AdagradOptimizer(config.learning_rate) for grads in all_grads:
self.train_op = opt.minimize(self.loss, global_step=self.global_step) apply_gradient_ops.append(opt.apply_gradients(
grads, global_step=self.global_step))
self.train_op = tf.group(loss_average_op, *apply_gradient_ops)
self.saver = tf.train.Saver(sharded=True) self.saver = tf.train.Saver(sharded=True)
def main(_): def main(_):
tf.logging.set_verbosity(tf.logging.INFO)
start_time = time.time()
# Create the output path. If this fails, it really ought to fail # Create the output path. If this fails, it really ought to fail
# now. :) # now. :)
if not os.path.isdir(FLAGS.output_base_path): if not os.path.isdir(FLAGS.output_base_path):
...@@ -295,8 +341,13 @@ def main(_): ...@@ -295,8 +341,13 @@ def main(_):
model = SwivelModel(FLAGS) model = SwivelModel(FLAGS)
# Create a session for running Ops on the Graph. # Create a session for running Ops on the Graph.
gpu_options = tf.GPUOptions( gpu_opts = {}
per_process_gpu_memory_fraction=FLAGS.per_process_gpu_memory_fraction) if FLAGS.per_process_gpu_memory_fraction > 0:
gpu_opts["per_process_gpu_memory_fraction"] = \
FLAGS.per_process_gpu_memory_fraction
else:
gpu_opts["allow_growth"] = True
gpu_options = tf.GPUOptions(**gpu_opts)
sess = tf.Session(config=tf.ConfigProto(gpu_options=gpu_options)) sess = tf.Session(config=tf.ConfigProto(gpu_options=gpu_options))
# Run the Op to initialize the variables. # Run the Op to initialize the variables.
...@@ -309,21 +360,32 @@ def main(_): ...@@ -309,21 +360,32 @@ def main(_):
# Calculate how many steps each thread should run # Calculate how many steps each thread should run
n_total_steps = int(FLAGS.num_epochs * model.n_rows * model.n_cols) / ( n_total_steps = int(FLAGS.num_epochs * model.n_rows * model.n_cols) / (
FLAGS.submatrix_rows * FLAGS.submatrix_cols) FLAGS.submatrix_rows * FLAGS.submatrix_cols)
n_steps_per_thread = n_total_steps / FLAGS.num_concurrent_steps n_steps_per_thread = n_total_steps / (
FLAGS.num_concurrent_steps * model.devices_number)
n_submatrices_to_train = model.n_submatrices * FLAGS.num_epochs n_submatrices_to_train = model.n_submatrices * FLAGS.num_epochs
t0 = [time.time()] t0 = [time.time()]
n_steps_between_status_updates = 100
status_i = [0]
status_lock = threading.Lock()
msg = ('%%%dd/%%d submatrices trained (%%.1f%%%%), %%5.1f submatrices/sec |'
' loss %%f') % len(str(n_submatrices_to_train))
def TrainingFn(): def TrainingFn():
for _ in range(int(n_steps_per_thread)): for _ in range(int(n_steps_per_thread)):
_, global_step = sess.run([model.train_op, model.global_step]) _, global_step, loss = sess.run((
n_steps_between_status_updates = 100 model.train_op, model.global_step, model.loss))
if (global_step % n_steps_between_status_updates) == 0:
show_status = False
with status_lock:
new_i = global_step // n_steps_between_status_updates
if new_i > status_i[0]:
status_i[0] = new_i
show_status = True
if show_status:
elapsed = float(time.time() - t0[0]) elapsed = float(time.time() - t0[0])
print('%d/%d submatrices trained (%.1f%%), %.1f submatrices/sec' % ( log(msg, global_step, n_submatrices_to_train,
global_step, n_submatrices_to_train,
100.0 * global_step / n_submatrices_to_train, 100.0 * global_step / n_submatrices_to_train,
n_steps_between_status_updates / elapsed)) n_steps_between_status_updates / elapsed, loss)
sys.stdout.flush()
t0[0] = time.time() t0[0] = time.time()
# Start training threads # Start training threads
...@@ -343,8 +405,9 @@ def main(_): ...@@ -343,8 +405,9 @@ def main(_):
# Write out vectors # Write out vectors
write_embeddings_to_disk(FLAGS, model, sess) write_embeddings_to_disk(FLAGS, model, sess)
#Shutdown # Shutdown
sess.close() sess.close()
log("Elapsed: %s", time.time() - start_time)
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment