Commit 6b33aeb8 authored by zhangqha's avatar zhangqha
Browse files

BladeDISC DeePMD code

parents
Pipeline #179 canceled with stages
import logging
from typing import Callable, Tuple
import numpy as np
from deepmd.utils.errors import OutOfMemoryError
class AutoBatchSize:
"""This class allows DeePMD-kit to automatically decide the maximum
batch size that will not cause an OOM error.
Notes
-----
We assume all OOM error will raise :class:`OutOfMemoryError`.
Parameters
----------
initial_batch_size : int, default: 1024
initial batch size (number of total atoms)
factor : float, default: 2.
increased factor
Attributes
----------
current_batch_size : int
current batch size (number of total atoms)
maximum_working_batch_size : int
maximum working batch size
minimal_not_working_batch_size : int
minimal not working batch size
"""
def __init__(self, initial_batch_size: int = 1024, factor: float = 2.) -> None:
# See also PyTorchLightning/pytorch-lightning#1638
# TODO: discuss a proper initial batch size
self.current_batch_size = initial_batch_size
self.maximum_working_batch_size = 0
self.minimal_not_working_batch_size = 2**31
self.factor = factor
def execute(self, callable: Callable, start_index: int, natoms: int) -> Tuple[int, tuple]:
"""Excuate a method with given batch size.
Parameters
----------
callable : Callable
The method should accept the batch size and start_index as parameters,
and returns executed batch size and data.
start_index : int
start index
natoms : int
natoms
Returns
-------
int
executed batch size * number of atoms
tuple
result from callable, None if failing to execute
Raises
------
OutOfMemoryError
OOM when batch size is 1
"""
try:
n_batch, result = callable(max(self.current_batch_size // natoms, 1), start_index)
except OutOfMemoryError as e:
# TODO: it's very slow to catch OOM error; I don't know what TF is doing here
# but luckily we only need to catch once
self.minimal_not_working_batch_size = min(self.minimal_not_working_batch_size, self.current_batch_size)
if self.maximum_working_batch_size >= self.minimal_not_working_batch_size:
self.maximum_working_batch_size = int(self.minimal_not_working_batch_size / self.factor)
if self.minimal_not_working_batch_size <= natoms:
raise OutOfMemoryError("The callable still throws an out-of-memory (OOM) error even when batch size is 1!") from e
# adjust the next batch size
self._adjust_batch_size(1./self.factor)
return 0, None
else:
n_tot = n_batch * natoms
self.maximum_working_batch_size = max(self.maximum_working_batch_size, n_tot)
# adjust the next batch size
if n_tot + natoms > self.current_batch_size and self.current_batch_size * self.factor < self.minimal_not_working_batch_size:
self._adjust_batch_size(self.factor)
return n_batch, result
def _adjust_batch_size(self, factor: float):
old_batch_size = self.current_batch_size
self.current_batch_size = int(self.current_batch_size * factor)
logging.info("Adjust batch size from %d to %d" % (old_batch_size, self.current_batch_size))
def execute_all(self, callable: Callable, total_size: int, natoms: int, *args, **kwargs) -> Tuple[np.ndarray]:
"""Excuate a method with all given data.
Parameters
----------
callable : Callable
The method should accept *args and **kwargs as input and return the similiar array.
total_size : int
Total size
natoms : int
The number of atoms
**kwargs
If 2D np.ndarray, assume the first axis is batch; otherwise do nothing.
"""
def execute_with_batch_size(batch_size: int, start_index: int) -> Tuple[int, Tuple[np.ndarray]]:
end_index = start_index + batch_size
end_index = min(end_index, total_size)
return (end_index - start_index), callable(
*[(vv[start_index:end_index] if isinstance(vv, np.ndarray) and vv.ndim > 1 else vv) for vv in args],
**{kk: (vv[start_index:end_index] if isinstance(vv, np.ndarray) and vv.ndim > 1 else vv) for kk, vv in kwargs.items()},
)
index = 0
results = []
while index < total_size:
n_batch, result = self.execute(execute_with_batch_size, index, natoms)
if not isinstance(result, tuple):
result = (result,)
index += n_batch
if n_batch:
for rr in result:
rr.reshape((n_batch, -1))
results.append(result)
r = tuple([np.concatenate(r, axis=0) for r in zip(*results)])
if len(r) == 1:
# avoid returning tuple if callable doesn't return tuple
r = r[0]
return r
"""Module providing compatibility between `0.x.x` and `1.x.x` input versions."""
import json
import warnings
from pathlib import Path
from typing import Any, Dict, Optional, Sequence, Union
import numpy as np
from deepmd.common import j_must_have
def convert_input_v0_v1(
jdata: Dict[str, Any], warning: bool = True, dump: Optional[Union[str, Path]] = None
) -> Dict[str, Any]:
"""Convert input from v0 format to v1.
Parameters
----------
jdata : Dict[str, Any]
loaded json/yaml file
warning : bool, optional
whether to show deprecation warning, by default True
dump : Optional[Union[str, Path]], optional
whether to dump converted file, by default None
Returns
-------
Dict[str, Any]
converted output
"""
output = {}
output["model"] = _model(jdata, jdata["use_smooth"])
output["learning_rate"] = _learning_rate(jdata)
output["loss"] = _loss(jdata)
output["training"] = _training(jdata)
if warning:
_warning_input_v0_v1(dump)
if dump is not None:
with open(dump, "w") as fp:
json.dump(output, fp, indent=4)
return output
def _warning_input_v0_v1(fname: Optional[Union[str, Path]]):
msg = "It seems that you are using a deepmd-kit input of version 0.x.x, " \
"which is deprecated. we have converted the input to >2.0.0 compatible"
if fname is not None:
msg += f", and output it to file {fname}"
warnings.warn(msg)
def _model(jdata: Dict[str, Any], smooth: bool) -> Dict[str, Dict[str, Any]]:
"""Convert data to v1 input for non-smooth model.
Parameters
----------
jdata : Dict[str, Any]
parsed input json/yaml data
smooth : bool
whether to use smooth or non-smooth descriptor version
Returns
-------
Dict[str, Dict[str, Any]]
dictionary with model input parameters and sub-dictionaries for descriptor and
fitting net
"""
model = {}
model["descriptor"] = (
_smth_descriptor(jdata) if smooth else _nonsmth_descriptor(jdata)
)
model["fitting_net"] = _fitting_net(jdata)
return model
def _nonsmth_descriptor(jdata: Dict[str, Any]) -> Dict[str, Any]:
"""Convert data to v1 input for non-smooth descriptor.
Parameters
----------
jdata : Dict[str, Any]
parsed input json/yaml data
Returns
-------
Dict[str, Any]
dict with descriptor parameters
"""
descriptor = {}
descriptor["type"] = "loc_frame"
_jcopy(jdata, descriptor, ("sel_a", "sel_r", "rcut", "axis_rule"))
return descriptor
def _smth_descriptor(jdata: Dict[str, Any]) -> Dict[str, Any]:
"""Convert data to v1 input for smooth descriptor.
Parameters
----------
jdata : Dict[str, Any]
parsed input json/yaml data
Returns
-------
Dict[str, Any]
dict with descriptor parameters
"""
descriptor = {}
seed = jdata.get("seed", None)
if seed is not None:
descriptor["seed"] = seed
descriptor["type"] = "se_a"
descriptor["sel"] = jdata["sel_a"]
_jcopy(jdata, descriptor, ("rcut", ))
descriptor["rcut_smth"] = jdata.get("rcut_smth", descriptor["rcut"])
descriptor["neuron"] = j_must_have(jdata, "filter_neuron")
descriptor["axis_neuron"] = j_must_have(jdata, "axis_neuron", ["n_axis_neuron"])
descriptor["resnet_dt"] = False
if "resnet_dt" in jdata:
descriptor["resnet_dt"] = jdata["filter_resnet_dt"]
return descriptor
def _fitting_net(jdata: Dict[str, Any]) -> Dict[str, Any]:
"""Convert data to v1 input for fitting net.
Parameters
----------
jdata : Dict[str, Any]
parsed input json/yaml data
Returns
-------
Dict[str, Any]
dict with fitting net parameters
"""
fitting_net = {}
seed = jdata.get("seed", None)
if seed is not None:
fitting_net["seed"] = seed
fitting_net["neuron"] = j_must_have(jdata, "fitting_neuron", ["n_neuron"])
fitting_net["resnet_dt"] = True
if "resnet_dt" in jdata:
fitting_net["resnet_dt"] = jdata["resnet_dt"]
if "fitting_resnet_dt" in jdata:
fitting_net["resnet_dt"] = jdata["fitting_resnet_dt"]
return fitting_net
def _learning_rate(jdata: Dict[str, Any]) -> Dict[str, Any]:
"""Convert data to v1 input for learning rate section.
Parameters
----------
jdata : Dict[str, Any]
parsed input json/yaml data
Returns
-------
Dict[str, Any]
dict with learning rate parameters
"""
learning_rate = {}
learning_rate["type"] = "exp"
_jcopy(jdata, learning_rate, ("decay_steps", "decay_rate", "start_lr"))
return learning_rate
def _loss(jdata: Dict[str, Any]) -> Dict[str, Any]:
"""Convert data to v1 input for loss function.
Parameters
----------
jdata : Dict[str, Any]
parsed input json/yaml data
Returns
-------
Dict[str, Any]
dict with loss function parameters
"""
loss: Dict[str, Any] = {}
_jcopy(
jdata,
loss,
(
"start_pref_e",
"limit_pref_e",
"start_pref_f",
"limit_pref_f",
"start_pref_v",
"limit_pref_v",
),
)
if "start_pref_ae" in jdata:
loss["start_pref_ae"] = jdata["start_pref_ae"]
if "limit_pref_ae" in jdata:
loss["limit_pref_ae"] = jdata["limit_pref_ae"]
return loss
def _training(jdata: Dict[str, Any]) -> Dict[str, Any]:
"""Convert data to v1 input for training.
Parameters
----------
jdata : Dict[str, Any]
parsed input json/yaml data
Returns
-------
Dict[str, Any]
dict with training parameters
"""
training = {}
seed = jdata.get("seed", None)
if seed is not None:
training["seed"] = seed
_jcopy(jdata, training, ("systems", "set_prefix", "stop_batch", "batch_size"))
training["disp_file"] = "lcurve.out"
if "disp_file" in jdata:
training["disp_file"] = jdata["disp_file"]
training["disp_freq"] = j_must_have(jdata, "disp_freq")
training["numb_test"] = j_must_have(jdata, "numb_test")
training["save_freq"] = j_must_have(jdata, "save_freq")
training["save_ckpt"] = j_must_have(jdata, "save_ckpt")
training["disp_training"] = j_must_have(jdata, "disp_training")
training["time_training"] = j_must_have(jdata, "time_training")
if "profiling" in jdata:
training["profiling"] = jdata["profiling"]
if training["profiling"]:
training["profiling_file"] = j_must_have(jdata, "profiling_file")
return training
def _jcopy(src: Dict[str, Any], dst: Dict[str, Any], keys: Sequence[str]):
"""Copy specified keys from one dict to another.
Parameters
----------
src : Dict[str, Any]
source dictionary
dst : Dict[str, Any]
destination dictionary, will be modified in place
keys : Sequence[str]
list of keys to copy
must_have : bool
ensure that the source dictionary contains the copyyied keys
"""
for k in keys:
dst[k] = src[k]
def remove_decay_rate(jdata: Dict[str, Any]):
"""convert decay_rate to stop_lr.
Parameters
----------
jdata: Dict[str, Any]
input data
"""
lr = jdata["learning_rate"]
if "decay_rate" in lr:
decay_rate = lr["decay_rate"]
start_lr = lr["start_lr"]
stop_step = jdata["training"]["stop_batch"]
decay_steps = lr["decay_steps"]
stop_lr = np.exp(np.log(decay_rate) * (stop_step / decay_steps)) * start_lr
lr["stop_lr"] = stop_lr
lr.pop("decay_rate")
def convert_input_v1_v2(jdata: Dict[str, Any],
warning: bool = True,
dump: Optional[Union[str, Path]] = None) -> Dict[str, Any]:
tr_cfg = jdata["training"]
tr_data_keys = {
"systems",
"set_prefix",
"batch_size",
"sys_prob",
"auto_prob",
# alias included
"sys_weights",
"auto_prob_style"
}
tr_data_cfg = {k: v for k, v in tr_cfg.items() if k in tr_data_keys}
new_tr_cfg = {k: v for k, v in tr_cfg.items() if k not in tr_data_keys}
new_tr_cfg["training_data"] = tr_data_cfg
jdata["training"] = new_tr_cfg
# remove deprecated arguments
remove_decay_rate(jdata)
if warning:
_warning_input_v1_v2(dump)
if dump is not None:
with open(dump, "w") as fp:
json.dump(jdata, fp, indent=4)
return jdata
def _warning_input_v1_v2(fname: Optional[Union[str, Path]]):
msg = "It seems that you are using a deepmd-kit input of version 1.x.x, " \
"which is deprecated. we have converted the input to >2.0.0 compatible"
if fname is not None:
msg += f", and output it to file {fname}"
warnings.warn(msg)
def deprecate_numb_test(jdata: Dict[str, Any],
warning: bool = True,
dump: Optional[Union[str, Path]] = None) -> Dict[str, Any]:
"""Deprecate `numb_test` since v2.1. It has taken no effect since v2.0.
See `#1243 <https://github.com/deepmodeling/deepmd-kit/discussions/1243>`_.
Parameters
----------
jdata : Dict[str, Any]
loaded json/yaml file
warning : bool, optional
whether to show deprecation warning, by default True
dump : Optional[Union[str, Path]], optional
whether to dump converted file, by default None
Returns
-------
Dict[str, Any]
converted output
"""
try:
jdata.get("training", {}).pop("numb_test")
except KeyError:
pass
else:
if warning:
warnings.warn(
"The argument training->numb_test has been deprecated since v2.0.0. "
"Use training->validation_data->batch_size instead."
)
if dump is not None:
with open(dump, "w") as fp:
json.dump(jdata, fp, indent=4)
return jdata
def update_deepmd_input(jdata: Dict[str, Any],
warning: bool = True,
dump: Optional[Union[str, Path]] = None) -> Dict[str, Any]:
def is_deepmd_v0_input(jdata):
return "model" not in jdata.keys()
def is_deepmd_v1_input(jdata):
return "systems" in j_must_have(jdata, "training").keys()
if is_deepmd_v0_input(jdata):
jdata = convert_input_v0_v1(jdata, warning, None)
jdata = convert_input_v1_v2(jdata, False, None)
jdata = deprecate_numb_test(jdata, False, dump)
elif is_deepmd_v1_input(jdata):
jdata = convert_input_v1_v2(jdata, warning, None)
jdata = deprecate_numb_test(jdata, False, dump)
else:
jdata = deprecate_numb_test(jdata, warning, dump)
return jdata
import os
import textwrap
from deepmd.env import tf
from google.protobuf import text_format
def convert_13_to_21(input_model: str, output_model: str):
"""Convert DP 1.3 graph to 2.1 graph.
Parameters
----------
input_model : str
filename of the input graph
output_model : str
filename of the output graph
"""
convert_pb_to_pbtxt(input_model, 'frozen_model.pbtxt')
convert_dp13_to_dp20('frozen_model.pbtxt')
convert_dp20_to_dp21('frozen_model.pbtxt')
convert_pbtxt_to_pb('frozen_model.pbtxt', output_model)
if os.path.isfile('frozen_model.pbtxt'):
os.remove('frozen_model.pbtxt')
print("the converted output model (2.1 support) is saved in %s" % output_model)
def convert_13_to_21(input_model: str, output_model: str):
"""Convert DP 1.3 graph to 2.1 graph.
Parameters
----------
input_model : str
filename of the input graph
output_model : str
filename of the output graph
"""
convert_pb_to_pbtxt(input_model, 'frozen_model.pbtxt')
convert_dp13_to_dp20('frozen_model.pbtxt')
convert_dp20_to_dp21('frozen_model.pbtxt')
convert_pbtxt_to_pb('frozen_model.pbtxt', output_model)
if os.path.isfile('frozen_model.pbtxt'):
os.remove('frozen_model.pbtxt')
print("the converted output model (2.1 support) is saved in %s" % output_model)
def convert_12_to_21(input_model: str, output_model: str):
"""Convert DP 1.2 graph to 2.1 graph.
Parameters
----------
input_model : str
filename of the input graph
output_model : str
filename of the output graph
"""
convert_pb_to_pbtxt(input_model, 'frozen_model.pbtxt')
convert_dp12_to_dp13('frozen_model.pbtxt')
convert_dp13_to_dp20('frozen_model.pbtxt')
convert_dp20_to_dp21('frozen_model.pbtxt')
convert_pbtxt_to_pb('frozen_model.pbtxt', output_model)
if os.path.isfile('frozen_model.pbtxt'):
os.remove('frozen_model.pbtxt')
print("the converted output model (2.1 support) is saved in %s" % output_model)
def convert_10_to_21(input_model: str, output_model: str):
"""Convert DP 1.0 graph to 2.1 graph.
Parameters
----------
input_model : str
filename of the input graph
output_model : str
filename of the output graph
"""
convert_pb_to_pbtxt(input_model, 'frozen_model.pbtxt')
convert_dp10_to_dp11('frozen_model.pbtxt')
convert_dp12_to_dp13('frozen_model.pbtxt')
convert_dp13_to_dp20('frozen_model.pbtxt')
convert_dp20_to_dp21('frozen_model.pbtxt')
convert_pbtxt_to_pb('frozen_model.pbtxt', output_model)
if os.path.isfile('frozen_model.pbtxt'):
os.remove('frozen_model.pbtxt')
print("the converted output model (2.1 support) is saved in %s" % output_model)
def convert_012_to_21(input_model: str, output_model: str):
"""Convert DP 0.12 graph to 2.1 graph.
Parameters
----------
input_model : str
filename of the input graph
output_model : str
filename of the output graph
"""
convert_pb_to_pbtxt(input_model, 'frozen_model.pbtxt')
convert_dp012_to_dp10('frozen_model.pbtxt')
convert_dp10_to_dp11('frozen_model.pbtxt')
convert_dp12_to_dp13('frozen_model.pbtxt')
convert_dp13_to_dp20('frozen_model.pbtxt')
convert_dp20_to_dp21('frozen_model.pbtxt')
convert_pbtxt_to_pb('frozen_model.pbtxt', output_model)
if os.path.isfile('frozen_model.pbtxt'):
os.remove('frozen_model.pbtxt')
print("the converted output model (2.1 support) is saved in %s" % output_model)
def convert_20_to_21(input_model: str, output_model: str):
"""Convert DP 2.0 graph to 2.1 graph.
Parameters
----------
input_model : str
filename of the input graph
output_model : str
filename of the output graph
"""
convert_pb_to_pbtxt(input_model, 'frozen_model.pbtxt')
convert_dp20_to_dp21('frozen_model.pbtxt')
convert_pbtxt_to_pb('frozen_model.pbtxt', output_model)
if os.path.isfile('frozen_model.pbtxt'):
os.remove('frozen_model.pbtxt')
print("the converted output model (2.1 support) is saved in %s" % output_model)
def convert_pb_to_pbtxt(pbfile: str, pbtxtfile: str):
"""Convert DP graph to graph text.
Parameters
----------
pbfile : str
filename of the input graph
pbtxtfile : str
filename of the output graph text
"""
with tf.gfile.GFile(pbfile, 'rb') as f:
graph_def = tf.GraphDef()
graph_def.ParseFromString(f.read())
tf.import_graph_def(graph_def, name='')
tf.train.write_graph(graph_def, './', pbtxtfile, as_text=True)
def convert_pbtxt_to_pb(pbtxtfile: str, pbfile: str):
"""Convert DP graph text to graph.
Parameters
----------
pbtxtfile : str
filename of the input graph text
pbfile : str
filename of the output graph
"""
with tf.gfile.GFile(pbtxtfile, 'r') as f:
graph_def = tf.GraphDef()
file_content = f.read()
# Merges the human-readable string in `file_content` into `graph_def`.
text_format.Merge(file_content, graph_def)
tf.train.write_graph(graph_def, './', pbfile, as_text=False)
def convert_dp012_to_dp10(file: str):
"""Convert DP 1.0 graph text to 1.1 graph text.
Parameters
----------
file : str
filename of the graph text
"""
with open(file) as fp:
file_content = fp.read()
# note: atom_energy must be put before energy,
# otherwise atom_energy_test -> atom_o_energy
file_content = file_content\
.replace('DescrptNorot', 'DescrptSeA') \
.replace('ProdForceNorot', 'ProdForceSeA') \
.replace('ProdVirialNorot', 'ProdVirialSeA') \
.replace('t_rcut', 'descrpt_attr/rcut') \
.replace('t_ntypes', 'descrpt_attr/ntypes') \
.replace('atom_energy_test', 'o_atom_energy') \
.replace('atom_virial_test', 'o_atom_virial') \
.replace('energy_test', 'o_energy') \
.replace('force_test', 'o_force') \
.replace('virial_test', 'o_virial')
file_content += textwrap.dedent("""\
node {
name: "fitting_attr/dfparam"
op: "Const"
attr {
key: "dtype"
value {
type: DT_INT32
}
}
attr {
key: "value"
value {
tensor {
dtype: DT_INT32
tensor_shape {
}
int_val: 0
}
}
}
}
""")
file_content += textwrap.dedent("""\
node {
name: "model_attr/model_type"
op: "Const"
attr {
key: "dtype"
value {
type: DT_STRING
}
}
attr {
key: "value"
value {
tensor {
dtype: DT_STRING
tensor_shape {
}
string_val: "ener"
}
}
}
}
""")
with open(file, 'w') as fp:
fp.write(file_content)
def convert_dp10_to_dp11(file: str):
"""Convert DP 1.0 graph text to 1.1 graph text.
Parameters
----------
file : str
filename of the graph text
"""
with open(file, 'a') as f:
f.write(textwrap.dedent("""\
node {
name: "fitting_attr/daparam"
op: "Const"
attr {
key: "dtype"
value {
type: DT_INT32
}
}
attr {
key: "value"
value {
tensor {
dtype: DT_INT32
tensor_shape {
}
int_val: 0
}
} }
}
"""))
def convert_dp12_to_dp13(file: str):
"""Convert DP 1.2 graph text to 1.3 graph text.
Parameters
----------
file : str
filename of the graph text
"""
file_data = ""
with open(file, "r", encoding="utf-8") as f:
ii = 0
lines = f.readlines()
while (ii < len(lines)):
line = lines[ii]
file_data += line
ii+=1
if 'name' in line and ('DescrptSeA' in line or 'ProdForceSeA' in line or 'ProdVirialSeA' in line):
while not('attr' in lines[ii] and '{' in lines[ii]):
file_data += lines[ii]
ii+=1
file_data += ' attr {\n'
file_data += ' key: \"T\"\n'
file_data += ' value {\n'
file_data += ' type: DT_DOUBLE\n'
file_data += ' }\n'
file_data += ' }\n'
with open(file, "w", encoding="utf-8") as f:
f.write(file_data)
def convert_dp13_to_dp20(fname: str):
"""Convert DP 1.3 graph text to 2.0 graph text.
Parameters
----------
file : str
filename of the graph text
"""
with open(fname) as fp:
file_content = fp.read()
file_content += textwrap.dedent("""\
node {
name: "model_attr/model_version"
op: "Const"
attr {
key: "dtype"
value {
type: DT_STRING
}
}
attr {
key: "value"
value {
tensor {
dtype: DT_STRING
tensor_shape {
}
string_val: "1.0"
}
}
}
}
""")
file_content = file_content\
.replace('DescrptSeA', 'ProdEnvMatA')\
.replace('DescrptSeR', 'ProdEnvMatR')
with open(fname, 'w') as fp:
fp.write(file_content)
def convert_dp20_to_dp21(fname: str):
with open(fname) as fp:
file_content = fp.read()
old_model_version_node = textwrap.dedent("""\
node {
name: "model_attr/model_version"
op: "Const"
attr {
key: "dtype"
value {
type: DT_STRING
}
}
attr {
key: "value"
value {
tensor {
dtype: DT_STRING
tensor_shape {
}
string_val: "1.0"
}
}
}
}
""")
new_model_version_node = textwrap.dedent("""\
node {
name: "model_attr/model_version"
op: "Const"
attr {
key: "dtype"
value {
type: DT_STRING
}
}
attr {
key: "value"
value {
tensor {
dtype: DT_STRING
tensor_shape {
}
string_val: "1.1"
}
}
}
}
""")
file_content = file_content\
.replace(old_model_version_node, new_model_version_node)\
.replace('TabulateFusion', 'TabulateFusionSeA')\
.replace('TabulateFusionGrad', 'TabulateFusionSeAGrad')\
.replace('TabulateFusionGradGrad', 'TabulateFusionSeAGradGrad')
with open(fname, 'w') as fp:
fp.write(file_content)
#!/usr/bin/env python3
import time
import glob
import numpy as np
import os.path
from typing import Tuple, List
import logging
from deepmd.env import GLOBAL_NP_FLOAT_PRECISION
from deepmd.env import GLOBAL_ENER_FLOAT_PRECISION
from deepmd.utils import random as dp_random
from deepmd.utils.path import DPPath
log = logging.getLogger(__name__)
class DeepmdData() :
"""
Class for a data system.
It loads data from hard disk, and mantains the data as a `data_dict`
Parameters
----------
sys_path
Path to the data system
set_prefix
Prefix for the directories of different sets
shuffle_test
If the test data are shuffled
type_map
Gives the name of different atom types
modifier
Data modifier that has the method `modify_data`
trn_all_set
Use all sets as training dataset. Otherwise, if the number of sets is more than 1, the last set is left for test.
"""
def __init__ (self,
sys_path : str,
set_prefix : str = 'set',
shuffle_test : bool = True,
type_map : List[str] = None,
modifier = None,
trn_all_set : bool = False) :
"""
Constructor
"""
root = DPPath(sys_path)
self.dirs = root.glob(set_prefix + ".*")
self.dirs.sort()
self.mixed_type = self._check_mode(self.dirs[0]) # mixed_type format only has one set
# load atom type
self.atom_type = self._load_type(root)
self.natoms = len(self.atom_type)
if self.mixed_type:
# nframes x natoms
self.atom_type_mix = self._load_type_mix(self.dirs[0])
# load atom type map
self.type_map = self._load_type_map(root)
if self.type_map is not None:
assert(len(self.type_map) >= max(self.atom_type)+1)
# check pbc
self.pbc = self._check_pbc(root)
# enforce type_map if necessary
if type_map is not None and self.type_map is not None:
if not self.mixed_type:
atom_type_ = [type_map.index(self.type_map[ii]) for ii in self.atom_type]
self.atom_type = np.array(atom_type_, dtype = np.int32)
else:
sorter = np.argsort(type_map)
type_idx_map = sorter[np.searchsorted(type_map, self.type_map, sorter=sorter)]
try:
atom_type_mix_ = np.array(type_idx_map)[self.atom_type_mix].astype(np.int32)
except RuntimeError as e:
raise RuntimeError("some types in 'real_atom_types.npy' of sys {} are not contained in {} types!"
.format(self.dirs[0], self.get_ntypes())) from e
self.atom_type_mix = atom_type_mix_
self.type_map = type_map
if type_map is None and self.type_map is None and self.mixed_type:
raise RuntimeError('mixed_type format must have type_map!')
# make idx map
self.idx_map = self._make_idx_map(self.atom_type)
# train dirs
self.test_dir = self.dirs[-1]
if trn_all_set:
self.train_dirs = self.dirs
else:
if len(self.dirs) == 1 :
self.train_dirs = self.dirs
else :
self.train_dirs = self.dirs[:-1]
self.data_dict = {}
# add box and coord
self.add('box', 9, must = self.pbc)
self.add('coord', 3, atomic = True, must = True)
# set counters
self.set_count = 0
self.iterator = 0
self.shuffle_test = shuffle_test
# set modifier
self.modifier = modifier
def add(self,
key : str,
ndof : int,
atomic : bool = False,
must : bool = False,
high_prec : bool = False,
type_sel : List[int] = None,
repeat : int = 1,
default: float=0.,
) :
"""
Add a data item that to be loaded
Parameters
----------
key
The key of the item. The corresponding data is stored in `sys_path/set.*/key.npy`
ndof
The number of dof
atomic
The item is an atomic property.
If False, the size of the data should be nframes x ndof
If True, the size of data should be nframes x natoms x ndof
must
The data file `sys_path/set.*/key.npy` must exist.
If must is False and the data file does not exist, the `data_dict[find_key]` is set to 0.0
high_prec
Load the data and store in float64, otherwise in float32
type_sel
Select certain type of atoms
repeat
The data will be repeated `repeat` times.
default : float, default=0.
default value of data
"""
self.data_dict[key] = {'ndof': ndof,
'atomic': atomic,
'must': must,
'high_prec': high_prec,
'type_sel': type_sel,
'repeat': repeat,
'reduce': None,
'default': default,
}
return self
def reduce(self,
key_out : str,
key_in : str
) :
"""
Generate a new item from the reduction of another atom
Parameters
----------
key_out
The name of the reduced item
key_in
The name of the data item to be reduced
"""
assert (key_in in self.data_dict), 'cannot find input key'
assert (self.data_dict[key_in]['atomic']), 'reduced property should be atomic'
assert (not(key_out in self.data_dict)), 'output key should not have been added'
assert (self.data_dict[key_in]['repeat'] == 1), 'reduced proerties should not have been repeated'
self.data_dict[key_out] = {'ndof': self.data_dict[key_in]['ndof'],
'atomic': False,
'must': True,
'high_prec': True,
'type_sel': None,
'repeat': 1,
'reduce': key_in,
}
return self
def get_data_dict(self) -> dict:
"""
Get the `data_dict`
"""
return self.data_dict
def check_batch_size (self, batch_size) :
"""
Check if the system can get a batch of data with `batch_size` frames.
"""
for ii in self.train_dirs :
if self.data_dict['coord']['high_prec'] :
tmpe = (ii / "coord.npy").load_numpy().astype(GLOBAL_ENER_FLOAT_PRECISION)
else:
tmpe = (ii / "coord.npy").load_numpy().astype(GLOBAL_NP_FLOAT_PRECISION)
if tmpe.ndim == 1:
tmpe = tmpe.reshape([1,-1])
if tmpe.shape[0] < batch_size :
return ii, tmpe.shape[0]
return None
def check_test_size (self, test_size) :
"""
Check if the system can get a test dataset with `test_size` frames.
"""
if self.data_dict['coord']['high_prec'] :
tmpe = (self.test_dir / "coord.npy").load_numpy().astype(GLOBAL_ENER_FLOAT_PRECISION)
else:
tmpe = (self.test_dir / "coord.npy").load_numpy().astype(GLOBAL_NP_FLOAT_PRECISION)
if tmpe.ndim == 1:
tmpe = tmpe.reshape([1,-1])
if tmpe.shape[0] < test_size :
return self.test_dir, tmpe.shape[0]
else :
return None
def get_batch(self,
batch_size : int
) -> dict :
"""
Get a batch of data with `batch_size` frames. The frames are randomly picked from the data system.
Parameters
----------
batch_size
size of the batch
"""
if hasattr(self, 'batch_set') :
set_size = self.batch_set["coord"].shape[0]
else :
set_size = 0
if self.iterator + batch_size > set_size :
self._load_batch_set (self.train_dirs[self.set_count % self.get_numb_set()])
self.set_count += 1
set_size = self.batch_set["coord"].shape[0]
if self.modifier is not None:
self.modifier.modify_data(self.batch_set)
iterator_1 = self.iterator + batch_size
if iterator_1 >= set_size :
iterator_1 = set_size
idx = np.arange (self.iterator, iterator_1)
self.iterator += batch_size
ret = self._get_subdata(self.batch_set, idx)
return ret
def get_test (self,
ntests : int = -1
) -> dict:
"""
Get the test data with `ntests` frames.
Parameters
----------
ntests
Size of the test data set. If `ntests` is -1, all test data will be get.
"""
if not hasattr(self, 'test_set') :
self._load_test_set(self.test_dir, self.shuffle_test)
if ntests == -1:
idx = None
else :
ntests_ = ntests if ntests < self.test_set['type'].shape[0] else self.test_set['type'].shape[0]
# print('ntest', self.test_set['type'].shape[0], ntests, ntests_)
idx = np.arange(ntests_)
ret = self._get_subdata(self.test_set, idx = idx)
if self.modifier is not None:
self.modifier.modify_data(ret)
return ret
def get_ntypes(self) -> int:
"""
Number of atom types in the system
"""
if self.type_map is not None:
return len(self.type_map)
else:
return max(self.get_atom_type()) + 1
def get_type_map(self) -> List[str]:
"""
Get the type map
"""
return self.type_map
def get_atom_type(self) -> List[int]:
"""
Get atom types
"""
return self.atom_type
def get_numb_set (self) -> int:
"""
Get number of training sets
"""
return len (self.train_dirs)
def get_numb_batch (self,
batch_size : int,
set_idx : int
) -> int:
"""
Get the number of batches in a set.
"""
data = self._load_set(self.train_dirs[set_idx])
ret = data["coord"].shape[0] // batch_size
if ret == 0:
ret = 1
return ret
def get_sys_numb_batch (self,
batch_size : int
) -> int:
"""
Get the number of batches in the data system.
"""
ret = 0
for ii in range(len(self.train_dirs)) :
ret += self.get_numb_batch(batch_size, ii)
return ret
def get_natoms (self) :
"""
Get number of atoms
"""
return len(self.atom_type)
def get_natoms_vec (self,
ntypes : int) :
"""
Get number of atoms and number of atoms in different types
Parameters
----------
ntypes
Number of types (may be larger than the actual number of types in the system).
Returns
-------
natoms
natoms[0]: number of local atoms
natoms[1]: total number of atoms held by this processor
natoms[i]: 2 <= i < Ntypes+2, number of type i atoms
"""
natoms, natoms_vec = self._get_natoms_2 (ntypes)
tmp = [natoms, natoms]
tmp = np.append (tmp, natoms_vec)
return tmp.astype(np.int32)
def avg(self, key) :
"""
Return the average value of an item.
"""
if key not in self.data_dict.keys() :
raise RuntimeError('key %s has not been added' % key)
info = self.data_dict[key]
ndof = info['ndof']
eners = np.array([])
for ii in self.train_dirs:
data = self._load_set(ii)
ei = data[key].reshape([-1, ndof])
if eners.size == 0 :
eners = ei
else :
eners = np.concatenate((eners, ei), axis = 0)
if eners.size == 0 :
return 0
else :
return np.average(eners, axis = 0)
def _idx_map_sel(self, atom_type, type_sel) :
new_types = []
for ii in atom_type :
if ii in type_sel:
new_types.append(ii)
new_types = np.array(new_types, dtype = int)
natoms = new_types.shape[0]
idx = np.arange(natoms)
idx_map = np.lexsort((idx, new_types))
return idx_map
def _get_natoms_2 (self, ntypes) :
sample_type = self.atom_type
natoms = len(sample_type)
natoms_vec = np.zeros (ntypes).astype(int)
for ii in range (ntypes) :
natoms_vec[ii] = np.count_nonzero(sample_type == ii)
return natoms, natoms_vec
def _get_subdata(self, data, idx = None) :
new_data = {}
for ii in data:
dd = data[ii]
if 'find_' in ii:
new_data[ii] = dd
else:
if idx is not None:
new_data[ii] = dd[idx]
else :
new_data[ii] = dd
return new_data
def _load_batch_set (self,
set_name: DPPath) :
self.batch_set = self._load_set(set_name)
self.batch_set, _ = self._shuffle_data(self.batch_set)
self.reset_get_batch()
def reset_get_batch(self):
self.iterator = 0
def _load_test_set (self,
set_name: DPPath,
shuffle_test) :
self.test_set = self._load_set(set_name)
if shuffle_test :
self.test_set, _ = self._shuffle_data(self.test_set)
def _shuffle_data (self,
data) :
ret = {}
nframes = data['coord'].shape[0]
idx = np.arange (nframes)
dp_random.shuffle(idx)
for kk in data :
if type(data[kk]) == np.ndarray and \
len(data[kk].shape) == 2 and \
data[kk].shape[0] == nframes and \
not('find_' in kk):
ret[kk] = data[kk][idx]
else :
ret[kk] = data[kk]
return ret, idx
def _load_set(self, set_name: DPPath) :
# get nframes
if not isinstance(set_name, DPPath):
set_name = DPPath(set_name)
path = set_name / "coord.npy"
if self.data_dict['coord']['high_prec'] :
coord = path.load_numpy().astype(GLOBAL_ENER_FLOAT_PRECISION)
else:
coord = path.load_numpy().astype(GLOBAL_NP_FLOAT_PRECISION)
if coord.ndim == 1:
coord = coord.reshape([1,-1])
nframes = coord.shape[0]
assert(coord.shape[1] == self.data_dict['coord']['ndof'] * self.natoms)
# load keys
data = {}
for kk in self.data_dict.keys():
if self.data_dict[kk]['reduce'] is None :
data['find_'+kk], data[kk] \
= self._load_data(set_name,
kk,
nframes,
self.data_dict[kk]['ndof'],
atomic = self.data_dict[kk]['atomic'],
high_prec = self.data_dict[kk]['high_prec'],
must = self.data_dict[kk]['must'],
type_sel = self.data_dict[kk]['type_sel'],
repeat = self.data_dict[kk]['repeat'],
default=self.data_dict[kk]['default'],
)
for kk in self.data_dict.keys():
if self.data_dict[kk]['reduce'] is not None :
k_in = self.data_dict[kk]['reduce']
ndof = self.data_dict[kk]['ndof']
data['find_'+kk] = data['find_'+k_in]
tmp_in = data[k_in].astype(GLOBAL_ENER_FLOAT_PRECISION)
data[kk] = np.sum(np.reshape(tmp_in, [nframes, self.natoms, ndof]), axis = 1)
if self.mixed_type:
real_type = self.atom_type_mix.reshape([nframes, self.natoms])
data['type'] = real_type
natoms = data['type'].shape[1]
# nframes x ntypes
atom_type_nums = np.array([(real_type == i).sum(axis=-1) for i in range(self.get_ntypes())],
dtype=np.int32).T
assert (atom_type_nums.sum(axis=-1) == natoms).all(), \
"some types in 'real_atom_types.npy' of sys {} are not contained in {} types!" \
.format(self.dirs[0], self.get_ntypes())
data['real_natoms_vec'] = np.concatenate((np.tile(np.array([natoms, natoms], dtype=np.int32), (nframes, 1)),
atom_type_nums), axis=-1)
else:
data['type'] = np.tile(self.atom_type[self.idx_map], (nframes, 1))
return data
def _load_data(self, set_name, key, nframes, ndof_, atomic = False, must = True, repeat = 1, high_prec = False, type_sel = None, default: float=0.):
if atomic:
natoms = self.natoms
idx_map = self.idx_map
# if type_sel, then revise natoms and idx_map
if type_sel is not None:
natoms = 0
for jj in type_sel :
natoms += np.sum(self.atom_type == jj)
idx_map = self._idx_map_sel(self.atom_type, type_sel)
ndof = ndof_ * natoms
else:
ndof = ndof_
path = set_name / (key+".npy")
if path.is_file() :
if high_prec :
data = path.load_numpy().astype(GLOBAL_ENER_FLOAT_PRECISION)
else:
data = path.load_numpy().astype(GLOBAL_NP_FLOAT_PRECISION)
try: # YWolfeee: deal with data shape error
if atomic :
data = data.reshape([nframes, natoms, -1])
data = data[:,idx_map,:]
data = data.reshape([nframes, -1])
data = np.reshape(data, [nframes, ndof])
except ValueError as err_message:
explanation = "This error may occur when your label mismatch it's name, i.e. you might store global tensor in `atomic_tensor.npy` or atomic tensor in `tensor.npy`."
log.error(str(err_message))
log.error(explanation)
raise ValueError(str(err_message) + ". " + explanation)
if repeat != 1:
data = np.repeat(data, repeat).reshape([nframes, -1])
return np.float32(1.0), data
elif must:
raise RuntimeError("%s not found!" % path)
else:
if high_prec :
data = np.full([nframes, ndof], default, dtype=GLOBAL_ENER_FLOAT_PRECISION)
else :
data = np.full([nframes, ndof], default, dtype=GLOBAL_NP_FLOAT_PRECISION)
if repeat != 1:
data = np.repeat(data, repeat).reshape([nframes, -1])
return np.float32(0.0), data
def _load_type (self, sys_path: DPPath) :
atom_type = (sys_path / "type.raw").load_txt(dtype=np.int32, ndmin=1)
return atom_type
def _load_type_mix(self, set_name: DPPath):
type_path = set_name / "real_atom_types.npy"
real_type = type_path.load_numpy().astype(np.int32).reshape([-1, self.natoms])
return real_type
def _make_idx_map(self, atom_type):
natoms = atom_type.shape[0]
idx = np.arange (natoms)
idx_map = np.lexsort ((idx, atom_type))
return idx_map
def _load_type_map(self, sys_path: DPPath) :
fname = sys_path / 'type_map.raw'
if fname.is_file() :
return fname.load_txt(dtype=str, ndmin=1).tolist()
else :
return None
def _check_pbc(self, sys_path: DPPath):
pbc = True
if (sys_path / 'nopbc').is_file() :
pbc = False
return pbc
def _check_mode(self, set_path: DPPath):
return (set_path / 'real_atom_types.npy').is_file()
class DataSets (object):
"""
Outdated class for one data system.
.. deprecated:: 2.0.0
This class is not maintained any more.
"""
def __init__ (self,
sys_path,
set_prefix,
seed = None,
shuffle_test = True) :
self.dirs = glob.glob (os.path.join(sys_path, set_prefix + ".*"))
self.dirs.sort()
# load atom type
self.atom_type, self.idx_map, self.idx3_map = self.load_type (sys_path)
# load atom type map
self.type_map = self.load_type_map(sys_path)
if self.type_map is not None:
assert(len(self.type_map) >= max(self.atom_type)+1)
# train dirs
self.test_dir = self.dirs[-1]
if len(self.dirs) == 1 :
self.train_dirs = self.dirs
else :
self.train_dirs = self.dirs[:-1]
# check fparam
has_fparam = [ os.path.isfile(os.path.join(ii, 'fparam.npy')) for ii in self.dirs ]
if any(has_fparam) and (not all(has_fparam)) :
raise RuntimeError("system %s: if any set has frame parameter, then all sets should have frame parameter" % sys_path)
if all(has_fparam) :
self.has_fparam = 0
else :
self.has_fparam = -1
# check aparam
has_aparam = [ os.path.isfile(os.path.join(ii, 'aparam.npy')) for ii in self.dirs ]
if any(has_aparam) and (not all(has_aparam)) :
raise RuntimeError("system %s: if any set has frame parameter, then all sets should have frame parameter" % sys_path)
if all(has_aparam) :
self.has_aparam = 0
else :
self.has_aparam = -1
# energy norm
self.eavg = self.stats_energy()
# load sets
self.set_count = 0
self.load_batch_set (self.train_dirs[self.set_count % self.get_numb_set()])
self.load_test_set (self.test_dir, shuffle_test)
def check_batch_size (self, batch_size) :
for ii in self.train_dirs :
tmpe = np.load(os.path.join(ii, "coord.npy"))
if tmpe.shape[0] < batch_size :
return ii, tmpe.shape[0]
return None
def check_test_size (self, test_size) :
tmpe = np.load(os.path.join(self.test_dir, "coord.npy"))
if tmpe.shape[0] < test_size :
return self.test_dir, tmpe.shape[0]
else :
return None
def load_type (self, sys_path) :
atom_type = np.loadtxt (os.path.join(sys_path, "type.raw"), dtype=np.int32, ndmin=1)
natoms = atom_type.shape[0]
idx = np.arange (natoms)
idx_map = np.lexsort ((idx, atom_type))
atom_type3 = np.repeat(atom_type, 3)
idx3 = np.arange (natoms * 3)
idx3_map = np.lexsort ((idx3, atom_type3))
return atom_type, idx_map, idx3_map
def load_type_map(self, sys_path) :
fname = os.path.join(sys_path, 'type_map.raw')
if os.path.isfile(fname) :
with open(os.path.join(sys_path, 'type_map.raw')) as fp:
return fp.read().split()
else :
return None
def get_type_map(self) :
return self.type_map
def get_numb_set (self) :
return len (self.train_dirs)
def stats_energy (self) :
eners = np.array([])
for ii in self.train_dirs:
ener_file = os.path.join(ii, "energy.npy")
if os.path.isfile(ener_file) :
ei = np.load(ener_file)
eners = np.append(eners, ei)
if eners.size == 0 :
return 0
else :
return np.average(eners)
def load_energy(self,
set_name,
nframes,
nvalues,
energy_file,
atom_energy_file) :
"""
return : coeff_ener, ener, coeff_atom_ener, atom_ener
"""
# load atom_energy
coeff_atom_ener, atom_ener = self.load_data(set_name, atom_energy_file, [nframes, nvalues], False)
# ignore energy_file
if coeff_atom_ener == 1:
ener = np.sum(atom_ener, axis = 1)
coeff_ener = 1
# load energy_file
else:
coeff_ener, ener = self.load_data(set_name, energy_file, [nframes], False)
return coeff_ener, ener, coeff_atom_ener, atom_ener
def load_data(self, set_name, data_name, shape, is_necessary = True):
path = os.path.join(set_name, data_name+".npy")
if os.path.isfile (path) :
data = np.load(path)
data = np.reshape(data, shape)
if is_necessary:
return data
return 1, data
elif is_necessary:
raise OSError("%s not found!" % path)
else:
data = np.zeros(shape)
return 0, data
def load_set(self, set_name, shuffle = True):
data = {}
data["box"] = self.load_data(set_name, "box", [-1, 9])
nframe = data["box"].shape[0]
data["coord"] = self.load_data(set_name, "coord", [nframe, -1])
ncoord = data["coord"].shape[1]
if self.has_fparam >= 0:
data["fparam"] = self.load_data(set_name, "fparam", [nframe, -1])
if self.has_fparam == 0 :
self.has_fparam = data["fparam"].shape[1]
else :
assert self.has_fparam == data["fparam"].shape[1]
if self.has_aparam >= 0:
data["aparam"] = self.load_data(set_name, "aparam", [nframe, -1])
if self.has_aparam == 0 :
self.has_aparam = data["aparam"].shape[1] // (ncoord//3)
else :
assert self.has_aparam == data["aparam"].shape[1] // (ncoord//3)
data["prop_c"] = np.zeros(5)
data["prop_c"][0], data["energy"], data["prop_c"][3], data["atom_ener"] \
= self.load_energy (set_name, nframe, ncoord // 3, "energy", "atom_ener")
data["prop_c"][1], data["force"] = self.load_data(set_name, "force", [nframe, ncoord], False)
data["prop_c"][2], data["virial"] = self.load_data(set_name, "virial", [nframe, 9], False)
data["prop_c"][4], data["atom_pref"] = self.load_data(set_name, "atom_pref", [nframe, ncoord//3], False)
data["atom_pref"] = np.repeat(data["atom_pref"], 3, axis=1)
# shuffle data
if shuffle:
idx = np.arange (nframe)
dp_random.shuffle(idx)
for ii in data:
if ii != "prop_c":
data[ii] = data[ii][idx]
data["type"] = np.tile (self.atom_type, (nframe, 1))
# sort according to type
for ii in ["type", "atom_ener"]:
data[ii] = data[ii][:, self.idx_map]
for ii in ["coord", "force", "atom_pref"]:
data[ii] = data[ii][:, self.idx3_map]
return data
def load_batch_set (self,
set_name) :
self.batch_set = self.load_set(set_name, True)
self.reset_iter ()
def load_test_set (self,
set_name,
shuffle_test) :
self.test_set = self.load_set(set_name, shuffle_test)
def reset_iter (self) :
self.iterator = 0
self.set_count += 1
def get_set(self, data, idx = None) :
new_data = {}
for ii in data:
dd = data[ii]
if ii == "prop_c":
new_data[ii] = dd.astype(np.float32)
else:
if idx is not None:
dd = dd[idx]
if ii == "type":
new_data[ii] = dd
else:
new_data[ii] = dd.astype(GLOBAL_NP_FLOAT_PRECISION)
return new_data
def get_test (self) :
"""
returned property prefector [4] in order:
energy, force, virial, atom_ener
"""
return self.get_set(self.test_set)
def get_batch (self,
batch_size) :
"""
returned property prefector [4] in order:
energy, force, virial, atom_ener
"""
set_size = self.batch_set["energy"].shape[0]
# assert (batch_size <= set_size), "batch size should be no more than set size"
if self.iterator + batch_size > set_size :
self.load_batch_set (self.train_dirs[self.set_count % self.get_numb_set()])
set_size = self.batch_set["energy"].shape[0]
# print ("%d %d %d" % (self.iterator, self.iterator + batch_size, set_size))
iterator_1 = self.iterator + batch_size
if iterator_1 >= set_size :
iterator_1 = set_size
idx = np.arange (self.iterator, iterator_1)
self.iterator += batch_size
return self.get_set(self.batch_set, idx)
def get_natoms (self) :
sample_type = self.batch_set["type"][0]
natoms = len(sample_type)
return natoms
def get_natoms_2 (self, ntypes) :
sample_type = self.batch_set["type"][0]
natoms = len(sample_type)
natoms_vec = np.zeros (ntypes).astype(int)
for ii in range (ntypes) :
natoms_vec[ii] = np.count_nonzero(sample_type == ii)
return natoms, natoms_vec
def get_natoms_vec (self, ntypes) :
natoms, natoms_vec = self.get_natoms_2 (ntypes)
tmp = [natoms, natoms]
tmp = np.append (tmp, natoms_vec)
return tmp.astype(np.int32)
def set_numb_batch (self,
batch_size) :
return self.batch_set["energy"].shape[0] // batch_size
def get_sys_numb_batch (self, batch_size) :
return self.set_numb_batch(batch_size) * self.get_numb_set()
def get_ener (self) :
return self.eavg
def numb_fparam(self) :
return self.has_fparam
def numb_aparam(self) :
return self.has_aparam
#!/usr/bin/env python3
import logging
import os
import collections
import warnings
import numpy as np
from typing import Tuple, List
from deepmd.utils import random as dp_random
from deepmd.utils.data import DataSets
from deepmd.utils.data import DeepmdData
log = logging.getLogger(__name__)
class DeepmdDataSystem() :
"""
Class for manipulating many data systems.
It is implemented with the help of DeepmdData
"""
def __init__ (self,
systems : List[str],
batch_size : int,
test_size : int,
rcut : float,
set_prefix : str = 'set',
shuffle_test : bool = True,
type_map : List[str] = None,
modifier = None,
trn_all_set = False,
sys_probs = None,
auto_prob_style ="prob_sys_size") :
"""
Constructor
Parameters
----------
systems
Specifying the paths to systems
batch_size
The batch size
test_size
The size of test data
rcut
The cut-off radius
set_prefix
Prefix for the directories of different sets
shuffle_test
If the test data are shuffled
type_map
Gives the name of different atom types
modifier
Data modifier that has the method `modify_data`
trn_all_set
Use all sets as training dataset. Otherwise, if the number of sets is more than 1, the last set is left for test.
sys_probs: list of float
The probabilitis of systems to get the batch.
Summation of positive elements of this list should be no greater than 1.
Element of this list can be negative, the probability of the corresponding system is determined
automatically by the number of batches in the system.
auto_prob_style: str
Determine the probability of systems automatically. The method is assigned by this key and can be
- "prob_uniform" : the probability all the systems are equal, namely 1.0/self.get_nsystems()
- "prob_sys_size" : the probability of a system is proportional to the number of batches in the system
- "prob_sys_size;stt_idx:end_idx:weight;stt_idx:end_idx:weight;..." :
the list of systems is devided into blocks. A block is specified by `stt_idx:end_idx:weight`,
where `stt_idx` is the starting index of the system, `end_idx` is then ending (not including) index of the system,
the probabilities of the systems in this block sums up to `weight`, and the relatively probabilities within this block is proportional
to the number of batches in the system."""
# init data
self.rcut = rcut
self.system_dirs = systems
self.nsystems = len(self.system_dirs)
self.data_systems = []
for ii in self.system_dirs :
self.data_systems.append(
DeepmdData(
ii,
set_prefix=set_prefix,
shuffle_test=shuffle_test,
type_map = type_map,
modifier = modifier,
trn_all_set = trn_all_set
))
# check mix_type format
error_format_msg = "if one of the system is of mixed_type format, " \
"then all of the systems should be of mixed_type format!"
if self.data_systems[0].mixed_type:
for data_sys in self.data_systems[1:]:
assert data_sys.mixed_type, error_format_msg
self.mixed_type = True
else:
for data_sys in self.data_systems[1:]:
assert not data_sys.mixed_type, error_format_msg
self.mixed_type = False
# batch size
self.batch_size = batch_size
if isinstance(self.batch_size, int):
self.batch_size = self.batch_size * np.ones(self.nsystems, dtype=int)
elif isinstance(self.batch_size, str):
words = self.batch_size.split(':')
if 'auto' == words[0] :
rule = 32
if len(words) == 2 :
rule = int(words[1])
else:
raise RuntimeError('unknown batch_size rule ' + words[0])
self.batch_size = self._make_auto_bs(rule)
elif isinstance(self.batch_size, list):
pass
else :
raise RuntimeError('invalid batch_size')
assert(isinstance(self.batch_size, (list,np.ndarray)))
assert(len(self.batch_size) == self.nsystems)
# natoms, nbatches
ntypes = []
for ii in self.data_systems :
ntypes.append(ii.get_ntypes())
self.sys_ntypes = max(ntypes)
self.natoms = []
self.natoms_vec = []
self.nbatches = []
type_map_list = []
for ii in range(self.nsystems) :
self.natoms.append(self.data_systems[ii].get_natoms())
self.natoms_vec.append(self.data_systems[ii].get_natoms_vec(self.sys_ntypes).astype(int))
self.nbatches.append(self.data_systems[ii].get_sys_numb_batch(self.batch_size[ii]))
type_map_list.append(self.data_systems[ii].get_type_map())
self.type_map = self._check_type_map_consistency(type_map_list)
# ! altered by Marián Rynik
# test size
# now test size can be set as a percentage of systems data or test size
# can be set for each system individualy in the same manner as batch
# size. This enables one to use systems with diverse number of
# structures and different number of atoms.
self.test_size = test_size
if isinstance(self.test_size, int):
self.test_size = self.test_size * np.ones(self.nsystems, dtype=int)
elif isinstance(self.test_size, str):
words = self.test_size.split('%')
try:
percent = int(words[0])
except ValueError:
raise RuntimeError('unknown test_size rule ' + words[0])
self.test_size = self._make_auto_ts(percent)
elif isinstance(self.test_size, list):
pass
else :
raise RuntimeError('invalid test_size')
assert(isinstance(self.test_size, (list,np.ndarray)))
assert(len(self.test_size) == self.nsystems)
# prob of batch, init pick idx
self.prob_nbatches = [ float(i) for i in self.nbatches] / np.sum(self.nbatches)
self.pick_idx = 0
# derive system probabilities
self.sys_probs = None
self.set_sys_probs(sys_probs, auto_prob_style)
# check batch and test size
for ii in range(self.nsystems) :
chk_ret = self.data_systems[ii].check_batch_size(self.batch_size[ii])
if chk_ret is not None :
warnings.warn("system %s required batch size is larger than the size of the dataset %s (%d > %d)" % \
(self.system_dirs[ii], chk_ret[0], self.batch_size[ii], chk_ret[1]))
chk_ret = self.data_systems[ii].check_test_size(self.test_size[ii])
if chk_ret is not None :
warnings.warn("system %s required test size is larger than the size of the dataset %s (%d > %d)" % \
(self.system_dirs[ii], chk_ret[0], self.test_size[ii], chk_ret[1]))
def _load_test(self, ntests = -1):
self.test_data = collections.defaultdict(list)
for ii in range(self.nsystems) :
test_system_data = self.data_systems[ii].get_test(ntests = ntests)
for nn in test_system_data:
self.test_data[nn].append(test_system_data[nn])
def _make_default_mesh(self):
self.default_mesh = []
cell_size = np.max (self.rcut)
for ii in range(self.nsystems) :
if self.data_systems[ii].pbc :
test_system_data = self.data_systems[ii].get_batch(self.batch_size[ii])
self.data_systems[ii].reset_get_batch()
# test_system_data = self.data_systems[ii].get_test()
avg_box = np.average (test_system_data["box"], axis = 0)
avg_box = np.reshape (avg_box, [3,3])
ncell = (np.linalg.norm(avg_box, axis=1)/ cell_size).astype(np.int32)
ncell[ncell < 2] = 2
default_mesh = np.zeros (6, dtype = np.int32)
default_mesh[3:6] = ncell
self.default_mesh.append(default_mesh)
else:
self.default_mesh.append(np.array([], dtype = np.int32))
def compute_energy_shift(self, rcond = 1e-3, key = 'energy') :
sys_ener = np.array([])
for ss in self.data_systems :
sys_ener = np.append(sys_ener, ss.avg(key))
sys_tynatom = np.array(self.natoms_vec, dtype = float)
sys_tynatom = np.reshape(sys_tynatom, [self.nsystems,-1])
sys_tynatom = sys_tynatom[:,2:]
energy_shift,resd,rank,s_value \
= np.linalg.lstsq(sys_tynatom, sys_ener, rcond = rcond)
return energy_shift
def add_dict(self, adict: dict) -> None:
"""
Add items to the data system by a `dict`.
`adict` should have items like
.. code-block:: python
adict[key] = {
'ndof': ndof,
'atomic': atomic,
'must': must,
'high_prec': high_prec,
'type_sel': type_sel,
'repeat': repeat,
}
For the explaination of the keys see `add`
"""
for kk in adict :
self.add(kk,
adict[kk]['ndof'],
atomic=adict[kk]['atomic'],
must=adict[kk]['must'],
high_prec=adict[kk]['high_prec'],
type_sel=adict[kk]['type_sel'],
repeat=adict[kk]['repeat'],
default=adict[kk]['default'],
)
def add(self,
key : str,
ndof : int,
atomic : bool = False,
must : bool = False,
high_prec : bool = False,
type_sel : List[int] = None,
repeat : int = 1,
default: float=0.,
) :
"""
Add a data item that to be loaded
Parameters
----------
key
The key of the item. The corresponding data is stored in `sys_path/set.*/key.npy`
ndof
The number of dof
atomic
The item is an atomic property.
If False, the size of the data should be nframes x ndof
If True, the size of data should be nframes x natoms x ndof
must
The data file `sys_path/set.*/key.npy` must exist.
If must is False and the data file does not exist, the `data_dict[find_key]` is set to 0.0
high_prec
Load the data and store in float64, otherwise in float32
type_sel
Select certain type of atoms
repeat
The data will be repeated `repeat` times.
default, default=0.
Default value of data
"""
for ii in self.data_systems:
ii.add(key, ndof, atomic=atomic, must=must, high_prec=high_prec, repeat=repeat, type_sel=type_sel, default=default)
def reduce(self, key_out, key_in):
"""
Generate a new item from the reduction of another atom
Parameters
----------
key_out
The name of the reduced item
key_in
The name of the data item to be reduced
"""
for ii in self.data_systems:
ii.reduce(key_out, key_in)
def get_data_dict(self, ii: int = 0) -> dict:
return self.data_systems[ii].get_data_dict()
def set_sys_probs(self, sys_probs=None,
auto_prob_style: str = "prob_sys_size"):
if sys_probs is None :
if auto_prob_style == "prob_uniform":
prob_v = 1./float(self.nsystems)
probs = [prob_v for ii in range(self.nsystems)]
elif auto_prob_style == "prob_sys_size":
probs = self.prob_nbatches
elif auto_prob_style[:14] == "prob_sys_size;":
probs = self._prob_sys_size_ext(auto_prob_style)
else:
raise RuntimeError("Unknown auto prob style: " + auto_prob_style)
else:
probs = self._process_sys_probs(sys_probs)
self.sys_probs = probs
def _get_sys_probs(self,
sys_probs,
auto_prob_style) : # depreciated
if sys_probs is None :
if auto_prob_style == "prob_uniform" :
prob_v = 1./float(self.nsystems)
prob = [prob_v for ii in range(self.nsystems)]
elif auto_prob_style == "prob_sys_size" :
prob = self.prob_nbatches
elif auto_prob_style[:14] == "prob_sys_size;" :
prob = self._prob_sys_size_ext(auto_prob_style)
else :
raise RuntimeError("unkown style " + auto_prob_style )
else :
prob = self._process_sys_probs(sys_probs)
return prob
def get_batch(self, sys_idx : int = None):
# batch generation style altered by Ziyao Li:
# one should specify the "sys_prob" and "auto_prob_style" params
# via set_sys_prob() function. The sys_probs this function uses is
# defined as a private variable, self.sys_probs, initialized in __init__().
# This is to optimize the (vain) efforts in evaluating sys_probs every batch.
"""
Get a batch of data from the data systems
Parameters
----------
sys_idx: int
The index of system from which the batch is get.
If sys_idx is not None, `sys_probs` and `auto_prob_style` are ignored
If sys_idx is None, automatically determine the system according to `sys_probs` or `auto_prob_style`, see the following.
"""
if not hasattr(self, 'default_mesh') :
self._make_default_mesh()
if sys_idx is not None :
self.pick_idx = sys_idx
else :
# prob = self._get_sys_probs(sys_probs, auto_prob_style)
self.pick_idx = dp_random.choice(np.arange(self.nsystems), p=self.sys_probs)
b_data = self.data_systems[self.pick_idx].get_batch(self.batch_size[self.pick_idx])
b_data["natoms_vec"] = self.natoms_vec[self.pick_idx]
b_data["default_mesh"] = self.default_mesh[self.pick_idx]
return b_data
# ! altered by Marián Rynik
def get_test (self,
sys_idx : int = None,
n_test : int = -1) : # depreciated
"""
Get test data from the the data systems.
Parameters
----------
sys_idx
The test dat of system with index `sys_idx` will be returned.
If is None, the currently selected system will be returned.
n_test
Number of test data. If set to -1 all test data will be get.
"""
if not hasattr(self, 'default_mesh') :
self._make_default_mesh()
if not hasattr(self, 'test_data') :
self._load_test(ntests = n_test)
if sys_idx is not None :
idx = sys_idx
else :
idx = self.pick_idx
test_system_data = {}
for nn in self.test_data:
test_system_data[nn] = self.test_data[nn][idx]
test_system_data["natoms_vec"] = self.natoms_vec[idx]
test_system_data["default_mesh"] = self.default_mesh[idx]
return test_system_data
def get_sys_ntest(self, sys_idx=None):
"""
Get number of tests for the currently selected system,
or one defined by sys_idx.
"""
if sys_idx is not None :
return self.test_size[sys_idx]
else :
return self.test_size[self.pick_idx]
def get_type_map(self) -> List[str]:
"""
Get the type map
"""
return self.type_map
def get_nbatches (self) -> int:
"""
Get the total number of batches
"""
return self.nbatches
def get_ntypes (self) -> int:
"""
Get the number of types
"""
return self.sys_ntypes
def get_nsystems (self) -> int:
"""
Get the number of data systems
"""
return self.nsystems
def get_sys (self, idx : int) -> DeepmdData:
"""
Get a certain data system
"""
return self.data_systems[idx]
def get_batch_size(self) -> int:
"""
Get the batch size
"""
return self.batch_size
def _format_name_length(self, name, width) :
if len(name) <= width:
return '{: >{}}'.format(name, width)
else :
name = name[-(width-3):]
name = '-- ' + name
return name
def print_summary(self, name) :
# width 65
sys_width = 42
log.info(f"---Summary of DataSystem: {name:13s}-----------------------------------------------")
log.info("found %d system(s):" % self.nsystems)
log.info(("%s " % self._format_name_length('system', sys_width)) +
("%6s %6s %6s %5s %3s" % ('natoms', 'bch_sz', 'n_bch', 'prob', 'pbc')))
for ii in range(self.nsystems) :
log.info("%s %6d %6d %6d %5.3f %3s" %
(self._format_name_length(self.system_dirs[ii], sys_width),
self.natoms[ii],
# TODO batch size * nbatches = number of structures
self.batch_size[ii],
self.nbatches[ii],
self.sys_probs[ii],
"T" if self.data_systems[ii].pbc else "F"
) )
log.info("--------------------------------------------------------------------------------------")
def _make_auto_bs(self, rule) :
bs = []
for ii in self.data_systems:
ni = ii.get_natoms()
bsi = rule // ni
if bsi * ni < rule:
bsi += 1
bs.append(bsi)
return bs
# ! added by Marián Rynik
def _make_auto_ts(self, percent):
ts = []
for ii in range(self.nsystems):
ni = self.batch_size[ii] * self.nbatches[ii]
tsi = int(ni * percent / 100)
ts.append(tsi)
return ts
def _check_type_map_consistency(self, type_map_list):
ret = []
for ii in type_map_list:
if ii is not None:
min_len = min([len(ii), len(ret)])
for idx in range(min_len) :
if ii[idx] != ret[idx] :
raise RuntimeError('inconsistent type map: %s %s' % (str(ret), str(ii)))
if len(ii) > len(ret) :
ret = ii
return ret
def _process_sys_probs(self, sys_probs) :
sys_probs = np.array(sys_probs)
type_filter = sys_probs >= 0
assigned_sum_prob = np.sum(type_filter * sys_probs)
# 1e-8 is to handle floating point error; See #1917
assert assigned_sum_prob <= 1. + 1e-8, "the sum of assigned probability should be less than 1"
rest_sum_prob = 1. - assigned_sum_prob
if not np.isclose(rest_sum_prob, 0):
rest_nbatch = (1 - type_filter) * self.nbatches
rest_prob = rest_sum_prob * rest_nbatch / np.sum(rest_nbatch)
ret_prob = rest_prob + type_filter * sys_probs
else :
ret_prob = sys_probs
assert np.isclose(np.sum(ret_prob), 1), "sum of probs should be 1"
return ret_prob
def _prob_sys_size_ext(self, keywords):
block_str = keywords.split(';')[1:]
block_stt = []
block_end = []
block_weights = []
for ii in block_str:
stt = int(ii.split(':')[0])
end = int(ii.split(':')[1])
weight = float(ii.split(':')[2])
assert(weight >= 0), "the weight of a block should be no less than 0"
block_stt.append(stt)
block_end.append(end)
block_weights.append(weight)
nblocks = len(block_str)
block_probs = np.array(block_weights) / np.sum(block_weights)
sys_probs = np.zeros([self.get_nsystems()])
for ii in range(nblocks):
nbatch_block = self.nbatches[block_stt[ii]:block_end[ii]]
tmp_prob = [float(i) for i in nbatch_block] / np.sum(nbatch_block)
sys_probs[block_stt[ii]:block_end[ii]] = tmp_prob * block_probs[ii]
return sys_probs
class DataSystem (object) :
"""
Outdated class for the data systems.
.. deprecated:: 2.0.0
This class is not maintained any more.
"""
def __init__ (self,
systems,
set_prefix,
batch_size,
test_size,
rcut,
run_opt = None) :
self.system_dirs = systems
self.nsystems = len(self.system_dirs)
self.batch_size = batch_size
if isinstance(self.batch_size, int) :
self.batch_size = self.batch_size * np.ones(self.nsystems, dtype=int)
assert(isinstance(self.batch_size, (list,np.ndarray)))
assert(len(self.batch_size) == self.nsystems)
self.data_systems = []
self.ntypes = []
self.natoms = []
self.natoms_vec = []
self.nbatches = []
for ii in self.system_dirs :
self.data_systems.append(DataSets(ii, set_prefix))
sys_all_types = np.loadtxt(os.path.join(ii, "type.raw")).astype(int)
self.ntypes.append(np.max(sys_all_types) + 1)
self.sys_ntypes = max(self.ntypes)
type_map = []
for ii in range(self.nsystems) :
self.natoms.append(self.data_systems[ii].get_natoms())
self.natoms_vec.append(self.data_systems[ii].get_natoms_vec(self.sys_ntypes).astype(int))
self.nbatches.append(self.data_systems[ii].get_sys_numb_batch(self.batch_size[ii]))
type_map.append(self.data_systems[ii].get_type_map())
self.type_map = self.check_type_map_consistency(type_map)
# check frame parameters
has_fparam = [ii.numb_fparam() for ii in self.data_systems]
for ii in has_fparam :
if ii != has_fparam[0] :
raise RuntimeError("if any system has frame parameter, then all systems should have the same number of frame parameter")
self.has_fparam = has_fparam[0]
# check the size of data if they satisfy the requirement of batch and test
for ii in range(self.nsystems) :
chk_ret = self.data_systems[ii].check_batch_size(self.batch_size[ii])
if chk_ret is not None :
raise RuntimeError ("system %s required batch size %d is larger than the size %d of the dataset %s" % \
(self.system_dirs[ii], self.batch_size[ii], chk_ret[1], chk_ret[0]))
chk_ret = self.data_systems[ii].check_test_size(test_size)
if chk_ret is not None :
print("WARNNING: system %s required test size %d is larger than the size %d of the dataset %s" % \
(self.system_dirs[ii], test_size, chk_ret[1], chk_ret[0]))
if run_opt is not None:
self.print_summary(run_opt)
self.prob_nbatches = [ float(i) for i in self.nbatches] / np.sum(self.nbatches)
self.test_data = collections.defaultdict(list)
self.default_mesh = []
for ii in range(self.nsystems) :
test_system_data = self.data_systems[ii].get_test ()
for nn in test_system_data:
self.test_data[nn].append(test_system_data[nn])
cell_size = np.max (rcut)
avg_box = np.average (test_system_data["box"], axis = 0)
avg_box = np.reshape (avg_box, [3,3])
ncell = (np.linalg.norm(avg_box, axis=1)/ cell_size).astype(np.int32)
ncell[ncell < 2] = 2
default_mesh = np.zeros (6, dtype = np.int32)
default_mesh[3:6] = ncell
self.default_mesh.append(default_mesh)
self.pick_idx = 0
def check_type_map_consistency(self, type_map_list):
ret = []
for ii in type_map_list:
if ii is not None:
min_len = min([len(ii), len(ret)])
for idx in range(min_len) :
if ii[idx] != ret[idx] :
raise RuntimeError('inconsistent type map: %s %s' % (str(ret), str(ii)))
if len(ii) > len(ret) :
ret = ii
return ret
def get_type_map(self):
return self.type_map
def format_name_length(self, name, width) :
if len(name) <= width:
return '{: >{}}'.format(name, width)
else :
name = name[-(width-3):]
name = '-- ' + name
return name
def print_summary(self) :
tmp_msg = ""
# width 65
sys_width = 42
tmp_msg += "---Summary of DataSystem-----------------------------------------\n"
tmp_msg += "find %d system(s):\n" % self.nsystems
tmp_msg += "%s " % self.format_name_length('system', sys_width)
tmp_msg += "%s %s %s\n" % ('natoms', 'bch_sz', 'n_bch')
for ii in range(self.nsystems) :
tmp_msg += ("%s %6d %6d %5d\n" %
(self.format_name_length(self.system_dirs[ii], sys_width),
self.natoms[ii],
self.batch_size[ii],
self.nbatches[ii]) )
tmp_msg += "-----------------------------------------------------------------\n"
log.info(tmp_msg)
def compute_energy_shift(self) :
sys_ener = np.array([])
for ss in self.data_systems :
sys_ener = np.append(sys_ener, ss.get_ener())
sys_tynatom = np.array(self.natoms_vec, dtype = float)
sys_tynatom = np.reshape(sys_tynatom, [self.nsystems,-1])
sys_tynatom = sys_tynatom[:,2:]
energy_shift,resd,rank,s_value \
= np.linalg.lstsq(sys_tynatom, sys_ener, rcond = 1e-3)
return energy_shift
def process_sys_weights(self, sys_weights) :
sys_weights = np.array(sys_weights)
type_filter = sys_weights >= 0
assigned_sum_prob = np.sum(type_filter * sys_weights)
assert assigned_sum_prob <= 1, "the sum of assigned probability should be less than 1"
rest_sum_prob = 1. - assigned_sum_prob
rest_nbatch = (1 - type_filter) * self.nbatches
rest_prob = rest_sum_prob * rest_nbatch / np.sum(rest_nbatch)
ret_prob = rest_prob + type_filter * sys_weights
assert np.sum(ret_prob) == 1, "sum of probs should be 1"
return ret_prob
def get_batch (self,
sys_idx = None,
sys_weights = None,
style = "prob_sys_size") :
if sys_idx is not None :
self.pick_idx = sys_idx
else :
if sys_weights is None :
if style == "prob_sys_size" :
prob = self.prob_nbatches
elif style == "prob_uniform" :
prob = None
else :
raise RuntimeError("unkown get_batch style")
else :
prob = self.process_sys_weights(sys_weights)
self.pick_idx = dp_random.choice(np.arange(self.nsystems), p=prob)
b_data = self.data_systems[self.pick_idx].get_batch(self.batch_size[self.pick_idx])
b_data["natoms_vec"] = self.natoms_vec[self.pick_idx]
b_data["default_mesh"] = self.default_mesh[self.pick_idx]
return b_data
def get_test (self,
sys_idx = None) :
if sys_idx is not None :
idx = sys_idx
else :
idx = self.pick_idx
test_system_data = {}
for nn in self.test_data:
test_system_data[nn] = self.test_data[nn][idx]
test_system_data["natoms_vec"] = self.natoms_vec[idx]
test_system_data["default_mesh"] = self.default_mesh[idx]
return test_system_data
def get_nbatches (self) :
return self.nbatches
def get_ntypes (self) :
return self.sys_ntypes
def get_nsystems (self) :
return self.nsystems
def get_sys (self, sys_idx) :
return self.data_systems[sys_idx]
def get_batch_size(self) :
return self.batch_size
def numb_fparam(self) :
return self.has_fparam
def _main () :
sys = ['/home/wanghan/study/deep.md/results.01/data/mos2/only_raws/20',
'/home/wanghan/study/deep.md/results.01/data/mos2/only_raws/30',
'/home/wanghan/study/deep.md/results.01/data/mos2/only_raws/38',
'/home/wanghan/study/deep.md/results.01/data/mos2/only_raws/MoS2',
'/home/wanghan/study/deep.md/results.01/data/mos2/only_raws/Pt_cluster']
set_prefix = 'set'
ds = DataSystem (sys, set_prefix, 4, 6)
r = ds.get_batch()
print(r[1][0])
if __name__ == '__main__':
_main()
class GraphTooLargeError(Exception):
"""The graph is too large, exceeding protobuf's hard limit of 2GB."""
class GraphWithoutTensorError(Exception):
pass
class OutOfMemoryError(Exception):
"""This error is caused by out-of-memory (OOM)."""
\ No newline at end of file
import re
import numpy as np
from typing import Tuple, Dict
from deepmd.env import tf, EMBEDDING_NET_PATTERN, FITTING_NET_PATTERN, TYPE_EMBEDDING_PATTERN, ATTENTION_LAYER_PATTERN
from deepmd.utils.sess import run_sess
from deepmd.utils.errors import GraphWithoutTensorError
# TODO (JZ): I think in this file we can merge some duplicated lines into one method...
def load_graph_def(model_file: str) -> Tuple[tf.Graph, tf.GraphDef]:
"""
Load graph as well as the graph_def from the frozen model(model_file)
Parameters
----------
model_file : str
The input frozen model path
Returns
-------
tf.Graph
The graph loaded from the frozen model
tf.GraphDef
The graph_def loaded from the frozen model
"""
graph_def = tf.GraphDef()
with open(model_file, "rb") as f:
graph_def.ParseFromString(f.read())
with tf.Graph().as_default() as graph:
tf.import_graph_def(graph_def, name = "")
return graph, graph_def
def get_tensor_by_name_from_graph(graph: tf.Graph,
tensor_name: str) -> tf.Tensor:
"""
Load tensor value from the given tf.Graph object
Parameters
----------
graph : tf.Graph
The input TensorFlow graph
tensor_name : str
Indicates which tensor which will be loaded from the frozen model
Returns
-------
tf.Tensor
The tensor which was loaded from the frozen model
Raises
------
GraphWithoutTensorError
Whether the tensor_name is within the frozen model
"""
try:
tensor = graph.get_tensor_by_name(tensor_name + ':0')
except KeyError as e:
raise GraphWithoutTensorError() from e
with tf.Session(graph=graph) as sess:
tensor = run_sess(sess, tensor)
return tensor
def get_tensor_by_name(model_file: str,
tensor_name: str) -> tf.Tensor:
"""
Load tensor value from the frozen model(model_file)
Parameters
----------
model_file : str
The input frozen model path
tensor_name : str
Indicates which tensor which will be loaded from the frozen model
Returns
-------
tf.Tensor
The tensor which was loaded from the frozen model
Raises
------
GraphWithoutTensorError
Whether the tensor_name is within the frozen model
"""
graph, _ = load_graph_def(model_file)
return get_tensor_by_name_from_graph(graph, tensor_name)
def get_tensor_by_type(node,
data_type : np.dtype) -> tf.Tensor:
"""
Get the tensor value within the given node according to the input data_type
Parameters
----------
node
The given tensorflow graph node
data_type
The data type of the node
Returns
----------
tf.Tensor
The tensor value of the given node
"""
if data_type == np.float64:
tensor = np.array(node.double_val)
elif data_type == np.float32:
tensor = np.array(node.float_val)
else:
raise RuntimeError('model compression does not support the half precision')
return tensor
def get_pattern_nodes_from_graph_def(graph_def: tf.GraphDef, pattern: str) -> Dict:
"""
Get the pattern nodes with the given tf.GraphDef object
Parameters
----------
graph_def
The input tf.GraphDef object
pattern
The node pattern within the graph_def
Returns
----------
Dict
The fitting net nodes within the given tf.GraphDef object
"""
nodes = {}
pattern = re.compile(pattern)
for node in graph_def.node:
if re.fullmatch(pattern, node.name) != None:
nodes[node.name] = node.attr["value"].tensor
return nodes
def get_embedding_net_nodes_from_graph_def(graph_def: tf.GraphDef, suffix: str = "") -> Dict:
"""
Get the embedding net nodes with the given tf.GraphDef object
Parameters
----------
graph_def
The input tf.GraphDef object
suffix : str, optional
The scope suffix
Returns
----------
Dict
The embedding net nodes within the given tf.GraphDef object
"""
# embedding_net_pattern = f"filter_type_\d+{suffix}/matrix_\d+_\d+|filter_type_\d+{suffix}/bias_\d+_\d+|filter_type_\d+{suffix}/idt_\d+_\d+|filter_type_all{suffix}/matrix_\d+_\d+|filter_type_all{suffix}/matrix_\d+_\d+_\d+|filter_type_all{suffix}/bias_\d+_\d+|filter_type_all{suffix}/bias_\d+_\d+_\d+|filter_type_all{suffix}/idt_\d+_\d+"
if suffix != "":
embedding_net_pattern = EMBEDDING_NET_PATTERN\
.replace('/idt', suffix + '/idt')\
.replace('/bias', suffix + '/bias')\
.replace('/matrix', suffix + '/matrix')
else:
embedding_net_pattern = EMBEDDING_NET_PATTERN
embedding_net_nodes = get_pattern_nodes_from_graph_def(graph_def, embedding_net_pattern)
for key in embedding_net_nodes.keys():
assert key.find('bias') > 0 or key.find(
'matrix') > 0, "currently, only support weight matrix and bias matrix at the tabulation op!"
return embedding_net_nodes
def get_embedding_net_nodes(model_file: str, suffix: str = "") -> Dict:
"""
Get the embedding net nodes with the given frozen model(model_file)
Parameters
----------
model_file
The input frozen model path
suffix : str, optional
The suffix of the scope
Returns
----------
Dict
The embedding net nodes with the given frozen model
"""
_, graph_def = load_graph_def(model_file)
return get_embedding_net_nodes_from_graph_def(graph_def, suffix=suffix)
def get_embedding_net_variables_from_graph_def(graph_def : tf.GraphDef, suffix: str = "") -> Dict:
"""
Get the embedding net variables with the given tf.GraphDef object
Parameters
----------
graph_def
The input tf.GraphDef object
suffix : str, optional
The suffix of the scope
Returns
----------
Dict
The embedding net variables within the given tf.GraphDef object
"""
embedding_net_variables = {}
embedding_net_nodes = get_embedding_net_nodes_from_graph_def(graph_def, suffix=suffix)
for item in embedding_net_nodes:
node = embedding_net_nodes[item]
dtype = tf.as_dtype(node.dtype).as_numpy_dtype
tensor_shape = tf.TensorShape(node.tensor_shape).as_list()
if (len(tensor_shape) != 1) or (tensor_shape[0] != 1):
tensor_value = np.frombuffer(node.tensor_content, dtype = tf.as_dtype(node.dtype).as_numpy_dtype)
else:
tensor_value = get_tensor_by_type(node, dtype)
embedding_net_variables[item] = np.reshape(tensor_value, tensor_shape)
return embedding_net_variables
def get_embedding_net_variables(model_file : str, suffix: str = "") -> Dict:
"""
Get the embedding net variables with the given frozen model(model_file)
Parameters
----------
model_file
The input frozen model path
suffix : str, optional
The suffix of the scope
Returns
----------
Dict
The embedding net variables within the given frozen model
"""
_, graph_def = load_graph_def(model_file)
return get_embedding_net_variables_from_graph_def(graph_def, suffix=suffix)
def get_fitting_net_nodes_from_graph_def(graph_def: tf.GraphDef, suffix: str = "") -> Dict:
"""
Get the fitting net nodes with the given tf.GraphDef object
Parameters
----------
graph_def
The input tf.GraphDef object
suffix
suffix of the scope
Returns
----------
Dict
The fitting net nodes within the given tf.GraphDef object
"""
if suffix != "":
fitting_net_pattern = FITTING_NET_PATTERN\
.replace('/idt', suffix + '/idt')\
.replace('/bias', suffix + '/bias')\
.replace('/matrix', suffix + '/matrix')
else:
fitting_net_pattern = FITTING_NET_PATTERN
fitting_net_nodes = get_pattern_nodes_from_graph_def(graph_def, fitting_net_pattern)
for key in fitting_net_nodes.keys():
assert key.find('bias') > 0 or key.find('matrix') > 0 or key.find(
'idt') > 0, "currently, only support weight matrix, bias and idt at the model compression process!"
return fitting_net_nodes
def get_fitting_net_nodes(model_file : str) -> Dict:
"""
Get the fitting net nodes with the given frozen model(model_file)
Parameters
----------
model_file
The input frozen model path
Returns
----------
Dict
The fitting net nodes with the given frozen model
"""
_, graph_def = load_graph_def(model_file)
return get_fitting_net_nodes_from_graph_def(graph_def)
def get_fitting_net_variables_from_graph_def(graph_def : tf.GraphDef, suffix: str = "") -> Dict:
"""
Get the fitting net variables with the given tf.GraphDef object
Parameters
----------
graph_def
The input tf.GraphDef object
suffix
suffix of the scope
Returns
----------
Dict
The fitting net variables within the given tf.GraphDef object
"""
fitting_net_variables = {}
fitting_net_nodes = get_fitting_net_nodes_from_graph_def(graph_def, suffix=suffix)
for item in fitting_net_nodes:
node = fitting_net_nodes[item]
dtype= tf.as_dtype(node.dtype).as_numpy_dtype
tensor_shape = tf.TensorShape(node.tensor_shape).as_list()
if (len(tensor_shape) != 1) or (tensor_shape[0] != 1):
tensor_value = np.frombuffer(node.tensor_content, dtype = tf.as_dtype(node.dtype).as_numpy_dtype)
else:
tensor_value = get_tensor_by_type(node, dtype)
fitting_net_variables[item] = np.reshape(tensor_value, tensor_shape)
return fitting_net_variables
def get_fitting_net_variables(model_file : str, suffix: str = "") -> Dict:
"""
Get the fitting net variables with the given frozen model(model_file)
Parameters
----------
model_file
The input frozen model path
suffix
suffix of the scope
Returns
----------
Dict
The fitting net variables within the given frozen model
"""
_, graph_def = load_graph_def(model_file)
return get_fitting_net_variables_from_graph_def(graph_def, suffix=suffix)
def get_type_embedding_net_nodes_from_graph_def(graph_def: tf.GraphDef, suffix: str = "") -> Dict:
"""
Get the type embedding net nodes with the given tf.GraphDef object
Parameters
----------
graph_def
The input tf.GraphDef object
suffix : str, optional
The scope suffix
Returns
----------
Dict
The type embedding net nodes within the given tf.GraphDef object
"""
if suffix != "":
type_embedding_net_pattern = TYPE_EMBEDDING_PATTERN\
.replace('/idt', suffix + '/idt')\
.replace('/bias', suffix + '/bias')\
.replace('/matrix', suffix + '/matrix')
else:
type_embedding_net_pattern = TYPE_EMBEDDING_PATTERN
type_embedding_net_nodes = get_pattern_nodes_from_graph_def(graph_def, type_embedding_net_pattern)
return type_embedding_net_nodes
def get_type_embedding_net_variables_from_graph_def(graph_def: tf.GraphDef, suffix: str = "") -> Dict:
"""
Get the type embedding net variables with the given tf.GraphDef object
Parameters
----------
graph_def : tf.GraphDef
The input tf.GraphDef object
suffix : str, optional
The suffix of the scope
Returns
----------
Dict
The embedding net variables within the given tf.GraphDef object
"""
type_embedding_net_variables = {}
type_embedding_net_nodes = get_type_embedding_net_nodes_from_graph_def(graph_def, suffix=suffix)
for item in type_embedding_net_nodes:
node = type_embedding_net_nodes[item]
dtype = tf.as_dtype(node.dtype).as_numpy_dtype
tensor_shape = tf.TensorShape(node.tensor_shape).as_list()
if (len(tensor_shape) != 1) or (tensor_shape[0] != 1):
tensor_value = np.frombuffer(node.tensor_content, dtype = tf.as_dtype(node.dtype).as_numpy_dtype)
else:
tensor_value = get_tensor_by_type(node, dtype)
type_embedding_net_variables[item] = np.reshape(tensor_value, tensor_shape)
return type_embedding_net_variables
def get_attention_layer_nodes_from_graph_def(graph_def: tf.GraphDef, suffix: str = "") -> Dict:
"""
Get the attention layer nodes with the given tf.GraphDef object
Parameters
----------
graph_def
The input tf.GraphDef object
suffix : str, optional
The scope suffix
Returns
----------
Dict
The attention layer nodes within the given tf.GraphDef object
"""
if suffix != "":
attention_layer_pattern = ATTENTION_LAYER_PATTERN \
.replace('/c_query', suffix + '/c_query') \
.replace('/c_key', suffix + '/c_key') \
.replace('/c_value', suffix + '/c_value') \
.replace('/c_out', suffix + '/c_out') \
.replace('/layer_normalization', suffix + '/layer_normalization')
else:
attention_layer_pattern = ATTENTION_LAYER_PATTERN
attention_layer_nodes = get_pattern_nodes_from_graph_def(graph_def, attention_layer_pattern)
return attention_layer_nodes
def get_attention_layer_variables_from_graph_def(graph_def: tf.GraphDef, suffix: str = "") -> Dict:
"""
Get the attention layer variables with the given tf.GraphDef object
Parameters
----------
graph_def : tf.GraphDef
The input tf.GraphDef object
suffix : str, optional
The suffix of the scope
Returns
----------
Dict
The attention layer variables within the given tf.GraphDef object
"""
attention_layer_variables = {}
attention_layer_net_nodes = get_attention_layer_nodes_from_graph_def(graph_def, suffix=suffix)
for item in attention_layer_net_nodes:
node = attention_layer_net_nodes[item]
dtype = tf.as_dtype(node.dtype).as_numpy_dtype
tensor_shape = tf.TensorShape(node.tensor_shape).as_list()
if (len(tensor_shape) != 1) or (tensor_shape[0] != 1):
tensor_value = np.frombuffer(node.tensor_content, dtype=tf.as_dtype(node.dtype).as_numpy_dtype)
else:
tensor_value = get_tensor_by_type(node, dtype)
attention_layer_variables[item] = np.reshape(tensor_value, tensor_shape)
return attention_layer_variables
import numpy as np
from deepmd.env import tf
from deepmd.common import ClassArg
class LearningRateExp (object) :
r"""
The exponentially decaying learning rate.
The learning rate at step :math:`t` is given by
.. math::
\alpha(t) = \alpha_0 \lambda ^ { t / \tau }
where :math:`\alpha` is the learning rate, :math:`\alpha_0` is the starting learning rate,
:math:`\lambda` is the decay rate, and :math:`\tau` is the decay steps.
Parameters
----------
start_lr
Starting learning rate :math:`\alpha_0`
stop_lr
Stop learning rate :math:`\alpha_1`
decay_steps
Learning rate decay every this number of steps :math:`\tau`
decay_rate
The decay rate :math:`\lambda`.
If `stop_step` is provided in `build`, then it will be determined automatically and overwritten.
"""
def __init__ (self,
start_lr : float,
stop_lr : float = 5e-8,
decay_steps : int = 5000,
decay_rate : float = 0.95
) -> None :
"""
Constructor
"""
# args = ClassArg()\
# .add('decay_steps', int, must = False)\
# .add('decay_rate', float, must = False)\
# .add('start_lr', float, must = True)\
# .add('stop_lr', float, must = False)
# self.cd = args.parse(jdata)
self.cd = {}
self.cd['start_lr'] = start_lr
self.cd['stop_lr'] = stop_lr
self.cd['decay_steps'] = decay_steps
self.cd['decay_rate'] = decay_rate
self.start_lr_ = self.cd['start_lr']
def build(self,
global_step : tf.Tensor,
stop_step : int = None
) -> tf.Tensor :
"""
Build the learning rate
Parameters
----------
global_step
The tf Tensor prividing the global training step
stop_step
The stop step. If provided, the decay_rate will be determined automatically and overwritten.
Returns
-------
learning_rate
The learning rate
"""
if stop_step is None:
self.decay_steps_ = self.cd['decay_steps'] if self.cd['decay_steps'] is not None else 5000
self.decay_rate_ = self.cd['decay_rate'] if self.cd['decay_rate'] is not None else 0.95
else:
self.stop_lr_ = self.cd['stop_lr'] if self.cd['stop_lr'] is not None else 5e-8
default_ds = 100 if stop_step // 10 > 100 else stop_step // 100 + 1
self.decay_steps_ = self.cd['decay_steps'] if self.cd['decay_steps'] is not None else default_ds
if self.decay_steps_ >= stop_step:
self.decay_steps_ = default_ds
self.decay_rate_ = np.exp(np.log(self.stop_lr_ / self.start_lr_) / (stop_step / self.decay_steps_))
return tf.train.exponential_decay(self.start_lr_,
global_step,
self.decay_steps_,
self.decay_rate_,
staircase=True)
def start_lr(self) -> float:
"""
Get the start lr
"""
return self.start_lr_
def value (self,
step : int
) -> float:
"""
Get the lr at a certain step
"""
return self.start_lr_ * np.power (self.decay_rate_, (step // self.decay_steps_))
import math
import logging
import numpy as np
from deepmd.env import tf
from typing import Tuple, List
from deepmd.env import op_module
from deepmd.env import default_tf_session_config
from deepmd.env import GLOBAL_NP_FLOAT_PRECISION
from deepmd.utils.data_system import DeepmdDataSystem
from deepmd.utils.parallel_op import ParallelOp
log = logging.getLogger(__name__)
class NeighborStat():
"""
Class for getting training data information.
It loads data from DeepmdData object, and measures the data info, including neareest nbor distance between atoms, max nbor size of atoms and the output data range of the environment matrix.
Parameters
----------
ntypes
The num of atom types
rcut
The cut-off radius
one_type : bool, optional, default=False
Treat all types as a single type.
"""
def __init__(self,
ntypes : int,
rcut: float,
one_type : bool = False,
) -> None:
"""
Constructor
"""
self.rcut = rcut
self.ntypes = ntypes
self.one_type = one_type
sub_graph = tf.Graph()
def builder():
place_holders = {}
for ii in ['coord', 'box']:
place_holders[ii] = tf.placeholder(GLOBAL_NP_FLOAT_PRECISION, [None, None], name='t_'+ii)
place_holders['type'] = tf.placeholder(tf.int32, [None, None], name='t_type')
place_holders['natoms_vec'] = tf.placeholder(tf.int32, [self.ntypes+2], name='t_natoms')
place_holders['default_mesh'] = tf.placeholder(tf.int32, [None], name='t_mesh')
t_type = place_holders['type']
t_natoms = place_holders['natoms_vec']
if self.one_type:
# all types = 0, natoms_vec = [natoms, natoms, natoms]
t_type = tf.zeros_like(t_type, dtype=tf.int32)
t_natoms = tf.repeat(t_natoms[0], 3)
_max_nbor_size, _min_nbor_dist \
= op_module.neighbor_stat(place_holders['coord'],
t_type,
t_natoms,
place_holders['box'],
place_holders['default_mesh'],
rcut = self.rcut)
place_holders['dir'] = tf.placeholder(tf.string)
return place_holders, (_max_nbor_size, _min_nbor_dist, place_holders['dir'])
with sub_graph.as_default():
self.p = ParallelOp(builder, config=default_tf_session_config)
self.sub_sess = tf.Session(graph = sub_graph, config=default_tf_session_config)
def get_stat(self,
data : DeepmdDataSystem) -> Tuple[float, List[int]]:
"""
get the data statistics of the training data, including nearest nbor distance between atoms, max nbor size of atoms
Parameters
----------
data
Class for manipulating many data systems. It is implemented with the help of DeepmdData.
Returns
-------
min_nbor_dist
The nearest distance between neighbor atoms
max_nbor_size
A list with ntypes integers, denotes the actual achieved max sel
"""
self.min_nbor_dist = 100.0
self.max_nbor_size = [0]
if not self.one_type:
self.max_nbor_size *= self.ntypes
def feed():
for ii in range(len(data.system_dirs)):
for jj in data.data_systems[ii].dirs:
data_set = data.data_systems[ii]._load_set(jj)
for kk in range(np.array(data_set['type']).shape[0]):
yield {
'coord': np.array(data_set['coord'])[kk].reshape([-1, data.natoms[ii] * 3]),
'type': np.array(data_set['type'])[kk].reshape([-1, data.natoms[ii]]),
'natoms_vec': np.array(data.natoms_vec[ii]),
'box': np.array(data_set['box'])[kk].reshape([-1, 9]),
'default_mesh': np.array(data.default_mesh[ii]),
'dir': str(jj),
}
for mn, dt, jj in self.p.generate(self.sub_sess, feed()):
if dt.size != 0:
dt = np.min(dt)
else:
dt = self.rcut
log.warning("Atoms with no neighbors found in %s. Please make sure it's what you expected." % jj)
if dt < self.min_nbor_dist:
if math.isclose(dt, 0., rel_tol=1e-6):
# it's unexpected that the distance between two atoms is zero
# zero distance will cause nan (#874)
raise RuntimeError(
"Some atoms are overlapping in %s. Please check your"
" training data to remove duplicated atoms." % jj
)
self.min_nbor_dist = dt
var = np.max(mn, axis=0)
self.max_nbor_size = np.maximum(var, self.max_nbor_size)
log.info('training data with min nbor dist: ' + str(self.min_nbor_dist))
log.info('training data with max nbor size: ' + str(self.max_nbor_size))
return self.min_nbor_dist, self.max_nbor_size
import numpy as np
from deepmd.env import tf
from deepmd.env import GLOBAL_TF_FLOAT_PRECISION
from deepmd.common import get_precision
def one_layer_rand_seed_shift():
return 3
def one_layer(inputs,
outputs_size,
activation_fn=tf.nn.tanh,
precision = GLOBAL_TF_FLOAT_PRECISION,
stddev=1.0,
bavg=0.0,
name='linear',
scope='',
reuse=None,
seed=None,
use_timestep = False,
trainable = True,
useBN = False,
uniform_seed = False,
initial_variables = None,
mixed_prec = None,
final_layer = False):
# For good accuracy, the last layer of the fitting network uses a higher precision neuron network.
if mixed_prec is not None and final_layer:
inputs = tf.cast(inputs, get_precision(mixed_prec['output_prec']))
with tf.variable_scope(name, reuse=reuse):
shape = inputs.get_shape().as_list()
w_initializer = tf.random_normal_initializer(
stddev=stddev / np.sqrt(shape[1] + outputs_size),
seed=seed if (seed is None or uniform_seed) else seed + 0)
b_initializer = tf.random_normal_initializer(
stddev=stddev,
mean=bavg,
seed=seed if (seed is None or uniform_seed) else seed + 1)
if initial_variables is not None:
w_initializer = tf.constant_initializer(initial_variables[scope + name + '/matrix'])
b_initializer = tf.constant_initializer(initial_variables[scope + name + '/bias'])
w = tf.get_variable('matrix',
[shape[1], outputs_size],
precision,
w_initializer,
trainable = trainable)
variable_summaries(w, 'matrix')
b = tf.get_variable('bias',
[outputs_size],
precision,
b_initializer,
trainable = trainable)
variable_summaries(b, 'bias')
if mixed_prec is not None and not final_layer:
inputs = tf.cast(inputs, get_precision(mixed_prec['compute_prec']))
w = tf.cast(w, get_precision(mixed_prec['compute_prec']))
b = tf.cast(b, get_precision(mixed_prec['compute_prec']))
hidden = tf.nn.bias_add(tf.matmul(inputs, w), b)
if activation_fn != None and use_timestep :
idt_initializer = tf.random_normal_initializer(
stddev=0.001,
mean=0.1,
seed=seed if (seed is None or uniform_seed) else seed + 2)
if initial_variables is not None:
idt_initializer = tf.constant_initializer(initial_variables[scope + name + '/idt'])
idt = tf.get_variable('idt',
[outputs_size],
precision,
idt_initializer,
trainable = trainable)
variable_summaries(idt, 'idt')
if activation_fn != None:
if useBN:
None
# hidden_bn = self._batch_norm(hidden, name=name+'_normalization', reuse=reuse)
# return activation_fn(hidden_bn)
else:
if use_timestep :
if mixed_prec is not None and not final_layer:
idt = tf.cast(idt, get_precision(mixed_prec['compute_prec']))
hidden = tf.reshape(activation_fn(hidden), [-1, outputs_size]) * idt
else :
hidden = tf.reshape(activation_fn(hidden), [-1, outputs_size])
if mixed_prec is not None:
hidden = tf.cast(hidden, get_precision(mixed_prec['output_prec']))
return hidden
def embedding_net_rand_seed_shift(
network_size
):
shift = 3 * (len(network_size) + 1)
return shift
def embedding_net(xx,
network_size,
precision,
activation_fn = tf.nn.tanh,
resnet_dt = False,
name_suffix = '',
stddev = 1.0,
bavg = 0.0,
seed = None,
trainable = True,
uniform_seed = False,
initial_variables = None,
mixed_prec = None):
r"""The embedding network.
The embedding network function :math:`\mathcal{N}` is constructed by is the
composition of multiple layers :math:`\mathcal{L}^{(i)}`:
.. math::
\mathcal{N} = \mathcal{L}^{(n)} \circ \mathcal{L}^{(n-1)}
\circ \cdots \circ \mathcal{L}^{(1)}
A layer :math:`\mathcal{L}` is given by one of the following forms,
depending on the number of nodes: [1]_
.. math::
\mathbf{y}=\mathcal{L}(\mathbf{x};\mathbf{w},\mathbf{b})=
\begin{cases}
\boldsymbol{\phi}(\mathbf{x}^T\mathbf{w}+\mathbf{b}) + \mathbf{x}, & N_2=N_1 \\
\boldsymbol{\phi}(\mathbf{x}^T\mathbf{w}+\mathbf{b}) + (\mathbf{x}, \mathbf{x}), & N_2 = 2N_1\\
\boldsymbol{\phi}(\mathbf{x}^T\mathbf{w}+\mathbf{b}), & \text{otherwise} \\
\end{cases}
where :math:`\mathbf{x} \in \mathbb{R}^{N_1}`$` is the input vector and :math:`\mathbf{y} \in \mathbb{R}^{N_2}`
is the output vector. :math:`\mathbf{w} \in \mathbb{R}^{N_1 \times N_2}` and
:math:`\mathbf{b} \in \mathbb{R}^{N_2}`$` are weights and biases, respectively,
both of which are trainable if `trainable` is `True`. :math:`\boldsymbol{\phi}`
is the activation function.
Parameters
----------
xx : Tensor
Input tensor :math:`\mathbf{x}` of shape [-1,1]
network_size: list of int
Size of the embedding network. For example [16,32,64]
precision:
Precision of network weights. For example, tf.float64
activation_fn:
Activation function :math:`\boldsymbol{\phi}`
resnet_dt: boolean
Using time-step in the ResNet construction
name_suffix: str
The name suffix append to each variable.
stddev: float
Standard deviation of initializing network parameters
bavg: float
Mean of network intial bias
seed: int
Random seed for initializing network parameters
trainable: boolean
If the network is trainable
uniform_seed : boolean
Only for the purpose of backward compatibility, retrieves the old behavior of using the random seed
initial_variables : dict
The input dict which stores the embedding net variables
mixed_prec
The input dict which stores the mixed precision setting for the embedding net
References
----------
.. [1] Kaiming He, Xiangyu Zhang, Shaoqing Ren, and Jian Sun. Identitymappings
in deep residual networks. InComputer Vision – ECCV 2016,pages 630–645. Springer
International Publishing, 2016.
"""
input_shape = xx.get_shape().as_list()
outputs_size = [input_shape[1]] + network_size
for ii in range(1, len(outputs_size)):
w_initializer = tf.random_normal_initializer(
stddev=stddev/np.sqrt(outputs_size[ii]+outputs_size[ii-1]),
seed = seed if (seed is None or uniform_seed) else seed + ii*3+0
)
b_initializer = tf.random_normal_initializer(
stddev=stddev,
mean = bavg,
seed = seed if (seed is None or uniform_seed) else seed + 3*ii+1
)
if initial_variables is not None:
scope = tf.get_variable_scope().name
w_initializer = tf.constant_initializer(initial_variables[scope+'/matrix_'+str(ii)+name_suffix])
b_initializer = tf.constant_initializer(initial_variables[scope+'/bias_'+str(ii)+name_suffix])
w = tf.get_variable('matrix_'+str(ii)+name_suffix,
[outputs_size[ii - 1], outputs_size[ii]],
precision,
w_initializer,
trainable = trainable)
variable_summaries(w, 'matrix_'+str(ii)+name_suffix)
b = tf.get_variable('bias_'+str(ii)+name_suffix,
[outputs_size[ii]],
precision,
b_initializer,
trainable = trainable)
variable_summaries(b, 'bias_'+str(ii)+name_suffix)
if mixed_prec is not None:
xx = tf.cast(xx, get_precision(mixed_prec['compute_prec']))
w = tf.cast(w, get_precision(mixed_prec['compute_prec']))
b = tf.cast(b, get_precision(mixed_prec['compute_prec']))
if activation_fn is not None:
hidden = tf.reshape(activation_fn(tf.nn.bias_add(tf.matmul(xx, w), b)), [-1, outputs_size[ii]])
else:
hidden = tf.reshape(tf.nn.bias_add(tf.matmul(xx, w), b), [-1, outputs_size[ii]])
if resnet_dt :
idt_initializer = tf.random_normal_initializer(
stddev=0.001,
mean = 1.0,
seed = seed if (seed is None or uniform_seed) else seed + 3*ii+2
)
if initial_variables is not None:
scope = tf.get_variable_scope().name
idt_initializer = tf.constant_initializer(initial_variables[scope+'/idt_'+str(ii)+name_suffix])
idt = tf.get_variable('idt_'+str(ii)+name_suffix,
[1, outputs_size[ii]],
precision,
idt_initializer,
trainable = trainable)
variable_summaries(idt, 'idt_'+str(ii)+name_suffix)
if mixed_prec is not None:
idt = tf.cast(idt, get_precision(mixed_prec['compute_prec']))
if outputs_size[ii] == outputs_size[ii-1]:
if resnet_dt :
xx += hidden * idt
else :
xx += hidden
elif outputs_size[ii] == outputs_size[ii-1] * 2:
if resnet_dt :
xx = tf.concat([xx,xx], 1) + hidden * idt
else :
xx = tf.concat([xx,xx], 1) + hidden
else:
xx = hidden
if mixed_prec is not None:
xx = tf.cast(xx, get_precision(mixed_prec['output_prec']))
return xx
def variable_summaries(var: tf.Variable, name: str):
"""Attach a lot of summaries to a Tensor (for TensorBoard visualization).
Parameters
----------
var : tf.Variable
[description]
name : str
variable name
"""
with tf.name_scope(name):
mean = tf.reduce_mean(var)
tf.summary.scalar('mean', mean)
with tf.name_scope('stddev'):
stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean)))
tf.summary.scalar('stddev', stddev)
tf.summary.scalar('max', tf.reduce_max(var))
tf.summary.scalar('min', tf.reduce_min(var))
tf.summary.histogram('histogram', var)
#!/usr/bin/env python3
import numpy as np
from typing import Tuple, List
from scipy.interpolate import CubicSpline
class PairTab (object):
"""
Parameters
----------
filename
File name for the short-range tabulated potential.
The table is a text data file with (N_t + 1) * N_t / 2 + 1 columes.
The first colume is the distance between atoms.
The second to the last columes are energies for pairs of certain types.
For example we have two atom types, 0 and 1.
The columes from 2nd to 4th are for 0-0, 0-1 and 1-1 correspondingly.
"""
def __init__(self,
filename : str
) -> None:
"""
Constructor
"""
self.reinit(filename)
def reinit(self,
filename : str
) -> None:
"""
Initialize the tabulated interaction
Parameters
----------
filename
File name for the short-range tabulated potential.
The table is a text data file with (N_t + 1) * N_t / 2 + 1 columes.
The first colume is the distance between atoms.
The second to the last columes are energies for pairs of certain types.
For example we have two atom types, 0 and 1.
The columes from 2nd to 4th are for 0-0, 0-1 and 1-1 correspondingly.
"""
self.vdata = np.loadtxt(filename)
self.rmin = self.vdata[0][0]
self.hh = self.vdata[1][0] - self.vdata[0][0]
self.nspline = self.vdata.shape[0] - 1
ncol = self.vdata.shape[1] - 1
n0 = (-1 + np.sqrt(1 + 8 * ncol)) * 0.5
self.ntypes = int(n0 + 0.1)
assert(self.ntypes * (self.ntypes+1) // 2 == ncol),\
"number of volumes provided in %s does not match guessed number of types %d" % (filename, self.ntypes)
self.tab_info = np.array([self.rmin, self.hh, self.nspline, self.ntypes])
self.tab_data = self._make_data()
def get(self) -> Tuple[np.array, np.array]:
"""
Get the serialized table.
"""
return self.tab_info, self.tab_data
def _make_data(self) :
data = np.zeros([self.ntypes * self.ntypes * 4 * self.nspline])
stride = 4 * self.nspline
idx_iter = 0
xx = self.vdata[:,0]
for t0 in range(self.ntypes) :
for t1 in range(t0, self.ntypes) :
vv = self.vdata[:,1+idx_iter]
cs = CubicSpline(xx, vv)
dd = cs(xx, 1)
dd *= self.hh
dtmp = np.zeros(stride)
for ii in range(self.nspline) :
dtmp[ii*4+0] = 2 * vv[ii] - 2 * vv[ii+1] + dd[ii] + dd[ii+1]
dtmp[ii*4+1] =-3 * vv[ii] + 3 * vv[ii+1] - 2 * dd[ii] - dd[ii+1]
dtmp[ii*4+2] = dd[ii]
dtmp[ii*4+3] = vv[ii]
data[(t0 * self.ntypes + t1) * stride : (t0 * self.ntypes + t1) * stride + stride] \
= dtmp
data[(t1 * self.ntypes + t0) * stride : (t1 * self.ntypes + t0) * stride + stride] \
= dtmp
idx_iter += 1
return data
from typing import Callable, Generator, Tuple, Dict, Any
from deepmd.env import tf
from deepmd.utils.sess import run_sess
class ParallelOp:
"""Run an op with data parallelism.
Parameters
----------
builder : Callable[..., Tuple[Dict[str, tf.Tensor], Tuple[tf.Tensor]]]
returns two objects: a dict which stores placeholders by key, and a tuple with the final op(s)
nthreads : int, optional
the number of threads
config : tf.ConfigProto, optional
tf.ConfigProto
Examples
--------
>>> from deepmd.env import tf
>>> from deepmd.utils.parallel_op import ParallelOp
>>> def builder():
... x = tf.placeholder(tf.int32, [1])
... return {"x": x}, (x + 1)
...
>>> p = ParallelOp(builder, nthreads=4)
>>> def feed():
... for ii in range(10):
... yield {"x": [ii]}
...
>>> print(*p.generate(tf.Session(), feed()))
[1] [2] [3] [4] [5] [6] [7] [8] [9] [10]
"""
def __init__(self, builder: Callable[..., Tuple[Dict[str, tf.Tensor], Tuple[tf.Tensor]]], nthreads: int = None, config: tf.ConfigProto = None) -> None:
if nthreads is not None:
self.nthreads = nthreads
elif config is not None:
self.nthreads = max(config.inter_op_parallelism_threads, 1)
else:
self.nthreads = 1
self.placeholders = []
self.ops = []
for ii in range(self.nthreads):
with tf.name_scope("task_%d" % ii) as scope:
placeholder, op = builder()
self.placeholders.append(placeholder)
self.ops.append(op)
def generate(self, sess: tf.Session, feed: Generator[Dict[str, Any], None, None]) -> Generator[Tuple, None, None]:
"""Returns a generator.
Parameters
----------
feed : Generator[dict, None, None]
generator which yields feed_dict
Yields
------
Generator[Tuple, None, None]
generator which yields session returns
"""
nn = self.nthreads
while True:
feed_dict = {}
for ii in range(self.nthreads):
try:
fd = next(feed)
except StopIteration:
if ii == 0:
return
nn = ii
break
for kk, vv in fd.items():
feed_dict[self.placeholders[ii][kk]] = vv
ops = self.ops[:nn]
for yy in run_sess(sess, ops, feed_dict=feed_dict):
yield yy
import os
from abc import ABC, abstractmethod
from typing import List
from pathlib import Path
from functools import lru_cache
import numpy as np
import h5py
from wcmatch.glob import globfilter
class DPPath(ABC):
"""The path class to data system (DeepmdData).
Parameters
----------
path : str
path
"""
def __new__(cls, path: str):
if cls is DPPath:
if os.path.isdir(path):
return super().__new__(DPOSPath)
elif os.path.isfile(path.split("#")[0]):
# assume h5 if it is not dir
# TODO: check if it is a real h5? or just check suffix?
return super().__new__(DPH5Path)
raise FileNotFoundError("%s not found" % path)
return super().__new__(cls)
@abstractmethod
def load_numpy(self) -> np.ndarray:
"""Load NumPy array.
Returns
-------
np.ndarray
loaded NumPy array
"""
@abstractmethod
def load_txt(self, **kwargs) -> np.ndarray:
"""Load NumPy array from text.
Returns
-------
np.ndarray
loaded NumPy array
"""
@abstractmethod
def glob(self, pattern: str) -> List["DPPath"]:
"""Search path using the glob pattern.
Parameters
----------
pattern : str
glob pattern
Returns
-------
List[DPPath]
list of paths
"""
@abstractmethod
def rglob(self, pattern: str) -> List["DPPath"]:
"""This is like calling :meth:`DPPath.glob()` with `**/` added in front
of the given relative pattern.
Parameters
----------
pattern : str
glob pattern
Returns
-------
List[DPPath]
list of paths
"""
@abstractmethod
def is_file(self) -> bool:
"""Check if self is file."""
@abstractmethod
def is_dir(self) -> bool:
"""Check if self is directory."""
@abstractmethod
def __truediv__(self, key: str) -> "DPPath":
"""Used for / operator."""
@abstractmethod
def __lt__(self, other: "DPPath") -> bool:
"""whether this DPPath is less than other for sorting"""
@abstractmethod
def __str__(self) -> str:
"""Represent string"""
def __repr__(self) -> str:
return "%s (%s)" % (type(self), str(self))
def __eq__(self, other) -> bool:
return str(self) == str(other)
def __hash__(self):
return hash(str(self))
class DPOSPath(DPPath):
"""The OS path class to data system (DeepmdData) for real directories.
Parameters
----------
path : str
path
"""
def __init__(self, path: str) -> None:
super().__init__()
if isinstance(path, Path):
self.path = path
else:
self.path = Path(path)
def load_numpy(self) -> np.ndarray:
"""Load NumPy array.
Returns
-------
np.ndarray
loaded NumPy array
"""
return np.load(str(self.path))
def load_txt(self, **kwargs) -> np.ndarray:
"""Load NumPy array from text.
Returns
-------
np.ndarray
loaded NumPy array
"""
return np.loadtxt(str(self.path), **kwargs)
def glob(self, pattern: str) -> List["DPPath"]:
"""Search path using the glob pattern.
Parameters
----------
pattern : str
glob pattern
Returns
-------
List[DPPath]
list of paths
"""
# currently DPOSPath will only derivative DPOSPath
# TODO: discuss if we want to mix DPOSPath and DPH5Path?
return list([type(self)(p) for p in self.path.glob(pattern)])
def rglob(self, pattern: str) -> List["DPPath"]:
"""This is like calling :meth:`DPPath.glob()` with `**/` added in front
of the given relative pattern.
Parameters
----------
pattern : str
glob pattern
Returns
-------
List[DPPath]
list of paths
"""
return list([type(self)(p) for p in self.path.rglob(pattern)])
def is_file(self) -> bool:
"""Check if self is file."""
return self.path.is_file()
def is_dir(self) -> bool:
"""Check if self is directory."""
return self.path.is_dir()
def __truediv__(self, key: str) -> "DPPath":
"""Used for / operator."""
return type(self)(self.path / key)
def __lt__(self, other: "DPOSPath") -> bool:
"""whether this DPPath is less than other for sorting"""
return self.path < other.path
def __str__(self) -> str:
"""Represent string"""
return str(self.path)
class DPH5Path(DPPath):
"""The path class to data system (DeepmdData) for HDF5 files.
Notes
-----
OS - HDF5 relationship:
directory - Group
file - Dataset
Parameters
----------
path : str
path
"""
def __init__(self, path: str) -> None:
super().__init__()
# we use "#" to split path
# so we do not support file names containing #...
s = path.split("#")
self.root_path = s[0]
self.root = self._load_h5py(s[0])
# h5 path: default is the root path
self.name = s[1] if len(s) > 1 else "/"
@classmethod
@lru_cache(None)
def _load_h5py(cls, path: str) -> h5py.File:
"""Load hdf5 file.
Parameters
----------
path : str
path to hdf5 file
"""
# this method has cache to avoid duplicated
# loading from different DPH5Path
# However the file will be never closed?
return h5py.File(path, 'r')
def load_numpy(self) -> np.ndarray:
"""Load NumPy array.
Returns
-------
np.ndarray
loaded NumPy array
"""
return self.root[self.name][:]
def load_txt(self, dtype: np.dtype = None, **kwargs) -> np.ndarray:
"""Load NumPy array from text.
Returns
-------
np.ndarray
loaded NumPy array
"""
arr = self.load_numpy()
if dtype:
arr = arr.astype(dtype)
return arr
def glob(self, pattern: str) -> List["DPPath"]:
"""Search path using the glob pattern.
Parameters
----------
pattern : str
glob pattern
Returns
-------
List[DPPath]
list of paths
"""
# got paths starts with current path first, which is faster
subpaths = [ii for ii in self._keys if ii.startswith(self.name)]
return list([type(self)("%s#%s"%(self.root_path, pp)) for pp in globfilter(subpaths, self._connect_path(pattern))])
def rglob(self, pattern: str) -> List["DPPath"]:
"""This is like calling :meth:`DPPath.glob()` with `**/` added in front
of the given relative pattern.
Parameters
----------
pattern : str
glob pattern
Returns
-------
List[DPPath]
list of paths
"""
return self.glob("**" + pattern)
@property
def _keys(self) -> List[str]:
"""Walk all groups and dataset"""
return self._file_keys(self.root)
@classmethod
@lru_cache(None)
def _file_keys(cls, file: h5py.File) -> List[str]:
"""Walk all groups and dataset"""
l = []
file.visit(lambda x: l.append("/" + x))
return l
def is_file(self) -> bool:
"""Check if self is file."""
if self.name not in self._keys:
return False
return isinstance(self.root[self.name], h5py.Dataset)
def is_dir(self) -> bool:
"""Check if self is directory."""
if self.name not in self._keys:
return False
return isinstance(self.root[self.name], h5py.Group)
def __truediv__(self, key: str) -> "DPPath":
"""Used for / operator."""
return type(self)("%s#%s" % (self.root_path, self._connect_path(key)))
def _connect_path(self, path: str) -> str:
"""Connect self with path"""
if self.name.endswith("/"):
return "%s%s" % (self.name, path)
return "%s/%s" % (self.name, path)
def __lt__(self, other: "DPH5Path") -> bool:
"""whether this DPPath is less than other for sorting"""
if self.root_path == other.root_path:
return self.name < other.name
return self.root_path < other.root_path
def __str__(self) -> str:
"""returns path of self"""
return "%s#%s" % (self.root_path, self.name)
"""Base of plugin systems."""
# copied from https://github.com/deepmodeling/dpdata/blob/a3e76d75de53f6076254de82d18605a010dc3b00/dpdata/plugin.py
from abc import ABCMeta
from typing import Callable
class Plugin:
"""A class to register and restore plugins.
Attributes
----------
plugins : Dict[str, object]
plugins
Examples
--------
>>> plugin = Plugin()
>>> @plugin.register("xx")
def xxx():
pass
>>> print(plugin.plugins['xx'])
"""
def __init__(self):
self.plugins = {}
def __add__(self, other) -> "Plugin":
self.plugins.update(other.plugins)
return self
def register(self, key : str) -> Callable[[object], object]:
"""Register a plugin.
Parameters
----------
key : str
key of the plugin
Returns
-------
Callable[[object], object]
decorator
"""
def decorator(object : object) -> object:
self.plugins[key] = object
return object
return decorator
def get_plugin(self, key) -> object:
"""Visit a plugin by key.
Parameters
----------
key : str
key of the plugin
Returns
-------
object
the plugin
"""
return self.plugins[key]
class VariantMeta:
def __call__(cls, *args, **kwargs):
"""Remove `type` and keys that starts with underline."""
obj = cls.__new__(cls, *args, **kwargs)
kwargs.pop('type', None)
to_pop = []
for kk in kwargs:
if kk[0] == '_':
to_pop.append(kk)
for kk in to_pop:
kwargs.pop(kk, None)
obj.__init__(*args, **kwargs)
return obj
class VariantABCMeta(VariantMeta, ABCMeta):
pass
class PluginVariant(metaclass=VariantABCMeta):
"""A class to remove `type` from input arguments."""
pass
\ No newline at end of file
import numpy as np
_RANDOM_GENERATOR = np.random.RandomState()
def choice(a: np.ndarray, p: np.ndarray = None):
"""Generates a random sample from a given 1-D array.
Parameters
----------
a : np.ndarray
A random sample is generated from its elements.
p : np.ndarray
The probabilities associated with each entry in a.
Returns
-------
np.ndarray
arrays with results and their shapes
"""
return _RANDOM_GENERATOR.choice(a, p=p)
def random(size=None):
"""Return random floats in the half-open interval [0.0, 1.0).
Parameters
----------
size
Output shape.
Returns
-------
np.ndarray
Arrays with results and their shapes.
"""
return _RANDOM_GENERATOR.random_sample(size)
def seed(val: int = None):
"""Seed the generator.
Parameters
----------
val : int
Seed.
"""
_RANDOM_GENERATOR.seed(val)
def shuffle(x: np.ndarray):
"""Modify a sequence in-place by shuffling its contents.
Parameters
----------
x : np.ndarray
The array or list to be shuffled.
"""
_RANDOM_GENERATOR.shuffle(x)
__all__ = ['choice', 'random', 'seed', 'shuffle']
import os
from deepmd.env import tf
from deepmd.utils.errors import OutOfMemoryError
def run_sess(sess: tf.Session, *args, **kwargs):
"""Run session with erorrs caught.
Parameters
----------
sess: tf.Session
TensorFlow Session
Returns
-------
the result of sess.run()
"""
try:
# https://www.tensorflow.org/api_docs/python/tf/compat/v1/Session#run
return sess.run(*args, **kwargs)
except tf.errors.ResourceExhaustedError as e:
MESSAGE = (
"Your memory may be not enough, thus an error has been raised "
"above. You need to take the following actions:\n"
"1. Check if the network size of the model is too large.\n"
"2. Check if the batch size of training or testing is too large."
" You can set the training batch size to `auto`.\n"
"3. Check if the number of atoms is too large.\n"
)
if tf.test.is_built_with_cuda():
MESSAGE += (
"4. Check if another program is using the same GPU by "
"execuating `nvidia-smi`. The usage of GPUs is "
"controlled by `CUDA_VISIBLE_DEVICES` environment "
"variable (current value: %s).\n" % (
os.getenv("CUDA_VISIBLE_DEVICES", None),
))
raise OutOfMemoryError(MESSAGE) from e
import logging
import numpy as np
import deepmd
from typing import Callable
from typing import Tuple, List, Dict
from functools import lru_cache
from scipy.special import comb
from deepmd.env import tf
from deepmd.env import op_module
from deepmd.common import ACTIVATION_FN_DICT
from deepmd.utils.graph import get_tensor_by_name_from_graph, load_graph_def
from deepmd.utils.graph import get_embedding_net_nodes_from_graph_def
from deepmd.descriptor import Descriptor
log = logging.getLogger(__name__)
class DPTabulate():
"""
Class for tabulation.
Compress a model, which including tabulating the embedding-net.
The table is composed of fifth-order polynomial coefficients and is assembled from two sub-tables. The first table takes the stride(parameter) as it\'s uniform stride, while the second table takes 10 * stride as it\s uniform stride
The range of the first table is automatically detected by deepmd-kit, while the second table ranges from the first table\'s upper boundary(upper) to the extrapolate(parameter) * upper.
Parameters
----------
descrpt
Descriptor of the original model
neuron
Number of neurons in each hidden layers of the embedding net :math:`\mathcal{N}`
model_file
The frozen model
type_one_side
Try to build N_types tables. Otherwise, building N_types^2 tables
exclude_types : List[List[int]]
The excluded pairs of types which have no interaction with each other.
For example, `[[0, 1]]` means no interaction between type 0 and type 1.
activation_function
The activation function in the embedding net. Supported options are {"tanh","gelu"} in common.ACTIVATION_FN_DICT.
suffix : str, optional
The suffix of the scope
"""
def __init__(self,
descrpt : Descriptor,
neuron : List[int],
model_file : str,
type_one_side : bool = False,
exclude_types : List[List[int]] = [],
activation_fn : Callable[[tf.Tensor], tf.Tensor] = tf.nn.tanh,
suffix : str = "",
) -> None:
"""
Constructor
"""
self.descrpt = descrpt
self.neuron = neuron
self.model_file = model_file
self.type_one_side = type_one_side
self.exclude_types = exclude_types
self.suffix = suffix
# functype
if activation_fn == ACTIVATION_FN_DICT["tanh"]:
self.functype = 1
elif activation_fn == ACTIVATION_FN_DICT["gelu"]:
self.functype = 2
elif activation_fn == ACTIVATION_FN_DICT["relu"]:
self.functype = 3
elif activation_fn == ACTIVATION_FN_DICT["relu6"]:
self.functype = 4
elif activation_fn == ACTIVATION_FN_DICT["softplus"]:
self.functype = 5
elif activation_fn == ACTIVATION_FN_DICT["sigmoid"]:
self.functype = 6
else:
raise RuntimeError("Unknown actication function type!")
self.activation_fn = activation_fn
self.graph, self.graph_def = load_graph_def(self.model_file)
#self.sess = tf.Session(graph = self.graph)
self.sub_graph, self.sub_graph_def = self._load_sub_graph()
self.sub_sess = tf.Session(graph = self.sub_graph)
if isinstance(self.descrpt, deepmd.descriptor.DescrptSeR):
self.sel_a = self.descrpt.sel_r
self.rcut = self.descrpt.rcut
self.rcut_smth = self.descrpt.rcut_smth
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeA):
self.sel_a = self.descrpt.sel_a
self.rcut = self.descrpt.rcut_r
self.rcut_smth = self.descrpt.rcut_r_smth
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeT):
self.sel_a = self.descrpt.sel_a
self.rcut = self.descrpt.rcut_r
self.rcut_smth = self.descrpt.rcut_r_smth
else:
raise RuntimeError("Unsupported descriptor")
self.davg = get_tensor_by_name_from_graph(self.graph, f'descrpt_attr{self.suffix}/t_avg')
self.dstd = get_tensor_by_name_from_graph(self.graph, f'descrpt_attr{self.suffix}/t_std')
self.ntypes = get_tensor_by_name_from_graph(self.graph, 'descrpt_attr/ntypes')
self.embedding_net_nodes = get_embedding_net_nodes_from_graph_def(self.graph_def, suffix=self.suffix)
# move it to the descriptor class
# for tt in self.exclude_types:
# if (tt[0] not in range(self.ntypes)) or (tt[1] not in range(self.ntypes)):
# raise RuntimeError("exclude types" + str(tt) + " must within the number of atomic types " + str(self.ntypes) + "!")
# if (self.ntypes * self.ntypes - len(self.exclude_types) == 0):
# raise RuntimeError("empty embedding-net are not supported in model compression!")
self.layer_size = self._get_layer_size()
self.table_size = self._get_table_size()
self.bias = self._get_bias()
self.matrix = self._get_matrix()
self.data_type = self._get_data_type()
self.last_layer_size = self._get_last_layer_size()
self.data = {}
self.upper = {}
self.lower = {}
def build(self,
min_nbor_dist : float,
extrapolate : float,
stride0 : float,
stride1 : float) -> Tuple[Dict[str, int], Dict[str, int]]:
"""
Build the tables for model compression
Parameters
----------
min_nbor_dist
The nearest distance between neighbor atoms
extrapolate
The scale of model extrapolation
stride0
The uniform stride of the first table
stride1
The uniform stride of the second table
neuron
Number of neurons in each hidden layers of the embedding net :math:`\mathcal{N}`
Returns
----------
lower : dict[str, int]
The lower boundary of environment matrix by net
upper : dict[str, int]
The upper boundary of environment matrix by net
"""
# tabulate range [lower, upper] with stride0 'stride0'
lower, upper = self._get_env_mat_range(min_nbor_dist)
if isinstance(self.descrpt, deepmd.descriptor.DescrptSeA):
for ii in range(self.table_size):
if (self.type_one_side and not self._all_excluded(ii)) or (not self.type_one_side and (ii // self.ntypes, ii % self.ntypes) not in self.exclude_types):
if self.type_one_side:
net = "filter_-1_net_" + str(ii)
# upper and lower should consider all types which are not excluded and sel>0
idx = [(type_i, ii) not in self.exclude_types and self.sel_a[type_i] > 0 for type_i in range(self.ntypes)]
uu = np.max(upper[idx])
ll = np.min(lower[idx])
else:
ielement = ii // self.ntypes
net = "filter_" + str(ielement) + "_net_" + str(ii % self.ntypes)
uu = upper[ielement]
ll = lower[ielement]
xx = np.arange(ll, uu, stride0, dtype = self.data_type)
xx = np.append(xx, np.arange(uu, extrapolate * uu, stride1, dtype = self.data_type))
xx = np.append(xx, np.array([extrapolate * uu], dtype = self.data_type))
nspline = ((uu - ll) / stride0 + (extrapolate * uu - uu) / stride1).astype(int)
self._build_lower(net, xx, ii, uu, ll, stride0, stride1, extrapolate, nspline)
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeT):
xx_all = []
for ii in range(self.ntypes):
xx = np.arange(extrapolate * lower[ii], lower[ii], stride1, dtype = self.data_type)
xx = np.append(xx, np.arange(lower[ii], upper[ii], stride0, dtype = self.data_type))
xx = np.append(xx, np.arange(upper[ii], extrapolate * upper[ii], stride1, dtype = self.data_type))
xx = np.append(xx, np.array([extrapolate * upper[ii]], dtype = self.data_type))
xx_all.append(xx)
nspline = ((upper - lower) / stride0 + 2 * ((extrapolate * upper - upper) / stride1)).astype(int)
idx = 0
for ii in range(self.ntypes):
for jj in range(ii, self.ntypes):
net = "filter_" + str(ii) + "_net_" + str(jj)
self._build_lower(net, xx_all[ii], idx, upper[ii], lower[ii], stride0, stride1, extrapolate, nspline[ii])
idx += 1
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeR):
for ii in range(self.table_size):
if (self.type_one_side and not self._all_excluded(ii)) or (not self.type_one_side and (ii // self.ntypes, ii % self.ntypes) not in self.exclude_types):
if self.type_one_side:
net = "filter_-1_net_" + str(ii)
# upper and lower should consider all types which are not excluded and sel>0
idx = [(type_i, ii) not in self.exclude_types and self.sel_a[type_i] > 0 for type_i in range(self.ntypes)]
uu = np.max(upper[idx])
ll = np.min(lower[idx])
else:
ielement = ii // self.ntypes
net = "filter_" + str(ielement) + "_net_" + str(ii % self.ntypes)
uu = upper[ielement]
ll = lower[ielement]
xx = np.arange(ll, uu, stride0, dtype = self.data_type)
xx = np.append(xx, np.arange(uu, extrapolate * uu, stride1, dtype = self.data_type))
xx = np.append(xx, np.array([extrapolate * uu], dtype = self.data_type))
nspline = ((uu - ll) / stride0 + (extrapolate * uu - uu) / stride1).astype(int)
self._build_lower(net, xx, ii, uu, ll, stride0, stride1, extrapolate, nspline)
else:
raise RuntimeError("Unsupported descriptor")
self._convert_numpy_to_tensor()
return self.lower, self.upper
def _build_lower(self, net, xx, idx, upper, lower, stride0, stride1, extrapolate, nspline):
vv, dd, d2 = self._make_data(xx, idx)
self.data[net] = np.zeros([nspline, 6 * self.last_layer_size], dtype = self.data_type)
# tt.shape: [nspline, self.last_layer_size]
if isinstance(self.descrpt, deepmd.descriptor.DescrptSeA):
tt = np.full((nspline, self.last_layer_size), stride1)
tt[:int((upper - lower) / stride0), :] = stride0
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeT):
tt = np.full((nspline, self.last_layer_size), stride1)
tt[int((lower - extrapolate * lower) / stride1) + 1:(int((lower - extrapolate * lower) / stride1) + int((upper - lower) / stride0)), :] = stride0
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeR):
tt = np.full((nspline, self.last_layer_size), stride1)
tt[:int((upper - lower) / stride0), :] = stride0
else:
raise RuntimeError("Unsupported descriptor")
# hh.shape: [nspline, self.last_layer_size]
hh = vv[1:nspline+1, :self.last_layer_size] - vv[:nspline, :self.last_layer_size]
self.data[net][:, :6 * self.last_layer_size:6] = vv[:nspline, :self.last_layer_size]
self.data[net][:, 1:6 * self.last_layer_size:6] = dd[:nspline, :self.last_layer_size]
self.data[net][:, 2:6 * self.last_layer_size:6] = 0.5 * d2[:nspline, :self.last_layer_size]
self.data[net][:, 3:6 * self.last_layer_size:6] = (1 / (2 * tt * tt * tt)) * (20 * hh - (8 * dd[1:nspline+1, :self.last_layer_size] + 12 * dd[:nspline, :self.last_layer_size]) * tt - (3 * d2[:nspline, :self.last_layer_size] - d2[1:nspline+1, :self.last_layer_size]) * tt * tt)
self.data[net][:, 4:6 * self.last_layer_size:6] = (1 / (2 * tt * tt * tt * tt)) * (-30 * hh + (14 * dd[1:nspline+1, :self.last_layer_size] + 16 * dd[:nspline, :self.last_layer_size]) * tt + (3 * d2[:nspline, :self.last_layer_size] - 2 * d2[1:nspline+1, :self.last_layer_size]) * tt * tt)
self.data[net][:, 5:6 * self.last_layer_size:6] = (1 / (2 * tt * tt * tt * tt * tt)) * (12 * hh - 6 * (dd[1:nspline+1, :self.last_layer_size] + dd[:nspline, :self.last_layer_size]) * tt + (d2[1:nspline+1, :self.last_layer_size] - d2[:nspline, :self.last_layer_size]) * tt * tt)
self.upper[net] = upper
self.lower[net] = lower
def _load_sub_graph(self):
sub_graph_def = tf.GraphDef()
with tf.Graph().as_default() as sub_graph:
tf.import_graph_def(sub_graph_def, name = "")
return sub_graph, sub_graph_def
def _get_bias(self):
bias = {}
for layer in range(1, self.layer_size + 1):
bias["layer_" + str(layer)] = []
if isinstance(self.descrpt, deepmd.descriptor.DescrptSeA):
if self.type_one_side:
for ii in range(0, self.ntypes):
if not self._all_excluded(ii):
node = self.embedding_net_nodes[f"filter_type_all{self.suffix}/bias_{layer}_{ii}"]
bias["layer_" + str(layer)].append(tf.make_ndarray(node))
else:
bias["layer_" + str(layer)].append(np.array([]))
else:
for ii in range(0, self.ntypes * self.ntypes):
if (ii // self.ntypes, ii % self.ntypes) not in self.exclude_types:
node = self.embedding_net_nodes[f"filter_type_{ii // self.ntypes}{self.suffix}/bias_{layer}_{ii % self.ntypes}"]
bias["layer_" + str(layer)].append(tf.make_ndarray(node))
else:
bias["layer_" + str(layer)].append(np.array([]))
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeT):
for ii in range(self.ntypes):
for jj in range(ii, self.ntypes):
node = self.embedding_net_nodes[f"filter_type_all{self.suffix}/bias_{layer}_{ii}_{jj}"]
bias["layer_" + str(layer)].append(tf.make_ndarray(node))
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeR):
if self.type_one_side:
for ii in range(0, self.ntypes):
if not self._all_excluded(ii):
node = self.embedding_net_nodes[f"filter_type_all{self.suffix}/bias_{layer}_{ii}"]
bias["layer_" + str(layer)].append(tf.make_ndarray(node))
else:
bias["layer_" + str(layer)].append(np.array([]))
else:
for ii in range(0, self.ntypes * self.ntypes):
if (ii // self.ntypes, ii % self.ntypes) not in self.exclude_types:
node = self.embedding_net_nodes[f"filter_type_{ii // self.ntypes}{self.suffix}/bias_{layer}_{ii % self.ntypes}"]
bias["layer_" + str(layer)].append(tf.make_ndarray(node))
else:
bias["layer_" + str(layer)].append(np.array([]))
else:
raise RuntimeError("Unsupported descriptor")
return bias
def _get_matrix(self):
matrix = {}
for layer in range(1, self.layer_size + 1):
matrix["layer_" + str(layer)] = []
if isinstance(self.descrpt, deepmd.descriptor.DescrptSeA):
if self.type_one_side:
for ii in range(0, self.ntypes):
if not self._all_excluded(ii):
node = self.embedding_net_nodes[f"filter_type_all{self.suffix}/matrix_{layer}_{ii}"]
matrix["layer_" + str(layer)].append(tf.make_ndarray(node))
else:
matrix["layer_" + str(layer)].append(np.array([]))
else:
for ii in range(0, self.ntypes * self.ntypes):
if (ii // self.ntypes, ii % self.ntypes) not in self.exclude_types:
node = self.embedding_net_nodes[f"filter_type_{ii // self.ntypes}{self.suffix}/matrix_{layer}_{ii % self.ntypes}"]
matrix["layer_" + str(layer)].append(tf.make_ndarray(node))
else:
matrix["layer_" + str(layer)].append(np.array([]))
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeT):
for ii in range(self.ntypes):
for jj in range(ii, self.ntypes):
node = self.embedding_net_nodes[f"filter_type_all{self.suffix}/matrix_{layer}_{ii}_{jj}"]
matrix["layer_" + str(layer)].append(tf.make_ndarray(node))
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeR):
if self.type_one_side:
for ii in range(0, self.ntypes):
if not self._all_excluded(ii):
node = self.embedding_net_nodes[f"filter_type_all{self.suffix}/matrix_{layer}_{ii}"]
matrix["layer_" + str(layer)].append(tf.make_ndarray(node))
else:
matrix["layer_" + str(layer)].append(np.array([]))
else:
for ii in range(0, self.ntypes * self.ntypes):
if (ii // self.ntypes, ii % self.ntypes) not in self.exclude_types:
node = self.embedding_net_nodes[f"filter_type_{ii // self.ntypes}{self.suffix}/matrix_{layer}_{ii % self.ntypes}"]
matrix["layer_" + str(layer)].append(tf.make_ndarray(node))
else:
matrix["layer_" + str(layer)].append(np.array([]))
else:
raise RuntimeError("Unsupported descriptor")
return matrix
# one-by-one executions
def _make_data(self, xx, idx):
with self.sub_graph.as_default():
with self.sub_sess.as_default():
xx = tf.reshape(xx, [xx.size, -1])
for layer in range(self.layer_size):
if layer == 0:
xbar = tf.matmul(
xx, self.matrix["layer_" + str(layer + 1)][idx]) + self.bias["layer_" + str(layer + 1)][idx]
if self.neuron[0] == 1:
yy = self._layer_0(
xx, self.matrix["layer_" + str(layer + 1)][idx], self.bias["layer_" + str(layer + 1)][idx]) + xx
dy = op_module.unaggregated_dy_dx_s(
yy, self.matrix["layer_" + str(layer + 1)][idx], xbar, tf.constant(self.functype)) + tf.ones([1, 1], yy.dtype)
dy2 = op_module.unaggregated_dy2_dx_s(
yy, dy, self.matrix["layer_" + str(layer + 1)][idx], xbar, tf.constant(self.functype))
elif self.neuron[0] == 2:
tt, yy = self._layer_1(
xx, self.matrix["layer_" + str(layer + 1)][idx], self.bias["layer_" + str(layer + 1)][idx])
dy = op_module.unaggregated_dy_dx_s(
yy - tt, self.matrix["layer_" + str(layer + 1)][idx], xbar, tf.constant(self.functype)) + tf.ones([1, 2], yy.dtype)
dy2 = op_module.unaggregated_dy2_dx_s(
yy - tt, dy, self.matrix["layer_" + str(layer + 1)][idx], xbar, tf.constant(self.functype))
else:
yy = self._layer_0(
xx, self.matrix["layer_" + str(layer + 1)][idx], self.bias["layer_" + str(layer + 1)][idx])
dy = op_module.unaggregated_dy_dx_s(
yy, self.matrix["layer_" + str(layer + 1)][idx], xbar, tf.constant(self.functype))
dy2 = op_module.unaggregated_dy2_dx_s(
yy, dy, self.matrix["layer_" + str(layer + 1)][idx], xbar, tf.constant(self.functype))
else:
ybar = tf.matmul(
yy, self.matrix["layer_" + str(layer + 1)][idx]) + self.bias["layer_" + str(layer + 1)][idx]
tt, zz = self._layer_1(
yy, self.matrix["layer_" + str(layer + 1)][idx], self.bias["layer_" + str(layer + 1)][idx])
dz = op_module.unaggregated_dy_dx(
zz - tt, self.matrix["layer_" + str(layer + 1)][idx], dy, ybar, tf.constant(self.functype))
dy2 = op_module.unaggregated_dy2_dx(
zz - tt, self.matrix["layer_" + str(layer + 1)][idx], dy, dy2, ybar, tf.constant(self.functype))
dy = dz
yy = zz
vv = zz.eval()
dd = dy.eval()
d2 = dy2.eval()
return vv, dd, d2
def _layer_0(self, x, w, b):
return self.activation_fn(tf.matmul(x, w) + b)
def _layer_1(self, x, w, b):
t = tf.concat([x, x], axis=1)
return t, self.activation_fn(tf.matmul(x, w) + b) + t
# Change the embedding net range to sw / min_nbor_dist
def _get_env_mat_range(self,
min_nbor_dist):
sw = self._spline5_switch(min_nbor_dist, self.rcut_smth, self.rcut)
if isinstance(self.descrpt, deepmd.descriptor.DescrptSeA):
lower = -self.davg[:, 0] / self.dstd[:, 0]
upper = ((1 / min_nbor_dist) * sw - self.davg[:, 0]) / self.dstd[:, 0]
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeT):
var = np.square(sw / (min_nbor_dist * self.dstd[:, 1:4]))
lower = np.min(-var, axis=1)
upper = np.max(var, axis=1)
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeR):
lower = -self.davg[:, 0] / self.dstd[:, 0]
upper = ((1 / min_nbor_dist) * sw - self.davg[:, 0]) / self.dstd[:, 0]
else:
raise RuntimeError("Unsupported descriptor")
log.info('training data with lower boundary: ' + str(lower))
log.info('training data with upper boundary: ' + str(upper))
# returns element-wise lower and upper
return np.floor(lower), np.ceil(upper)
def _spline5_switch(self,
xx,
rmin,
rmax):
if xx < rmin:
vv = 1
elif xx < rmax:
uu = (xx - rmin) / (rmax - rmin)
vv = uu*uu*uu * (-6 * uu*uu + 15 * uu - 10) + 1
else:
vv = 0
return vv
def _get_layer_size(self):
layer_size = 0
if isinstance(self.descrpt, deepmd.descriptor.DescrptSeA):
layer_size = len(self.embedding_net_nodes) // ((self.ntypes * self.ntypes - len(self.exclude_types)) * 2)
if self.type_one_side :
layer_size = len(self.embedding_net_nodes) // ((self.ntypes - self._n_all_excluded) * 2)
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeT):
layer_size = len(self.embedding_net_nodes) // int(comb(self.ntypes + 1, 2) * 2)
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeR):
layer_size = len(self.embedding_net_nodes) // ((self.ntypes * self.ntypes - len(self.exclude_types)) * 2)
if self.type_one_side :
layer_size = len(self.embedding_net_nodes) // ((self.ntypes - self._n_all_excluded) * 2)
else:
raise RuntimeError("Unsupported descriptor")
return layer_size
@property
@lru_cache()
def _n_all_excluded(self) -> int:
"""Then number of types excluding all types."""
return sum((int(self._all_excluded(ii)) for ii in range(0, self.ntypes)))
@lru_cache()
def _all_excluded(self, ii: int) -> bool:
"""Check if type ii excluds all types.
Parameters
----------
ii : int
type index
Returns
-------
bool
if type ii excluds all types
"""
return all([(ii, type_i) in self.exclude_types for type_i in range(self.ntypes)])
def _get_table_size(self):
table_size = 0
if isinstance(self.descrpt, deepmd.descriptor.DescrptSeA):
table_size = self.ntypes * self.ntypes
if self.type_one_side :
table_size = self.ntypes
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeT):
table_size = int(comb(self.ntypes + 1, 2))
elif isinstance(self.descrpt, deepmd.descriptor.DescrptSeR):
table_size = self.ntypes * self.ntypes
if self.type_one_side :
table_size = self.ntypes
else:
raise RuntimeError("Unsupported descriptor")
return table_size
def _get_data_type(self):
for item in self.matrix["layer_" + str(self.layer_size)]:
if len(item) != 0:
return type(item[0][0])
return None
def _get_last_layer_size(self):
for item in self.matrix["layer_" + str(self.layer_size)]:
if len(item) != 0:
return item.shape[1]
return 0
def _convert_numpy_to_tensor(self):
"""Convert self.data from np.ndarray to tf.Tensor."""
for ii in self.data:
self.data[ii] = tf.constant(self.data[ii])
import numpy as np
from typing import Tuple, List, Union
from deepmd.env import tf
from deepmd.utils.network import one_layer
from deepmd.env import GLOBAL_TF_FLOAT_PRECISION
from deepmd.env import GLOBAL_NP_FLOAT_PRECISION
from deepmd.env import op_module
from deepmd.env import default_tf_session_config
from deepmd.utils.network import embedding_net
from deepmd.utils.graph import get_type_embedding_net_variables_from_graph_def
from deepmd.common import get_activation_func, get_precision
def embed_atom_type(
ntypes : int,
natoms : tf.Tensor,
type_embedding : tf.Tensor,
):
"""
Make the embedded type for the atoms in system.
The atoms are assumed to be sorted according to the type,
thus their types are described by a `tf.Tensor` natoms, see explanation below.
Parameters
----------
ntypes:
Number of types.
natoms:
The number of atoms. This tensor has the length of Ntypes + 2
natoms[0]: number of local atoms
natoms[1]: total number of atoms held by this processor
natoms[i]: 2 <= i < Ntypes+2, number of type i atoms
type_embedding:
The type embedding.
It has the shape of [ntypes, embedding_dim]
Returns
-------
atom_embedding
The embedded type of each atom.
It has the shape of [numb_atoms, embedding_dim]
"""
te_out_dim = type_embedding.get_shape().as_list()[-1]
atype = []
for ii in range(ntypes):
atype.append(tf.tile([ii], [natoms[2+ii]]))
atype = tf.concat(atype, axis = 0)
atm_embed = tf.nn.embedding_lookup(type_embedding,tf.cast(atype,dtype=tf.int32)) #(nf*natom)*nchnl
atm_embed = tf.reshape(atm_embed,[-1,te_out_dim])
return atm_embed
class TypeEmbedNet():
"""
Parameters
----------
neuron : list[int]
Number of neurons in each hidden layers of the embedding net
resnet_dt
Time-step `dt` in the resnet construction:
y = x + dt * \phi (Wx + b)
activation_function
The activation function in the embedding net. Supported options are |ACTIVATION_FN|
precision
The precision of the embedding net parameters. Supported options are |PRECISION|
trainable
If the weights of embedding net are trainable.
seed
Random seed for initializing the network parameters.
uniform_seed
Only for the purpose of backward compatibility, retrieves the old behavior of using the random seed
padding
Concat the zero padding to the output, as the default embedding of empty type.
"""
def __init__(
self,
neuron: List[int]=[],
resnet_dt: bool = False,
activation_function: Union[str, None] = 'tanh',
precision: str = 'default',
trainable: bool = True,
seed: int = None,
uniform_seed: bool = False,
padding: bool = False,
)->None:
"""
Constructor
"""
self.neuron = neuron
self.seed = seed
self.filter_resnet_dt = resnet_dt
self.filter_precision = get_precision(precision)
self.filter_activation_fn = get_activation_func(activation_function)
self.trainable = trainable
self.uniform_seed = uniform_seed
self.type_embedding_net_variables = None
self.padding = padding
def build(
self,
ntypes: int,
reuse = None,
suffix = '',
):
"""
Build the computational graph for the descriptor
Parameters
----------
ntypes
Number of atom types.
reuse
The weights in the networks should be reused when get the variable.
suffix
Name suffix to identify this descriptor
Returns
-------
embedded_types
The computational graph for embedded types
"""
types = tf.convert_to_tensor(
[ii for ii in range(ntypes)],
dtype = tf.int32
)
ebd_type = tf.cast(tf.one_hot(tf.cast(types,dtype=tf.int32),int(ntypes)), self.filter_precision)
ebd_type = tf.reshape(ebd_type, [-1, ntypes])
name = 'type_embed_net' + suffix
with tf.variable_scope(name, reuse=reuse):
ebd_type = embedding_net(
ebd_type,
self.neuron,
activation_fn = self.filter_activation_fn,
precision = self.filter_precision,
resnet_dt = self.filter_resnet_dt,
seed = self.seed,
trainable = self.trainable,
initial_variables = self.type_embedding_net_variables,
uniform_seed = self.uniform_seed)
ebd_type = tf.reshape(ebd_type, [-1, self.neuron[-1]]) # ntypes * neuron[-1]
if self.padding:
last_type = tf.cast(tf.zeros([1, self.neuron[-1]]), self.filter_precision)
ebd_type = tf.concat([ebd_type, last_type], 0) # (ntypes + 1) * neuron[-1]
self.ebd_type = tf.identity(ebd_type, name ='t_typeebd')
return self.ebd_type
def init_variables(self,
graph: tf.Graph,
graph_def: tf.GraphDef,
suffix = '',
) -> None:
"""
Init the type embedding net variables with the given dict
Parameters
----------
graph : tf.Graph
The input frozen model graph
graph_def : tf.GraphDef
The input frozen model graph_def
suffix
Name suffix to identify this descriptor
"""
self.type_embedding_net_variables = get_type_embedding_net_variables_from_graph_def(graph_def, suffix = suffix)
from typing import TYPE_CHECKING, List, Dict, Optional, Tuple
import numpy as np
def weighted_average(
errors: List[Dict[str, Tuple[float, float]]]
) -> Dict:
"""Compute wighted average of prediction errors for model.
Parameters
----------
errors : List[Dict[str, Tuple[float, float]]]
List: the error of systems
Dict: the error of quantities, name given by the key
Tuple: (error, weight)
Returns
-------
Dict
weighted averages
"""
sum_err = {}
sum_siz = {}
for err in errors:
for kk, (ee, ss) in err.items():
if kk in sum_err:
sum_err[kk] += ee * ee * ss
sum_siz[kk] += ss
else :
sum_err[kk] = ee * ee * ss
sum_siz[kk] = ss
for kk in sum_err.keys():
sum_err[kk] = np.sqrt(sum_err[kk] / sum_siz[kk])
return sum_err
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment