Commit 7d1a83a9 authored by aiss's avatar aiss
Browse files

push Deepspeed 0.6.3 rocm version

parent ab5534fc
import copy
from re import I
from numpy import BUFSIZE
from deepspeed.env_report import SUCCESS
from enum import Flag
import json
import os
import subprocess
import sys
import threading
import time
from pathlib import Path
from typing import List
import hjson
from tqdm import tqdm
from ..utils import logger
from .constants import *
from .constants import AUTOTUNING, AUTOTUNING_METRIC_PATH
from .utils import get_val_by_key, search_error, was_interruptted
"""
thread-0: loop over experiment queue dispatching experiments if they become available
thread-N: start each experiment in its own thread
"""
import torch.distributed as dist
from datetime import datetime
TIMEOUT = 5
class ResourceManager:
def __init__(self,
args,
hosts,
num_gpus_per_node,
results_dir,
exps_dir,
arg_mappings):
self.results_dir = results_dir
self.exps_dir = exps_dir
self.nodes = []
self.num_gpus_per_node = num_gpus_per_node
for host in hosts:
self.nodes.append(Node(host, num_gpus_per_node))
self.experiment_queue = []
self.running_experiments = {}
self.finished_experiments = {}
self.experiment_count = 0
self.exp_paths = set()
self.args = args
self.arg_mappings = {}
if arg_mappings is not None:
for k, v in arg_mappings.items():
k = k.strip()
v = v.strip()
if k not in self.arg_mappings:
self.arg_mappings[k] = v
def schedule_experiments(self, exp_paths):
for exp_path in exp_paths:
if exp_path in self.exp_paths:
continue
else:
self.exp_paths.add(exp_path)
with open(exp_path, "r") as fd:
exp = hjson.load(fd)
exp["exp_id"] = self.experiment_count
self.experiment_count += 1
result_dir = exp["result_dir"] = os.path.join(
self.results_dir,
exp['name'])
if AUTOTUNING in exp["ds_config"]:
metric_file = os.path.join(result_dir, "metrics.json")
exp["ds_config"][AUTOTUNING][
AUTOTUNING_METRIC_PATH] = metric_file
stderr_file = os.path.join(result_dir, "stderr.log")
model_info_file = os.path.join(result_dir, "model_info.json")
metric_file = os.path.join(result_dir, "metrics.json")
# skip existing experiments (except for the ones that were interrupted)
if os.path.exists(result_dir) and os.path.exists(stderr_file):
if not was_interruptted(stderr_file):
err = search_error(stderr_file)
exp_id = exp["exp_id"]
self.finished_experiments[exp_id] = (exp, err)
if err or os.path.exists(metric_file) or os.path.exists(
model_info_file):
logger.info(
f"Skipping exp {exp['name']} whose result already exists"
)
continue
self.experiment_queue.append(exp)
def run_job(self, exp: dict, reservations):
exp_id = exp["exp_id"]
exp["master_port"] = self.args.master_port + exp_id
exp["result_dir"] = os.path.join(self.results_dir, exp['name'])
user_script = self.args.user_script
user_args = self.args.user_args
# overwrite the user arg in the arg_mappings
for key, val in self.arg_mappings.items():
nval = get_val_by_key(exp, key)
if nval and str(nval) != "auto":
if val in user_args:
idx = user_args.index(val)
user_args[idx + 1] = str(nval)
else:
user_args.append(val)
user_args.append(str(nval))
t = threading.Thread(target=run_experiment,
args=(exp,
reservations,
user_script,
user_args))
t.start()
self.running_experiments[exp_id] = (t, exp, reservations, time.time())
def experiment_check(self, pbar):
finished_exps = []
for exp_id, exp_data in self.running_experiments.items():
thread, exp_json, reservations, start_time = exp_data
logger.debug(f"Checking exp_id = {exp_id}, alive = {thread.is_alive()}")
thread.join(timeout=TIMEOUT)
if not thread.is_alive():
exp_dir = exp_json["result_dir"]
stderr_file = os.path.join(exp_dir, "stderr.log")
err = search_error(stderr_file)
finished_exps.append((exp_id, reservations))
self.finished_experiments[exp_id] = (exp_json, err)
duration = time.time() - start_time
logger.debug(f"Finished exp_id = {exp_id}, duration={duration:.2f} sec")
pbar.update(len(finished_exps))
for exp_id, reservations in finished_exps:
for reservation in reservations:
reservation.restore_slots()
self.running_experiments.pop(exp_id)
time.sleep(TIMEOUT)
def resource_request(self, exp):
num_gpus, num_nodes = exp['num_gpus'], exp['num_nodes']
slot_request = num_gpus
reservations = []
for node in self.nodes:
if num_nodes == 0:
break
slots = node.reserve_slots(slot_request=slot_request)
if slots:
reservations.append(Reservation(node=node, slots=slots))
num_nodes -= 1
if num_nodes == 0:
# request satisfied
return reservations
else:
# request not satisfied
for reservation in reservations:
reservation.restore_slots()
def status(self):
status = ""
for node in self.nodes:
status += f"{node.host} ({len(node.idle_slots)} idle gpus), "
return status[:-1]
def run(self):
pbar = tqdm(total=len(self.experiment_queue))
while len(self.experiment_queue) > 0:
exp = self.experiment_queue.pop(0)
logger.debug(f'Popped exp_id = {exp["exp_id"]} from the queue')
logger.debug(f'Resource status: {self.status()}')
reservations = self.resource_request(exp)
if not reservations:
logger.debug(f'Unable to schedule exp_id = {exp["exp_id"]}')
self.experiment_queue.insert(0, exp)
logger.debug(f'Put exp_id = {exp["exp_id"]} back into the queue')
self.experiment_check(pbar)
else:
desc = ""
for reservation in reservations:
reservation.slots.sort()
slots = ",".join(map(str, reservation.slots))
desc += f"{reservation.node.host}:{slots}@"
desc = desc[:-1]
logger.debug(f'Running exp_id = {exp["exp_id"]} on {desc}')
self.run_job(exp, reservations)
# All pending experiments are scheduled, waiting for them to complete
while len(self.running_experiments) > 0:
self.experiment_check(pbar)
def save_exp_results_to_database(self, message, ranks=None, path=None):
"""Print message when one of following condition meets
+ not dist.is_initialized()
+ dist.get_rank() in ranks if ranks is not None or ranks = [-1]
Args:
message (str)
ranks (list)
path (str)
"""
should_log = not dist.is_initialized()
ranks = ranks or []
my_rank = dist.get_rank() if dist.is_initialized() else -1
if ranks and not should_log:
should_log = ranks[0] == -1
should_log = should_log or (my_rank in set(ranks))
logger.debug(f"*** Should log: {should_log}")
if should_log:
message['rank'] = my_rank
with open(path, 'a') as outfile:
json.dump(message, outfile)
outfile.write('\n')
def parse_results(self, metric):
""" Parses the metric file of the finished experiments to select the optimal DeepSpeed configuration.
Args:
finished_experiments (dcit): a dictionary of experiment id and experiment description.
Returns:
The path to the result folder of the experiment with the optimal configuration.
"""
max_throughput = sys.float_info.min
best_exp_id = -1
for exp_id, (exp, err) in self.finished_experiments.items():
if err:
logger.info(
f"The experiment exp_id = {exp_id}, exp_name = {exp['name']}, did not run successfully with error = {err}, thus a metrics.txt does not exist for it. Check the stderr.log in {exp['result_dir']}"
)
continue
metric_file = exp["ds_config"][AUTOTUNING][AUTOTUNING_METRIC_PATH]
if os.path.exists(metric_file):
with open(metric_file, 'r') as f:
results = hjson.load(f)
curr_throughput = results[metric]
if curr_throughput > max_throughput:
max_throughput = curr_throughput
best_exp_id = exp_id
exp['results'] = results
if best_exp_id != -1:
best_exp, _ = self.finished_experiments[best_exp_id]
return best_exp, max_throughput
return exp, None
def clear(self):
"""Clear experiment queues, does not reset self.experiment_count
"""
self.experiment_queue = []
# clean up the running experiments
for exp_id, exp_data in self.running_experiments.items():
thread, exp_json, reservations, start_time = exp_data
clean_up(exp_json, reservations)
self.running_experiments = {}
self.finished_experiments = {}
self.exp_paths = set()
class Node:
def __init__(self, host, max_slots):
self.host = host
self.max_slots = max_slots
self.idle_slots = list(range(max_slots))
def reserve_slots(self, slot_request: int) -> list:
if len(self.idle_slots) >= slot_request:
return [self.idle_slots.pop(0) for _ in range(slot_request)]
def restore_slots(self, slots: list):
self.idle_slots += slots
class Reservation:
def __init__(self, node, slots):
self.node = node
self.slots = slots
def restore_slots(self):
self.node.restore_slots(self.slots)
def desc(self):
slots = ",".join(map(str, self.slots))
return f"{self.node.host}:{slots}@"
def get_job_id():
# Infrastructure-specific job-id
infra_job_id = None
if "DLWS_JOB_ID" in os.environ:
infra_job_id = os.environ["DLWS_JOB_ID"]
elif "DLTS_JOB_ID" in os.environ:
infra_job_id = os.environ["DLTS_JOB_ID"]
else:
infra_job_id = "unknown-job-id"
return infra_job_id
def get_user():
user = None
if "USER" in os.environ:
user = os.environ["USER"]
else:
user = "unknown-user"
return user
def run_experiment(exp: dict, reservations, user_script, user_args):
include_str = ""
for reservation in reservations:
reservation.slots.sort()
slots = ",".join(map(str, reservation.slots))
include_str += f"{reservation.node.host}:{slots}@"
include_str = include_str[:-1]
master_port = exp["master_port"]
exp["launcher_args"] = [
"--include",
f"{include_str}",
"--master_port",
str(master_port),
]
logger.debug(f'launcher args={exp["launcher_args"]}')
exp["user"] = get_user()
exp["job_id"] = get_job_id()
exp_dir = exp["result_dir"]
os.makedirs(exp_dir, exist_ok=True)
exp["ds_config_path"] = os.path.join(exp_dir, "ds_config.json")
ds_config = copy.deepcopy(exp["ds_config"])
with open(exp["ds_config_path"], "w", buffering=BUFSIZE) as fd:
json.dump(ds_config, fd)
fd.flush()
os.fsync(fd)
with open(os.path.join(exp_dir, "exp.json"), "w", buffering=BUFSIZE) as fd:
json.dump(exp, fd)
fd.flush()
os.fsync(fd)
# remove "--deepspeed_config ds_config.json" from user_args
if user_args:
if "--deepspeed_config" in user_args:
idx = user_args.index("--deepspeed_config")
# "--deepspeed_config" is omitted in HF
elif "--deepspeed" in user_args:
idx = user_args.index("--deepspeed")
assert idx < len(user_args) and ".json" in user_args[idx +
1], "there is no ds_config file specified after --deepspeed_config or --deepspeed"
user_args[idx + 1] = exp["ds_config_path"]
exp["user_script"] = user_script
exp["user_args"] = user_args
cmd = ["deepspeed"] + exp["launcher_args"] + [user_script] + user_args
assert len(exp["launcher_args"]) > 0, "must provide launcher args"
with open(os.path.join(exp_dir, "cmd.txt"), "w", buffering=BUFSIZE) as fd:
fd.write(" ".join(cmd))
fd.write("\n")
fd.flush()
os.fsync(fd)
logger.info(f"Launching exp_id = {exp['exp_id']}, exp_name = {exp['name']}")
with open(os.path.join(exp_dir, "stdout.log"), "wb") as out, open(
os.path.join(exp_dir, "stderr.log"), "wb"
) as err:
result = subprocess.Popen(cmd, stdout=out, stderr=err)
result.wait()
out.flush()
err.flush()
os.fsync(out)
os.fsync(err)
clean_up(exp, reservations)
logger.info(f"Done running exp_id = {exp['exp_id']}, exp_name = {exp['name']}")
PDSH_MAX_FAN_OUT = 1024
def clean_up(exp: dict, reservations):
env = os.environ.copy()
env['PDSH_RCMD_TYPE'] = 'ssh'
nodes_str = ""
for reservation in reservations:
nodes_str += f"{reservation.node.host},"
nodes_str = nodes_str[:-1]
logger.debug(
f"Cleaning up exp_id = {exp['exp_id']} on the following workers: {nodes_str}")
# PDSH flags for max node fan out and specific hosts to launch on
# See https://linux.die.net/man/1/pdsh for flag details
pdsh_cmd = ['pdsh', '-f', str(PDSH_MAX_FAN_OUT), '-w', nodes_str]
kill_cmd = [
'pkill',
'-f',
exp['name'],
]
cmd = pdsh_cmd + kill_cmd
logger.debug("cmd = {}".format(' '.join(cmd)))
result = subprocess.Popen(cmd, env=env)
result.wait()
# In case of failure must propagate the error-condition back to the caller (usually shell). The
# actual error and traceback should have been printed in the subprocess, so in order to avoid
# unnecessary noise we just quietly exit here with the same code as the subprocess
if result.returncode > 0:
sys.exit(result.returncode)
logger.info(
f"Done cleaning up exp_id = {exp['exp_id']} on the following workers: {nodes_str}"
)
# Tuner
`exps` is a list of experiment descriptions (dictionaries).
An experimentation description has a `ds_config` field that stores the DeepSpeed configuration to be used in the experiment.
A tuner is based on BaseTuner and at least implements the `next_batch` method. It can implement a different `tune` method from the BaseTuner's.
```python
class NewTuner(BaseTuner):
def __init__(self, exps: list, resource_manager):
super(NewTuner, self).__init__(exps, resource_manager)
def next_batch(self, sample_size=1):
pass
def tune(self): # if it differs from BaseTuner
pass
```
from .index_based_tuner import RandomTuner, GridSearchTuner
# from .ga_tuner import GATuner
from .model_based_tuner import ModelBasedTuner
import atexit
import sys
from deepspeed.autotuning.constants import *
from deepspeed.autotuning.utils import write_experiments
from deepspeed.utils import logger
import json
class BaseTuner:
def __init__(self, exps, resource_manager, metric):
self.all_exps = exps
self.rm = resource_manager
self.best_iter = 0
self.best_exp = None
self.best_metric_val = None
self.metric = metric if metric else AUTOTUNING_METRIC_DEFAULT
logger.info(f"total number of exps = {len(self.all_exps)}")
def has_next(self):
"""Whether there exists more configurations for evaluation"""
if len(self.all_exps) > 0:
return True
else:
return False
def next_batch(self, sample_size):
"""Select the next batch of configurations for evaluation"""
raise NotImplementedError
def update(self):
""""Update the tuner with what configurations have been evaluated and their performance results"""
def tune(self, sample_size=1, n_trials=1000, early_stopping=None):
i = 0
try:
while i < n_trials and self.has_next():
# Select the next batch of configuratiosn for evaluation
sampled_exps = self.next_batch(sample_size)
# Generate experiments for measurement of performance
exp_paths = write_experiments(sampled_exps, self.rm.exps_dir)
self.rm.schedule_experiments(exp_paths)
self.rm.run()
exp, metric_val = self.rm.parse_results(self.metric)
if self.best_exp == None or self.best_metric_val == None or (
metric_val and metric_val > self.best_metric_val):
# logger.info(f"tuner finds better = {exp}")
self.best_exp = exp
self.best_metric_val = metric_val
self.best_iter = i
i += len(sampled_exps)
# Update the tuner with evaluated performance results
self.update()
self.rm.clear()
# Early stop if no more promising configurations are likely to be found
if early_stopping and i >= self.best_iter + early_stopping:
logger.info(
f"Tuner early stopped at iteration {i}. Best iteration is {self.best_iter}. Early stopping threshold is {early_stopping}"
)
break
return i
except:
logger.info("Tunner Error:", sys.exc_info()[0])
return i
import numpy as np
from .utils import *
try:
import xgboost as xgb
except ImportError:
xgb = None
class XGBoostCostModel():
def __init__(self, loss_type, num_threads=None, log_interval=25, upper_model=None):
assert xgb is not None, "missing requirements, please install deepspeed w. 'autotuning_ml' extra."
self.loss_type = loss_type
if loss_type == "reg":
self.xgb_params = {
"max_depth": 3,
"gamma": 0.0001,
"min_child_weight": 1,
"subsample": 1.0,
"eta": 0.3,
"lambda": 1.0,
"alpha": 0,
"objective": "reg:linear",
}
elif loss_type == "rank":
self.xgb_params = {
"max_depth": 3,
"gamma": 0.0001,
"min_child_weight": 1,
"subsample": 1.0,
"eta": 0.3,
"lambda": 1.0,
"alpha": 0,
"objective": "rank:pairwise",
}
else:
raise RuntimeError("Invalid loss type: " + loss_type)
self.xgb_params["verbosity"] = 0
if num_threads:
self.xgb_params["nthread"] = num_threads
def fit(self, xs, ys):
x_train = np.array(xs, dtype=np.float32)
y_train = np.array(ys, dtype=np.float32)
y_max = np.max(y_train)
y_train = y_train / max(y_max, 1e-9)
index = np.random.permutation(len(x_train))
dtrain = xgb.DMatrix(x_train[index], y_train[index])
self.bst = xgb.train(self.xgb_params, dtrain)
def predict(self, xs):
features = xgb.DMatrix(xs)
return self.bst.predict(features)
import random
from deepspeed.utils import logger
from .base_tuner import BaseTuner
class RandomTuner(BaseTuner):
"""Explore the search space in random order"""
def __init__(self, exps: list, resource_manager, metric):
super().__init__(exps, resource_manager, metric)
def next_batch(self, sample_size=1):
if sample_size > len(self.all_exps):
sample_size = len(self.all_exps)
sampled_batch = random.sample(self.all_exps, sample_size)
self.all_exps = [x for x in self.all_exps if x not in sampled_batch]
return sampled_batch
class GridSearchTuner(BaseTuner):
"""Explore the search space in sequential order"""
def __init__(self, exps: list, resource_manager, metric):
super().__init__(exps, resource_manager, metric)
def next_batch(self, sample_size=1):
if sample_size > len(self.all_exps):
sample_size = len(self.all_exps)
sampled_batch = self.all_exps[0:sample_size]
self.all_exps = [x for x in self.all_exps if x not in sampled_batch]
return sampled_batch
import hjson
import numpy as np
from deepspeed.utils import logger
from ..constants import AUTOTUNING, AUTOTUNING_METRIC_PATH, AUTOTUNING_METRIC_DEFAULT
from .base_tuner import BaseTuner
from .cost_model import XGBoostCostModel
from .utils import *
from ..utils import *
import numbers
from ..constants import AUTOTUNING_METRIC_LATENCY
INIT_NUM = 2
class ModelBasedTuner(BaseTuner):
"""Exploring the search space with a cost model"""
def __init__(self, exps: list, resource_manager, metric, tuning_sapce):
super().__init__(exps, resource_manager, metric)
self.tuning_space = tuning_sapce
self.best_iter = 0
self.all_configs = [e['ds_config'] for e in exps]
self.num_all_configs = len(self.all_configs)
self.dims = dict_to_dims(self.tuning_space)
logger.info(
f"Create config dim: {self.dims}, all configs: {self.num_all_configs}")
self.visited = set([])
self.trials = []
self.trial_pt = 0
init_num = min(INIT_NUM, self.num_all_configs)
for _ in range(init_num):
exp_feature = np.random.randint(self.num_all_configs)
exp_feature = 0
while exp_feature in self.visited:
exp_feature = np.random.randint(self.num_all_configs)
self.trials.append(exp_feature)
self.visited.add(exp_feature)
self.cost_model = XGBoostCostModel("rank")
self.evaluated_configs = []
self.evaluated_perf = []
self.train_ct = 0
self.random_exploration_ratio = 0.2 # do random exploration
def find_estimated_top_configs(self):
"""Use the cost model to predict the estimated performance of configurations and find the top ones for the next round of evaluation"""
configs = []
for c in self.all_configs:
flattened_ds_config = flatten(c)
feature_val = []
for k, v in flattened_ds_config.items():
if isinstance(v, numbers.Number):
feature_val.append(v)
configs.append(feature_val)
# print(configs)
# TODO the current implementation requires that all configs have the same shape.
configs = np.array(configs, dtype=np.float32)
estimates = self.cost_model.predict(configs)
n = len(estimates)
top_idx = np.argsort(estimates)
top_idx_ret = top_idx if self.metric == AUTOTUNING_METRIC_LATENCY else top_idx[::
-1][:
n]
# top_configs = [self.all_configs[i] for i in top_idx]
return top_idx_ret
def next_batch(self, sample_size):
sampled_batch = []
counter = 0
while counter < sample_size:
if len(self.visited) >= self.num_all_configs:
break
while self.trial_pt < len(self.trials):
logger.debug(f"trials: {self.trials}")
# Select top promising trials
index = self.trials[self.trial_pt]
if index not in self.visited:
break
self.trial_pt += 1
# To avoid over-exploitation, randomly select one that has not been explored.
rand = np.random.rand()
if rand < self.random_exploration_ratio:
# Do normal selection
feature = np.random.choice(self.trials)
while index in self.visited:
index = np.random.randint(self.num_all_configs)
# Need to track both the sampled configs and indices
sampled_batch.append(self.all_exps[index])
self.visited.add(index)
counter += 1
return sampled_batch
def has_next(self):
return len(self.visited) < self.num_all_configs
def update(self):
for exp_id, (exp, err) in self.rm.finished_experiments.items():
feature_val = []
if err:
logger.info(
f"Skipping exp_id = {exp_id}, exp_name = {exp['name']}, the experiment did not run successfully with error = {err}, thus a metrics.txt does not exist for it. Please check the stderr.log in {exp['result_dir']}"
)
ds_config = exp["ds_config"]
flattened_ds_config = flatten(ds_config)
for k, v in flattened_ds_config.items():
if isinstance(v, numbers.Number):
feature_val.append(v)
self.evaluated_configs.append(feature_val)
self.evaluated_perf.append(0.0)
continue
p = exp["ds_config"][AUTOTUNING][AUTOTUNING_METRIC_PATH]
with open(p, 'r') as f:
results = hjson.load(f)
curr_iter = results[self.metric]
logger.debug(f"parsing the results for {exp_id}, Result is {curr_iter}")
ds_config = exp["ds_config"]
flattened_ds_config = flatten(ds_config)
for k, v in flattened_ds_config.items():
if isinstance(v, numbers.Number):
feature_val.append(v)
self.evaluated_configs.append(feature_val)
self.evaluated_perf.append(curr_iter)
logger.debug(
f"**Evaluated configs: {len(self.evaluated_configs)}, evaluated perf: {self.evaluated_perf}"
)
self.cost_model.fit(self.evaluated_configs, self.evaluated_perf)
estimated_top_configs = self.find_estimated_top_configs()
self.trials = estimated_top_configs
self.trial_pt = 0
self.train_ct += 1
import numpy as np
import itertools
from ..utils import *
import collections.abc
def index_to_feature(p, dims):
"""convert index form (single integer) to feature form (vector)"""
feature = []
for dim in dims:
feature.append(p % dim)
p //= dim
return feature
def feature_to_index(feature, dims):
"""convert feature form (vector) to index form (single integer)"""
p = 0
for j, k in enumerate(feature):
print("j:", "k:", k, "dims", dims[:j])
p += int(np.prod(dims[:j])) * k
return p
def dict_to_dims(tuning_space):
dims = []
for key, val in tuning_space.items():
if isinstance(val, dict):
dims.extend(dict_to_dims(val))
elif isinstance(val, list):
dims.append(len(val))
else:
dims.append(1)
return dims
def gen_combinations(d: dict):
keys, values = d.keys(), d.values()
for v in values:
if not isinstance(v, list):
v = [v]
values_choices = (gen_combinations(v) if isinstance(v,
dict) else get_list(v)
for v in values)
for comb in itertools.product(*values_choices):
yield dict(zip(keys, comb))
def flatten(d, parent_key='', sep='_'):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, collections.abc.MutableMapping):
items.extend(flatten(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def dict_to_feature(feature_dict, keys, max_value=None):
"""Extract values from dict"""
feature = []
for key, val in feature_dict.items(): # First level
if key not in keys:
continue
if val is None or val == "auto" or key == "autotuning" or val == "":
continue
if isinstance(val, dict):
feature.append(dict_to_feature(val, max_value))
else:
feature.append(float(val))
# normalization, should not matter in tree models
if max_value is not None:
norm_feature = []
for f, mv in zip(feature, max_value):
norm_feature.append(f / mv)
feature = norm_feature
return feature
import re
import collections.abc
import os
import json
from deepspeed.runtime.constants import GRADIENT_ACCUMULATION_STEPS, TRAIN_MICRO_BATCH_SIZE_PER_GPU
import hjson
import sys
import itertools
import copy
from ..utils import logger
def search_error(filename):
if not os.path.exists(filename):
return "stderr.log does not exist"
with open(filename) as f:
for line in f:
for s in ["Error", "error", "ERROR"]:
idx = line.find(s)
if idx != -1:
return line[idx + len(s):].lstrip(": ")
return None
def was_interruptted(filename):
if not os.path.exists(filename):
return "stderr.log does not exist"
with open(filename) as f:
for line in f:
s = "KeyboardInterrupt"
idx = line.find(s)
if idx != -1:
return True
return False
def was_interruptted(filename):
if not os.path.exists(filename):
return "stderr.log does not exist"
with open(filename) as f:
for line in f:
s = "KeyboardInterrupt"
idx = line.find(s)
if idx != -1:
return True
return False
def find_replace_str(value, replace_dict):
if not isinstance(value, str):
return str(value)
matches = re.findall("\$[A-Za-z0-9_]+", value)
for var in matches:
var_key = var.replace("$", "").lower()
if var_key == "nvme_path":
continue
assert var_key in replace_dict, f"unknown var key: {var_key}, in {replace_dict}"
if isinstance(replace_dict[var_key], str):
value = value.replace(var, replace_dict[var_key])
else:
assert len(matches) == 1, "unable to replace multiple non-string matches"
value = replace_dict[var_key]
return value
def find_replace(target, replace_dict):
if isinstance(target, dict):
for key, value in target.items():
if isinstance(value, str):
target[key] = find_replace_str(value, replace_dict)
if isinstance(value, list):
for i in range(len(value)):
value[i] = find_replace_str(value[i], replace_dict)
if isinstance(value, dict):
find_replace(value, replace_dict)
elif isinstance(target, list):
for i in range(len(target)):
target[i] = str(find_replace_str(target[i], replace_dict))
def get_list(val):
if not isinstance(val, list):
return [val]
else:
return val
def combine_dict(d, u):
for k, v in u.items():
if isinstance(v, collections.abc.Mapping):
d[k] = combine_dict(d.get(k, {}), v)
else:
if k not in d:
d[k] = v
else:
if not isinstance(d[k], list):
d[k] = [d[k]]
d[k].extend(i for i in get_list(v) if i not in d[k])
return d
def del_if_exists(t, d):
"""Deletes a key from a dictionary if it exists.
Args:
t (string): target key to delete
d (dict): dictionary to delete from
"""
if t in d:
del d[t]
return
for k, v in d.items():
if isinstance(v, collections.abc.Mapping):
del_if_exists(t, v)
def replace_dict(d, u, ignored_keys=[]):
"""Replaces values in dict d with values in dict u.
Args:
d (dict): the target dict to overwrite
u (dict): the dict containing the values to overwrite the target dict
Returns:
dict d with values overwritten by the corresponding ones in dict u.
"""
if u is not None:
for k, v in u.items():
if k not in ignored_keys:
if v is None:
del_if_exists(k, d)
continue
if isinstance(v, collections.abc.Mapping):
d[k] = replace_dict(d.get(k, {}), v, ignored_keys)
else:
d[k] = v
return d
def get_val_by_key(d: dict, k):
if k in d:
return d[k]
for v in d.values():
if isinstance(v, dict):
return get_val_by_key(v, k)
return None
def set_val_by_key(d: dict, k, vv):
if k in d:
d[k] = vv
for v in d.values():
if isinstance(v, dict):
set_val_by_key(v, k, vv)
def fetch_hostfile(hostfile_path):
if not os.path.isfile(hostfile_path):
logger.warning("Unable to find hostfile, will proceed with training "
"with local resources only.")
return None
# e.g., worker-0 slots=16
with open(hostfile_path, 'r') as fd:
resource_pool = collections.OrderedDict()
for line in fd.readlines():
line = line.strip()
if line == '':
# skip empty lines
continue
try:
hostname, slots = line.split()
_, slot_count = slots.split("=")
slot_count = int(slot_count)
except ValueError as err:
logger.error("Hostfile is not formatted correctly, unable to "
"proceed with training.")
raise err
if hostname in resource_pool:
logger.error("Hostfile contains duplicate hosts, unable to "
"proceed with training.")
raise ValueError("host {} is already defined".format(hostname))
resource_pool[hostname] = slot_count
return resource_pool
def validate_ds_config(config: dict):
def is_False(config: dict, key):
if config is None:
return False
return bool(config.get(key))
config_zero = config.get("zero_optimization", {})
if not config_zero:
return True
stage = config_zero.get("stage")
offload = False
if stage == 1:
return True
elif stage == 2:
if is_False(config_zero,
"cpu_offload") and is_False(config_zero,
"cpu_offload_params"):
return False
elif stage == 3:
offload_devices = ["cpu", "nvme"]
if config_zero.get("offload_optimizer", {}).get("device") in offload_devices:
offload = True
if config_zero.get("offload_param", {}).get("device") in offload_devices:
offload = True
else:
return True
# HF requires that "ZeRO Offload can only work with DeepSpeed optimizers"
if offload and not config.get("optimizer"):
return False
return True
def remove_dupe_dicts(l):
""" Removes duplicate dictionaries from a list. Uses list comprehension and the json library to sort and stringify each dictionary and the set data type to ensure unique values. Works with nested data structures.
Args:
l (list): a list of (nested) data structures.
Returns:
A list of unique values.
"""
list_of_strings = [json.dumps(d, sort_keys=True) for d in l]
list_of_strings = set(list_of_strings)
return [json.loads(s) for s in list_of_strings]
def prune_config(config, ignored_keys=[]):
""" Prunes the input configurations
Args:
configs (dict): A configuration dictionary.
ignored_keys (list, optional): the keys of the sections to delete. Defaults to [].
Returns:
A configuration dictionary.
"""
if ignored_keys:
for k in ignored_keys:
def find_del_key(d: dict, k: str):
if k in d:
del d[k]
else:
for dd in d.values():
if isinstance(dd, dict):
find_del_key(dd, k)
find_del_key(config, k)
def prune_configs(configs, ignored_keys=[]):
""" Prunes the input list of configurations
Args:
configs (list): A list of configuration dictionaries.
ignored_keys (list, optional): the keys of the sections to delete. Defaults to [].
Returns:
A list of valid and unique configuration dictionaries.
"""
pruned_list = []
for config in configs:
prune_config(config, ignored_keys)
pruned_list.append(config)
return remove_dupe_dicts(pruned_list)
def get_tuning_keys(tuning_space: dict):
"""Outputs the list of tunnable parameters in the tuning space dict.
Args:
tuning_space (dict): a configuration dictionary containing tunable parameters as lists of values.
Returns:
A list of strings
"""
tuning_keys = []
for key, val in tuning_space.items():
if isinstance(val, dict):
tuning_keys.extend(get_tuning_keys(val))
if isinstance(val, list) and len(val) > 1:
tuning_keys.append(key)
return tuning_keys
def get_all_configs(tuning_space: dict, ignore_keys=None):
""" Splits the tuning space dictionary to result in all combinations of values.
Args:
tuning_space (dict): the tuning space where tunable parameters are lists of values.
"""
def gen_combinations(d: dict):
keys, values = d.keys(), d.values()
for v in values:
if not isinstance(v, list):
v = [v]
values_choices = (gen_combinations(v) if isinstance(v,
dict) else get_list(v)
for v in values)
for comb in itertools.product(*values_choices):
yield dict(zip(keys, comb))
all_configs = []
ignored_key_vals = {}
for ik in ignore_keys:
ignored_key_vals[ik] = tuning_space.get(ik, {})
del_if_exists(ik, tuning_space)
for c in gen_combinations(tuning_space):
replace_dict(c, ignored_key_vals)
all_configs.append(c)
return all_configs
def canonical_name(config: dict, tuning_keys=None, prefix="", omit_val=False):
""" Generates a name from the acronyms of the tuning keys in the config dict. TRAIN_MICRO_BATCH_SIZE_PER_GPU is always included in the tuning keys.
Args:
config (dict): the config dict used to generate the name
tuning_keys (list, optional): the tuning keys used to generate the name. Defaults to None.
prefix (str, optional): a string added to the beginning of the name. Defaults to None.
"""
if TRAIN_MICRO_BATCH_SIZE_PER_GPU not in tuning_keys:
tuning_keys.append(TRAIN_MICRO_BATCH_SIZE_PER_GPU)
if GRADIENT_ACCUMULATION_STEPS not in tuning_keys:
tuning_keys.append(GRADIENT_ACCUMULATION_STEPS)
tuning_keys.sort()
def get_offload_name(offload_config):
cname = ""
if offload_config is None:
return "None_"
for key, val in offload_config.items():
key = "".join(map(lambda c: c[0], key.split('_')))
if (isinstance(val, int) or isinstance(val, float)) and val > 9000:
cname += key + '{:.1e}'.format(val) + "_"
else:
if isinstance(val, bool):
val = "T" if val else "F"
cname += f"{key}{val}_"
return cname
def get_name_by_keys(config: dict, tuning_keys=None, omit_val=False):
cname = ""
if not tuning_keys or config is None:
return cname
for key, val in config.items():
# skip the arg_mappings section when naming the exp file
if key == "arg_mappings":
continue
if key == "offload_param":
cname += "op_"
if not omit_val:
cname += get_offload_name(val)
continue
if key == "offload_optimizer":
cname += "oo_"
if not omit_val:
cname += get_offload_name(val)
continue
# recursively call the func to get name for the child dicts
if isinstance(val, dict):
n = get_name_by_keys(val, tuning_keys, omit_val=omit_val)
if n != "":
cname += n + "_"
if tuning_keys and key not in tuning_keys:
continue
key_str = "".join(map(lambda c: c[0], key.split('_')))
if not omit_val:
if (isinstance(val, int) or isinstance(val, float)) and val > 9000:
cname += key_str + '{:.1e}'.format(val) + "_"
else:
if isinstance(val, bool):
val = "T" if val else "F"
cname += f"{key_str}{val}_"
else:
cname += key_str + "_"
return cname[:-1]
name = get_name_by_keys(config, tuning_keys, omit_val=omit_val)
return prefix + (name if name != "" else "exp")
def get_first_config(config: dict):
if not config:
return None
cfg = copy.deepcopy(config)
for key, val in cfg.items():
if isinstance(val, dict):
if key == "optimizer": # use user defined optimizer which might have lists of values as params
cfg[key] = val
else:
cfg[key] = get_first_config(val)
if isinstance(val, list) and len(val) > 0:
cfg[key] = val[0]
return cfg
def write_experiments(exps: list, exps_dir: str):
exp_paths = []
for exp in exps:
exp_name = exp['name']
# write the expr config to a json file
exp_path = os.path.join(exps_dir, f'{exp_name}.json')
with open(exp_path, 'w') as fd:
json.dump(exp, fd)
exp_paths.append(exp_path)
return exp_paths
def memory_to_string(n, postfix="", units=None, precision=2):
if units is None:
if n // 10**12 > 0:
return str(round(n / 1024**4, precision)) + " T" + postfix
if n // 10**9 > 0:
return str(round(n / 1024**3, precision)) + " G" + postfix
elif n // 10**6 > 0:
return str(round(n / 1024**2, precision)) + " M" + postfix
elif n // 10**3 > 0:
return str(round(n / 1014, precision)) + " K" + postfix
else:
return str(n) + " "
else:
if units == "T":
return str(round(n / 1024**4, precision)) + " " + units
if units == "G" + postfix:
return str(round(n / 1024**3, precision)) + " " + units
elif units == "M" + postfix:
return str(round(n / 1024**2, precision)) + " " + units
elif units == "K" + postfix:
return str(round(n / 1024, precision)) + " " + units
else:
return str(n) + " "
def number_to_string(n, postfix="", units=None, precision=2):
if units is None:
if n // 10**9 > 0:
return str(round(n / 1000**3, precision)) + " B" + postfix
if n // 10**6 > 0:
return str(round(n / 1000**2, precision)) + " M" + postfix
elif n // 10**3 > 0:
return str(round(n / 1000**1, precision)) + " K" + postfix
else:
return str(n) + " "
else:
if units == "B" + postfix:
return str(round(n / 1000**3, precision)) + " " + units
elif units == "M" + postfix:
return str(round(n / 1000**2, precision)) + " " + units
elif units == "K" + postfix:
return str(round(n / 1000**1, precision)) + " " + units
else:
return str(n) + " "
'''
Various symbolic constants used for model checkpointing
'''
#########################################
# Optimizer checkpoint keys
#########################################
OPTIMIZER_STATE_DICT = "optimizer_state_dict"
FP32_GROUPS = "fp32_groups"
FP32_FLAT_GROUPS = 'fp32_flat_groups'
BASE_OPTIMIZER_STATE = 'base_optimizer_state'
SINGLE_PARTITION_OF_FP32_GROUPS = "single_partition_of_fp32_groups"
GROUPS_PADDING = 'groups_padding'
PARTITION_COUNT = 'partition_count'
ZERO_STAGE = 'zero_stage'
CLIP_GRAD = 'clip_grad'
#########################################
# Module checkpoint keys
#########################################
PARAM_SHAPES = 'param_shapes'
BUFFER_NAMES = 'buffer_names'
DS_VERSION = 'ds_version'
......@@ -14,3 +14,6 @@ TORCH_DISTRIBUTED_DEFAULT_PORT = 29500
# To make an attempt at backwards compatibility with THD, we use an
# extraordinarily high default timeout, given that THD did not have timeouts.
default_pg_timeout = timedelta(minutes=30)
INFERENCE_GENERIC_MODE = 'generic'
INFERENCE_SPECIALIZED_MODE = 'specialized'
......@@ -10,21 +10,18 @@ class ElasticityError(Exception):
"""
Base exception for all elasticity related errors
"""
pass
class ElasticityConfigError(ElasticityError):
"""
Elasticity configuration error
"""
pass
class ElasticityIncompatibleWorldSize(ElasticityError):
"""
Attempting to run a world size that is incompatible with a given elastic config
"""
pass
class ElasticityConfig:
......
......@@ -16,7 +16,7 @@ Elasticity should be enabled as:
"max_train_batch_size": 2000,
"micro_batch_sizes": [2,4,6],
"min_gpus": 1,
"max_gpus" : 10000
"max_gpus" : 10000,
"min_time": 20,
"prefer_larger_batch": true,
"ignore_non_elastic_batch_info": false,
......
......@@ -6,6 +6,8 @@ import re
import json
import numpy as np
from packaging import version as pkg_version
from .config import ElasticityConfig, ElasticityConfigError, ElasticityError, \
ElasticityIncompatibleWorldSize
from .constants import ELASTICITY, ENABLED, ENABLED_DEFAULT, LATEST_ELASTICITY_VERSION, \
......@@ -60,17 +62,16 @@ HCN_LIST = [
def get_candidate_batch_sizes(base_list, max_acceptable_batch_size):
candidate_batch_size = []
#brute force is fine here. We are working with very small lists
for base in base_list:
batch_size = base
for hcn in HCN_LIST:
new_batch_size = base * hcn
if new_batch_size > max_acceptable_batch_size:
break
batch_size = new_batch_size
candidate_batch_size.append(batch_size)
return list(set(candidate_batch_size))
if base >= max_acceptable_batch_size:
candidate_batch_size.append(base)
else:
value = max_acceptable_batch_size // base
index = np.argmax(np.asarray(HCN_LIST) > value)
candidate_batch_size.append(HCN_LIST[index - 1] * base)
candidate_batch_size = list(set(candidate_batch_size))
logger.info(f"Candidate batch size: {candidate_batch_size}")
return candidate_batch_size
def get_valid_gpus(batch_size, micro_batches, min_valid_gpus, max_valid_gpus):
......@@ -82,12 +83,17 @@ def get_valid_gpus(batch_size, micro_batches, min_valid_gpus, max_valid_gpus):
if max_gpus >= min_valid_gpus and max_gpus <= max_valid_gpus:
valid_gpus.append(max_gpus)
# find all factors less than max_gpus / 2
for i in range(1, max_gpus // 2 + 1):
if i > max_valid_gpus:
break
if i < min_valid_gpus:
continue
if max_gpus % i == 0:
if i >= min_valid_gpus and i <= max_valid_gpus:
valid_gpus.append(i)
valid_gpus = set(valid_gpus)
valid_gpus = sorted(list(valid_gpus))
logger.info(f"Valid GPUs: {valid_gpus}")
return valid_gpus
......@@ -141,16 +147,12 @@ def _get_compatible_gpus_v01(micro_batches,
final_batch_size
valid_gpus
'''
min_gpus = min_gpus or 1
max_gpus = max_gpus or max_acceptable_batch_size // min(micro_batches)
if min_gpus is None:
min_gpus = int(1)
if max_gpus is None:
max_gpus = int(max_acceptable_batch_size / min(micro_batches))
assert all(mb <= max_acceptable_batch_size for mb in micro_batches ), \
f"All micro batches must be less than \
or equal to max_acceptable_batch_size: {max_acceptable_batch_size}"
if not all(mb <= max_acceptable_batch_size for mb in micro_batches):
raise ValueError(f"All micro batches must be less than \
or equal to max_acceptable_batch_size: {max_acceptable_batch_size}")
lcm = np.lcm.reduce(micro_batches)
......@@ -171,29 +173,13 @@ def _get_compatible_gpus_v01(micro_batches,
return final_batch_size, valid_gpus
def _parse_version(version_str):
'''Parse a version string and extract the major and minor versions (and possibly patch version).'''
matched = re.search('^(\d+)\.(\d+)\.(\d+)', version_str)
if matched:
return int(matched.group(1)), int(matched.group(2)), int(matched.group(3))
else:
matched = re.search('^(\d+)\.(\d+)', version_str)
assert matched != None, "Unable to parse version number, expecting" \
f"major.minor[.patch] format but received {version_str}"
return int(matched.group(1)), int(matched.group(2)), 0
def _compatible_ds_version_check(target_deepspeed_version: str):
min_major, min_minor, min_patch = _parse_version(MINIMUM_DEEPSPEED_VERSION)
trg_major, trg_minor, trg_patch = _parse_version(target_deepspeed_version)
min_version = pkg_version.parse(MINIMUM_DEEPSPEED_VERSION)
target_version = pkg_version.parse(target_deepspeed_version)
err_str = f"Target deepspeed version of {target_deepspeed_version} is not compatible " \
f"with minimum version {MINIMUM_DEEPSPEED_VERSION} supporting elasticity."
if trg_major < min_major:
raise ElasticityError(err_str)
if trg_minor < min_minor:
raise ElasticityError(err_str)
if trg_patch < min_patch:
if target_version < min_version:
raise ElasticityError(err_str)
return True
......
import torch
import deepspeed
import subprocess
import argparse
from .ops.op_builder import ALL_OPS
from .git_version_info import installed_ops, torch_info
from .ops import __compatible_ops__ as compatible_ops
......@@ -20,7 +21,7 @@ okay = f"{GREEN}[OKAY]{END}"
warning = f"{YELLOW}[WARNING]{END}"
def op_report():
def op_report(verbose=True):
max_dots = 23
max_dots2 = 11
h = ["op name", "installed", "compatible"]
......@@ -43,7 +44,7 @@ def op_report():
no = f"{YELLOW}[NO]{END}"
for op_name, builder in ALL_OPS.items():
dots = "." * (max_dots - len(op_name))
is_compatible = OKAY if builder.is_compatible() else no
is_compatible = OKAY if builder.is_compatible(verbose) else no
is_installed = installed if installed_ops[op_name] else no
dots2 = '.' * ((len(h[1]) + (max_dots2 - len(h[1]))) -
(len(is_installed) - color_len))
......@@ -78,6 +79,11 @@ def nvcc_version():
def debug_report():
max_dots = 33
hip_version = None
if hasattr(torch.version, 'hip'):
hip_version = torch.version.hip
report = [
("torch install path",
torch.__path__),
......@@ -85,25 +91,51 @@ def debug_report():
torch.__version__),
("torch cuda version",
torch.version.cuda),
("torch hip version",
hip_version),
("nvcc version",
nvcc_version()),
(None if hip_version else nvcc_version())),
("deepspeed install path",
deepspeed.__path__),
("deepspeed info",
f"{deepspeed.__version__}, {deepspeed.__git_hash__}, {deepspeed.__git_branch__}"
),
("deepspeed wheel compiled w.",
f"torch {torch_info['version']}, cuda {torch_info['cuda_version']}"),
f"torch {torch_info['version']}, " +
(f"hip {torch_info['hip_version']}"
if hip_version else f"cuda {torch_info['cuda_version']}")),
]
print("DeepSpeed general environment info:")
for name, value in report:
print(name, "." * (max_dots - len(name)), value)
def main():
op_report()
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
'--hide_operator_status',
action='store_true',
help=
'Suppress display of installation and compatibility statuses of DeepSpeed operators. '
)
parser.add_argument('--hide_errors_and_warnings',
action='store_true',
help='Suppress warning and error messages.')
args = parser.parse_args()
return args
def main(hide_operator_status=False, hide_errors_and_warnings=False):
if not hide_operator_status:
op_report(verbose=not hide_errors_and_warnings)
debug_report()
def cli_main():
args = parse_arguments()
main(hide_operator_status=args.hide_operator_status,
hide_errors_and_warnings=args.hide_errors_and_warnings)
if __name__ == "__main__":
main()
......@@ -14,4 +14,4 @@ except ModuleNotFoundError:
from .ops.op_builder import ALL_OPS
installed_ops = dict.fromkeys(ALL_OPS.keys(), False)
compatible_ops = dict.fromkeys(ALL_OPS.keys(), False)
torch_info = {'version': "0.0", "cuda_version": "0.0"}
torch_info = {'version': "0.0", "cuda_version": "0.0", "hip_version": "0.0"}
from .engine import InferenceEngine
'''
Copyright 2021 The Microsoft DeepSpeed Team
'''
import torch
import os
from torch.nn.modules import Module
import torch.distributed as dist
from ..runtime.state_dict_factory import SDLoaderFactory
from ..runtime.weight_quantizer import WeightQuantization
from ..module_inject.replace_module import replace_transformer_layer
from ..utils import logger, init_distributed
from ..pipe import PipelineModule
from ..moe.utils import has_moe_layers
from ..moe.layer import MoE
import torch.distributed as dist
import deepspeed.utils.groups as groups
DS_INFERENCE_ENABLED = False
class InferenceEngine(Module):
inference_mp_group = None
inference_ep_group = None
expert_mp_group = None
def __init__(self,
model,
triangular_masking=True,
mp_size=1,
training_mp_size=1,
ep_size=1,
mpu=None,
ep_group=None,
expert_mp_group=None,
checkpoint=None,
dtype=None,
injection_dict=None,
return_tuple=True,
replace_method='auto',
quantization_setting=None,
replace_with_kernel_inject=False,
moe=False,
moe_experts=1,
moe_type='standard',
config=None):
"""
Args:
model: torch.nn.Module
mp_size: model-parallel size
mpu: model-parallel unit (used for Megatron-type models)
checkpoint: the json-path, showing the address of model-checkpoints
Example: {type: 'Megatron', 'checkpoints': [ckpt_mp0.pt, ckpt_mp1.pt], 'version': 1.0}
dtype: data-type by which inference is executed
injection_dict: the dictionary that shows the injection policy:
Example: {BertLayer: HFBertLayerPolicy}
return_tuple: if true, inference-API returns a tuple, otherwise a tensor
replace_method: the injection method, this can be passed as auto if no injection-policy is defined, in which case the injection is automatic based on the available policies
quantization_setting:
one of None, Tuple(mlp_extra_grouping, quantize_groups), quantize_groups
replace_with_kernel_inject: this flag need to be set to true to inject inference kernels for models such as, Bert, GPT2, GPT-Neo and GPT-J. Otherwise,
the injection_dict provides the names of two linear layers as a tuple: (attention_output projection, transformer output projection)
"""
global DS_INFERENCE_ENABLED
DS_INFERENCE_ENABLED = True
super().__init__()
self.module = model
self._get_model_config_generate(config)
self.mp_world_size = mp_size
self.checkpoint = checkpoint
self.dtype = dtype
self.injection_dict = injection_dict
self.mp_group = None
self.mpu = mpu
self._validate_args(mpu)
self.replace_method = replace_method
self.quantize_merge_count = 1
self.quantization_scales = None
self.triangular_masking = triangular_masking
self.ep_size = ep_size
self.ep_group = ep_group
self.expert_mp_group = expert_mp_group
self._init_quantization_setting(quantization_setting)
if self.checkpoint:
self._load_checkpoint(self.checkpoint)
# convert model to intended dtype
if self.dtype:
self._convert_to_dtype()
if self.mpu:
self.mp_world_size = dist.get_world_size(
group=self.mpu.get_model_parallel_group())
self.mp_group = mpu.get_model_parallel_group()
elif self.mp_world_size > 1:
self._create_model_parallel_group()
moe, _ = has_moe_layers(self.module)
if moe and dist.get_world_size() > 1:
self._create_ep_parallel_group(moe_experts)
if self.injection_dict:
for client_module, injection_policy in self.injection_dict.items():
self._apply_injection_policy(client_module,
injection_policy,
return_tuple,
replace_with_kernel_inject,
moe,
moe_experts,
moe_type,
training_mp_size)
elif replace_method == 'auto':
self._apply_injection_policy(
return_tuple=return_tuple,
replace_with_kernel_inject=replace_with_kernel_inject,
moe=moe,
moe_experts=moe_experts,
moe_type=moe_type,
training_mp_size=training_mp_size)
device = torch.cuda.current_device()
logger.info(f"Place model to device: {device}")
self.module.to(device)
if self.mp_world_size > 1:
self.model_orig_fwd = self.module.forward
self.module.forward = self.forward
else:
self.module.register_forward_pre_hook(self._pre_forward_hook)
def _get_model_config_generate(self, config):
self.config = getattr(self.module, 'config', None) if config is None else config
self.generate = getattr(self.module, 'generate', None)
def _create_model_parallel_group(self):
# Call the init process
if InferenceEngine.inference_mp_group is None:
init_distributed()
local_rank = int(os.getenv('LOCAL_RANK', '0'))
torch.cuda.set_device(local_rank)
ranks = [i for i in range(self.mp_world_size)]
self.mp_group = dist.new_group(ranks)
InferenceEngine.inference_mp_group = self.mp_group
else:
self.mp_group = InferenceEngine.inference_mp_group
def _create_ep_parallel_group(self, moe_experts):
# Call the init process
self.ep_group = {}
self.expert_mp_group = {}
moe_experts = moe_experts if type(moe_experts) is list else [moe_experts]
for e in moe_experts:
self.ep_group.update({e: None})
self.expert_mp_group.update({e: None})
for moe_ep_size in self.ep_group.keys():
num_ep_groups = dist.get_world_size() // moe_ep_size
for i in range(num_ep_groups):
ep_cnt = i * moe_ep_size
size = dist.get_world_size(
) if moe_ep_size > dist.get_world_size() else moe_ep_size
ranks = list(range(ep_cnt, ep_cnt + size))
_ep_group = dist.new_group(ranks)
if dist.get_rank() in ranks:
self.ep_group.update({moe_ep_size: _ep_group})
if dist.get_world_size() > moe_ep_size:
num_expert_mp_groups = dist.get_world_size() // num_ep_groups
expert_mp_size = dist.get_world_size() // moe_ep_size
for i in range(num_expert_mp_groups):
expert_mp_comm_ranks = [
i + nr * moe_ep_size for nr in range(expert_mp_size)
]
_expert_mp_group = dist.new_group(expert_mp_comm_ranks)
if dist.get_rank() in expert_mp_comm_ranks:
self.expert_mp_group.update({moe_ep_size: _expert_mp_group})
def _init_quantization_setting(self, quantization_setting):
self.quantize_bits = 8
self.mlp_extra_grouping = False
self.quantize_groups = 1
if type(quantization_setting) is tuple:
self.mlp_extra_grouping, \
self.quantize_groups = quantization_setting
elif quantization_setting is not None:
self.quantize_groups = quantization_setting
logger.info(f"quantize_bits = {self.quantize_bits} "
f"mlp_extra_grouping = {self.mlp_extra_grouping}, "
f"quantize_groups = {self.quantize_groups}")
def _validate_args(self, mpu):
if not isinstance(self.module, Module):
raise ValueError(f"model must be a torch.nn.Module, got {type(self.module)}")
if not isinstance(self.mp_world_size, int) or self.mp_world_size < 1:
raise ValueError(f"mp_size must be an int >= 1, got {self.mp_world_size}")
if mpu:
methods = ["get_model_parallel_group", "get_data_parallel_group"]
for method in methods:
if not hasattr(mpu, method):
raise ValueError(f"mpu is missing {method}")
if self.checkpoint is not None and not isinstance(self.checkpoint, str):
raise ValueError(
f"checkpoint must be None or a str, got {type(self.checkpoint)}")
supported_dtypes = [None, torch.half, torch.int8, torch.float]
if self.dtype not in supported_dtypes:
raise ValueError(
f"{self.dtype} not supported, valid dtype: {supported_dtypes}")
if self.injection_dict is not None and not isinstance(self.injection_dict, dict):
raise ValueError(
f"injection_dict must be None or a dict, got: {self.injection_dict}")
def _apply_injection_policy(self,
client_module=None,
injection_policy=None,
return_tuple=True,
replace_with_kernel_inject=False,
moe=False,
moe_experts=1,
moe_type='standard',
training_mp_size=1):
replace_transformer_layer(client_module,
self.module,
triangular_masking=self.triangular_masking,
policy=injection_policy,
mp_size=self.mp_world_size,
mp_group=self.mp_group,
ep_group=self.ep_group,
expert_mp_group=self.expert_mp_group,
config=self.config,
fp16=(self.dtype == torch.half),
training=False,
return_tuple=return_tuple,
quantize=(self.dtype == torch.int8),
quantize_settings=(self.quantization_scales,
self.quantize_merge_count,
self.mlp_extra_grouping,
self.quantize_groups),
replace_with_kernel_inject=replace_with_kernel_inject,
moe=moe,
moe_experts=moe_experts,
moe_type=moe_type,
training_mp_size=training_mp_size)
def _get_all_ckpt_names(self, checkpoints_path, tag):
ckpt_file_pattern = self._get_ckpt_name(checkpoints_path,
tag,
mp_placeholder="*")
import glob
ckpt_files = glob.glob(ckpt_file_pattern)
ckpt_files.sort()
return ckpt_files
def _get_ckpt_name(self, checkpoints_path, tag, mp_placeholder=None):
if mp_placeholder is not None:
mp_rank_str = mp_placeholder
else:
mp_rank = 0 if self.mpu is None else self.mpu.get_model_parallel_rank()
mp_rank_str = "{:02d}".format(mp_rank)
ckpt_name = os.path.join(
checkpoints_path,
"mp_rank_" + mp_rank_str + "_model_states.pt",
)
return ckpt_name
def _load_checkpoint(self, load_dir, load_module_strict=True, tag=None):
is_pipe_parallel = isinstance(self.module, PipelineModule)
if is_pipe_parallel:
raise RuntimeError(
'pipeline parallelism is currently not supported in inference.')
if os.path.isdir(load_dir):
if tag is None:
latest_path = os.path.join(load_dir, "latest")
if os.path.isfile(latest_path):
with open(latest_path, "r") as fd:
tag = fd.read().strip()
ckpt_list = self._get_all_ckpt_names(load_dir, tag)
sd_loader = SDLoaderFactory.get_sd_loader(ckpt_list)
else:
sd_loader = SDLoaderFactory.get_sd_loader_json(load_dir)
mp_rank = 0 if self.mpu is None else self.mpu.get_model_parallel_rank()
load_path, checkpoint, quantize_config = sd_loader.load(self.mp_world_size,
mp_rank,
is_pipe_parallel=is_pipe_parallel,
quantize=(self.dtype is torch.int8),
quantize_groups=self.quantize_groups,
mlp_extra_grouping=self.mlp_extra_grouping)
self.quantization_scales, self.quantize_merge_count = quantize_config
moe, _ = has_moe_layers(self.module)
if moe:
from deepspeed.runtime.engine import DeepSpeedEngine
old_moe_load = False
if not isinstance(checkpoint['num_experts'], list):
old_moe_load = True
DeepSpeedEngine.load_moe_state_dict(
load_dir,
tag,
state_dict=checkpoint[self._choose_module_key(checkpoint)],
old_moe_load=old_moe_load,
model=self.module,
mpu=self.mpu)
self.module.load_state_dict(
state_dict=checkpoint[self._choose_module_key(checkpoint)],
strict=load_module_strict)
def _choose_module_key(self, sd):
assert not ('module' in sd and 'model' in sd), "checkpoint has both 'model' and 'module' keys, not sure how to proceed"
assert 'module' in sd or 'model' in sd, "checkpoint contains neither 'model' or 'module' keys, not sure how to proceed"
if 'module' in sd:
return 'module'
elif 'model' in sd:
return 'model'
def _convert_to_dtype(self):
if self.dtype is torch.int8 and self.quantization_scales is None:
quantizer = WeightQuantization(mlp_extra_grouping=self.mlp_extra_grouping)
model, self.quantization_scales = quantizer.model_quantize(self.module,
self.injection_dict,
self.quantize_bits,
self.quantize_groups)
elif self.dtype == torch.half:
self.module.half()
elif self.dtype == torch.float:
self.module.float()
def _pre_forward_hook(self, module, *inputs, **kwargs):
for input in inputs:
if torch.is_tensor(input):
input = input.to(torch.cuda.current_device())
for k in kwargs:
if torch.is_tensor(kwargs[k]):
kwargs[k] = kwargs[k].to(torch.cuda.current_device())
def forward(self, *inputs, **kwargs):
"""Execute forward propagation
Arguments:
*inputs: Variable length input list
**kwargs: variable length keyword arguments
"""
if self.mp_world_size > 1:
if self.mpu is None:
for input in inputs:
if torch.is_tensor(input):
input = input.to(torch.cuda.current_device())
if not input.is_contiguous():
input = input.contiguous()
dist.broadcast(input, 0)
for k in kwargs:
if torch.is_tensor(kwargs[k]):
kwargs[k] = kwargs[k].to(torch.cuda.current_device())
if not kwargs[k].is_contiguous():
kwargs[k] = kwargs[k].contiguous()
dist.broadcast(kwargs[k], 0)
outputs = self.model_orig_fwd(*inputs, **kwargs)
else:
outputs = self.module(*inputs, **kwargs)
return outputs
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment