Unverified Commit 1be78103 authored by zcxzcx1's avatar zcxzcx1 Committed by GitHub
Browse files

Add files via upload

parent f675ef76
import json
import os
from pathlib import Path
from typing import List, Optional
import pandas as pd
import pytest
import torch
from ase import build
from mace import data as mace_data
from mace.calculators.foundations_models import mace_mp
from mace.tools import AtomicNumberTable, torch_geometric, torch_tools
def is_mace_full_bench():
return os.environ.get("MACE_FULL_BENCH", "0") == "1"
@pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda is not available")
@pytest.mark.benchmark(warmup=True, warmup_iterations=4, min_rounds=8)
@pytest.mark.parametrize("size", (3, 5, 7, 9))
@pytest.mark.parametrize("dtype", ["float32", "float64"])
@pytest.mark.parametrize("compile_mode", [None, "default"])
def test_inference(
benchmark, size: int, dtype: str, compile_mode: Optional[str], device: str = "cuda"
):
if not is_mace_full_bench() and compile_mode is not None:
pytest.skip("Skipping long running benchmark, set MACE_FULL_BENCH=1 to execute")
with torch_tools.default_dtype(dtype):
model = load_mace_mp_medium(dtype, compile_mode, device)
batch = create_batch(size, model, device)
log_bench_info(benchmark, dtype, compile_mode, batch)
def func():
torch.cuda.synchronize()
model(batch, training=compile_mode is not None, compute_force=True)
torch.cuda.empty_cache()
benchmark(func)
def load_mace_mp_medium(dtype, compile_mode, device):
calc = mace_mp(
model="medium",
default_dtype=dtype,
device=device,
compile_mode=compile_mode,
fullgraph=False,
)
model = calc.models[0].to(device)
return model
def create_batch(size: int, model: torch.nn.Module, device: str) -> dict:
cutoff = model.r_max.item()
z_table = AtomicNumberTable([int(z) for z in model.atomic_numbers])
atoms = build.bulk("C", "diamond", a=3.567, cubic=True)
atoms = atoms.repeat((size, size, size))
config = mace_data.config_from_atoms(atoms)
dataset = [mace_data.AtomicData.from_config(config, z_table=z_table, cutoff=cutoff)]
data_loader = torch_geometric.dataloader.DataLoader(
dataset=dataset,
batch_size=1,
shuffle=False,
drop_last=False,
)
batch = next(iter(data_loader))
batch.to(device)
return batch.to_dict()
def log_bench_info(benchmark, dtype, compile_mode, batch):
benchmark.extra_info["num_atoms"] = int(batch["positions"].shape[0])
benchmark.extra_info["num_edges"] = int(batch["edge_index"].shape[1])
benchmark.extra_info["dtype"] = dtype
benchmark.extra_info["is_compiled"] = compile_mode is not None
benchmark.extra_info["device_name"] = torch.cuda.get_device_name()
def process_benchmark_file(bench_file: Path) -> pd.DataFrame:
with open(bench_file, "r", encoding="utf-8") as f:
bench_data = json.load(f)
records = []
for bench in bench_data["benchmarks"]:
record = {**bench["extra_info"], **bench["stats"]}
records.append(record)
result_df = pd.DataFrame(records)
result_df["ns/day (1 fs/step)"] = 0.086400 / result_df["median"]
result_df["Steps per day"] = result_df["ops"] * 86400
columns = [
"num_atoms",
"num_edges",
"dtype",
"is_compiled",
"device_name",
"median",
"Steps per day",
"ns/day (1 fs/step)",
]
return result_df[columns]
def read_bench_results(result_files: List[str]) -> pd.DataFrame:
return pd.concat([process_benchmark_file(Path(f)) for f in result_files])
if __name__ == "__main__":
# Print to stdout a csv of the benchmark metrics
import subprocess
result = subprocess.run(
["pytest-benchmark", "list"], capture_output=True, text=True, check=True
)
bench_files = result.stdout.strip().split("\n")
bench_results = read_bench_results(bench_files)
print(bench_results.to_csv(index=False))
This diff is collapsed.
from e3nn import o3
from mace.tools import cg
def test_U_matrix():
irreps_in = o3.Irreps("1x0e + 1x1o + 1x2e")
irreps_out = o3.Irreps("1x0e + 1x1o")
u_matrix = cg.U_matrix_real(
irreps_in=irreps_in, irreps_out=irreps_out, correlation=3
)[-1]
assert u_matrix.shape == (3, 9, 9, 9, 21)
import os
from functools import wraps
from typing import Callable
import numpy as np
import pytest
import torch
import torch.nn.functional as F
from e3nn import o3
from torch.testing import assert_close
from mace import data, modules, tools
from mace.tools import compile as mace_compile
from mace.tools import torch_geometric
table = tools.AtomicNumberTable([6])
atomic_energies = np.array([1.0], dtype=float)
cutoff = 5.0
def create_mace(device: str, seed: int = 1702):
torch_geometric.seed_everything(seed)
model_config = {
"r_max": cutoff,
"num_bessel": 8,
"num_polynomial_cutoff": 6,
"max_ell": 3,
"interaction_cls": modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
"interaction_cls_first": modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
"num_interactions": 2,
"num_elements": 1,
"hidden_irreps": o3.Irreps("128x0e + 128x1o"),
"MLP_irreps": o3.Irreps("16x0e"),
"gate": F.silu,
"atomic_energies": atomic_energies,
"avg_num_neighbors": 8,
"atomic_numbers": table.zs,
"correlation": 3,
"radial_type": "bessel",
"atomic_inter_scale": 1.0,
"atomic_inter_shift": 0.0,
}
model = modules.ScaleShiftMACE(**model_config)
return model.to(device)
def create_batch(device: str):
from ase import build
size = 2
atoms = build.bulk("C", "diamond", a=3.567, cubic=True)
atoms_list = [atoms.repeat((size, size, size))]
print("Number of atoms", len(atoms_list[0]))
configs = [data.config_from_atoms(atoms) for atoms in atoms_list]
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[
data.AtomicData.from_config(config, z_table=table, cutoff=cutoff)
for config in configs
],
batch_size=1,
shuffle=False,
drop_last=False,
)
batch = next(iter(data_loader))
batch = batch.to(device)
batch = batch.to_dict()
return batch
def time_func(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
torch._inductor.cudagraph_mark_step_begin() # pylint: disable=W0212
outputs = func(*args, **kwargs)
torch.cuda.synchronize()
return outputs
return wrapper
@pytest.fixture(params=[torch.float32, torch.float64], ids=["fp32", "fp64"])
def default_dtype(request):
with tools.torch_tools.default_dtype(request.param):
yield torch.get_default_dtype()
# skip if on windows
@pytest.mark.skipif(os.name == "nt", reason="Not supported on Windows")
@pytest.mark.parametrize("device", ["cpu", "cuda"])
def test_mace(device, default_dtype): # pylint: disable=W0621
print(f"using default dtype = {default_dtype}")
if device == "cuda" and not torch.cuda.is_available():
pytest.skip(reason="cuda is not available")
model_defaults = create_mace(device)
tmp_model = mace_compile.prepare(create_mace)(device)
model_compiled = torch.compile(tmp_model, mode="default")
batch = create_batch(device)
output1 = model_defaults(batch, training=True)
output2 = model_compiled(batch, training=True)
assert_close(output1["energy"], output2["energy"])
assert_close(output1["forces"], output2["forces"])
@pytest.mark.skipif(os.name == "nt", reason="Not supported on Windows")
@pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda is not available")
def test_eager_benchmark(benchmark, default_dtype): # pylint: disable=W0621
print(f"using default dtype = {default_dtype}")
batch = create_batch("cuda")
model = create_mace("cuda")
model = time_func(model)
benchmark(model, batch, training=True)
@pytest.mark.skipif(os.name == "nt", reason="Not supported on Windows")
@pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda is not available")
@pytest.mark.parametrize("compile_mode", ["default", "reduce-overhead", "max-autotune"])
@pytest.mark.parametrize("enable_amp", [False, True], ids=["fp32", "mixed"])
def test_compile_benchmark(benchmark, compile_mode, enable_amp):
if enable_amp:
pytest.skip(reason="autocast compiler assertion aten.slice_scatter.default")
with tools.torch_tools.default_dtype(torch.float32):
batch = create_batch("cuda")
torch.compiler.reset()
model = mace_compile.prepare(create_mace)("cuda")
model = torch.compile(model, mode=compile_mode)
model = time_func(model)
with torch.autocast("cuda", enabled=enable_amp):
benchmark(model, batch, training=True)
@pytest.mark.skipif(os.name == "nt", reason="Not supported on Windows")
@pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda is not available")
def test_graph_breaks():
import torch._dynamo as dynamo
batch = create_batch("cuda")
model = mace_compile.prepare(create_mace)("cuda")
explanation = dynamo.explain(model)(batch, training=False)
# these clutter the output but might be useful for investigating graph breaks
explanation.ops_per_graph = None
explanation.out_guards = None
print(explanation)
assert explanation.graph_break_count == 0
# pylint: disable=wrong-import-position
import os
from copy import deepcopy
from typing import Any, Dict
os.environ["TORCH_FORCE_NO_WEIGHTS_ONLY_LOAD"] = "1"
import pytest
import torch
import torch.nn.functional as F
from e3nn import o3
from mace import data, modules, tools
from mace.cli.convert_cueq_e3nn import run as run_cueq_to_e3nn
from mace.cli.convert_e3nn_cueq import run as run_e3nn_to_cueq
from mace.tools import torch_geometric
try:
import cuequivariance as cue # pylint: disable=unused-import
CUET_AVAILABLE = True
except ImportError:
CUET_AVAILABLE = False
CUDA_AVAILABLE = torch.cuda.is_available()
@pytest.mark.skipif(not CUET_AVAILABLE, reason="cuequivariance not installed")
class TestCueq:
@pytest.fixture
def model_config(self, interaction_cls_first, hidden_irreps) -> Dict[str, Any]:
table = tools.AtomicNumberTable([6])
return {
"r_max": 5.0,
"num_bessel": 8,
"num_polynomial_cutoff": 6,
"max_ell": 3,
"interaction_cls": modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
"interaction_cls_first": interaction_cls_first,
"num_interactions": 2,
"num_elements": 1,
"hidden_irreps": hidden_irreps,
"MLP_irreps": o3.Irreps("16x0e"),
"gate": F.silu,
"atomic_energies": torch.tensor([1.0]),
"avg_num_neighbors": 8,
"atomic_numbers": table.zs,
"correlation": 3,
"radial_type": "bessel",
"atomic_inter_scale": 1.0,
"atomic_inter_shift": 0.0,
}
@pytest.fixture
def batch(self, device: str, default_dtype: torch.dtype) -> Dict[str, torch.Tensor]:
from ase import build
torch.set_default_dtype(default_dtype)
table = tools.AtomicNumberTable([6])
atoms = build.bulk("C", "diamond", a=3.567, cubic=True)
import numpy as np
displacement = np.random.uniform(-0.1, 0.1, size=atoms.positions.shape)
atoms.positions += displacement
atoms_list = [atoms.repeat((2, 2, 2))]
configs = [data.config_from_atoms(atoms) for atoms in atoms_list]
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[
data.AtomicData.from_config(config, z_table=table, cutoff=5.0)
for config in configs
],
batch_size=1,
shuffle=False,
drop_last=False,
)
batch = next(iter(data_loader))
return batch.to(device).to_dict()
@pytest.mark.parametrize(
"device",
["cpu"] + (["cuda"] if CUDA_AVAILABLE else []),
)
@pytest.mark.parametrize(
"interaction_cls_first",
[
modules.interaction_classes["RealAgnosticResidualInteractionBlock"],
modules.interaction_classes["RealAgnosticInteractionBlock"],
modules.interaction_classes["RealAgnosticDensityInteractionBlock"],
],
)
@pytest.mark.parametrize(
"hidden_irreps",
[
o3.Irreps("32x0e + 32x1o"),
o3.Irreps("32x0e + 32x1o + 32x2e"),
o3.Irreps("32x0e"),
],
)
@pytest.mark.parametrize("default_dtype", [torch.float32, torch.float64])
def test_bidirectional_conversion(
self,
model_config: Dict[str, Any],
batch: Dict[str, torch.Tensor],
device: str,
default_dtype: torch.dtype,
):
if device == "cuda" and not CUDA_AVAILABLE:
pytest.skip("CUDA not available")
torch.manual_seed(42)
# Create original E3nn model
model_e3nn = modules.ScaleShiftMACE(**model_config).to(device)
# Convert E3nn to CuEq
model_cueq = run_e3nn_to_cueq(model_e3nn).to(device)
# Convert CuEq back to E3nn
model_e3nn_back = run_cueq_to_e3nn(model_cueq).to(device)
# Test forward pass equivalence
out_e3nn = model_e3nn(deepcopy(batch), training=True, compute_stress=True)
out_cueq = model_cueq(deepcopy(batch), training=True, compute_stress=True)
out_e3nn_back = model_e3nn_back(
deepcopy(batch), training=True, compute_stress=True
)
# Check outputs match for both conversions
torch.testing.assert_close(out_e3nn["energy"], out_cueq["energy"])
torch.testing.assert_close(out_cueq["energy"], out_e3nn_back["energy"])
torch.testing.assert_close(out_e3nn["forces"], out_cueq["forces"])
torch.testing.assert_close(out_cueq["forces"], out_e3nn_back["forces"])
torch.testing.assert_close(out_e3nn["stress"], out_cueq["stress"])
torch.testing.assert_close(out_cueq["stress"], out_e3nn_back["stress"])
# Test backward pass equivalence
loss_e3nn = out_e3nn["energy"].sum()
loss_cueq = out_cueq["energy"].sum()
loss_e3nn_back = out_e3nn_back["energy"].sum()
loss_e3nn.backward()
loss_cueq.backward()
loss_e3nn_back.backward()
# Compare gradients for all conversions
tol = 1e-4 if default_dtype == torch.float32 else 1e-7
def print_gradient_diff(name1, p1, name2, p2, conv_type):
if p1.grad is not None and p1.grad.shape == p2.grad.shape:
if name1.split(".", 2)[:2] == name2.split(".", 2)[:2]:
error = torch.abs(p1.grad - p2.grad)
print(
f"{conv_type} - Parameter {name1}/{name2}, Max error: {error.max()}"
)
torch.testing.assert_close(p1.grad, p2.grad, atol=tol, rtol=tol)
# E3nn to CuEq gradients
for (name_e3nn, p_e3nn), (name_cueq, p_cueq) in zip(
model_e3nn.named_parameters(), model_cueq.named_parameters()
):
print_gradient_diff(name_e3nn, p_e3nn, name_cueq, p_cueq, "E3nn->CuEq")
# CuEq to E3nn gradients
for (name_cueq, p_cueq), (name_e3nn_back, p_e3nn_back) in zip(
model_cueq.named_parameters(), model_e3nn_back.named_parameters()
):
print_gradient_diff(
name_cueq, p_cueq, name_e3nn_back, p_e3nn_back, "CuEq->E3nn"
)
# Full circle comparison (E3nn -> E3nn)
for (name_e3nn, p_e3nn), (name_e3nn_back, p_e3nn_back) in zip(
model_e3nn.named_parameters(), model_e3nn_back.named_parameters()
):
print_gradient_diff(
name_e3nn, p_e3nn, name_e3nn_back, p_e3nn_back, "Full circle"
)
from copy import deepcopy
from pathlib import Path
import ase.build
import h5py
import numpy as np
import torch
from mace.data import (
AtomicData,
Configuration,
HDF5Dataset,
config_from_atoms,
get_neighborhood,
save_configurations_as_HDF5,
)
from mace.tools import AtomicNumberTable, torch_geometric
mace_path = Path(__file__).parent.parent
class TestAtomicData:
config = Configuration(
atomic_numbers=np.array([8, 1, 1]),
positions=np.array(
[
[0.0, -2.0, 0.0],
[1.0, 0.0, 0.0],
[0.0, 1.0, 0.0],
]
),
properties={
"forces": np.array(
[
[0.0, -1.3, 0.0],
[1.0, 0.2, 0.0],
[0.0, 1.1, 0.3],
]
),
"energy": -1.5,
},
property_weights={
"forces": 1.0,
"energy": 1.0,
},
)
config_2 = deepcopy(config)
config_2.positions = config.positions + 0.01
table = AtomicNumberTable([1, 8])
def test_atomic_data(self):
data = AtomicData.from_config(self.config, z_table=self.table, cutoff=3.0)
assert data.edge_index.shape == (2, 4)
assert data.forces.shape == (3, 3)
assert data.node_attrs.shape == (3, 2)
def test_data_loader(self):
data1 = AtomicData.from_config(self.config, z_table=self.table, cutoff=3.0)
data2 = AtomicData.from_config(self.config, z_table=self.table, cutoff=3.0)
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[data1, data2],
batch_size=2,
shuffle=True,
drop_last=False,
)
for batch in data_loader:
assert batch.batch.shape == (6,)
assert batch.edge_index.shape == (2, 8)
assert batch.shifts.shape == (8, 3)
assert batch.positions.shape == (6, 3)
assert batch.node_attrs.shape == (6, 2)
assert batch.energy.shape == (2,)
assert batch.forces.shape == (6, 3)
def test_to_atomic_data_dict(self):
data1 = AtomicData.from_config(self.config, z_table=self.table, cutoff=3.0)
data2 = AtomicData.from_config(self.config, z_table=self.table, cutoff=3.0)
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[data1, data2],
batch_size=2,
shuffle=True,
drop_last=False,
)
for batch in data_loader:
batch_dict = batch.to_dict()
assert batch_dict["batch"].shape == (6,)
assert batch_dict["edge_index"].shape == (2, 8)
assert batch_dict["shifts"].shape == (8, 3)
assert batch_dict["positions"].shape == (6, 3)
assert batch_dict["node_attrs"].shape == (6, 2)
assert batch_dict["energy"].shape == (2,)
assert batch_dict["forces"].shape == (6, 3)
def test_hdf5_dataloader(self):
datasets = [self.config, self.config_2] * 5
# get path of the mace package
with h5py.File(str(mace_path) + "test.h5", "w") as f:
save_configurations_as_HDF5(datasets, 0, f)
train_dataset = HDF5Dataset(
str(mace_path) + "test.h5", z_table=self.table, r_max=3.0
)
train_loader = torch_geometric.dataloader.DataLoader(
dataset=train_dataset,
batch_size=2,
shuffle=False,
drop_last=False,
)
batch_count = 0
for batch in train_loader:
batch_count += 1
assert batch.batch.shape == (6,)
assert batch.edge_index.shape == (2, 8)
assert batch.shifts.shape == (8, 3)
assert batch.positions.shape == (6, 3)
assert batch.node_attrs.shape == (6, 2)
assert batch.energy.shape == (2,)
assert batch.forces.shape == (6, 3)
print(batch_count, len(train_loader), len(train_dataset))
assert batch_count == len(train_loader) == len(train_dataset) / 2
train_loader_direct = torch_geometric.dataloader.DataLoader(
dataset=[
AtomicData.from_config(config, z_table=self.table, cutoff=3.0)
for config in datasets
],
batch_size=2,
shuffle=False,
drop_last=False,
)
for batch_direct, batch in zip(train_loader_direct, train_loader):
assert torch.all(batch_direct.edge_index == batch.edge_index)
assert torch.all(batch_direct.shifts == batch.shifts)
assert torch.all(batch_direct.positions == batch.positions)
assert torch.all(batch_direct.node_attrs == batch.node_attrs)
assert torch.all(batch_direct.energy == batch.energy)
assert torch.all(batch_direct.forces == batch.forces)
class TestNeighborhood:
def test_basic(self):
positions = np.array(
[
[-1.0, 0.0, 0.0],
[+0.0, 0.0, 0.0],
[+1.0, 0.0, 0.0],
]
)
indices, shifts, unit_shifts, _ = get_neighborhood(positions, cutoff=1.5)
assert indices.shape == (2, 4)
assert shifts.shape == (4, 3)
assert unit_shifts.shape == (4, 3)
def test_signs(self):
positions = np.array(
[
[+0.5, 0.5, 0.0],
[+1.0, 1.0, 0.0],
]
)
cell = np.array([[2.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]])
edge_index, shifts, unit_shifts, _ = get_neighborhood(
positions, cutoff=3.5, pbc=(True, False, False), cell=cell
)
num_edges = 10
assert edge_index.shape == (2, num_edges)
assert shifts.shape == (num_edges, 3)
assert unit_shifts.shape == (num_edges, 3)
# Based on mir-group/nequip
def test_periodic_edge():
atoms = ase.build.bulk("Cu", "fcc")
dist = np.linalg.norm(atoms.cell[0]).item()
config = config_from_atoms(atoms)
edge_index, shifts, _, _ = get_neighborhood(
config.positions, cutoff=1.05 * dist, pbc=(True, True, True), cell=config.cell
)
sender, receiver = edge_index
vectors = (
config.positions[receiver] - config.positions[sender] + shifts
) # [n_edges, 3]
assert vectors.shape == (12, 3) # 12 neighbors in close-packed bulk
assert np.allclose(
np.linalg.norm(vectors, axis=-1),
dist,
)
def test_half_periodic():
atoms = ase.build.fcc111("Al", size=(3, 3, 1), vacuum=0.0)
assert all(atoms.pbc == (True, True, False))
config = config_from_atoms(atoms) # first shell dist is 2.864A
edge_index, shifts, _, _ = get_neighborhood(
config.positions, cutoff=2.9, pbc=(True, True, False), cell=config.cell
)
sender, receiver = edge_index
vectors = (
config.positions[receiver] - config.positions[sender] + shifts
) # [n_edges, 3]
# Check number of neighbors:
_, neighbor_count = np.unique(edge_index[0], return_counts=True)
assert (neighbor_count == 6).all() # 6 neighbors
# Check not periodic in z
assert np.allclose(
vectors[:, 2],
np.zeros(vectors.shape[0]),
)
import ase.io as aio
import numpy as np
import pytest
from ase import Atoms
from ase.build import molecule
from mace.cli.fine_tuning_select import (
FilteringType,
SelectionSettings,
SubselectType,
_filter_pretraining_data,
_load_descriptors,
_maybe_save_descriptors,
filter_atoms,
select_samples,
)
@pytest.fixture(name="train_atoms_fixture")
def train_atoms():
return [
molecule("H2O"),
molecule("CH4"),
Atoms("Fe2O3"),
Atoms("C"),
Atoms("FeON"),
Atoms("Fe"),
]
@pytest.fixture(name="train_atom_descriptors_fixture")
def train_atom_descriptors(train_atoms_fixture):
return [
{x: np.zeros(5) + i for x in atoms.symbols}
for i, atoms in enumerate(train_atoms_fixture)
]
@pytest.mark.parametrize(
"filtering_type, passes_filter, element_sublist",
[
(FilteringType.NONE, [True] * 6, []),
(FilteringType.NONE, [True] * 6, ["C", "U", "Anything really"]),
(
FilteringType.COMBINATIONS,
[False, False, True, False, False, True],
["O", "Fe"],
),
(
FilteringType.INCLUSIVE,
[False, False, True, False, True, False],
["O", "Fe"],
),
(
FilteringType.EXCLUSIVE,
[False, False, True, False, False, False],
["O", "Fe"],
),
],
)
def test_filter_data(
train_atoms_fixture, filtering_type, passes_filter, element_sublist
):
filtered, _, passes = _filter_pretraining_data(
train_atoms_fixture, filtering_type, element_sublist
)
assert passes == passes_filter
assert len(filtered) == sum(passes_filter)
@pytest.mark.parametrize(
"passes_filter", [[True] * 6, [False, True, False, True, False, True]]
)
def test_load_descriptors(
train_atoms_fixture, train_atom_descriptors_fixture, passes_filter, tmp_path
):
for i, atoms in enumerate(train_atoms_fixture):
atoms.info["mace_descriptors"] = train_atom_descriptors_fixture[i]
save_path = tmp_path / "test.xyz"
_maybe_save_descriptors(train_atoms_fixture, save_path.as_posix())
assert all(not "mace_descriptors" in atoms.info for atoms in train_atoms_fixture)
filtered_atoms = [
x for x, passes in zip(train_atoms_fixture, passes_filter) if passes
]
descriptors_path = save_path.as_posix().replace(".xyz", "_descriptors.npy")
_load_descriptors(
filtered_atoms,
passes_filter,
descriptors_path=descriptors_path,
calc=None,
full_data_length=len(train_atoms_fixture),
)
expected_descriptors = [
train_atom_descriptors_fixture[i]
for i, passes in enumerate(passes_filter)
if passes
]
for i, atoms in enumerate(filtered_atoms):
assert "mace_descriptors" in atoms.info
for key, value in expected_descriptors[i].items():
assert np.allclose(atoms.info["mace_descriptors"][key], value)
def test_select_samples_random(train_atoms_fixture, tmp_path):
input_file_path = tmp_path / "input.xyz"
aio.write(input_file_path, train_atoms_fixture, format="extxyz")
output_file_path = tmp_path / "output.xyz"
settings = SelectionSettings(
configs_pt=input_file_path.as_posix(),
output=output_file_path.as_posix(),
num_samples=2,
subselect=SubselectType.RANDOM,
filtering_type=FilteringType.NONE,
)
select_samples(settings)
# Check if output file is created
assert output_file_path.exists()
combined_output_file_path = tmp_path / "output_combined.xyz"
assert combined_output_file_path.exists()
output_atoms = aio.read(output_file_path, index=":")
assert isinstance(output_atoms, list)
assert len(output_atoms) == 2
combined_output_atoms = aio.read(combined_output_file_path, index=":")
assert isinstance(combined_output_atoms, list)
assert (
len(combined_output_atoms) == 2
) # combined same as output since no FT data provided
def test_select_samples_ft_provided(train_atoms_fixture, tmp_path):
input_file_path = tmp_path / "input.xyz"
aio.write(input_file_path, train_atoms_fixture, format="extxyz")
output_file_path = tmp_path / "output.xyz"
ft_file_path = tmp_path / "ft_data.xyz"
ft_data = [Atoms("FeO")]
aio.write(ft_file_path.as_posix(), ft_data, format="extxyz")
settings = SelectionSettings(
configs_pt=input_file_path.as_posix(),
output=output_file_path.as_posix(),
num_samples=2,
subselect=SubselectType.RANDOM,
configs_ft=ft_file_path.as_posix(),
)
select_samples(settings)
# Check if output file is created
assert output_file_path.exists()
combined_output_file_path = tmp_path / "output_combined.xyz"
assert combined_output_file_path.exists()
output_atoms = aio.read(output_file_path, index=":")
assert isinstance(output_atoms, list)
assert len(output_atoms) == 2
assert all(filter_atoms(x, ["Fe", "O"]) for x in output_atoms)
combined_atoms = aio.read(combined_output_file_path, index=":")
assert isinstance(combined_atoms, list)
assert len(combined_atoms) == len(output_atoms) + len(ft_data)
from pathlib import Path
import numpy as np
import pytest
import torch
import torch.nn.functional
from ase.build import molecule
from e3nn import o3
from e3nn.util import jit
from scipy.spatial.transform import Rotation as R
from mace import data, modules, tools
from mace.calculators import mace_mp, mace_off
from mace.tools import torch_geometric
from mace.tools.finetuning_utils import load_foundations_elements
from mace.tools.scripts_utils import extract_config_mace_model, remove_pt_head
from mace.tools.utils import AtomicNumberTable
MODEL_PATH = (
Path(__file__).parent.parent
/ "mace"
/ "calculators"
/ "foundations_models"
/ "2023-12-03-mace-mp.model"
)
torch.set_default_dtype(torch.float64)
@pytest.skip("Problem with the float type", allow_module_level=True)
def test_foundations():
# Create MACE model
config = data.Configuration(
atomic_numbers=molecule("H2COH").numbers,
positions=molecule("H2COH").positions,
properties={
"forces": molecule("H2COH").positions,
"energy": -1.5,
"charges": molecule("H2COH").numbers,
"dipole": np.array([-1.5, 1.5, 2.0]),
},
property_weights={
"forces": 1.0,
"energy": 1.0,
"charges": 1.0,
"dipole": 1.0,
},
)
# Created the rotated environment
rot = R.from_euler("z", 60, degrees=True).as_matrix()
positions_rotated = np.array(rot @ config.positions.T).T
config_rotated = data.Configuration(
atomic_numbers=molecule("H2COH").numbers,
positions=positions_rotated,
properties={
"forces": molecule("H2COH").positions,
"energy": -1.5,
"charges": molecule("H2COH").numbers,
"dipole": np.array([-1.5, 1.5, 2.0]),
},
property_weights={
"forces": 1.0,
"energy": 1.0,
"charges": 1.0,
"dipole": 1.0,
},
)
table = tools.AtomicNumberTable([1, 6, 8])
atomic_energies = np.array([0.0, 0.0, 0.0], dtype=float)
model_config = dict(
r_max=6,
num_bessel=10,
num_polynomial_cutoff=5,
max_ell=3,
interaction_cls=modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
interaction_cls_first=modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
num_interactions=2,
num_elements=3,
hidden_irreps=o3.Irreps("128x0e + 128x1o"),
MLP_irreps=o3.Irreps("16x0e"),
gate=torch.nn.functional.silu,
atomic_energies=atomic_energies,
avg_num_neighbors=3,
atomic_numbers=table.zs,
correlation=3,
radial_type="bessel",
atomic_inter_scale=0.1,
atomic_inter_shift=0.0,
)
model = modules.ScaleShiftMACE(**model_config)
calc_foundation = mace_mp(model="medium", device="cpu", default_dtype="float64")
model_loaded = load_foundations_elements(
model,
calc_foundation.models[0],
table=table,
load_readout=True,
use_shift=False,
max_L=1,
)
atomic_data = data.AtomicData.from_config(config, z_table=table, cutoff=6.0)
atomic_data2 = data.AtomicData.from_config(
config_rotated, z_table=table, cutoff=6.0
)
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[atomic_data, atomic_data2],
batch_size=2,
shuffle=True,
drop_last=False,
)
batch = next(iter(data_loader))
forces_loaded = model_loaded(batch.to_dict())["forces"]
forces = model(batch.to_dict())["forces"]
assert torch.allclose(forces, forces_loaded)
def test_multi_reference():
config_multi = data.Configuration(
atomic_numbers=molecule("H2COH").numbers,
positions=molecule("H2COH").positions,
properties={
"forces": molecule("H2COH").positions,
"energy": -1.5,
"charges": molecule("H2COH").numbers,
"dipole": np.array([-1.5, 1.5, 2.0]),
},
property_weights={
"forces": 1.0,
"energy": 1.0,
"charges": 1.0,
"dipole": 1.0,
},
head="MP2",
)
table_multi = tools.AtomicNumberTable([1, 6, 8])
atomic_energies_multi = np.array([[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]], dtype=float)
table = tools.AtomicNumberTable([1, 6, 8])
# Create MACE model
model_config = dict(
r_max=6,
num_bessel=10,
num_polynomial_cutoff=5,
max_ell=3,
interaction_cls=modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
interaction_cls_first=modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
num_interactions=2,
num_elements=3,
hidden_irreps=o3.Irreps("128x0e + 128x1o"),
MLP_irreps=o3.Irreps("16x0e"),
gate=torch.nn.functional.silu,
atomic_energies=atomic_energies_multi,
avg_num_neighbors=61,
atomic_numbers=table.zs,
correlation=3,
radial_type="bessel",
atomic_inter_scale=[1.0, 1.0],
atomic_inter_shift=[0.0, 0.0],
heads=["MP2", "DFT"],
)
model = modules.ScaleShiftMACE(**model_config)
calc_foundation = mace_mp(model="medium", device="cpu", default_dtype="float64")
model_loaded = load_foundations_elements(
model,
calc_foundation.models[0],
table=table,
load_readout=True,
use_shift=False,
max_L=1,
)
atomic_data = data.AtomicData.from_config(
config_multi, z_table=table_multi, cutoff=6.0, heads=["MP2", "DFT"]
)
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[atomic_data, atomic_data],
batch_size=2,
shuffle=True,
drop_last=False,
)
batch = next(iter(data_loader))
forces_loaded = model_loaded(batch.to_dict())["forces"]
calc_foundation = mace_mp(model="medium", device="cpu", default_dtype="float64")
atoms = molecule("H2COH")
atoms.info["head"] = "MP2"
atoms.calc = calc_foundation
forces = atoms.get_forces()
assert np.allclose(
forces, forces_loaded.detach().numpy()[:5, :], atol=1e-5, rtol=1e-5
)
@pytest.mark.parametrize(
"calc",
[
mace_mp(device="cpu", default_dtype="float64"),
mace_mp(model="small", device="cpu", default_dtype="float64"),
mace_mp(model="medium", device="cpu", default_dtype="float64"),
mace_mp(model="large", device="cpu", default_dtype="float64"),
mace_mp(model=MODEL_PATH, device="cpu", default_dtype="float64"),
mace_off(model="small", device="cpu", default_dtype="float64"),
mace_off(model="medium", device="cpu", default_dtype="float64"),
mace_off(model="large", device="cpu", default_dtype="float64"),
mace_off(model=MODEL_PATH, device="cpu", default_dtype="float64"),
],
)
def test_compile_foundation(calc):
model = calc.models[0]
atoms = molecule("CH4")
atoms.positions += np.random.randn(*atoms.positions.shape) * 0.1
batch = calc._atoms_to_batch(atoms) # pylint: disable=protected-access
output_1 = model(batch.to_dict())
model_compiled = jit.compile(model)
output = model_compiled(batch.to_dict())
for key in output_1.keys():
if isinstance(output_1[key], torch.Tensor):
assert torch.allclose(output_1[key], output[key], atol=1e-5)
@pytest.mark.parametrize(
"model",
[
mace_mp(model="small", device="cpu", default_dtype="float64").models[0],
mace_mp(model="medium", device="cpu", default_dtype="float64").models[0],
mace_mp(model="large", device="cpu", default_dtype="float64").models[0],
mace_mp(model=MODEL_PATH, device="cpu", default_dtype="float64").models[0],
mace_off(model="small", device="cpu", default_dtype="float64").models[0],
mace_off(model="medium", device="cpu", default_dtype="float64").models[0],
mace_off(model="large", device="cpu", default_dtype="float64").models[0],
mace_off(model=MODEL_PATH, device="cpu", default_dtype="float64").models[0],
],
)
def test_extract_config(model):
assert isinstance(model, modules.ScaleShiftMACE)
config = data.Configuration(
atomic_numbers=molecule("H2COH").numbers,
positions=molecule("H2COH").positions,
properties={
"forces": molecule("H2COH").positions,
"energy": -1.5,
"charges": molecule("H2COH").numbers,
"dipole": np.array([-1.5, 1.5, 2.0]),
},
property_weights={
"forces": 1.0,
"energy": 1.0,
"charges": 1.0,
"dipole": 1.0,
},
)
model_copy = modules.ScaleShiftMACE(**extract_config_mace_model(model))
model_copy.load_state_dict(model.state_dict())
z_table = AtomicNumberTable([int(z) for z in model.atomic_numbers])
atomic_data = data.AtomicData.from_config(config, z_table=z_table, cutoff=6.0)
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[atomic_data, atomic_data],
batch_size=2,
shuffle=True,
drop_last=False,
)
batch = next(iter(data_loader))
output = model(batch.to_dict())
output_copy = model_copy(batch.to_dict())
# assert all items of the output dicts are equal
for key in output.keys():
if isinstance(output[key], torch.Tensor):
assert torch.allclose(output[key], output_copy[key], atol=1e-5)
def test_remove_pt_head():
# Set up test data
torch.manual_seed(42)
atomic_energies_pt_head = np.array([[1.0, 2.0], [3.0, 4.0]], dtype=float)
z_table = AtomicNumberTable([1, 8]) # H and O
# Create multihead model
model_config = {
"r_max": 5.0,
"num_bessel": 8,
"num_polynomial_cutoff": 5,
"max_ell": 2,
"interaction_cls": modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
"interaction_cls_first": modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
"num_interactions": 2,
"num_elements": len(z_table),
"hidden_irreps": o3.Irreps("32x0e + 32x1o"),
"MLP_irreps": o3.Irreps("16x0e"),
"gate": torch.nn.functional.silu,
"atomic_energies": atomic_energies_pt_head,
"avg_num_neighbors": 8,
"atomic_numbers": z_table.zs,
"correlation": 3,
"heads": ["pt_head", "DFT"],
"atomic_inter_scale": [1.0, 1.0],
"atomic_inter_shift": [0.0, 0.1],
}
model = modules.ScaleShiftMACE(**model_config)
# Create test molecule
mol = molecule("H2O")
config_pt_head = data.Configuration(
atomic_numbers=mol.numbers,
positions=mol.positions,
properties={"energy": 1.0, "forces": np.random.randn(len(mol), 3)},
property_weights={"forces": 1.0, "energy": 1.0},
head="DFT",
)
atomic_data = data.AtomicData.from_config(
config_pt_head, z_table=z_table, cutoff=5.0, heads=["pt_head", "DFT"]
)
dataloader = torch_geometric.dataloader.DataLoader(
dataset=[atomic_data], batch_size=1, shuffle=False
)
batch = next(iter(dataloader))
# Test original mode
output_orig = model(batch.to_dict())
# Convert to single head model
new_model = remove_pt_head(model, head_to_keep="DFT")
# Basic structure tests
assert len(new_model.heads) == 1
assert new_model.heads[0] == "DFT"
assert new_model.atomic_energies_fn.atomic_energies.shape[0] == 1
assert len(torch.atleast_1d(new_model.scale_shift.scale)) == 1
assert len(torch.atleast_1d(new_model.scale_shift.shift)) == 1
# Test output consistency
atomic_data = data.AtomicData.from_config(
config_pt_head, z_table=z_table, cutoff=5.0, heads=["DFT"]
)
dataloader = torch_geometric.dataloader.DataLoader(
dataset=[atomic_data], batch_size=1, shuffle=False
)
batch = next(iter(dataloader))
output_new = new_model(batch.to_dict())
torch.testing.assert_close(
output_orig["energy"], output_new["energy"], rtol=1e-5, atol=1e-5
)
torch.testing.assert_close(
output_orig["forces"], output_new["forces"], rtol=1e-5, atol=1e-5
)
def test_remove_pt_head_multihead():
# Set up test data
torch.manual_seed(42)
atomic_energies_pt_head = np.array(
[
[1.0, 2.0], # H energies for each head
[3.0, 4.0], # O energies for each head
]
* 2
)
z_table = AtomicNumberTable([1, 8]) # H and O
# Create multihead model
model_config = {
"r_max": 5.0,
"num_bessel": 8,
"num_polynomial_cutoff": 5,
"max_ell": 2,
"interaction_cls": modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
"interaction_cls_first": modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
"num_interactions": 2,
"num_elements": len(z_table),
"hidden_irreps": o3.Irreps("32x0e + 32x1o"),
"MLP_irreps": o3.Irreps("16x0e"),
"gate": torch.nn.functional.silu,
"atomic_energies": atomic_energies_pt_head,
"avg_num_neighbors": 8,
"atomic_numbers": z_table.zs,
"correlation": 3,
"heads": ["pt_head", "DFT", "MP2", "CCSD"],
"atomic_inter_scale": [1.0, 1.0, 1.0, 1.0],
"atomic_inter_shift": [0.0, 0.1, 0.2, 0.3],
}
model = modules.ScaleShiftMACE(**model_config)
# Create test configurations for each head
mol = molecule("H2O")
configs = {}
atomic_datas = {}
dataloaders = {}
original_outputs = {}
# First get outputs from original model for each head
for head in model.heads:
config_pt_head = data.Configuration(
atomic_numbers=mol.numbers,
positions=mol.positions,
properties={"energy": 1.0, "forces": np.random.randn(len(mol), 3)},
property_weights={"forces": 1.0, "energy": 1.0},
head=head,
)
configs[head] = config_pt_head
atomic_data = data.AtomicData.from_config(
config_pt_head, z_table=z_table, cutoff=5.0, heads=model.heads
)
atomic_datas[head] = atomic_data
dataloader = torch_geometric.dataloader.DataLoader(
dataset=[atomic_data], batch_size=1, shuffle=False
)
dataloaders[head] = dataloader
batch = next(iter(dataloader))
output = model(batch.to_dict())
original_outputs[head] = output
# Now test each head separately
for i, head in enumerate(model.heads):
# Convert to single head model
new_model = remove_pt_head(model, head_to_keep=head)
# Basic structure tests
assert len(new_model.heads) == 1, f"Failed for head {head}"
assert new_model.heads[0] == head, f"Failed for head {head}"
assert (
new_model.atomic_energies_fn.atomic_energies.shape[0] == 1
), f"Failed for head {head}"
assert (
len(torch.atleast_1d(new_model.scale_shift.scale)) == 1
), f"Failed for head {head}"
assert (
len(torch.atleast_1d(new_model.scale_shift.shift)) == 1
), f"Failed for head {head}"
# Verify scale and shift values
assert torch.allclose(
new_model.scale_shift.scale, model.scale_shift.scale[i : i + 1]
), f"Failed for head {head}"
assert torch.allclose(
new_model.scale_shift.shift, model.scale_shift.shift[i : i + 1]
), f"Failed for head {head}"
# Test output consistency
single_head_data = data.AtomicData.from_config(
configs[head], z_table=z_table, cutoff=5.0, heads=[head]
)
single_head_loader = torch_geometric.dataloader.DataLoader(
dataset=[single_head_data], batch_size=1, shuffle=False
)
batch = next(iter(single_head_loader))
new_output = new_model(batch.to_dict())
# Compare outputs
print(
original_outputs[head]["energy"],
new_output["energy"],
)
torch.testing.assert_close(
original_outputs[head]["energy"],
new_output["energy"],
rtol=1e-5,
atol=1e-5,
msg=f"Energy mismatch for head {head}",
)
torch.testing.assert_close(
original_outputs[head]["forces"],
new_output["forces"],
rtol=1e-5,
atol=1e-5,
msg=f"Forces mismatch for head {head}",
)
# Test error cases
with pytest.raises(ValueError, match="Head non_existent not found in model"):
remove_pt_head(model, head_to_keep="non_existent")
# Test default behavior (first non-PT head)
default_model = remove_pt_head(model)
assert default_model.heads[0] == "DFT"
# Additional test: check if each model's computation graph is independent
models = {head: remove_pt_head(model, head_to_keep=head) for head in model.heads}
results = {}
for head, head_model in models.items():
single_head_data = data.AtomicData.from_config(
configs[head], z_table=z_table, cutoff=5.0, heads=[head]
)
single_head_loader = torch_geometric.dataloader.DataLoader(
dataset=[single_head_data], batch_size=1, shuffle=False
)
batch = next(iter(single_head_loader))
results[head] = head_model(batch.to_dict())
# Verify each model produces different outputs
energies = torch.stack([results[head]["energy"] for head in model.heads])
assert not torch.allclose(
energies[0], energies[1], rtol=1e-3
), "Different heads should produce different outputs"
import numpy as np
import pytest
from ase.build import fcc111
from mace.calculators import mace_mp
@pytest.fixture(name="setup_calculator_")
def setup_calculator():
calc = mace_mp(
model="medium", dispersion=False, default_dtype="float64", device="cpu"
)
return calc
@pytest.fixture(name="setup_structure_")
def setup_structure(setup_calculator_):
initial = fcc111("Pt", size=(4, 4, 1), vacuum=10.0, orthogonal=True)
initial.calc = setup_calculator_
return initial
def test_potential_energy_and_hessian(setup_structure_):
initial = setup_structure_
h_autograd = initial.calc.get_hessian(atoms=initial)
assert h_autograd.shape == (len(initial) * 3, len(initial), 3)
def test_finite_difference_hessian(setup_structure_):
initial = setup_structure_
indicies = list(range(len(initial)))
delta, ndim = 1e-4, 3
hessian = np.zeros((len(indicies) * ndim, len(indicies) * ndim))
atoms_h = initial.copy()
for i, index in enumerate(indicies):
for j in range(ndim):
atoms_i = atoms_h.copy()
atoms_i.positions[index, j] += delta
atoms_i.calc = initial.calc
forces_i = atoms_i.get_forces()
atoms_j = atoms_h.copy()
atoms_j.positions[index, j] -= delta
atoms_j.calc = initial.calc
forces_j = atoms_j.get_forces()
hessian[:, i * ndim + j] = -(forces_i - forces_j)[indicies].flatten() / (
2 * delta
)
hessian = hessian.reshape((-1, len(initial), 3))
h_autograd = initial.calc.get_hessian(atoms=initial)
is_close = np.allclose(h_autograd, hessian, atol=1e-6)
assert is_close
import os
import tempfile
import numpy as np
import torch
from ase.build import molecule
from ase.calculators.singlepoint import SinglePointCalculator
from mace.data.lmdb_dataset import LMDBDataset
from mace.tools import AtomicNumberTable, torch_geometric
from mace.tools.fairchem_dataset.lmdb_dataset_tools import LMDBDatabase
def test_lmdb_dataset():
"""Test the LMDBDataset by creating a fake database and verifying batch creation."""
# Set default dtype to match typical MACE usage
torch.set_default_dtype(torch.float64)
# Set random seed for reproducibility
np.random.seed(42)
# Create temporary directories for the databases
with tempfile.TemporaryDirectory() as tmpdir:
# Create 3 folders for databases
db_paths = []
for i in range(3):
folder_path = os.path.join(tmpdir, f"folder_{i}")
os.makedirs(folder_path, exist_ok=True)
# Create LMDB database files in each folder (2 per folder)
for j in range(2):
db_path = os.path.join(folder_path, f"data_{j}.aselmdb")
db = LMDBDatabase(db_path, readonly=False)
# Add 2 configurations to each database
for _ in range(2):
# Create a water molecule using ASE's build functionality
atoms = molecule("H2O")
# Apply small random displacements to the positions
displacement = np.random.rand(*atoms.positions.shape) * 0.1
atoms.positions += displacement
# Set cell and PBC
atoms.set_cell(np.eye(3) * 5.0)
atoms.set_pbc(True)
# Add random energy, forces, and stress
energy = np.random.uniform(
-15.0, -5.0
) # Random energy between -15 and -5 eV
forces = (
np.random.randn(*atoms.positions.shape) * 0.5
) # Random forces
stress = np.random.randn(6) * 0.2 # Random stress in Voigt notation
# Add calculator to atoms with results
calc = SinglePointCalculator(
atoms, energy=energy, forces=forces, stress=stress
)
atoms.calc = calc
# Store in database
db.write(atoms)
db.close()
# Add folder path to our list
db_paths.append(folder_path)
# Create the dataset using paths joined with colons
paths_str = ":".join(db_paths)
z_table = AtomicNumberTable([1, 8]) # H and O
dataset = LMDBDataset(file_path=paths_str, r_max=5.0, z_table=z_table)
# Check dataset size (3 folders * 2 files * 2 configs = 12 entries)
assert len(dataset) == 12
# Test retrieving a single item
item = dataset[0]
print(item)
assert item.positions.shape == (3, 3) # 3 atoms, 3 coordinates
assert hasattr(item, "energy")
assert hasattr(item, "forces")
assert hasattr(item, "stress")
# Create a dataloader
dataloader = torch_geometric.dataloader.DataLoader(
dataset=dataset, batch_size=4, shuffle=False, drop_last=False
)
# Get a batch and validate it
batch = next(iter(dataloader))
# Verify batch properties - should have 12 atoms (4 configs * 3 atoms per water)
assert batch.positions.shape == (12, 3) # 12 atoms, 3 coordinates
assert batch.energy.shape[0] == 4 # 4 energies (one per config)
assert batch.forces.shape == (12, 3) # Forces for each atom
print(batch.stress.shape)
assert batch.stress.shape == (4, 3, 3) # Stress for each config
# Check batch has required attributes for MACE model processing
assert hasattr(batch, "batch") # Batch indices
assert batch.batch.shape[0] == 12 # One index per atom
assert hasattr(batch, "ptr") # Pointer for batch processing
assert batch.ptr.shape[0] == 5 # One pointer per config + 1
# Check that batch indices are correctly assigned
# First 3 atoms should be from config 0, next 3 from config 1, etc.
expected_batch = torch.tensor([0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3])
assert torch.all(batch.batch == expected_batch)
# Check ptr correctly points to start of each configuration
assert batch.ptr.tolist() == [0, 3, 6, 9, 12]
# Create a batch dictionary that can be passed to a MACE model
batch_dict = batch.to_dict()
assert "positions" in batch_dict
assert "energy" in batch_dict
assert "forces" in batch_dict
assert "stress" in batch_dict
assert "batch" in batch_dict
assert "ptr" in batch_dict
# Verify additional properties required by MACE
assert hasattr(batch, "edge_index") # Connectivity information
assert hasattr(batch, "shifts") # For periodic boundary conditions
assert hasattr(batch, "cell") # Unit cell information
# Test that a full batch can be processed (without errors)
all_batches = list(dataloader)
assert (
len(all_batches) == 3
) # Should have 3 batches (12 configs with batch size 4)
import numpy as np
import torch
import torch.nn.functional
from ase import build
from e3nn import o3
from e3nn.util import jit
from scipy.spatial.transform import Rotation as R
from mace import data, modules, tools
from mace.tools import torch_geometric
torch.set_default_dtype(torch.float64)
config = data.Configuration(
atomic_numbers=np.array([8, 1, 1]),
positions=np.array(
[
[0.0, -2.0, 0.0],
[1.0, 0.0, 0.0],
[0.0, 1.0, 0.0],
]
),
properties={
"forces": np.array(
[
[0.0, -1.3, 0.0],
[1.0, 0.2, 0.0],
[0.0, 1.1, 0.3],
]
),
"energy": -1.5,
"charges": np.array([-2.0, 1.0, 1.0]),
"dipole": np.array([-1.5, 1.5, 2.0]),
},
property_weights={
"forces": 1.0,
"energy": 1.0,
"charges": 1.0,
"dipole": 1.0,
},
)
# Created the rotated environment
rot = R.from_euler("z", 60, degrees=True).as_matrix()
positions_rotated = np.array(rot @ config.positions.T).T
config_rotated = data.Configuration(
atomic_numbers=np.array([8, 1, 1]),
positions=positions_rotated,
properties={
"forces": np.array(
[
[0.0, -1.3, 0.0],
[1.0, 0.2, 0.0],
[0.0, 1.1, 0.3],
]
),
"energy": -1.5,
"charges": np.array([-2.0, 1.0, 1.0]),
"dipole": np.array([-1.5, 1.5, 2.0]),
},
property_weights={
"forces": 1.0,
"energy": 1.0,
"charges": 1.0,
"dipole": 1.0,
},
)
table = tools.AtomicNumberTable([1, 8])
atomic_energies = np.array([1.0, 3.0], dtype=float)
def test_mace():
# Create MACE model
model_config = dict(
r_max=5,
num_bessel=8,
num_polynomial_cutoff=6,
max_ell=2,
interaction_cls=modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
interaction_cls_first=modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
num_interactions=5,
num_elements=2,
hidden_irreps=o3.Irreps("32x0e + 32x1o"),
MLP_irreps=o3.Irreps("16x0e"),
gate=torch.nn.functional.silu,
atomic_energies=atomic_energies,
avg_num_neighbors=8,
atomic_numbers=table.zs,
correlation=3,
radial_type="bessel",
)
model = modules.MACE(**model_config)
model_compiled = jit.compile(model)
atomic_data = data.AtomicData.from_config(config, z_table=table, cutoff=3.0)
atomic_data2 = data.AtomicData.from_config(
config_rotated, z_table=table, cutoff=3.0
)
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[atomic_data, atomic_data2],
batch_size=2,
shuffle=True,
drop_last=False,
)
batch = next(iter(data_loader))
output1 = model(batch.to_dict(), training=True)
output2 = model_compiled(batch.to_dict(), training=True)
assert torch.allclose(output1["energy"][0], output2["energy"][0])
assert torch.allclose(output2["energy"][0], output2["energy"][1])
def test_dipole_mace():
# create dipole MACE model
model_config = dict(
r_max=5,
num_bessel=8,
num_polynomial_cutoff=5,
max_ell=2,
interaction_cls=modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
interaction_cls_first=modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
num_interactions=2,
num_elements=2,
hidden_irreps=o3.Irreps("16x0e + 16x1o + 16x2e"),
MLP_irreps=o3.Irreps("16x0e"),
gate=torch.nn.functional.silu,
atomic_energies=None,
avg_num_neighbors=3,
atomic_numbers=table.zs,
correlation=3,
radial_type="gaussian",
)
model = modules.AtomicDipolesMACE(**model_config)
atomic_data = data.AtomicData.from_config(config, z_table=table, cutoff=3.0)
atomic_data2 = data.AtomicData.from_config(
config_rotated, z_table=table, cutoff=3.0
)
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[atomic_data, atomic_data2],
batch_size=2,
shuffle=False,
drop_last=False,
)
batch = next(iter(data_loader))
output = model(
batch,
training=True,
)
# sanity check of dipoles being the right shape
assert output["dipole"][0].unsqueeze(0).shape == atomic_data.dipole.shape
# test equivariance of output dipoles
assert np.allclose(
np.array(rot @ output["dipole"][0].detach().numpy()),
output["dipole"][1].detach().numpy(),
)
def test_energy_dipole_mace():
# create dipole MACE model
model_config = dict(
r_max=5,
num_bessel=8,
num_polynomial_cutoff=5,
max_ell=2,
interaction_cls=modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
interaction_cls_first=modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
num_interactions=2,
num_elements=2,
hidden_irreps=o3.Irreps("16x0e + 16x1o + 16x2e"),
MLP_irreps=o3.Irreps("16x0e"),
gate=torch.nn.functional.silu,
atomic_energies=atomic_energies,
avg_num_neighbors=3,
atomic_numbers=table.zs,
correlation=3,
)
model = modules.EnergyDipolesMACE(**model_config)
atomic_data = data.AtomicData.from_config(config, z_table=table, cutoff=3.0)
atomic_data2 = data.AtomicData.from_config(
config_rotated, z_table=table, cutoff=3.0
)
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[atomic_data, atomic_data2],
batch_size=2,
shuffle=False,
drop_last=False,
)
batch = next(iter(data_loader))
output = model(
batch,
training=True,
)
# sanity check of dipoles being the right shape
assert output["dipole"][0].unsqueeze(0).shape == atomic_data.dipole.shape
# test energy is invariant
assert torch.allclose(output["energy"][0], output["energy"][1])
# test equivariance of output dipoles
assert np.allclose(
np.array(rot @ output["dipole"][0].detach().numpy()),
output["dipole"][1].detach().numpy(),
)
def test_mace_multi_reference():
atomic_energies_multi = np.array([[1.0, 3.0], [0.0, 0.0]], dtype=float)
model_config = dict(
r_max=5,
num_bessel=8,
num_polynomial_cutoff=6,
max_ell=3,
interaction_cls=modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
interaction_cls_first=modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
num_interactions=2,
num_elements=2,
hidden_irreps=o3.Irreps("96x0e + 96x1o"),
MLP_irreps=o3.Irreps("16x0e"),
gate=torch.nn.functional.silu,
atomic_energies=atomic_energies_multi,
avg_num_neighbors=8,
atomic_numbers=table.zs,
distance_transform=True,
pair_repulsion=True,
correlation=3,
heads=["Default", "dft"],
# radial_type="chebyshev",
atomic_inter_scale=[1.0, 1.0],
atomic_inter_shift=[0.0, 0.1],
)
model = modules.ScaleShiftMACE(**model_config)
model_compiled = jit.compile(model)
config.head = "Default"
config_rotated.head = "dft"
atomic_data = data.AtomicData.from_config(
config, z_table=table, cutoff=3.0, heads=["Default", "dft"]
)
atomic_data2 = data.AtomicData.from_config(
config_rotated, z_table=table, cutoff=3.0, heads=["Default", "dft"]
)
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[atomic_data, atomic_data2],
batch_size=2,
shuffle=True,
drop_last=False,
)
batch = next(iter(data_loader))
output1 = model(batch.to_dict(), training=True)
output2 = model_compiled(batch.to_dict(), training=True)
assert torch.allclose(output1["energy"][0], output2["energy"][0])
assert output2["energy"].shape[0] == 2
def test_atomic_virials_stresses():
"""
Test that atomic virials and stresses sum to the total virials and stress.
"""
# Set default dtype for reproducibility
torch.set_default_dtype(torch.float64)
# Create a periodic cell with ASE
atoms = build.bulk("Si", "diamond", a=5.43)
# Apply strain to ensure non-zero stress
strain_tensor = np.eye(3) * 1.02 # 2% strain
atoms.set_cell(np.dot(atoms.get_cell(), strain_tensor), scale_atoms=True)
# Add forces and energy for completeness
atoms.arrays["REF_forces"] = np.random.normal(0, 0.1, size=atoms.positions.shape)
atoms.info["REF_energy"] = np.random.normal(0, 1)
atoms.info["REF_stress"] = np.random.normal(0, 0.1, size=6)
# Setup MACE model configuration
stress_z_table = tools.AtomicNumberTable([14]) # Silicon
stress_atomic_energies = np.array([0.0])
model_config = dict(
r_max=5.0,
num_bessel=8,
num_polynomial_cutoff=6,
max_ell=2,
interaction_cls=modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
interaction_cls_first=modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
num_interactions=3,
num_elements=1,
hidden_irreps=o3.Irreps("32x0e + 32x1o"),
MLP_irreps=o3.Irreps("16x0e"),
gate=torch.nn.functional.silu,
atomic_energies=stress_atomic_energies,
avg_num_neighbors=4.0,
atomic_numbers=table.zs,
correlation=3,
atomic_inter_scale=1.0,
atomic_inter_shift=0.0,
)
# Create the model
model = modules.ScaleShiftMACE(**model_config)
# Create atomic data
atomic_data = data.AtomicData.from_config(
data.config_from_atoms(
atoms, key_specification=data.KeySpecification.from_defaults()
),
z_table=stress_z_table,
cutoff=5.0,
)
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[atomic_data],
batch_size=2,
shuffle=True,
drop_last=False,
)
batch = next(iter(data_loader))
batch_dict = batch.to_dict()
# Run the model with compute_atomic_stresses=True
output = model(
batch_dict,
compute_force=True,
compute_virials=True,
compute_stress=True,
compute_atomic_stresses=True,
)
# Get total virials/stress and atomic virials/stresses
total_virials = output["virials"]
atomic_virials = output["atomic_virials"]
total_stress = output["stress"]
atomic_stresses = output["atomic_stresses"]
# Test that atomic values are not None
assert atomic_virials is not None, "Atomic virials were not computed"
assert atomic_stresses is not None, "Atomic stresses were not computed"
# Test shape of atomic values
assert atomic_virials.shape[0] == len(atoms), "Wrong shape for atomic virials"
assert atomic_virials.shape[1:] == (3, 3), "Atomic virials should be 3x3 matrices"
assert atomic_stresses.shape[0] == len(atoms), "Wrong shape for atomic stresses"
assert atomic_stresses.shape[1:] == (3, 3), "Atomic stresses should be 3x3 matrices"
# Compute sum of atomic values
summed_atomic_virials = torch.sum(atomic_virials, dim=0)
summed_atomic_stresses = torch.sum(atomic_stresses, dim=0)
# Test that sums match total values
assert torch.allclose(
summed_atomic_virials, total_virials.squeeze(0), atol=1e-6
), f"Sum of atomic virials {summed_atomic_virials} does not match total virials {total_virials.squeeze(0)}"
assert torch.allclose(
summed_atomic_stresses, total_stress.squeeze(0), atol=1e-6
), f"Sum of atomic stresses (normalized by volume) {summed_atomic_stresses} does not match total stress {total_stress.squeeze(0)}"
import numpy as np
import pytest
import torch
import torch.nn.functional
from e3nn import o3
from mace.data import AtomicData, Configuration
from mace.modules import (
AtomicEnergiesBlock,
BesselBasis,
PolynomialCutoff,
SymmetricContraction,
WeightedEnergyForcesLoss,
WeightedHuberEnergyForcesStressLoss,
compute_mean_rms_energy_forces,
compute_statistics,
)
from mace.tools import AtomicNumberTable, scatter, to_numpy, torch_geometric
from mace.tools.scripts_utils import dict_to_array
@pytest.fixture(name="config")
def _config():
return Configuration(
atomic_numbers=np.array([8, 1, 1]),
positions=np.array(
[
[0.0, -2.0, 0.0],
[1.0, 0.0, 0.0],
[0.0, 1.0, 0.0],
]
),
properties={
"forces": np.array(
[
[0.0, -1.3, 0.0],
[1.0, 0.2, 0.0],
[0.0, 1.1, 0.3],
]
),
"energy": -1.5,
"stress": np.array([1.0, 0.0, 0.5, 0.0, -1.0, 0.0]),
},
property_weights={
"forces": 1.0,
"energy": 1.0,
"stress": 1.0,
},
)
@pytest.fixture(name="table")
def _table():
return AtomicNumberTable([1, 8])
@pytest.fixture(name="config1")
def _config1():
return Configuration(
atomic_numbers=np.array([8, 1, 1]),
positions=np.array(
[
[0.0, -2.0, 0.0],
[1.0, 0.0, 0.0],
[0.0, 1.0, 0.0],
]
),
properties={
"forces": np.array(
[
[0.0, -1.3, 0.0],
[1.0, 0.2, 0.0],
[0.0, 1.1, 0.3],
]
),
"energy": -1.5,
},
property_weights={
"forces": 1.0,
"energy": 1.0,
},
head="DFT",
)
@pytest.fixture(name="config2")
def _config2():
return Configuration(
atomic_numbers=np.array([8, 1, 1]),
positions=np.array(
[
[0.1, -1.9, 0.1],
[1.1, 0.1, 0.1],
[0.1, 1.1, 0.1],
]
),
properties={
"forces": np.array(
[
[0.1, -1.2, 0.1],
[1.1, 0.3, 0.1],
[0.1, 1.2, 0.4],
]
),
"energy": -1.4,
},
property_weights={
"forces": 1.0,
"energy": 1.0,
},
head="MP2",
)
@pytest.fixture(name="atomic_data")
def _atomic_data(config1, config2, table):
atomic_data1 = AtomicData.from_config(
config1, z_table=table, cutoff=3.0, heads=["DFT", "MP2"]
)
atomic_data2 = AtomicData.from_config(
config2, z_table=table, cutoff=3.0, heads=["DFT", "MP2"]
)
return [atomic_data1, atomic_data2]
@pytest.fixture(name="data_loader")
def _data_loader(atomic_data):
return torch_geometric.dataloader.DataLoader(
dataset=atomic_data,
batch_size=2,
shuffle=False,
drop_last=False,
)
@pytest.fixture(name="atomic_energies")
def _atomic_energies():
atomic_energies_dict = {
"DFT": np.array([0.0, 0.0]),
"MP2": np.array([0.1, 0.1]),
}
return dict_to_array(atomic_energies_dict, ["DFT", "MP2"])
@pytest.fixture(autouse=True)
def _set_torch_default_dtype():
torch.set_default_dtype(torch.float64)
def test_weighted_loss(config, table):
loss1 = WeightedEnergyForcesLoss(energy_weight=1, forces_weight=10)
loss2 = WeightedHuberEnergyForcesStressLoss(energy_weight=1, forces_weight=10)
data = AtomicData.from_config(config, z_table=table, cutoff=3.0)
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[data, data],
batch_size=2,
shuffle=True,
drop_last=False,
)
batch = next(iter(data_loader))
pred = {
"energy": batch.energy,
"forces": batch.forces,
"stress": batch.stress,
}
out1 = loss1(batch, pred)
assert out1 == 0.0
out2 = loss2(batch, pred)
assert out2 == 0.0
def test_symmetric_contraction():
operation = SymmetricContraction(
irreps_in=o3.Irreps("16x0e + 16x1o + 16x2e"),
irreps_out=o3.Irreps("16x0e + 16x1o"),
correlation=3,
num_elements=2,
)
torch.manual_seed(123)
features = torch.randn(30, 16, 9)
one_hots = torch.nn.functional.one_hot(torch.arange(0, 30) % 2).to(
torch.get_default_dtype()
)
out = operation(features, one_hots)
assert out.shape == (30, 64)
assert operation.contractions[0].weights_max.shape == (2, 11, 16)
def test_bessel_basis():
d = torch.linspace(start=0.5, end=5.5, steps=10)
bessel_basis = BesselBasis(r_max=6.0, num_basis=5)
output = bessel_basis(d.unsqueeze(-1))
assert output.shape == (10, 5)
def test_polynomial_cutoff():
d = torch.linspace(start=0.5, end=5.5, steps=10)
cutoff_fn = PolynomialCutoff(r_max=5.0)
output = cutoff_fn(d)
assert output.shape == (10,)
def test_atomic_energies(config, table):
energies_block = AtomicEnergiesBlock(atomic_energies=np.array([1.0, 3.0]))
data = AtomicData.from_config(config, z_table=table, cutoff=3.0)
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[data, data],
batch_size=2,
shuffle=True,
drop_last=False,
)
batch = next(iter(data_loader))
energies = energies_block(batch.node_attrs).squeeze(-1)
out = scatter.scatter_sum(src=energies, index=batch.batch, dim=-1, reduce="sum")
out = to_numpy(out)
assert np.allclose(out, np.array([5.0, 5.0]))
def test_atomic_energies_multireference(config, table):
energies_block = AtomicEnergiesBlock(
atomic_energies=np.array([[1.0, 3.0], [2.0, 4.0]])
)
config.head = "MP2"
data = AtomicData.from_config(
config, z_table=table, cutoff=3.0, heads=["DFT", "MP2"]
)
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[data, data],
batch_size=2,
shuffle=True,
drop_last=False,
)
batch = next(iter(data_loader))
num_atoms_arange = torch.arange(batch["positions"].shape[0])
node_heads = (
batch["head"][batch["batch"]]
if "head" in batch
else torch.zeros_like(batch["batch"])
)
energies = energies_block(batch.node_attrs).squeeze(-1)
energies = energies[num_atoms_arange, node_heads]
out = scatter.scatter_sum(src=energies, index=batch.batch, dim=-1, reduce="sum")
out = to_numpy(out)
assert np.allclose(out, np.array([8.0, 8.0]))
def test_compute_mean_rms_energy_forces_multi_head(data_loader, atomic_energies):
mean, rms = compute_mean_rms_energy_forces(data_loader, atomic_energies)
assert isinstance(mean, np.ndarray)
assert isinstance(rms, np.ndarray)
assert mean.shape == (2,)
assert rms.shape == (2,)
assert np.all(rms >= 0)
assert rms[0] != rms[1]
def test_compute_statistics(data_loader, atomic_energies):
avg_num_neighbors, mean, std = compute_statistics(data_loader, atomic_energies)
assert isinstance(avg_num_neighbors, float)
assert isinstance(mean, np.ndarray)
assert isinstance(std, np.ndarray)
assert mean.shape == (2,)
assert std.shape == (2,)
assert avg_num_neighbors > 0
assert np.all(mean != 0)
assert np.all(std > 0)
assert mean[0] != mean[1]
assert std[0] != std[1]
This diff is collapsed.
import os
import subprocess
import sys
from pathlib import Path
import ase.io
import numpy as np
import pytest
import yaml
from ase.atoms import Atoms
pytest_mace_dir = Path(__file__).parent.parent
preprocess_data = Path(__file__).parent.parent / "mace" / "cli" / "preprocess_data.py"
@pytest.fixture(name="sample_configs")
def fixture_sample_configs():
water = Atoms(
numbers=[8, 1, 1],
positions=[[0, -2.0, 0], [1, 0, 0], [0, 1, 0]],
cell=[4] * 3,
pbc=[True] * 3,
)
configs = [
Atoms(numbers=[8], positions=[[0, 0, 0]], cell=[6] * 3),
Atoms(numbers=[1], positions=[[0, 0, 0]], cell=[6] * 3),
]
configs[0].info["REF_energy"] = 0.0
configs[0].info["config_type"] = "IsolatedAtom"
configs[1].info["REF_energy"] = 0.0
configs[1].info["config_type"] = "IsolatedAtom"
np.random.seed(5)
for _ in range(10):
c = water.copy()
c.positions += np.random.normal(0.1, size=c.positions.shape)
c.info["REF_energy"] = np.random.normal(0.1)
c.new_array("REF_forces", np.random.normal(0.1, size=c.positions.shape))
c.info["REF_stress"] = np.random.normal(0.1, size=6)
configs.append(c)
return configs
def test_preprocess_data(tmp_path, sample_configs):
ase.io.write(tmp_path / "sample.xyz", sample_configs)
preprocess_params = {
"train_file": tmp_path / "sample.xyz",
"r_max": 5.0,
"config_type_weights": "{'Default':1.0}",
"num_process": 2,
"valid_fraction": 0.1,
"h5_prefix": tmp_path / "preprocessed_",
"compute_statistics": None,
"seed": 42,
"energy_key": "REF_energy",
"forces_key": "REF_forces",
"stress_key": "REF_stress",
}
run_env = os.environ.copy()
sys.path.insert(0, str(Path(__file__).parent.parent))
run_env["PYTHONPATH"] = ":".join(sys.path)
print("DEBUG subprocess PYTHONPATH", run_env["PYTHONPATH"])
cmd = (
sys.executable
+ " "
+ str(preprocess_data)
+ " "
+ " ".join(
[
(f"--{k}={v}" if v is not None else f"--{k}")
for k, v in preprocess_params.items()
]
)
)
p = subprocess.run(cmd.split(), env=run_env, check=True)
assert p.returncode == 0
# Check if the output files are created
assert (tmp_path / "preprocessed_train").is_dir()
assert (tmp_path / "preprocessed_val").is_dir()
assert (tmp_path / "preprocessed_statistics.json").is_file()
# Check if the correct number of files are created
train_files = list((tmp_path / "preprocessed_train").glob("*.h5"))
val_files = list((tmp_path / "preprocessed_val").glob("*.h5"))
assert len(train_files) == preprocess_params["num_process"]
assert len(val_files) == preprocess_params["num_process"]
# Example of checking statistics file content:
import json
with open(tmp_path / "preprocessed_statistics.json", "r", encoding="utf-8") as f:
statistics = json.load(f)
assert "atomic_energies" in statistics
assert "avg_num_neighbors" in statistics
assert "mean" in statistics
assert "std" in statistics
assert "atomic_numbers" in statistics
assert "r_max" in statistics
# Example of checking H5 file content:
import h5py
with h5py.File(train_files[0], "r") as f:
assert "config_batch_0" in f
config = f["config_batch_0"]["config_0"]
assert "atomic_numbers" in config
assert "positions" in config
assert "energy" in config["properties"]
assert "forces" in config["properties"]
original_energies = [
config.info["REF_energy"]
for config in sample_configs[2:]
if "REF_energy" in config.info
]
original_forces = [
config.arrays["REF_forces"]
for config in sample_configs[2:]
if "REF_forces" in config.arrays
]
h5_energies = []
h5_forces = []
for train_file in train_files:
with h5py.File(train_file, "r") as f:
for _, batch in f.items():
for config_key in batch.keys():
config = batch[config_key]
assert "atomic_numbers" in config
assert "positions" in config
assert "energy" in config["properties"]
assert "forces" in config["properties"]
h5_energies.append(config["properties"]["energy"][()])
h5_forces.append(config["properties"]["forces"][()])
for val_file in val_files:
with h5py.File(val_file, "r") as f:
for _, batch in f.items():
for config_key in batch.keys():
config = batch[config_key]
h5_energies.append(config["properties"]["energy"][()])
h5_forces.append(config["properties"]["forces"][()])
print("Original energies", original_energies)
print("H5 energies", h5_energies)
print("Original forces", original_forces)
print("H5 forces", h5_forces)
original_energies.sort()
h5_energies.sort()
original_forces = np.concatenate(original_forces).flatten()
h5_forces = np.concatenate(h5_forces).flatten()
original_forces.sort()
h5_forces.sort()
# Compare energies and forces
np.testing.assert_allclose(original_energies, h5_energies, rtol=1e-5, atol=1e-8)
np.testing.assert_allclose(original_forces, h5_forces, rtol=1e-5, atol=1e-8)
print("All checks passed successfully!")
def test_preprocess_config(tmp_path, sample_configs):
ase.io.write(tmp_path / "sample.xyz", sample_configs)
preprocess_params = {
"train_file": str(tmp_path / "sample.xyz"),
"r_max": 5.0,
"config_type_weights": "{'Default':1.0}",
"num_process": 2,
"valid_fraction": 0.1,
"h5_prefix": str(tmp_path / "preprocessed_"),
"compute_statistics": None,
"seed": 42,
"energy_key": "REF_energy",
"forces_key": "REF_forces",
"stress_key": "REF_stress",
}
filename = tmp_path / "config.yaml"
with open(filename, "w", encoding="utf-8") as file:
yaml.dump(preprocess_params, file)
run_env = os.environ.copy()
sys.path.insert(0, str(Path(__file__).parent.parent))
run_env["PYTHONPATH"] = ":".join(sys.path)
print("DEBUG subprocess PYTHONPATH", run_env["PYTHONPATH"])
cmd = (
sys.executable
+ " "
+ str(preprocess_data)
+ " "
+ "--config"
+ " "
+ str(filename)
)
p = subprocess.run(cmd.split(), env=run_env, check=True)
assert p.returncode == 0
This diff is collapsed.
import os
import subprocess
import sys
from copy import deepcopy
from pathlib import Path
import ase.io
import numpy as np
import pytest
from ase.atoms import Atoms
from mace.calculators.mace import MACECalculator
from mace.cli.run_train import run as run_mace_train
from mace.data.utils import KeySpecification
from mace.tools import build_default_arg_parser
run_train = Path(__file__).parent.parent / "mace" / "cli" / "run_train.py"
_mace_params = {
"name": "MACE",
"valid_fraction": 0.05,
"energy_weight": 1.0,
"forces_weight": 10.0,
"stress_weight": 1.0,
"model": "MACE",
"hidden_irreps": "128x0e",
"max_num_epochs": 10,
"swa": None,
"start_swa": 5,
"ema": None,
"ema_decay": 0.99,
"amsgrad": None,
"device": "cpu",
"seed": 5,
"loss": "weighted",
"energy_key": "REF_energy",
"forces_key": "REF_forces",
"stress_key": "REF_stress",
"interaction_first": "RealAgnosticResidualInteractionBlock",
"batch_size": 1,
"valid_batch_size": 1,
"num_samples_pt": 50,
"subselect_pt": "random",
"eval_interval": 2,
"num_radial_basis": 10,
"r_max": 6.0,
"default_dtype": "float64",
}
def configs_numbered_keys():
np.random.seed(0)
water = Atoms(
numbers=[8, 1, 1],
positions=[[0, -2.0, 0], [1, 0, 0], [0, 1, 0]],
cell=[4] * 3,
pbc=[True] * 3,
)
energies = list(np.random.normal(0.1, size=15))
forces = list(np.random.normal(0.1, size=(15, 3, 3)))
trial_configs_lists = []
# some keys present, some not
keys_to_use = (
["REF_energy"]
+ ["2_energy"] * 2
+ ["3_energy"] * 3
+ ["4_energy"] * 4
+ ["5_energy"] * 5
)
force_keys_to_use = (
["REF_forces"]
+ ["2_forces"] * 2
+ ["3_forces"] * 3
+ ["4_forces"] * 4
+ ["5_forces"] * 5
)
for ind in range(15):
c = deepcopy(water)
c.info[keys_to_use[ind]] = energies[ind]
c.arrays[force_keys_to_use[ind]] = forces[ind]
c.positions += np.random.normal(0.1, size=(3, 3))
trial_configs_lists.append(c)
return trial_configs_lists
def trial_yamls_and_and_expected():
yamls = {}
command_line_kwargs = {"energy_key": "2_energy", "forces_key": "2_forces"}
yamls["no_heads"] = {}
yamls["one_head_no_dicts"] = {
"heads": {
"Default": {
"energy_key": "3_energy",
}
}
}
yamls["one_head_with_dicts"] = {
"heads": {
"Default": {
"info_keys": {
"energy": "3_energy",
},
"arrays_keys": {
"forces": "3_forces",
},
}
}
}
yamls["two_heads_no_dicts"] = {
"heads": {
"dft": {
"train_file": "fit_multihead_dft.xyz",
"energy_key": "3_energy",
},
"mp2": {
"train_file": "fit_multihead_mp2.xyz",
"energy_key": "4_energy",
},
}
}
yamls["two_heads_mixed"] = {
"heads": {
"dft": {
"train_file": "fit_multihead_dft.xyz",
"info_keys": {
"energy": "3_energy",
},
"arrays_keys": {
"forces": "3_forces",
},
"forces_key": "4_forces",
},
"mp2": {
"train_file": "fit_multihead_mp2.xyz",
"energy_key": "4_energy",
},
}
}
all_arg_sets = {
"with_command_line": {
key: {**command_line_kwargs, **value} for key, value in yamls.items()
},
"without_command_line": yamls,
}
all_expected_outputs = {
"with_command_line": {
"no_heads": [
1.0037831178668188,
1.0183291323603265,
1.0120784084221528,
0.9935695881012243,
1.0021641561865526,
0.9999135609205868,
0.9809440616323108,
1.0025784765050076,
1.0017901145495376,
1.0136913185404515,
1.006798563238269,
1.0187758397828384,
1.0180201540775071,
1.0132368725061702,
0.9998734173248169,
],
"one_head_no_dicts": [
1.0028437510688613,
1.0514693378041775,
1.059933403321331,
1.034719940573569,
1.0438040675561824,
1.019719477728329,
0.9841759692947915,
1.0435266573857496,
1.0339501989779065,
1.0501795448530264,
1.0402594216704781,
1.0604998765679152,
1.0633411200246015,
1.0539071190201297,
1.0393496428177804,
],
"one_head_with_dicts": [
0.8638341551096959,
1.0078341354784144,
1.0149701178418595,
0.9945723048460148,
1.0184158011731292,
0.9992135295205004,
0.8943420783639198,
1.0327920054084088,
0.9905731198078909,
0.9838325204450648,
1.0018725575620482,
1.007263052421034,
1.0335213929231966,
1.0033503312511205,
1.0174433894759563,
],
"two_heads_no_dicts": [
0.9836377578288774,
1.0196844186291318,
1.0151628222871238,
0.957307281711648,
0.985574141310865,
0.9629670134047853,
0.9242583185138095,
0.9807770070311039,
0.9973679440479541,
1.0221127246963275,
1.0031807967874216,
1.0358701219543687,
1.0434208761164758,
1.0235606028124515,
0.9797494630655053,
],
"two_heads_mixed": [
0.8664108574741868,
0.9907166576278023,
1.0051969372365164,
0.978702477000018,
1.025500166764692,
0.9940095566375018,
0.9034029726954119,
1.0391739502744488,
0.9717327061183668,
0.972292103670355,
1.0012510461663253,
0.9978051155885286,
1.0378611651753475,
1.0003207628186224,
1.0209509292189651,
],
},
"without_command_line": {
"no_heads": [
0.9352605307451007,
0.991084559389268,
0.9940350095024881,
0.9953849198103668,
0.9954705498032904,
0.9964815693808411,
0.9663142667436776,
0.9947223808739147,
0.9897776682803257,
0.989027769690667,
0.9910280920241263,
0.992067980667518,
0.9917276132506404,
0.9902848752169671,
0.9928585982942544,
],
"one_head_no_dicts": [
0.9425342207393083,
1.0149788456087416,
1.0249228965652788,
1.0247924743285792,
1.02732103964481,
1.0168852937950326,
0.9771283495170653,
1.0261776335561517,
1.0130461033368028,
1.0162619153561783,
1.019995179866916,
1.0209512298344965,
1.0219971755636952,
1.0195791901659124,
1.0234662527729408,
],
"one_head_with_dicts": [
0.8638341551096959,
1.0078341354784144,
1.0149701178418595,
0.9945723048460148,
1.0184158011731292,
0.9992135295205004,
0.8943420783639198,
1.0327920054084088,
0.9905731198078909,
0.9838325204450648,
1.0018725575620482,
1.007263052421034,
1.0335213929231966,
1.0033503312511205,
1.0174433894759563,
],
"two_heads_no_dicts": [
0.9933763730233168,
0.9986480398559268,
1.0042486164355315,
1.0025568793877726,
1.0032598081704625,
0.9926714183717912,
0.9920385249670881,
1.0020278841030676,
1.0012474150830537,
1.0039289677261019,
1.0022718878661814,
1.003586385624809,
1.003436450009097,
1.003805673887942,
1.001450261102316,
],
"two_heads_mixed": [
0.8781767864616707,
0.9843563603794138,
1.0145197579049248,
0.9835060778675391,
1.0419060462994596,
0.9917393978520056,
0.9091521032773944,
1.0605463095070453,
0.9685381713826684,
0.9866493058823766,
1.00305061187164,
1.0051273128414386,
1.037964258398104,
1.0106663924241408,
1.0274351814133602,
],
},
}
list_of_all = []
for key, value in all_arg_sets.items():
for key2, value2 in value.items():
list_of_all.append(
(value2, (key, key2), np.asarray(all_expected_outputs[key][key2]))
)
return list_of_all
def dict_to_yaml_str(data, indent=0):
yaml_str = ""
for key, value in data.items():
yaml_str += " " * indent + str(key) + ":"
if isinstance(value, dict):
yaml_str += "\n" + dict_to_yaml_str(value, indent + 2)
else:
yaml_str += " " + str(value) + "\n"
return yaml_str
_trial_yamls_and_and_expected = trial_yamls_and_and_expected()
@pytest.mark.parametrize(
"yaml_contents, name, expected_value", _trial_yamls_and_and_expected
)
def test_key_specification_methods(tmp_path, yaml_contents, name, expected_value):
fitting_configs = configs_numbered_keys()
ase.io.write(tmp_path / "fit_multihead_dft.xyz", fitting_configs)
ase.io.write(tmp_path / "fit_multihead_mp2.xyz", fitting_configs)
ase.io.write(tmp_path / "duplicated_fit_multihead_dft.xyz", fitting_configs)
mace_params = _mace_params.copy()
mace_params["valid_fraction"] = 0.1
mace_params["checkpoints_dir"] = str(tmp_path)
mace_params["model_dir"] = str(tmp_path)
mace_params["train_file"] = "fit_multihead_dft.xyz"
mace_params["E0s"] = "{1:0.0,8:1.0}"
mace_params["valid_file"] = "duplicated_fit_multihead_dft.xyz"
del mace_params["valid_fraction"]
mace_params["max_num_epochs"] = 1 # many tests to do
del mace_params["energy_key"]
del mace_params["forces_key"]
del mace_params["stress_key"]
mace_params["name"] = "MACE_"
filename = tmp_path / "config.yaml"
with open(filename, "w", encoding="utf-8") as file:
file.write(dict_to_yaml_str(yaml_contents))
if len(yaml_contents) > 0:
mace_params["config"] = str(tmp_path / "config.yaml")
run_env = os.environ.copy()
sys.path.insert(0, str(Path(__file__).parent.parent))
run_env["PYTHONPATH"] = ":".join(sys.path)
print("DEBUG subprocess PYTHONPATH", run_env["PYTHONPATH"])
cmd = (
sys.executable
+ " "
+ str(run_train)
+ " "
+ " ".join(
[
(f"--{k}={v}" if v is not None else f"--{k}")
for k, v in mace_params.items()
]
)
)
p = subprocess.run(cmd.split(), env=run_env, cwd=tmp_path, check=True)
assert p.returncode == 0
if "heads" in yaml_contents:
headname = list(yaml_contents["heads"].keys())[0]
else:
headname = "Default"
calc = MACECalculator(
tmp_path / "MACE_.model", device="cpu", default_dtype="float64", head=headname
)
Es = []
for at in fitting_configs:
at.calc = calc
Es.append(at.get_potential_energy())
print(name)
print("Es", Es)
assert np.allclose(
np.asarray(Es), expected_value, rtol=1e-8, atol=1e-8
), f"Expected {expected_value} but got {Es} with error {np.max(np.abs(Es - expected_value))}"
def test_multihead_finetuning_does_not_modify_default_keyspec(tmp_path):
fitting_configs = configs_numbered_keys()
ase.io.write(tmp_path / "fit_multihead_dft.xyz", fitting_configs)
args = build_default_arg_parser().parse_args(
[
"--name",
"_MACE_",
"--train_file",
str(tmp_path / "fit_multihead_dft.xyz"),
"--foundation_model",
"small",
"--device",
"cpu",
"--E0s",
"{1:0.0,8:1.0}",
"--energy_key",
"2_energy",
"--dry_run",
]
)
default_key_spec = KeySpecification.from_defaults()
default_key_spec.info_keys["energy"] = "2_energy"
run_mace_train(args)
assert args.key_specification == default_key_spec
# for creating values
def make_output():
outputs = {}
for yaml_contents, name, expected_value in _trial_yamls_and_and_expected:
if name[0] not in outputs:
outputs[name[0]] = {}
expected = test_key_specification_methods(
Path("."), yaml_contents, name, expected_value, debug_test=False
)
outputs[name[0]][name[1]] = expected
print(outputs)
import tempfile
from unittest.mock import MagicMock
import numpy as np
import pytest
import torch
import torch.nn.functional as F
from e3nn import o3
from mace import data, modules, tools
from mace.tools import scripts_utils, torch_geometric
try:
import schedulefree
except ImportError:
pytest.skip(
"Skipping schedulefree tests due to ImportError", allow_module_level=True
)
torch.set_default_dtype(torch.float64)
table = tools.AtomicNumberTable([6])
atomic_energies = np.array([1.0], dtype=float)
cutoff = 5.0
def create_mace(device: str, seed: int = 1702):
torch_geometric.seed_everything(seed)
model_config = {
"r_max": cutoff,
"num_bessel": 8,
"num_polynomial_cutoff": 6,
"max_ell": 3,
"interaction_cls": modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
"interaction_cls_first": modules.interaction_classes[
"RealAgnosticResidualInteractionBlock"
],
"num_interactions": 2,
"num_elements": 1,
"hidden_irreps": o3.Irreps("8x0e + 8x1o"),
"MLP_irreps": o3.Irreps("16x0e"),
"gate": F.silu,
"atomic_energies": atomic_energies,
"avg_num_neighbors": 8,
"atomic_numbers": table.zs,
"correlation": 3,
"radial_type": "bessel",
}
model = modules.MACE(**model_config)
return model.to(device)
def create_batch(device: str):
from ase import build
size = 2
atoms = build.bulk("C", "diamond", a=3.567, cubic=True)
atoms_list = [atoms.repeat((size, size, size))]
print("Number of atoms", len(atoms_list[0]))
configs = [data.config_from_atoms(atoms) for atoms in atoms_list]
data_loader = torch_geometric.dataloader.DataLoader(
dataset=[
data.AtomicData.from_config(config, z_table=table, cutoff=cutoff)
for config in configs
],
batch_size=1,
shuffle=False,
drop_last=False,
)
batch = next(iter(data_loader))
batch = batch.to(device)
batch = batch.to_dict()
return batch
def do_optimization_step(
model,
optimizer,
device,
):
batch = create_batch(device)
model.train()
optimizer.train()
optimizer.zero_grad()
output = model(batch, training=True, compute_force=False)
loss = output["energy"].mean()
loss.backward()
optimizer.step()
model.eval()
optimizer.eval()
@pytest.mark.parametrize("device", ["cpu", "cuda"])
def test_can_load_checkpoint(device):
model = create_mace(device)
optimizer = schedulefree.adamw_schedulefree.AdamWScheduleFree(model.parameters())
args = MagicMock()
args.optimizer = "schedulefree"
args.scheduler = "ExponentialLR"
args.lr_scheduler_gamma = 0.9
lr_scheduler = scripts_utils.LRScheduler(optimizer, args)
with tempfile.TemporaryDirectory() as d:
checkpoint_handler = tools.CheckpointHandler(
directory=d, keep=False, tag="schedulefree"
)
for _ in range(10):
do_optimization_step(model, optimizer, device)
batch = create_batch(device)
output = model(batch)
energy = output["energy"].detach().cpu().numpy()
state = tools.CheckpointState(
model=model, optimizer=optimizer, lr_scheduler=lr_scheduler
)
checkpoint_handler.save(state, epochs=0, keep_last=False)
checkpoint_handler.load_latest(
state=tools.CheckpointState(model, optimizer, lr_scheduler),
swa=False,
)
batch = create_batch(device)
output = model(batch)
new_energy = output["energy"].detach().cpu().numpy()
assert np.allclose(energy, new_energy, atol=1e-9)
import tempfile
import numpy as np
import torch
import torch.nn.functional
from torch import nn, optim
from mace.tools import (
AtomicNumberTable,
CheckpointHandler,
CheckpointState,
atomic_numbers_to_indices,
)
def test_atomic_number_table():
table = AtomicNumberTable(zs=[1, 8])
array = np.array([8, 8, 1])
indices = atomic_numbers_to_indices(array, z_table=table)
expected = np.array([1, 1, 0], dtype=int)
assert np.allclose(expected, indices)
class MyModel(nn.Module):
def __init__(self):
super().__init__()
self.linear = torch.nn.Linear(3, 4)
def forward(self, x):
return torch.nn.functional.relu(self.linear(x))
def test_save_load():
model = MyModel()
initial_lr = 0.001
optimizer = optim.SGD(model.parameters(), lr=initial_lr, momentum=0.9)
scheduler = optim.lr_scheduler.ExponentialLR(optimizer=optimizer, gamma=0.99)
with tempfile.TemporaryDirectory() as directory:
handler = CheckpointHandler(directory=directory, tag="test", keep=True)
handler.save(state=CheckpointState(model, optimizer, scheduler), epochs=50)
optimizer.step()
scheduler.step()
assert not np.isclose(optimizer.param_groups[0]["lr"], initial_lr)
handler.load_latest(state=CheckpointState(model, optimizer, scheduler))
assert np.isclose(optimizer.param_groups[0]["lr"], initial_lr)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment