Commit 47af8be9 authored by yuhai's avatar yuhai
Browse files

Initial commit

parents
import time
import torch
import numpy as np
from torch import nn
from pyscf import lib
from pyscf.lib import logger
from pyscf import gto
from pyscf import scf, dft
from deepks.scf.scf import t_make_eig, t_make_grad_eig_dm
def t_ele_grad(bfock, c_vir, c_occ, n_occ):
g = torch.einsum("pa,qi,...pq->...ai", c_vir, c_occ*n_occ, bfock)
return g.flatten(-2)
def make_grad_eig_egrad(dscf, mo_coeff=None, mo_occ=None, gfock=None):
if mo_occ is None:
mo_occ = dscf.mo_occ
if mo_coeff is None:
mo_coeff = dscf.mo_coeff
if gfock is None:
dm = dscf.make_rdm1(mo_coeff, mo_occ)
if dm.ndim >= 3 and isinstance(dscf, scf.uhf.UHF):
dm = dm.sum(0)
gfock = t_make_grad_eig_dm(torch.from_numpy(dm), dscf._t_ovlp_shells).numpy()
if mo_coeff.ndim >= 3 and mo_occ.ndim >= 2:
return np.concatenate([make_grad_eig_egrad(dscf, mc, mo, gfock)
for mc, mo in zip(mo_coeff, mo_occ)], axis=-1)
iocc = mo_occ>0
t_no = torch.from_numpy(mo_occ[iocc]).to(dscf.device)
t_co = torch.from_numpy(mo_coeff[:, iocc]).to(dscf.device)
t_cv = torch.from_numpy(mo_coeff[:, ~iocc]).to(dscf.device)
t_gfock = torch.from_numpy(gfock).to(dscf.device)
return t_ele_grad(t_gfock, t_cv, t_co, t_no).cpu().numpy()
def gen_coul_loss(dscf, fock=None, ovlp=None, mo_occ=None):
nao = dscf.mol.nao
fock = (fock if fock is not None else dscf.get_fock()).reshape(-1, nao, nao)
s1e = ovlp if ovlp is not None else dscf.get_ovlp()
mo_occ = (mo_occ if mo_occ is not None else dscf.mo_occ).reshape(-1, nao)
def _coul_loss_grad(v, target_dm):
# return coulomb loss and its grad with respect to fock matrix
# only support single dm, do not use directly for UHF
a_loss = 0.
a_grad = 0.
target_dm = target_dm.reshape(fock.shape)
for tdm, f1e, nocc in zip(target_dm, fock, mo_occ):
iocc = nocc>0
moe, moc = dscf._eigh(f1e+v, s1e)
eo, ev = moe[iocc], moe[~iocc]
co, cv = moc[:, iocc], moc[:, ~iocc]
dm = (co * nocc[iocc]) @ co.T
# calc loss
ddm = dm - tdm
dvj = dscf.get_j(dm=ddm)
loss = 0.5 * np.einsum("ij,ji", ddm, dvj)
a_loss += loss
# calc grad with respect to fock matrix
ie_mn = 1. / (-ev.reshape(-1, 1) + eo)
temp_mn = cv.T @ dvj @ co * nocc[iocc] * ie_mn
dldv = cv @ temp_mn @ co.T
dldv = dldv + dldv.T
a_grad += dldv
return a_loss, a_grad
return _coul_loss_grad
def make_grad_coul_veig(dscf, target_dm):
clfn = gen_coul_loss(dscf)
dm = dscf.make_rdm1()
if dm.ndim == 3 and isinstance(dscf, scf.uhf.UHF):
dm = dm.sum(0)
t_dm = torch.from_numpy(dm).requires_grad_()
t_eig = t_make_eig(t_dm, dscf._t_ovlp_shells).requires_grad_()
loss, dldv = clfn(np.zeros_like(dm), target_dm)
t_veig = torch.zeros_like(t_eig).requires_grad_()
[t_vc] = torch.autograd.grad(t_eig, t_dm, t_veig, create_graph=True)
[t_ghead] = torch.autograd.grad(t_vc, t_veig, torch.from_numpy(dldv))
return t_ghead.detach().cpu().numpy()
def calc_optim_veig(dscf, target_dm,
target_dec=None, gvx=None,
nstep=1, force_factor=1., **optim_args):
clfn = gen_coul_loss(dscf, fock=dscf.get_fock(vhf=dscf.get_veff0()))
dm = dscf.make_rdm1()
if dm.ndim == 3 and isinstance(dscf, scf.uhf.UHF):
dm = dm.sum(0)
t_dm = torch.from_numpy(dm).requires_grad_()
t_eig = t_make_eig(t_dm, dscf._t_ovlp_shells).requires_grad_()
t_ec = dscf.net(t_eig.to(dscf.device))
t_veig = torch.autograd.grad(t_ec, t_eig)[0].requires_grad_()
t_lde = torch.from_numpy(target_dec) if target_dec is not None else None
t_gvx = torch.from_numpy(gvx) if gvx is not None else None
# build closure
def closure():
[t_vc] = torch.autograd.grad(
t_eig, t_dm, t_veig, retain_graph=True, create_graph=True)
loss, dldv = clfn(t_vc.detach().numpy(), target_dm)
grad = torch.autograd.grad(
t_vc, t_veig, torch.from_numpy(dldv), only_inputs=True)[0]
# build closure for force loss
if t_lde is not None and t_gvx is not None:
t_pde = torch.tensordot(t_gvx, t_veig)
lossde = force_factor * torch.sum((t_pde - t_lde)**2)
grad = grad + torch.autograd.grad(lossde, t_veig, only_inputs=True)[0]
loss = loss + lossde
t_veig.grad = grad
return loss
# do the optimization
optim = torch.optim.LBFGS([t_veig], **optim_args)
tic = (time.process_time(), time.perf_counter())
for _ in range(nstep):
optim.step(closure)
tic = logger.timer(dscf, 'LBFGS step', *tic)
logger.note(dscf, f"optimized loss for veig = {closure()}")
return t_veig.detach().numpy()
def gcalc_optim_veig(gdscf, target_dm, target_grad,
nstep=1, force_factor=1., **optim_args):
target_dec = target_grad - gdscf.de0
gvx = gdscf.make_grad_eig_x()
return calc_optim_veig(
gdscf.base,
target_dm=target_dm,
target_dec=target_dec, gvx=gvx,
nstep=nstep, force_factor=force_factor, **optim_args)
import numpy as np
from typing import List, Callable
from dataclasses import dataclass, field
# Field = namedtuple("Field", ["name", "alias", "calc", "shape"])
# LabelField = namedtuple("LabelField", ["name", "alias", "calc", "shape", "required_labels"])
@dataclass
class Field:
name: str
alias: List[str]
calc: Callable
shape: str
required_labels: List[str] = field(default_factory=list)
def select_fields(names):
names = [n.lower() for n in names]
scfs = [fd for fd in SCF_FIELDS
if fd.name in names
or any(al in names for al in fd.alias)]
grads = [fd for fd in GRAD_FIELDS
if fd.name in names
or any(al in names for al in fd.alias)]
return {"scf": scfs, "grad": grads}
BOHR = 0.52917721092
def isinbohr(mol):
return mol.unit.upper().startswith(("B", "AU"))
def _Lunit(mol):
return (1. if isinbohr(mol) else BOHR)
def atom_data(mol):
raw_data = np.concatenate(
[mol.atom_charges().reshape(-1,1), mol.atom_coords(unit='Bohr')],
axis=1)
non_ghost = [ii for ii in range(mol.natm)
if not mol.elements[ii].startswith("X")]
return raw_data[non_ghost]
SCF_FIELDS = [
Field("atom",
["atoms", "mol", "molecule"],
lambda mf: atom_data(mf.mol),
"(nframe, natom, 4)"),
Field("e_base",
["ebase", "ene_base", "e0",
"e_hf", "ehf", "ene_hf",
"e_ks", "eks", "ene_ks"],
lambda mf: mf.energy_tot0(),
"(nframe, 1)"),
Field("e_tot",
["e_cf", "ecf", "ene_cf", "etot", "ene", "energy", "e"],
lambda mf: mf.e_tot,
"(nframe, 1)"),
Field("rdm",
["dm"],
lambda mf: mf.make_rdm1(),
"(nframe, nao, nao)"),
Field("proj_dm",
["pdm"],
lambda mf: mf.make_pdm(flatten=True),
"(nframe, natom, -1)"),
Field("dm_eig",
["eig"],
lambda mf: mf.make_eig(),
"(nframe, natom, nproj)"),
Field("hcore_eig",
["heig"],
lambda mf: mf.make_eig(mf.get_hcore()),
"(nframe, natom, nproj)"),
Field("ovlp_eig",
["oeig"],
lambda mf: mf.make_eig(mf.get_ovlp()),
"(nframe, natom, nproj)"),
Field("veff_eig",
["veig"],
lambda mf: mf.make_eig(mf.get_veff()),
"(nframe, natom, nproj)"),
Field("fock_eig",
["feig"],
lambda mf: mf.make_eig(mf.get_fock()),
"(nframe, natom, nproj)"),
Field("conv",
["converged", "convergence"],
lambda mf: mf.converged,
"(nframe, 1)"),
Field("mo_coef_occ", # do not support UHF
["mo_coeff_occ, orbital_coeff_occ"],
lambda mf: mf.mo_coeff[:,mf.mo_occ>0].T,
"(nframe, -1, nao)"),
Field("mo_ene_occ", # do not support UHF
["mo_energy_occ, orbital_ene_occ"],
lambda mf: mf.mo_energy[mf.mo_occ>0],
"(nframe, -1)"),
# below are fields that requires labels
Field("l_e_ref",
["e_ref", "lbl_e_ref", "label_e_ref", "le_ref"],
lambda mf, **lbl: lbl["energy"],
"(nframe, 1)",
["energy"]),
Field("l_e_delta",
["le_delta", "lbl_e_delta", "label_e_delta", "lbl_ed"],
lambda mf, **lbl: lbl["energy"] - mf.energy_tot0(),
"(nframe, 1)",
["energy"]),
Field("err_e",
["e_err", "err_e_tot", "err_e_cf"],
lambda mf, **lbl: lbl["energy"] - mf.e_tot,
"(nframe, 1)",
["energy"]),
]
GRAD_FIELDS = [
Field("f_base",
["fbase", "force_base", "f0",
"f_hf", "fhf", "force_hf",
"f_ks", "fks", "force_ks"],
lambda grad: - grad.get_base() / _Lunit(grad.mol),
"(nframe, natom_raw, 3)"),
Field("f_tot",
["f_cf", "fcf", "force_cf", "ftot", "force", "f"],
lambda grad: - grad.de / _Lunit(grad.mol),
"(nframe, natom_raw, 3)"),
Field("grad_dmx",
["grad_dm_x", "grad_pdm_x"],
lambda grad: grad.make_grad_pdm_x(flatten=True) / _Lunit(grad.mol),
"(nframe, natom_raw, 3, natom, -1)"),
Field("grad_vx",
["grad_eig_x", "geigx", "gvx"],
lambda grad: grad.make_grad_eig_x() / _Lunit(grad.mol),
"(nframe, natom_raw, 3, natom, nproj)"),
# below are fields that requires labels
Field("l_f_ref",
["f_ref", "lbl_f_ref", "label_f_ref", "lf_ref"],
lambda grad, **lbl: lbl["force"],
"(nframe, natom_raw, 3)",
["force"]),
Field("l_f_delta",
["lf_delta", "lbl_f_delta", "label_f_delta", "lbl_fd"],
lambda grad, **lbl: lbl["force"] - (-grad.get_base() / _Lunit(grad.mol)),
"(nframe, natom_raw, 3)",
["force"]),
Field("err_f",
["f_err", "err_f_tot", "err_f_cf"],
lambda grad, **lbl: lbl["force"] - (-grad.de / _Lunit(grad.mol)),
"(nframe, natom_raw, 3)",
["force"]),
]
# below are additional methods from addons
from deepks.scf import addons
SCF_FIELDS.extend([
# the following two are used for regularizing the potential
Field("grad_veg",
["grad_eig_egrad", "jac_eig_egrad"],
lambda mf: addons.make_grad_eig_egrad(mf),
"(nframe, natom, nproj, -1)"),
Field("eg_base",
["ele_grad_base", "egrad0", "egrad_base"],
lambda mf: mf.get_grad0(),
"(nframe, -1)"),
# the following one is used for coulomb loss optimization
Field("grad_ldv",
["grad_coul_dv", "grad_coul_deig", "coulomb_grad"],
lambda mf, **lbl: addons.make_grad_coul_veig(mf, target_dm=lbl["dm"]),
"(nframe, natom, nproj)",
["dm"]),
Field("l_veig_raw",
["optim_veig_raw", "l_opt_v_raw", "l_optim_veig_raw"],
lambda mf, **lbl: addons.calc_optim_veig(mf, lbl["dm"], nstep=1),
"(nframe, natom, nproj)",
["dm"]),
])
GRAD_FIELDS.extend([
# the following one is used for coulomb loss optimization from grad class
Field("l_veig",
["optim_veig", "l_opt_v", "l_optim_veig"],
lambda grad, **lbl: addons.gcalc_optim_veig(
grad, lbl["dm"], -_Lunit(grad.mol)*lbl["force"], nstep=1),
"(nframe, natom, nproj)",
["dm", "force"]),
Field("l_veig_nof",
["optim_veig_nof", "l_opt_v_nof", "l_optim_veig_nof"],
lambda grad, **lbl: addons.gcalc_optim_veig(
grad, lbl["dm"], grad.de, nstep=1),
"(nframe, natom, nproj)",
["dm"]),
])
\ No newline at end of file
import abc
import time
import torch
import numpy as np
from pyscf import gto, scf, lib
from pyscf.lib import logger
from pyscf.grad import rks as rks_grad
from pyscf.grad import uks as uks_grad
from deepks.scf.scf import t_make_pdm, t_shell_eig, t_batch_jacobian
# see ./_old_grad.py for a more clear (but maybe slower) implementation
# all variables and functions start with "t_" are torch related.
# convention in einsum:
# i,j: orbital
# a,b: atom
# p,q: projected basis on atom
# r,s: mol basis in pyscf
# x : space component of gradient
# v : eigen values of projected dm
# parameter shapes:
# ovlp_shells: [nao x natom x nsph] list
# pdm_shells: [natom x nsph x nsph] list
# eig_shells: [natom x nsph] list
# ipov_shells: [3 x nao x natom x nsph] list
# gdmx_shells: [natm (deriv atom) x 3 x natm (proj atom) x nsph x nsph] list
# gedm_shells: [natom x nsph x nsph] list
def t_make_grad_e_pdm(model, dm, ovlp_shells):
"""return gradient of energy w.r.t projected density matrix"""
# calculate \partial E / \partial (D^I_rl)_mm' by shells
pdm_shells = [dm.requires_grad_(True) for dm in t_make_pdm(dm, ovlp_shells)]
eig_shells = [t_shell_eig(dm) for dm in pdm_shells]
ceig = torch.cat(eig_shells, dim=-1).unsqueeze(0) # 1 x natoms x nproj
_dref = next(model.parameters())
ec = model(ceig.to(_dref)) # no batch dim here, unsqueeze(0) if needed
gedm_shells = torch.autograd.grad(ec, pdm_shells)
return gedm_shells
def t_make_grad_pdm_x(mol, dm, ovlp_shells, ipov_shells):
"""return jacobian of projected density matrix w.r.t atomic coordinates"""
natm = mol.natm
ralst = [ii for ii in range(mol.natm) if not mol.elements[ii].startswith("X")]
nratm = len(ralst)
shell_sec = [ov.shape[-1] for ov in ovlp_shells]
# [natm (deriv atom) x 3 (xyz) x nratm (proj atom) x nsph (1|3|5) x nsph] list
gdmx_shells = [torch.zeros([natm, 3, nratm, ss, ss], dtype=float)
for ss in shell_sec]
for gdmx, govx, ovlp in zip(gdmx_shells, ipov_shells, ovlp_shells):
# contribution of projection for all I
gproj = torch.einsum('xrap,rs,saq->xapq', govx, dm, ovlp)
for ira, ia in enumerate(ralst):
bg, ed = mol.aoslice_by_atom()[ia, 2:]
# contribution of < \nabla mol_ao |
gdmx[ia] -= torch.einsum('xrap,rs,saq->xapq', govx[:,bg:ed], dm[bg:ed], ovlp)
# contribution of | \nabla alpha^I_rlm >
gdmx[ia,:,ira] += gproj[:, ira]
# symmetrize p and q
gdmx += gdmx.clone().transpose(-1,-2)
return gdmx_shells
def t_make_grad_eig_x(mol, dm, ovlp_shells, ipov_shells):
"""return jacobian of decriptor eigenvalues w.r.t atomic coordinates"""
# v stands for eigen values
pdm_shells = [dm.requires_grad_(True) for dm in t_make_pdm(dm, ovlp_shells)]
gvdm_shells = [t_batch_jacobian(t_shell_eig, dm, dm.shape[-1])
for dm in pdm_shells]
gdmx_shells = t_make_grad_pdm_x(mol, dm, ovlp_shells, ipov_shells)
gvx_shells = [torch.einsum("bxapq,avpq->bxav", gdmx, gvdm)
for gdmx, gvdm in zip(gdmx_shells, gvdm_shells)]
return torch.cat(gvx_shells, dim=-1)
def t_grad_corr(mol, model, dm, ovlp_shells, ipov_shells, atmlst=None):
if atmlst is None:
atmlst = list(range(mol.natm))
ralst = [ii for ii in range(mol.natm) if not mol.elements[ii].startswith("X")]
dec = torch.zeros([len(atmlst), 3], dtype=float)
# \partial E / \partial (D^I_rl)_mm' by shells
gedm_shells = t_make_grad_e_pdm(model, dm, ovlp_shells)
for gedm, govx, ovlp in zip(gedm_shells, ipov_shells, ovlp_shells):
# contribution of projection orbitals for all atom
ginner = torch.einsum('xrap,rs,saq->xapq', govx, dm, ovlp) * 2
# contribution of atomic orbitals for all atom
gouter = -torch.einsum('xrap,apq,saq->xrs', govx, gedm, ovlp) * 2
for k, ia in enumerate(atmlst):
if ia not in ralst:
continue
bg, ed = mol.aoslice_by_atom()[ia, 2:]
ira = ralst.index(ia)
# contribution of | \nabla alpha^I_rlm > and < \nabla alpha^I_rlm |
dec[k] += torch.einsum('xpq,pq->x', ginner[:,ira], gedm[ira])
# contribution of < \nabla mol_ao | and | \nabla mol_ao >
dec[k] += torch.einsum('xrs,rs->x', gouter[:,bg:ed], dm[bg:ed])
return dec
class CorrGradMixin(abc.ABC):
def __init__(self, *args, **kwargs):
# add a field to memorize the pulay term in ec
self.dec = None
def grad_elec(self, mo_energy=None, mo_coeff=None, mo_occ=None, atmlst=None):
de = super().grad_elec(mo_energy, mo_coeff, mo_occ, atmlst)
cput0 = (time.process_time(), time.perf_counter())
dec = self.grad_corr(self.base.make_rdm1(mo_coeff, mo_occ), atmlst)
logger.timer(self, 'gradients of NN pulay part', *cput0)
# memeorize the result to save time in get_base
self.dec = self.symmetrize(dec, atmlst) if self.mol.symmetry else dec
return de + dec
def get_base(self):
"""return the grad given by raw base method Hamiltonian under current dm"""
assert self.de is not None and self.dec is not None
return self.de - self.dec
de0 = property(get_base)
@abc.abstractmethod
def grad_corr(self, dm=None, atmlst=None):
"""additional contribution of NN "correction" term resulted from projection"""
if atmlst is None:
atmlst = range(self.mol.natm)
return np.zeros([len(atmlst), 3])
class NetGradMixin(CorrGradMixin):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# prepare integrals for projection and derivative
self.prepare_integrals()
def prepare_integrals(self):
mf = self.base
# < mol_ao | alpha^I_rlm > by shells
self._t_ovlp_shells = mf._t_ovlp_shells
# < \nabla mol_ao | alpha^I_rlm >
t_proj_ipovlp = torch.from_numpy(mf.proj_intor("int1e_ipovlp")).double()
# < \nabla mol_ao | alpha^I_rlm > by shells
self._t_ipov_shells = torch.split(
t_proj_ipovlp.reshape(3, self.mol.nao, self.base._pmol.natm, -1),
self.base._shell_sec, -1)
def grad_corr(self, dm=None, atmlst=None):
"""additional contribution of NN "correction" term resulted from projection"""
if atmlst is None:
atmlst = range(self.mol.natm)
if self.base.net is None:
return np.zeros([len(atmlst), 3])
if dm is None:
dm = self.base.make_rdm1()
if dm.ndim > 2: # for uhf case
dm = dm.sum(0)
t_dm = torch.from_numpy(dm).double()
t_dec = t_grad_corr(self.mol, self.base.net, t_dm,
self._t_ovlp_shells, self._t_ipov_shells, atmlst)
return t_dec.detach().cpu().numpy()
def make_grad_pdm_x(self, dm=None, flatten=False):
"""return jacobian of projected density matrix w.r.t atomic coordinates"""
if dm is None:
dm = self.base.make_rdm1()
if dm.ndim > 2: # for uhf case
dm = dm.sum(0)
t_dm = torch.from_numpy(dm).double()
t_gdmx_shells = t_make_grad_pdm_x(self.mol, t_dm,
self._t_ovlp_shells, self._t_ipov_shells)
if not flatten:
return [s.detach().cpu().numpy() for s in t_gdmx_shells]
else:
return torch.cat([s.flatten(-2) for s in t_gdmx_shells],
dim=-1).detach().cpu().numpy()
def make_grad_eig_x(self, dm=None):
"""return jacobian of decriptor eigenvalues w.r.t atomic coordinates"""
if dm is None:
dm = self.base.make_rdm1()
if dm.ndim > 2: # for uhf case
dm = dm.sum(0)
t_dm = torch.from_numpy(dm).double()
t_gvx = t_make_grad_eig_x(self.mol, t_dm,
self._t_ovlp_shells, self._t_ipov_shells)
return t_gvx.detach().cpu().numpy()
def as_scanner(self):
scanner = super().as_scanner()
# make a new version of call method
class NewScanner(type(scanner)):
def __call__(self, mol_or_geom, **kwargs):
if isinstance(mol_or_geom, gto.Mole):
mol = mol_or_geom
else:
mol = self.mol.set_geom_(mol_or_geom, inplace=False)
mf_scanner = self.base
e_tot = mf_scanner(mol)
self.mol = mol
if getattr(self, 'grids', None):
self.grids.reset(mol)
# adding the following line to refresh integrals
self.prepare_integrals()
de = self.kernel(**kwargs)
return e_tot, de
# hecking the old scanner's method, bind the new one
scanner.__class__ = NewScanner
return scanner
# additional methods for dm training impl'd in addons
# from deepks.scf.addons import gcalc_optim_veig as calc_optim_veig
def build_grad(mf):
if isinstance(mf, scf.uhf.UHF):
return UGradients(mf)
else:
return Gradients(mf)
class Gradients(NetGradMixin, rks_grad.Gradients):
"""Analytical nuclear gradient for restricted DeePKS model"""
def __init__(self, mf):
# makesure the base method is initialized first
rks_grad.Gradients.__init__(self, mf)
NetGradMixin.__init__(self)
self._keys.update(self.__dict__.keys())
Grad = Gradients
RGrad = RGradients = Gradients
from deepks.scf.scf import DSCF
# Inject to SCF class
DSCF.Gradients = lib.class_as_method(Gradients)
class UGradients(NetGradMixin, uks_grad.Gradients):
"""Analytical nuclear gradient for unrestricted DeePKS model"""
def __init__(self, mf):
# makesure the base method is initialized first
uks_grad.Gradients.__init__(self, mf)
NetGradMixin.__init__(self)
self._keys.update(self.__dict__.keys())
UGrad = UGradients
from deepks.scf.scf import UDSCF
# Inject to SCF class
UDSCF.Gradients = lib.class_as_method(UGradients)
# # legacy method, kept for reference
# def make_mask(mol1, mol2, atom_id):
# mask = np.zeros((mol1.nao, mol2.nao))
# bg1, ed1 = mol1.aoslice_by_atom()[atom_id, 2:]
# bg2, ed2 = mol2.aoslice_by_atom()[atom_id, 2:]
# mask[bg1:ed1, :] -= 1
# mask[:, bg2:ed2] += 1
# return mask
# # only for testing purpose, not used in code
# def finite_difference(f, x, delta=1e-6):
# in_shape = x.shape
# y0 = f(x)
# out_shape = y0.shape
# res = np.empty(in_shape + out_shape)
# for idx in np.ndindex(*in_shape):
# diff = np.zeros(in_shape)
# diff[idx] += delta
# y1 = f(x+diff)
# res[idx] = (y1-y0) / delta
# return res
\ No newline at end of file
import time
import numpy as np
from pyscf.dft import numint, gen_grid
from pyscf.lib import logger
from deepks.utils import check_list
def select_penalty(name):
name = name.lower()
if name == "density":
return DensityPenalty
if name == "coulomb":
return CoulombPenalty
raise ValueError(f"unknown penalty type: {name}")
class PenaltyMixin(object):
"""Mixin class to add penalty potential in Fock matrix"""
def __init__(self, penalties=None):
self.penalties = check_list(penalties)
for pnt in self.penalties:
pnt.init_hook(self)
def get_fock(self, h1e=None, s1e=None, vhf=None, dm=None, cycle=-1,
diis=None, diis_start_cycle=None,
level_shift_factor=None, damp_factor=None):
"""modified get_fock method to apply penalty terms onto vhf"""
if dm is None:
dm = self.make_rdm1()
if h1e is None:
h1e = self.get_hcore()
if vhf is None:
vhf = self.get_veff(dm=dm)
vp = sum(pnt.fock_hook(self, dm=dm, h1e=h1e, vhf=vhf, cycle=cycle)
for pnt in self.penalties)
vhf = vhf + vp
return super().get_fock(h1e=h1e, s1e=s1e, vhf=vhf, dm=dm, cycle=cycle,
diis=diis, diis_start_cycle=diis_start_cycle,
level_shift_factor=level_shift_factor, damp_factor=damp_factor)
class AbstructPenalty(object):
"""
Abstruct class for penalty term in scf hamiltonian.
To implement a penalty one needs to implement
fock_hook and (optional) init_hook methods.
"""
required_labels = [] # the label would be load and pass to __init__
def init_hook(self, mf, **envs):
"""
Method to be called when initialize the scf object.
Used to initialize the penalty with molecule info.
"""
pass
def fock_hook(self, mf, dm=None, h1e=None, vhf=None, cycle=-1, **envs):
"""
Method to be called before get_fock is called.
The returned matrix would be added to the vhf matrix
"""
raise NotImplementedError("fock_hook method is not implemented")
class DummyPenalty(AbstructPenalty):
def fock_hook(self, mf, dm=None, h1e=None, vhf=None, cycle=-1, **envs):
return 0
class DensityPenalty(AbstructPenalty):
r"""
penalty on the difference w.r.t target density
E_p = \lambda / 2 * \int dx (\rho(x) - \rho_target(x))^2
V_p = \lambda * \int dx <ao_i|x> (\rho(x) - \rho_target(x)) <x|ao_j>
The target density should be given as density matrix in ao basis
"""
required_labels = ["dm"]
def __init__(self, target_dm, strength=1, random=False, start_cycle=0):
if isinstance(target_dm, str):
target_dm = np.load(target_dm)
self.dm_t = target_dm
self.init_strength = strength
self.strength = strength * np.random.rand() if random else strength
self.start_cycle = start_cycle
# below are values to be initialized later in init_hook
self.grids = None
self.ao_value = None
def init_hook(self, mf, **envs):
if hasattr(mf, "grid"):
self.grids = mf.grids
else:
self.grids = gen_grid.Grids(mf.mol)
def fock_hook(self, mf, dm=None, h1e=None, vhf=None, cycle=-1, **envs):
# cycle > 0 means it is doing scf iteration
if 0 <= cycle < self.start_cycle:
return 0
if self.grids.coords is None:
self.grids.build()
if self.ao_value is None:
self.ao_value = numint.eval_ao(mf.mol, self.grids.coords, deriv=0)
tic = (time.process_time(), time.perf_counter())
rho_diff = numint.eval_rho(mf.mol, self.ao_value, dm - self.dm_t)
v_p = numint.eval_mat(mf.mol, self.ao_value, self.grids.weights, rho_diff, rho_diff)
# cycle < 0 means it is just checking, we only print here
if cycle < 0 and mf.verbose >=4:
diff_norm = np.sum(np.abs(rho_diff)*self.grids.weights)
logger.info(mf, f" Density Penalty: |diff| = {diff_norm}")
logger.timer(mf, "dens_pnt", *tic)
return self.strength * v_p
class CoulombPenalty(AbstructPenalty):
r"""
penalty given by the coulomb energy of density difference
"""
required_labels = ["dm"]
def __init__(self, target_dm, strength=1, random=False, start_cycle=0):
if isinstance(target_dm, str):
target_dm = np.load(target_dm)
self.dm_t = target_dm
self.init_strength = strength
self.strength = strength * np.random.rand() if random else strength
self.start_cycle = start_cycle
def fock_hook(self, mf, dm=None, h1e=None, vhf=None, cycle=-1, **envs):
# cycle > 0 means it is doing scf iteration
if 0 <= cycle < self.start_cycle:
return 0
tic = (time.process_time(), time.perf_counter())
ddm = dm - self.dm_t
v_p = mf.get_j(dm=ddm)
# cycle < 0 means it is just checking, we only print here
if cycle < 0 and mf.verbose >=4:
diff_norm = np.sum(ddm * v_p)
logger.info(mf, f" Coulomb Penalty: |diff| = {diff_norm}")
logger.timer(mf, "coul_pnt", *tic)
return self.strength * v_p
\ No newline at end of file
import os
import sys
import time
import numpy as np
import torch
from pyscf import gto, lib
try:
import deepks
except ImportError as e:
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + "/../../")
from deepks.scf.scf import DSCF, UDSCF
from deepks.scf.fields import select_fields
from deepks.scf.penalty import select_penalty
from deepks.model.model import CorrNet
from deepks.utils import check_list, flat_file_list
from deepks.utils import is_xyz, load_sys_paths
from deepks.utils import load_yaml, load_array
from deepks.utils import get_sys_name, get_with_prefix
DEFAULT_UNIT = "Bohr"
DEFAULT_FNAMES = {"e_tot", "e_base", "dm_eig", "conv"}
DEFAULT_HF_ARGS = {
"conv_tol": 1e-9
}
DEFAULT_SCF_ARGS = {
"conv_tol": 1e-7,
# "level_shift": 0.1,
# "diis_space": 20
}
MOL_ATTRIBUTE = {"charge", "basis", "unit"} # other molecule properties
def solve_mol(mol, model, fields, labels=None,
proj_basis=None, penalties=None, device=None,
chkfile=None, verbose=0,
**scf_args):
tic = time.time()
SCFcls = DSCF if mol.spin == 0 else UDSCF
cf = SCFcls(mol, model,
proj_basis=proj_basis,
penalties=penalties,
device=device)
cf.set(chkfile=chkfile, verbose=verbose)
grid_args = scf_args.pop("grids", {})
cf.set(**scf_args)
cf.grids.set(**grid_args)
cf.kernel()
natom_raw = mol.natm
natom = cf._pmol.natm
nao = mol.nao
nproj = cf.nproj
meta = np.array([natom, natom_raw, nao, nproj])
res = {}
if labels is None:
labels = {}
for fd in fields["scf"]:
fls = {k:labels[k] for k in fd.required_labels}
res[fd.name] = fd.calc(cf, **fls)
if fields["grad"]:
gd = cf.nuc_grad_method().run()
for fd in fields["grad"]:
fls = {k:labels[k] for k in fd.required_labels}
res[fd.name] = fd.calc(gd, **fls)
tac = time.time()
if verbose:
print(f"time of scf: {tac - tic:6.2f}s, converged: {cf.converged}")
return meta, res
def get_required_labels(fields=None, penalty_dicts=None):
field_labels = [check_list(f.required_labels)
for f in check_list(fields)]
penalty_labels = [check_list(p.get("required_labels",
select_penalty(p["type"]).required_labels))
for p in check_list(penalty_dicts)]
return set(sum(field_labels + penalty_labels, []))
def system_iter(path, labels=None):
"""
return an iterator that gives atoms and required labels each time
path: either an xyz file, or a folder contains (atom.npy | (coord.npy & type.raw))
labels: a set contains required label names, will be load by $base[.|/]$label.npy
$base will be the basename of the xyz file (followed by .) or the folder (followed by /)
"""
if labels is None:
labels = set()
base = get_sys_name(path)
attr_paths = {at: get_with_prefix(at, base, ".npy", True) for at in MOL_ATTRIBUTE}
attr_paths = {k: v for k, v in attr_paths.items() if v is not None}
attrs = attr_paths.keys()
label_paths = {lb: get_with_prefix(lb, base, prefer=".npy") for lb in labels}
# if xyz, will yield single frame. Assume all labels are single frame
if is_xyz(path):
atom = path
attr_dict = {at: load_array(attr_paths[at]) for at in attrs}
if "unit" not in attr_dict:
attr_dict["unit"] = "Angstrom"
label_dict = {lb: load_array(label_paths[lb]) for lb in labels}
yield atom, attr_dict, label_dict
return
# a folder contains multiple frames data, yield one by one
else:
assert os.path.isdir(path), f"system {path} is neither .xyz or dir"
all_attrs = {at: load_array(attr_paths[at]) for at in attrs}
all_labels = {lb: load_array(label_paths[lb]) for lb in labels}
try:
atom_array = load_array(get_with_prefix("atom", path, prefer=".npy"))
assert len(atom_array.shape) == 3 and atom_array.shape[2] == 4, atom_array.shape
nframes = atom_array.shape[0]
elements = np.rint(atom_array[:, :, 0]).astype(int)
coords = atom_array[:, :, 1:]
except FileNotFoundError:
coords = load_array(get_with_prefix("coord", path, prefer=".npy"))
assert len(coords.shape) == 3 and coords.shape[2] == 3, coords.shape
nframes = coords.shape[0]
elements = np.loadtxt(os.path.join(path, "type.raw"), dtype=str)\
.reshape(1,-1).repeat(nframes, axis=0)
for i in range(nframes):
atom = [[e,c] for e,c in zip(elements[i], coords[i])]
attr_dict = {at: (all_attrs[at][i]
if all_attrs[at].ndim > 0
and all_attrs[at].shape[0] == nframes
else all_attrs[at])
for at in attrs}
label_dict = {lb: all_labels[lb][i] for lb in labels}
yield atom, attr_dict, label_dict
return
def build_mol(atom, basis='ccpvdz', unit=DEFAULT_UNIT, verbose=0, **kwargs):
# build a molecule using given atom input
# set the default basis to cc-pVDZ and use input unit 'Ang"
mol = gto.Mole()
# change minimum max memory to 16G
# mol.max_memory = max(16000, mol.max_memory)
if isinstance(unit, np.ndarray):
unit = unit.tolist()
mol.unit = unit
mol.atom = atom
mol.basis = basis
mol.verbose = verbose
mol.set(**kwargs)
mol.spin = mol.nelectron % 2
mol.build(0,0)
return mol
def build_penalty(pnt_dict, label_dict={}):
pnt_dict = pnt_dict.copy()
pnt_type = pnt_dict.pop("type")
PenaltyClass = select_penalty(pnt_type)
label_names = pnt_dict.pop("required_labels", PenaltyClass.required_labels)
label_arrays = [label_dict[lb] for lb in check_list(label_names)]
return PenaltyClass(*label_arrays, **pnt_dict)
def collect_fields(fields, meta, res_list):
if isinstance(fields, dict):
fields = sum(fields.values(), [])
if isinstance(res_list, dict):
res_list = [res_list]
nframe = len(res_list)
natom, natom_raw, nao, nproj = meta
res_dict = {}
for fd in fields:
fd_res = np.array([res[fd.name] for res in res_list])
if fd.shape:
fd_shape = eval(fd.shape, {}, locals())
fd_res = fd_res.reshape(fd_shape)
res_dict[fd.name] = fd_res
return res_dict
def dump_meta(dir_name, meta):
os.makedirs(dir_name, exist_ok = True)
np.savetxt(os.path.join(dir_name, 'system.raw'),
np.reshape(meta, (1,-1)),
fmt = '%d', header = 'natom natom_raw nao nproj')
def dump_data(dir_name, **data_dict):
os.makedirs(dir_name, exist_ok = True)
for name, value in data_dict.items():
np.save(os.path.join(dir_name, f'{name}.npy'), value)
def main(systems, model_file="model.pth", basis='ccpvdz',
proj_basis=None, penalty_terms=None, device=None,
dump_dir=".", dump_fields=DEFAULT_FNAMES, group=False,
mol_args=None, scf_args=None, verbose=0):
if model_file is None or model_file.upper() == "NONE":
model = None
default_scf_args = DEFAULT_HF_ARGS
else:
model = CorrNet.load(model_file).double()
default_scf_args = DEFAULT_SCF_ARGS
# check arguments
penalty_terms = check_list(penalty_terms)
if mol_args is None: mol_args = {}
if scf_args is None: scf_args = {}
scf_args = {**default_scf_args, **scf_args}
fields = select_fields(dump_fields)
# check label names from label fields and penalties
label_names = get_required_labels(fields["scf"]+fields["grad"], penalty_terms)
if verbose:
print(f"starting calculation with OMP threads: {lib.num_threads()}",
f"and max memory: {lib.param.MAX_MEMORY}")
if verbose > 1:
print(f"basis: {basis}")
print(f"specified scf args:\n {scf_args}")
meta = old_meta = None
res_list = []
systems = load_sys_paths(systems)
for fl in systems:
fl = fl.rstrip(os.path.sep)
for atom, attrs, labels in system_iter(fl, label_names):
mol_input = {**mol_args, "verbose":verbose,
"atom": atom, "basis": basis, **attrs}
mol = build_mol(**mol_input)
penalties = [build_penalty(pd, labels) for pd in penalty_terms]
try:
meta, result = solve_mol(mol, model, fields, labels,
proj_basis=proj_basis, penalties=penalties,
device=device, verbose=verbose, **scf_args)
except Exception as e:
print(fl, 'failed! error:', e, file=sys.stderr)
# continue
raise
if group and old_meta is not None and np.any(meta != old_meta):
break
res_list.append(result)
if not group:
sub_dir = os.path.join(dump_dir, get_sys_name(os.path.basename(fl)))
dump_meta(sub_dir, meta)
dump_data(sub_dir, **collect_fields(fields, meta, res_list))
res_list = []
elif old_meta is not None and np.any(meta != old_meta):
print(fl, 'meta does not match! saving previous results only.', file=sys.stderr)
break
old_meta = meta
if verbose:
print(fl, 'finished')
if group:
dump_meta(dump_dir, meta)
dump_data(dump_dir, **collect_fields(fields, meta, res_list))
if verbose:
print('group finished')
if __name__ == "__main__":
from deepks.main import scf_cli as cli
cli()
\ No newline at end of file
import abc
import time
import torch
import numpy as np
from torch import nn
from pyscf import lib
from pyscf.lib import logger
from pyscf import gto
from pyscf import scf, dft
from deepks.utils import load_basis, get_shell_sec
from deepks.model.model import CorrNet
from deepks.scf.penalty import PenaltyMixin
DEVICE = 'cpu'#torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# all variables and functions start with "t_" are torch based.
# all variables and functions ends with "0" are original base method results
# convention in einsum:
# i,j: orbital
# a,b: atom
# p,q: projected basis on atom
# r,s: mol basis in pyscf
# parameter shapes:
# ovlp_shells: [nao x natom x nsph] list
# pdm_shells: [natom x nsph x nsph] list
# eig_shells: [natom x nsph] list
def t_make_pdm(dm, ovlp_shells):
"""return projected density matrix by shell"""
# (D^I_rl)_mm' = \sum_i < alpha^I_rlm | phi_i >< phi_i | aplha^I_rlm' >
pdm_shells = [torch.einsum('rap,...rs,saq->...apq', po, dm, po)
for po in ovlp_shells]
return pdm_shells
if hasattr(torch, "linalg"):
def t_shell_eig(pdm):
return torch.linalg.eigvalsh(pdm)
else:
def t_shell_eig(pdm):
return torch.symeig(pdm, eigenvectors=True)[0]
def t_make_eig(dm, ovlp_shells):
"""return eigenvalues of projected density matrix"""
pdm_shells = t_make_pdm(dm, ovlp_shells)
eig_shells = [t_shell_eig(dm) for dm in pdm_shells]
ceig = torch.cat(eig_shells, dim=-1)
return ceig
def t_get_corr(model, dm, ovlp_shells, with_vc=True):
"""return the "correction" energy (and potential) given by a NN model"""
dm.requires_grad_(True)
ceig = t_make_eig(dm, ovlp_shells) # natoms x nproj
_dref = next(model.parameters()) if isinstance(model, nn.Module) else DEVICE
ec = model(ceig.to(_dref)) # no batch dim here, unsqueeze(0) if needed
if not with_vc:
return ec.to(ceig)
[vc] = torch.autograd.grad(ec, dm, torch.ones_like(ec))
return ec.to(ceig), vc
def t_batch_jacobian(f, x, noutputs):
nindim = len(x.shape)-1
x = x.unsqueeze(1) # b, 1 ,*in_dim
n = x.shape[0]
x = x.repeat(1, noutputs, *[1]*nindim) # b, out_dim, *in_dim
x.requires_grad_(True)
y = f(x)
input_val = torch.eye(noutputs).reshape(1,noutputs, noutputs).repeat(n, 1, 1)
return torch.autograd.grad(y, x, input_val)[0]
def t_make_grad_eig_dm(dm, ovlp_shells):
"""return jacobian of decriptor eigenvalues w.r.t 1-rdm"""
# using the sparsity, much faster than naive torch version
# v stands for eigen values
pdm_shells = [dm.requires_grad_(True) for dm in t_make_pdm(dm, ovlp_shells)]
gvdm_shells = [t_batch_jacobian(t_shell_eig, dm, dm.shape[-1])
for dm in pdm_shells]
vjac_shells = [torch.einsum('rap,avpq,saq->avrs', po, gdm, po)
for po, gdm in zip(ovlp_shells, gvdm_shells)]
return torch.cat(vjac_shells, dim=1)
def gen_proj_mol(mol, basis) :
mole_coords = mol.atom_coords(unit="Ang")
mole_ele = mol.elements
test_mol = gto.Mole()
test_mol.atom = [["X", coord] for coord, ele in zip(mole_coords, mole_ele)
if not ele.startswith("X")]
test_mol.basis = basis
test_mol.build(0,0,unit="Ang")
return test_mol
class CorrMixin(abc.ABC):
"""Abstruct mixin class to add "correction" term to the mean-field Hamiltionian"""
def get_veff0(self, *args, **kwargs):
return super().get_veff(*args, **kwargs)
def get_grad0(self, mo_coeff=None, mo_occ=None):
if mo_occ is None: mo_occ = self.mo_occ
if mo_coeff is None: mo_coeff = self.mo_coeff
return super().get_grad(mo_coeff, mo_occ,
fock=self.get_fock(vhf=self.get_veff0()))
def energy_elec0(self, dm=None, h1e=None, vhf=None):
if vhf is None: vhf = self.get_veff0(dm=dm)
return super().energy_elec(dm, h1e, vhf)
def energy_tot0(self, dm=None, h1e=None, vhf=None):
return self.energy_elec0(dm, h1e, vhf)[0] + self.energy_nuc()
def nuc_grad_method0(self):
return super().nuc_grad_method()
def get_veff(self, mol=None, dm=None, dm_last=0, vhf_last=0, hermi=1):
"""original mean field potential + correction potential"""
if mol is None:
mol = self.mol
if dm is None:
dm = self.make_rdm1()
tic = (time.process_time(), time.perf_counter())
# base method part
v0_last = getattr(vhf_last, 'v0', 0)
v0 = self.get_veff0(mol, dm, dm_last, v0_last, hermi)
tic = logger.timer(self, 'v0', *tic)
# Correlation (or correction) part
ec, vc = self.get_corr(dm)
tic = logger.timer(self, 'vc', *tic)
# make total effective potential
vtot = v0 + vc
vtot = lib.tag_array(vtot, ec=ec, v0=v0)
return vtot
def energy_elec(self, dm=None, h1e=None, vhf=None):
"""return electronic energy and the 2-electron part contribution"""
if dm is None:
dm = self.make_rdm1()
if h1e is None:
h1e = self.get_hcore()
if vhf is None or getattr(vhf, 'ec', None) is None:
vhf = self.get_veff(dm=dm)
etot, e2 = self.energy_elec0(dm, h1e, vhf.v0)
ec = vhf.ec
logger.debug(self, f'Emodel = {ec}')
return (etot+ec).real, e2+ec
@abc.abstractmethod
def get_corr(self, dm=None):
"""return "correction" energy and corresponding potential"""
if dm is None:
dm = self.make_rdm1()
return 0., np.zeros_like(dm)
@abc.abstractmethod
def nuc_grad_method(self):
return self.nuc_grad_method0()
class NetMixin(CorrMixin):
"""Mixin class to add correction term given by a neural network model"""
def __init__(self, model, proj_basis=None, device=DEVICE):
# make sure you call this method after the base SCF class init
# otherwise it would throw an error due to the lack of mol attr
self.device = device
if isinstance(model, str):
model = CorrNet.load(model).double()
if isinstance(model, torch.nn.Module):
model = model.to(self.device).eval()
self.net = model
# try load basis from model file
if proj_basis is None:
proj_basis = getattr(model, "_pbas", None)
# should be a list here, follow pyscf convention
self._pbas = load_basis(proj_basis)
# [1,1,1,...,3,3,3,...,5,5,5,...]
self._shell_sec = get_shell_sec(self._pbas)
# total number of projected basis per atom
self.nproj = sum(self._shell_sec)
# prepare overlap integrals used in projection
self.prepare_integrals()
def prepare_integrals(self):
# a virtual molecule to be projected on
self._pmol = gen_proj_mol(self.mol, self._pbas)
# < mol_ao | alpha^I_rlm >, shape=[nao x natom x nproj]
t_proj_ovlp = torch.from_numpy(self.proj_ovlp()).double()
# split the projected coeffs by shell (different r and l)
self._t_ovlp_shells = torch.split(t_proj_ovlp, self._shell_sec, -1)
def get_corr(self, dm=None):
"""return "correction" energy and corresponding potential"""
if dm is None:
dm = self.make_rdm1()
if self.net is None:
return 0., np.zeros_like(dm)
dm = np.asanyarray(dm)
if dm.ndim >= 3 and isinstance(self, scf.uhf.UHF):
dm = dm.sum(0)
t_dm = torch.from_numpy(dm).double()
t_ec, t_vc = t_get_corr(self.net, t_dm, self._t_ovlp_shells, with_vc=True)
ec = t_ec.item() if t_ec.nelement()==1 else t_ec.detach().cpu().numpy()
vc = t_vc.detach().cpu().numpy()
ec = ec + self.net.get_elem_const(filter(None, self.mol.atom_charges()))
return ec, vc
def nuc_grad_method(self):
from deepks.scf.grad import build_grad
return build_grad(self)
def reset(self, mol=None):
super().reset(mol)
self.prepare_integrals()
return self
def make_pdm(self, dm=None, flatten=False):
"""return projected density matrix by shell"""
if dm is None:
dm = self.make_rdm1()
t_dm = torch.from_numpy(dm).double()
t_pdm_shells = t_make_pdm(t_dm, self._t_ovlp_shells)
if not flatten:
return [s.detach().cpu().numpy() for s in t_pdm_shells]
else:
return torch.cat([s.flatten(-2) for s in t_pdm_shells],
dim=-1).detach().cpu().numpy()
def make_eig(self, dm=None):
"""return eigenvalues of projected density matrix"""
if dm is None:
dm = self.make_rdm1()
dm = np.asanyarray(dm)
if dm.ndim >= 3 and isinstance(self, scf.uhf.UHF):
dm = dm.sum(0)
t_dm = torch.from_numpy(dm).double()
t_eig = t_make_eig(t_dm, self._t_ovlp_shells)
return t_eig.detach().cpu().numpy()
def proj_intor(self, intor):
"""1-electron integrals between origin and projected basis"""
proj = gto.intor_cross(intor, self.mol, self._pmol)
return proj
def proj_ovlp(self):
"""overlap between origin and projected basis, reshaped"""
nao = self.mol.nao
natm = self._pmol.natm
pnao = self._pmol.nao
proj = self.proj_intor("int1e_ovlp")
# return shape [nao x natom x nproj]
return proj.reshape(nao, natm, pnao // natm)
# additional methods for dm training impl'd in addons
# from deepks.scf.addons import make_grad_eig_egrad
# from deepks.scf.addons import make_grad_coul_veig
# from deepks.scf.addons import calc_optim_veig
class DSCF(NetMixin, PenaltyMixin, dft.rks.RKS):
"""Restricted SCF solver for given NN energy model"""
def __init__(self, mol, model, xc="HF", proj_basis=None, penalties=None, device=DEVICE):
# base method must be initialized first
dft.rks.RKS.__init__(self, mol, xc=xc)
# correction mixin initialization
NetMixin.__init__(self, model, proj_basis=proj_basis, device=device)
# penalty term initialization
PenaltyMixin.__init__(self, penalties=penalties)
# update keys to avoid pyscf warning
self._keys.update(self.__dict__.keys())
DeepSCF = RDSCF = DSCF
class UDSCF(NetMixin, PenaltyMixin, dft.uks.UKS):
"""Unrestricted SCF solver for given NN energy model"""
def __init__(self, mol, model, xc="HF", proj_basis=None, penalties=None, device=DEVICE):
# base method must be initialized first
dft.uks.UKS.__init__(self, mol, xc=xc)
# correction mixin initialization
NetMixin.__init__(self, model, proj_basis=proj_basis, device=device)
# penalty term initialization
PenaltyMixin.__init__(self, penalties=penalties)
# update keys to avoid pyscf warning
self._keys.update(self.__dict__.keys())
# if __name__ == '__main__':
# mol = gto.Mole()
# mol.verbose = 5
# mol.output = None
# mol.atom = [['He', (0, 0, 0)], ]
# mol.basis = 'ccpvdz'
# mol.build(0, 0)
# def test_model(eigs):
# assert eigs.shape[-1] == _zeta.size * 9
# return 1e-3 * torch.sum(eigs, axis=(1,2))
# # SCF Procedure
# dscf = DSCF(mol, test_model)
# energy = dscf.kernel()
# print(energy)
import os
import sys
import glob
import numpy as np
import shutil
try:
import deepks
except ImportError as e:
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + "/../../")
from deepks.utils import check_list, check_array
from deepks.utils import load_array, load_yaml
from deepks.utils import get_sys_name, get_with_prefix
def concat_data(systems=None, sys_dir=".", dump_dir=".", pattern="*"):
if systems is None:
systems = sorted(filter(os.path.isdir,
map(os.path.abspath, glob.glob(f"{sys_dir}/{pattern}"))))
npy_names = list(map(os.path.basename, glob.glob(f"{systems[0]}/*.npy")))
os.makedirs(dump_dir, exist_ok=True)
for nm in npy_names:
tmp_array = np.concatenate([np.load(f"{sys}/{nm}") for sys in systems])
np.save(f"{dump_dir}/{nm}", tmp_array)
if os.path.exists(f'{systems[0]}/system.raw'):
shutil.copy(f'{systems[0]}/system.raw', dump_dir)
def print_stats(systems=None, test_sys=None,
dump_dir=None, test_dump=None, group=False,
with_conv=True, with_e=True, e_name="e_tot",
with_f=True, f_name="f_tot"):
load_func = load_stat if not group else load_stat_grouped
if dump_dir is None:
dump_dir = "."
if test_dump is None:
test_dump = dump_dir
shift = None
if systems is not None:
tr_c, tr_e, tr_f = load_func(systems, dump_dir, with_conv,
with_e, e_name, with_f, f_name)
print("Training:")
if tr_c is not None:
print_stats_conv(tr_c, indent=2)
if tr_e is not None:
shift = tr_e.mean()
print_stats_e(tr_e, shift=shift, indent=2)
if tr_f is not None:
print_stats_f(tr_f, indent=2)
if test_sys is not None:
ts_c, ts_e, ts_f = load_func(test_sys, test_dump, with_conv,
with_e, e_name, with_f, f_name)
print("Testing:")
if ts_c is not None:
print_stats_conv(ts_c, indent=2)
if ts_e is not None:
print_stats_e(ts_e, shift=shift, indent=2)
if ts_f is not None:
print_stats_f(ts_f, indent=2)
def print_stats_conv(conv, indent=0):
nsys = conv.shape[0]
ind = " "*indent
print(ind+f'Convergence:')
print(ind+f' {np.sum(conv)} / {nsys} = \t {np.mean(conv):.5f}')
def print_stats_e(e_err, shift=None, indent=0):
ind = " "*indent
print(ind+"Energy:")
print(ind+f' ME: \t {e_err.mean()}')
print(ind+f' MAE: \t {np.abs(e_err).mean()}')
if shift:
print(ind+f' MARE: \t {np.abs(e_err-shift).mean()}')
def print_stats_f(f_err, indent=0):
ind = " "*indent
print(ind+"Force:")
print(ind+f' MAE: \t {np.abs(f_err).mean()}')
def load_stat(systems, dump_dir,
with_conv=True, with_e=True, e_name="e_tot",
with_f=True, f_name="f_tot"):
systems = check_list(systems)
c_res = []
e_err = []
f_err = []
for fl in systems:
lbase = get_sys_name(fl)
rbase = os.path.join(dump_dir, os.path.basename(lbase))
if with_conv:
try:
c_res.append(load_array(get_with_prefix("conv", rbase, ".npy")))
except FileNotFoundError as e:
print("Warning! conv.npy not found:", e, file=sys.stderr)
if with_e:
try:
re = load_array(get_with_prefix(e_name, rbase, ".npy")).reshape(-1,1)
le = load_array(get_with_prefix("energy", lbase, ".npy")).reshape(-1,1)
e_err.append(le - re)
except FileNotFoundError as e:
print("Warning! energy file not found:", e, file=sys.stderr)
if with_f:
try:
rf = load_array(get_with_prefix(f_name, rbase, ".npy"))
lf = load_array(get_with_prefix("force", lbase, ".npy")).reshape(rf.shape)
f_err.append(np.abs(lf - rf).mean((-1,-2)))
except FileNotFoundError as e:
print("Warning! force file not found:", e, file=sys.stderr)
return np.concatenate(c_res, 0) if c_res else None, \
np.concatenate(e_err, 0) if e_err else None, \
np.concatenate(f_err, 0) if f_err else None
def load_stat_grouped(systems, dump_dir=".",
with_conv=True, with_e=True, e_name="e_tot",
with_f=True, f_name="f_tot"):
systems = check_list(systems)
lbases = [get_sys_name(fl) for fl in systems]
c_res = e_err = f_err = None
if with_conv:
c_res = load_array(get_with_prefix("conv", dump_dir, ".npy"))
if with_e:
e_res = load_array(get_with_prefix(e_name, dump_dir, ".npy"))
e_lbl = np.concatenate([
load_array(get_with_prefix("energy", lb, ".npy")) for lb in lbases
], 0).reshape(-1,1)
e_err = e_lbl - e_res
if with_f:
f_res = load_array(get_with_prefix(f_name, dump_dir, ".npy"))
f_lbl = np.concatenate([
load_array(get_with_prefix("force", lb, ".npy")) for lb in lbases
], 0).reshape(f_res.shape)
f_err = f_lbl - f_res
return c_res, e_err, f_err
# Below are legacy tools, kept for old examples
def print_stats_per_sys(err, conv=None, train_idx=None, test_idx=None):
err = np.array(err).reshape(-1)
nsys = err.shape[0]
if conv is not None:
assert len(conv) == nsys
print(f'converged calculation: {np.sum(conv)} / {nsys} = {np.mean(conv):.3f}')
print(f'mean error: {err.mean()}')
print(f'mean absolute error: {np.abs(err).mean()}')
if train_idx is not None:
if test_idx is None:
test_idx = np.setdiff1d(np.arange(nsys), train_idx, assume_unique=True)
print(f' training: {np.abs(err[train_idx]).mean()}')
print(f' testing: {np.abs(err[test_idx]).mean()}')
print(f'mean absolute error after shift: {np.abs(err - err[train_idx].mean()).mean()}')
print(f' training: {np.abs(err[train_idx] - err[train_idx].mean()).mean()}')
print(f' testing: {np.abs(err[test_idx] - err[train_idx].mean()).mean()}')
def make_label(sys_dir, eref, fref=None):
eref = eref.reshape(-1,1)
nmol = eref.shape[0]
ehf = np.load(f'{sys_dir}/e_base.npy')
assert ehf.shape[0] == nmol
ecc = eref - ehf
np.save(f'{sys_dir}/l_e_delta.npy', ecc)
if fref is not None:
fref = fref.reshape(nmol, -1, 3)
fhf = np.load(f'{sys_dir}/f_base.npy')
assert fhf.shape == fref.shape
fcc = fref - fhf
np.save(f'{sys_dir}/l_f_delta.npy', fcc)
def collect_data(train_idx, test_idx=None,
sys_dir="results", ene_ref="e_ref.npy", force_ref=None,
dump_dir=".", verbose=True):
erefs = check_array(ene_ref).reshape(-1)
nsys = erefs.shape[0]
if nsys == 1 and "e_tot.npy" in os.listdir(sys_dir):
systems = [os.path.abspath(sys_dir)]
else:
systems = sorted(map(os.path.abspath, glob.glob(f"{sys_dir}/*")))
assert nsys == len(systems)
convs = []
ecfs = []
for sys_i, ec_i in zip(systems, erefs):
e0_i = np.load(os.path.join(sys_i, "e_base.npy"))
ecc_i = ec_i - e0_i
np.save(os.path.join(sys_i, "l_e_delta.npy"), ecc_i)
convs.append(np.load(os.path.join(sys_i, "conv.npy")))
ecfs.append(np.load(os.path.join(sys_i, "e_tot.npy")))
convs = np.array(convs).reshape(-1)
ecfs = np.array(ecfs).reshape(-1)
err = erefs - ecfs
if test_idx is None:
test_idx = np.setdiff1d(np.arange(nsys), train_idx, assume_unique=True)
if verbose:
print(sys_dir)
print_stats_per_sys(err, convs, train_idx, test_idx)
np.savetxt(f'{dump_dir}/train_paths.raw', np.array(systems)[train_idx], fmt='%s')
np.savetxt(f'{dump_dir}/test_paths.raw', np.array(systems)[test_idx], fmt='%s')
np.savetxt(f'{dump_dir}/e_result.out', np.stack([erefs, ecfs], axis=-1), header="real pred")
def collect_data_grouped(train_idx, test_idx=None,
sys_dir="results", ene_ref="e_ref.npy", force_ref=None,
dump_dir=".", append=True, verbose=True):
eref = check_array(ene_ref).reshape(-1, 1)
fref = check_array(force_ref)
nmol = eref.shape[0]
if not os.path.exists(f'{sys_dir}/e_tot.npy'):
concat_data(sys_dir, dump_dir=sys_dir)
ecf = np.load(f'{sys_dir}/e_tot.npy').reshape(-1, 1)
assert ecf.shape[0] == nmol, f"{ene_ref} ref size: {nmol}, {sys_dir} data size: {ecf.shape[0]}"
make_label(sys_dir, eref, fref)
# ehf = np.load(f'{sys_dir}/e_base.npy')
# np.save(f'{sys_dir}/l_e_delta.npy', eref - ehf)
err = eref - ecf
conv = np.load(f'{sys_dir}/conv.npy').reshape(-1)
if test_idx is None:
test_idx = np.setdiff1d(np.arange(nmol), train_idx, assume_unique=True)
if verbose:
print(sys_dir)
print_stats_per_sys(err.reshape(-1), conv.reshape(-1), train_idx, test_idx)
dd = [name for name in os.listdir(sys_dir) if ".npy" in name]
os.makedirs(f'{sys_dir}/train', exist_ok=True)
os.makedirs(f'{sys_dir}/test', exist_ok=True)
for d in dd:
np.save(f"{sys_dir}/train/{d}", np.load(f'{sys_dir}/{d}')[train_idx])
for d in dd:
np.save(f"{sys_dir}/test/{d}", np.load(f'{sys_dir}/{d}')[test_idx])
shutil.copy(f'{sys_dir}/system.raw', f'{sys_dir}/train')
shutil.copy(f'{sys_dir}/system.raw', f'{sys_dir}/test')
# np.savetxt(f'{dump_dir}/train_paths.raw', [os.path.abspath(f'{dump_dir}/train')], fmt='%s')
# np.savetxt(f'{dump_dir}/test_paths.raw', [os.path.abspath(f'{dump_dir}/test')], fmt='%s')
# Path(f'{dump_dir}/train_paths.raw').write_text(str(Path(f'{dump_dir}/train').absolute()))
# Path(f'{dump_dir}/test_paths.raw').write_text(str(Path(f'{dump_dir}/test').absolute()))
mode = "a" if append else "w"
with open(f'{dump_dir}/train_paths.raw', mode) as fp:
fp.write(os.path.abspath(f'{sys_dir}/train') + "\n")
with open(f'{dump_dir}/test_paths.raw', mode) as fp:
fp.write(os.path.abspath(f'{sys_dir}/test') + "\n")
if __name__ == "__main__":
from deepks.main import stats_cli as cli
cli()
\ No newline at end of file
__all__ = [
"task",
"workflow",
"job"
]
from .task import *
from .workflow import *
\ No newline at end of file
# this sub package is borrowed and modified from dpgen project
# https://github.com/deepmodeling/dpgen/tree/master/dpgen/dispatcher
\ No newline at end of file
import os,sys,time
from itertools import zip_longest
from .job_status import JobStatus
class Batch(object) :
def __init__ (self,
context,
uuid_names = True) :
self.context = context
self.uuid_names = uuid_names
if uuid_names:
self.upload_tag_name = '%s_tag_upload' % self.context.job_uuid
self.finish_tag_name = '%s_tag_finished' % self.context.job_uuid
self.sub_script_name = '%s.sub' % self.context.job_uuid
self.job_id_name = '%s_job_id' % self.context.job_uuid
else:
self.upload_tag_name = 'tag_upload'
self.finish_tag_name = 'tag_finished'
self.sub_script_name = 'run.sub'
self.job_id_name = 'job_id'
def check_status(self) :
raise NotImplementedError('abstract method check_status should be implemented by derived class')
def default_resources(self, res) :
raise NotImplementedError('abstract method sub_script_head should be implemented by derived class')
def sub_script_head(self, res) :
raise NotImplementedError('abstract method sub_script_head should be implemented by derived class')
def sub_script_cmd(self, cmd, arg, res):
raise NotImplementedError('abstract method sub_script_cmd should be implemented by derived class')
def exec_sub_script(self, script_str):
raise NotImplementedError('abstract method exec_sub_script should be implemented by derived class')
def check_before_sub(self, res):
pass
def sub_step_head(self, step_res=None):
return ""
def do_submit(self,
job_dirs,
cmds,
args = None,
res = None,
outlog = 'log',
errlog = 'err',
para_deg = 1,
para_res = None):
'''
submit a single job, assuming that no job is running there.
'''
if res is None:
res = self.default_resources(res)
self.check_before_sub(res)
script_str = self.sub_script(job_dirs, cmds, args=args, res=res,
outlog=outlog, errlog=errlog,
para_deg=para_deg, para_res=para_res)
self.exec_sub_script(script_str=script_str)
def sub_script(self,
job_dirs,
cmds,
args = None,
res = None,
outlog = 'log',
errlog = 'err',
para_deg = 1,
para_res = None) :
"""
make submit script
job_dirs(list): directories of jobs. size: n_job
cmds(list of list): commands to be executed in each dir. size: n_job x n_cmd
args(list of list): args of commands. size: n_job x n_cmd
can be None
res(dict): resources available
outlog(str): file name for output
errlog(str): file name for error
"""
res = self.default_resources(res)
ret = self.sub_script_head(res)
if not isinstance(job_dirs, list):
job_dirs = [job_dirs]
if not isinstance(cmds, list):
cmds = [cmds]
if not isinstance(cmds[0], list):
cmds = [cmds for d in job_dirs]
if args is None:
args = [['' for c in jcmd] for jcmd in cmds]
if not isinstance(para_res, list):
para_res = [para_res for d in job_dirs]
# loop over cmds
for jj, (jcmds, jargs) in enumerate(zip(zip_longest(*cmds), zip_longest(*args))):
# for one cmd per dir
ret += self._sub_script_inner(job_dirs,
jcmds,
jargs,
res,
jj,
outlog=outlog,
errlog=errlog,
para_deg=para_deg,
para_res=para_res)
ret += '\nwait\n'
ret += '\ntouch %s\n' % self.finish_tag_name
return ret
def submit(self,
job_dirs,
cmds,
args = None,
res = None,
restart = False,
sleep = 0,
outlog = 'log',
errlog = 'err',
para_deg = 1,
para_res = None):
if restart:
status = self.check_status()
if status in [ JobStatus.unsubmitted, JobStatus.unknown, JobStatus.terminated ]:
# dlog.debug('task restart point !!!')
self.do_submit(job_dirs, cmds, args, res, outlog=outlog, errlog=errlog)
elif status==JobStatus.waiting:
pass
# dlog.debug('task is waiting')
elif status==JobStatus.running:
pass
# dlog.debug('task is running')
elif status==JobStatus.finished:
pass
# dlog.debug('task is finished')
else:
raise RuntimeError('unknow job status, must be wrong')
else:
# dlog.debug('new task')
self.do_submit(job_dirs, cmds, args, res,
outlog=outlog, errlog=errlog,
para_deg=para_deg, para_res=para_res)
if res is not None:
sleep = res.get('submit_wait_time', sleep)
time.sleep(sleep) # For preventing the crash of the tasks while submitting
def check_finish_tag(self) :
return self.context.check_file_exists(self.finish_tag_name)
def _sub_script_inner(self,
job_dirs,
cmds,
args,
res,
step = 0,
outlog = 'log',
errlog = 'err',
para_deg = 1,
para_res = None) :
# job_dirs: a list of dirs
# cmds: a list of cmds, `cmds[i]` will be run in directory `job_dirs[i]`
# args: a list of args, `args[i]` will be passed to `cmd[i]` in `job_dirs[i]`
# res: common resources to be used
# para_res: a list of resources for each cmd, used to make sub-steps
try:
allow_failure = res['allow_failure']
except:
allow_failure = False
ret = ""
# additional checker for ingroup parallel
if para_deg > 1:
ret += 'pids=""; FAIL=0\n\n'
# iter over job dirs
for idx, (idir, icmd, iarg) in enumerate(zip(job_dirs, cmds, args)) :
ret += 'cd %s\n' % idir
ret += 'test $? -ne 0 && exit\n'
# check if finished
sub = "\n"
sub += 'if [ ! -f tag_%d_finished ] ;then\n' % step
# build command
tmp_cmd = self.sub_script_cmd(icmd, iarg, res)
if para_deg > 1 and not res.get("with_mpi", False) and para_res:
tmp_cmd = self.sub_step_head(para_res[idx]) + tmp_cmd
sub += ' %s 1>> %s 2>> %s \n' % (tmp_cmd, outlog, errlog)
# check failure
if not allow_failure:
sub += ' if test $? -ne 0; then exit 1; else touch tag_%d_finished; fi \n' % step
else :
sub += ' if test $? -ne 0; then touch tag_failure_%d; fi \n' % step
sub += ' touch tag_%d_finished \n' % step
sub += 'fi\n'
# if parallel put step into subshell
if para_deg > 1:
sub = f'\n({sub})&\n'
sub += 'pids+=" $!"\n'
sub += "\n"
ret += sub
ret += 'cd %s\n' % self.context.remote_root
ret += 'test $? -ne 0 && exit\n'
if para_deg > 1 and ((idx+1) % para_deg == 0 or idx + 1 == len(job_dirs)):
ret += '\n\nfor p in $pids; do wait $p || let "FAIL+=1"; done\n'
ret += 'test $FAIL -ne 0 && exit\n'
ret += 'pids=""; FAIL=0\n'
ret += "\n\n"
return ret
import os,sys,time,random,json,glob
from hashlib import sha1
from copy import deepcopy
from .local_context import LocalSession
from .local_context import LocalContext
from .lazy_local_context import LazyLocalContext
from .ssh_context import SSHSession
from .ssh_context import SSHContext
from .slurm import Slurm
from .shell import Shell
from .job_status import JobStatus
def _split_tasks(tasks,
group_size):
ntasks = len(tasks)
ngroups = ntasks // group_size
if ngroups * group_size < ntasks:
ngroups += 1
chunks = [[]] * ngroups
tot = 0
for ii in range(ngroups) :
chunks[ii] = (tasks[ii::ngroups])
tot += len(chunks[ii])
assert(tot == len(tasks))
return chunks
def _hash_task_chunk(task_chunk):
task_chunk_str = '+'.join(t['_label'] for t in task_chunk)
task_hash = sha1(task_chunk_str.encode('utf-8')).hexdigest()
return task_hash
class Dispatcher(object):
def __init__ (self,
context='local',
batch='slurm',
remote_profile=None,
job_record = 'jr.json'):
if remote_profile is None:
assert 'local' in context
context = 'lazy-local'
self.remote_profile = remote_profile
if context == 'local':
self.session = LocalSession(remote_profile)
self.context_fn = LocalContext
self.uuid_names = False
elif context == 'lazy-local':
self.session = None
self.context_fn = LazyLocalContext
self.uuid_names = True
elif context == 'ssh':
self.session = SSHSession(remote_profile)
self.context_fn = SSHContext
self.uuid_names = False
else :
raise RuntimeError('unknown context')
if batch == 'slurm':
self.batch_fn = Slurm
elif batch == 'shell':
self.batch_fn = Shell
else :
raise RuntimeError('unknown batch ' + batch)
self.jrname = job_record
# customize deepcopy to make sure copied instances share same session
def __deepcopy__(self, memo):
d = id(self)
if d in memo:
return memo[d]
cls = self.__class__
result = cls.__new__(cls)
memo[d] = result
for k, v in self.__dict__.items():
if k == "session":
setattr(result, k, v)
else:
setattr(result, k, deepcopy(v, memo))
return result
def run_jobs(self,
tasks,
group_size=1,
para_deg=1,
work_path='.',
resources=None,
forward_task_deref=True,
forward_common_files=[],
backward_common_files=[],
mark_failure = False,
outlog='log',
errlog='err') :
# tasks is a list of dict [t1, t2, t3, ...]
# with each element t = {
# 'dir': job_dir,
# 'cmds': list of cmds,
# 'forward_files': list of files to be forward,
# 'backward_files': list of files to be pull back,
# 'resources': dict of resources used for the substep, when para_deg > 1
# }
job_handler = self.submit_jobs(
tasks,
group_size,
para_deg,
work_path,
resources,
forward_task_deref,
forward_common_files,
backward_common_files,
outlog,
errlog
)
while not self.all_finished(job_handler, mark_failure) :
time.sleep(60)
# delete path map file when job finish
# _pmap.delete()
def submit_jobs(self,
tasks,
group_size=1,
para_deg=1,
work_path='.',
resources=None,
forward_task_deref=True,
forward_common_files=[],
backward_common_files=[],
outlog='log',
errlog='err') :
if not isinstance(tasks, list):
tasks = [tasks]
for task in tasks:
assert isinstance(task, dict)
if isinstance(task['cmds'], str):
task['cmds'] = [task['cmds']]
for field in ('forward_files', 'backward_files'):
if task.get(field) is None:
task[field] = []
task['_label'] = f'{{dir:{task["dir"]}, cmds:{task["cmds"]}}}'
task_chunks = _split_tasks(tasks, group_size)
task_hashes = [_hash_task_chunk(chunk) for chunk in task_chunks]
job_record = JobRecord(work_path, task_chunks, fname = self.jrname)
job_record.dump()
nchunks = len(task_chunks)
job_list = []
for ii in range(nchunks) :
cur_chunk = task_chunks[ii]
cur_hash = task_hashes[ii]
if not job_record.check_finished(cur_hash):
# chunk is not finished
# check if chunk is submitted
submitted = job_record.check_submitted(cur_hash)
if not submitted:
job_uuid = None
else :
job_uuid = job_record.get_uuid(cur_hash)
# dlog.debug("load uuid %s for chunk %s" % (job_uuid, cur_hash))
# communication context, bach system
context = self.context_fn(work_path, self.session, job_uuid)
batch = self.batch_fn(context, uuid_names = self.uuid_names)
rjob = {'context':context, 'batch':batch}
# upload files
if (not isinstance(rjob['context'], LazyLocalContext) and
not rjob['context'].check_file_exists(rjob['batch'].upload_tag_name)):
rjob['context'].upload('.',
forward_common_files)
for task in cur_chunk:
rjob['context'].upload([task['dir']],
task['forward_files'],
dereference = forward_task_deref)
rjob['context'].write_file(rjob['batch'].upload_tag_name, '')
# submit new or recover old submission
dirs = [task['dir'] for task in cur_chunk]
commands = [task['cmds'] for task in cur_chunk]
para_res = [task['resources'] for task in cur_chunk]
if not submitted:
rjob['batch'].submit(dirs, commands, res = resources,
outlog=outlog, errlog=errlog,
para_deg=para_deg, para_res=para_res)
job_uuid = rjob['context'].job_uuid
# dlog.debug('assigned uuid %s for %s ' % (job_uuid, task_chunks_str[ii]))
print('# new submission of %s for chunk %s' % (job_uuid, cur_hash))
else:
rjob['batch'].submit(dirs, commands, res = resources,
outlog=outlog, errlog=errlog,
para_deg=para_deg, para_res=para_res,
restart = True)
print('# restart from old submission %s for chunk %s' % (job_uuid, cur_hash))
# record job and its remote context
job_list.append(rjob)
ip = None
instance_id = None
if (self.remote_profile is not None and
'cloud_resources' in self.remote_profile):
ip = self.remote_profile['hostname']
instance_id = self.remote_profile['instance_id']
job_record.record_remote_context(cur_hash,
context.local_root,
context.remote_root,
job_uuid,
ip,
instance_id)
job_record.dump()
else :
# finished job, append a None to list
job_list.append(None)
assert(len(job_list) == nchunks)
job_handler = {
'task_chunks': task_chunks,
'job_list': job_list,
'job_record': job_record,
'resources': resources,
'para_deg': para_deg,
'outlog': outlog,
'errlog': errlog,
'backward_common_files': backward_common_files
}
return job_handler
def all_finished(self,
job_handler,
mark_failure,
clean=True):
task_chunks = job_handler['task_chunks']
task_hashes = [_hash_task_chunk(chunk) for chunk in task_chunks]
job_list = job_handler['job_list']
job_record = job_handler['job_record']
tag_failure_list = ['tag_failure_%d' % ii
for ii in range(max(len(t['cmds']) for c in task_chunks for t in c))]
resources = job_handler['resources']
para_deg = job_handler['para_deg']
outlog = job_handler['outlog']
errlog = job_handler['errlog']
backward_common_files = job_handler['backward_common_files']
# dlog.debug('checking jobs')
nchunks = len(task_chunks)
for idx in range(nchunks) :
cur_chunk = task_chunks[idx]
cur_hash = task_hashes[idx]
rjob = job_list[idx]
if not job_record.check_finished(cur_hash) :
# chunk not finished according to record
status = rjob['batch'].check_status()
job_uuid = rjob['context'].job_uuid
# dlog.debug('checked job %s' % job_uuid)
if status == JobStatus.terminated :
job_record.increase_nfail(cur_hash)
if job_record.check_nfail(cur_hash) > 3:
raise RuntimeError('Job %s failed for more than 3 times' % job_uuid)
print('# job %s terminated, submit again'% job_uuid)
# dlog.debug('try %s times for %s'% (job_record.check_nfail(cur_hash), job_uuid))
dirs = [task['dir'] for task in cur_chunk]
commands = [task['cmds'] for task in cur_chunk]
para_res = [task['resources'] for task in cur_chunk]
rjob['batch'].submit(dirs, commands, res = resources,
outlog=outlog, errlog=errlog,
para_deg=para_deg, para_res=para_res,
restart = True)
elif status == JobStatus.finished :
print('# job %s finished' % job_uuid)
rjob['context'].download('.', backward_common_files)
for task in cur_chunk:
if mark_failure:
rjob['context'].download([task['dir']], tag_failure_list,
check_exists = True, mark_failure = False)
rjob['context'].download([task['dir']], task['backward_files'],
check_exists = True, mark_failure = True)
else:
rjob['context'].download([task['dir']], task['backward_files'])
if clean:
rjob['context'].clean()
job_record.record_finish(cur_hash)
job_record.dump()
job_record.dump()
return job_record.check_all_finished()
class JobRecord(object):
def __init__ (self, path, task_chunks, fname = 'job_record.json', ip=None):
self.path = os.path.abspath(path)
self.fname = os.path.join(self.path, fname)
self.task_chunks = task_chunks
if not os.path.exists(self.fname):
self._new_record()
else :
self.load()
def check_submitted(self, chunk_hash):
self.valid_hash(chunk_hash)
return self.record[chunk_hash]['context'] is not None
def record_remote_context(self,
chunk_hash,
local_root,
remote_root,
job_uuid,
ip=None,
instance_id=None):
self.valid_hash(chunk_hash)
# self.record[chunk_hash]['context'] = [local_root, remote_root, job_uuid, ip, instance_id]
self.record[chunk_hash]['context'] = {}
self.record[chunk_hash]['context']['local_root'] = local_root
self.record[chunk_hash]['context']['remote_root'] = remote_root
self.record[chunk_hash]['context']['job_uuid'] = job_uuid
self.record[chunk_hash]['context']['ip'] = ip
self.record[chunk_hash]['context']['instance_id'] = instance_id
def get_uuid(self, chunk_hash):
self.valid_hash(chunk_hash)
return self.record[chunk_hash]['context']['job_uuid']
def check_finished(self, chunk_hash):
self.valid_hash(chunk_hash)
return self.record[chunk_hash]['finished']
def check_all_finished(self):
flist = [self.record[ii]['finished'] for ii in self.record]
return all(flist)
def record_finish(self, chunk_hash):
self.valid_hash(chunk_hash)
self.record[chunk_hash]['finished'] = True
def check_nfail(self,chunk_hash):
self.valid_hash(chunk_hash)
return self.record[chunk_hash]['fail_count']
def increase_nfail(self,chunk_hash):
self.valid_hash(chunk_hash)
self.record[chunk_hash]['fail_count'] += 1
def valid_hash(self, chunk_hash):
if chunk_hash not in self.record.keys():
raise RuntimeError('chunk hash %s not in record, a invalid record may be used, please check file %s' % (chunk_hash, self.fname))
def dump(self):
with open(self.fname, 'w') as fp:
json.dump(self.record, fp, indent=4)
def load(self):
with open(self.fname) as fp:
self.record = json.load(fp)
def _new_record(self):
task_hashes = [_hash_task_chunk(chunk) for chunk in self.task_chunks]
self.record = {}
for ii,jj in zip(task_hashes, self.task_chunks):
self.record[ii] = {
'context': None,
'finished': False,
'fail_count': 0,
'task_chunk': [{"dir": t["dir"],
"cmds":t["cmds"]} for t in jj],
}
from enum import Enum
class JobStatus (Enum) :
unsubmitted = 1
waiting = 2
running = 3
terminated = 4
finished = 5
completing = 6
unknown = 100
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment