# Copyright (c) 2022-2025, Yongchao Wu in Aalto University
# This file is from the mdapy project, released under the BSD 3-Clause License.
from __future__ import annotations
from typing import List, Optional, Tuple, TYPE_CHECKING, Dict, Union
from pathlib import Path
import numpy as np
import polars as pl
from mdapy.calculator import CalculatorMP
from mdapy.build_lattice import build_hea, build_crystal
from mdapy.system import System
import os
if TYPE_CHECKING:
from matplotlib.figure import Figure
from matplotlib.axes import Axes
[docs]
def run_gpumd(
system: System,
dirname: str,
runin: str,
nep_file: str,
gpumd_path: str = "gpumd",
) -> None:
"""Warpper to run GPUMD.
Parameters
----------
system : System
Use this to generate model.xyz.
dirname : str
Run MD in this dirname. If it has been existing, raise error.
runin : str
Use this to generate run.in file. Do not specify potential here, using `nep_file` parameter.
nep_file : str
The path of nep.txt. Defaults to nep.txt.
gpumd_path : str
The path of gpumd. Defaults to gpumd.
"""
if os.path.exists(dirname):
raise FileExistsError(f"{dirname} is existing.")
if not os.path.exists(nep_file):
raise FileNotFoundError(f"{nep_file} is not found.")
os.makedirs(dirname)
system.write_xyz(f"{dirname}/model.xyz")
with open(f"{dirname}/run.in", "w") as op:
op.write(f"potential {os.path.abspath(nep_file)}\n" + runin)
cwd = os.getcwd()
os.chdir(dirname)
try:
os.system(gpumd_path)
finally:
os.chdir(cwd)
[docs]
def rmse(predictions: np.ndarray, targets: np.ndarray) -> float:
"""Compute Root-Mean-Square Error (RMSE).
Parameters
----------
predictions : np.ndarray
Predicted values.
targets : np.ndarray
Target reference values.
Returns
-------
float
RMSE value.
"""
return np.sqrt(((predictions - targets) ** 2).mean())
[docs]
def read_thermo(path: str) -> pl.DataFrame:
"""Load GPUMD thermo.out file into a Polars DataFrame.
The file contains 18 columns:
T, K, U, Pxx, Pyy, Pzz, Pyz, Pxz, Pxy, ax, ay, az, bx, by, bz, cx, cy, cz.
Parameters
----------
path : str
Directory path containing thermo.out.
Returns
-------
pl.DataFrame
Thermo data with 18 columns.
"""
return pl.from_numpy(
np.loadtxt(Path(path, "thermo.out")),
schema="T K U Pxx Pyy Pzz Pyz Pxz Pxy ax ay az bx by bz cx cy cz".split(),
)
[docs]
def plot_nep_train(
path: str,
outname: Optional[str] = None,
figdpi: Optional[int] = 300,
**kargs,
) -> Tuple[Figure, List[List[Axes]]]:
"""Plot NEP training results, including energy/force/stress scatter plots and loss curves.
Parameters
----------
path : str
Path containing NEP output files: loss.out, energy_train.out, force_train.out, stress_train.out.
outname : Optional[str], optional
Filename to save figure, by default None.
figdpi : Optional[int], optional
DPI of generated figure, by default 300.
**kargs :
Extra arguments passed to set_figure().
Returns
-------
Tuple[Figure, List[List[Axes]]]
The figure and 2×2 axes list.
"""
from mdapy.plotset import set_figure, save_figure
fig, axes = set_figure(figsize=(16, 14), figdpi=figdpi, nrow=2, ncol=2, **kargs)
loss = np.loadtxt(Path(path, "loss.out"))
e_train = np.loadtxt(Path(path, "energy_train.out"))
f_train = np.loadtxt(Path(path, "force_train.out"))
s_train = np.loadtxt(Path(path, "stress_train.out"))
# --- Energy ---
x, y = e_train[:, 1], e_train[:, 0]
axes[0][0].plot(x, y, "o", label=f"RMSE={rmse(x, y) * 1000:.1f} meV")
axes[0][0].legend()
axes[0][0].set_xlabel("DFT energy (eV/atom)")
axes[0][0].set_ylabel("NEP energy (eV/atom)")
# --- Force ---
x, y = f_train[:, 3:].flatten(), f_train[:, :3].flatten()
axes[0][1].plot(
x, y, "o", label=f"RMSE={rmse(x, y) * 1000:.1f} meV/" + r"$\mathregular{\AA}$"
)
axes[0][1].legend(handletextpad=0.01, loc="upper left")
axes[0][1].set_xlabel(r"DFT force (eV/$\mathregular{\AA}$)")
axes[0][1].set_ylabel(r"NEP force (eV/$\mathregular{\AA}$)")
# --- Stress ---
x, y = s_train[:, 6:].flatten(), s_train[:, :6].flatten()
axes[1][0].plot(x, y, "o", label=f"RMSE={rmse(x, y):.2f} GPa")
axes[1][0].legend(handletextpad=0.01, loc="upper left")
axes[1][0].set_xlabel("DFT stress (GPa)")
axes[1][0].set_ylabel("NEP stress (GPa)")
# --- Loss ---
for i, j in zip([1, 4, 5, 6], "Total E-train F-train V-train".split()):
axes[1][1].plot(loss[:, 0], loss[:, i], label=j)
axes[1][1].legend()
axes[1][1].set_xlabel("Generation")
axes[1][1].set_ylabel("Loss")
axes[1][1].set_yscale("log")
axes[1][1].set_xscale("log")
# --- diagonal reference lines ---
for i in [0, 1]:
for j in [0, 1]:
if i == 1 and j == 1:
continue
xlim = axes[i][j].get_xlim()
ylim = axes[i][j].get_ylim()
lo = min(xlim[0], ylim[0])
hi = max(xlim[1], ylim[1])
delta = 0.05 * abs(hi - lo)
lo -= delta
hi += delta
lim = [lo, hi]
axes[i][j].plot(lim, lim, "grey")
axes[i][j].set_xlim(lim)
axes[i][j].set_ylim(lim)
if outname is not None:
save_figure(fig, outname)
return fig, axes
[docs]
def get_sfe_fcc(name: str, a: float, calc: CalculatorMP) -> float:
"""Compute stacking fault energy (SFE) for an FCC crystal.
Parameters
----------
name : str
Element name.
a : float
Lattice constant.
calc : CalculatorMP
MDAPY calculator.
Returns
-------
float
Stacking fault energy in mJ/m².
"""
distance = a / 6**0.5
system = build_crystal(
name,
"fcc",
a,
nx=3,
ny=3,
nz=4,
miller1=[1, 1, 2],
miller2=[1, -1, 0],
miller3=[1, 1, -1],
)
calc.results = {}
system.calc = calc
system.box.boundary[2] = 0
e1 = system.get_energy()
LZ = system.data["z"].max() - system.data["z"].min()
system.update_data(
system.data.with_columns(
pl.when(pl.col("z") > LZ / 2)
.then(pl.col("x") + distance)
.otherwise(pl.col("x"))
.alias("x")
),
reset_calcolator=True,
)
system.wrap_pos()
e2 = system.get_energy()
factor = system.box.box[0, 0] * system.box.box[1, 1] / 16021.766200000002
return (e2 - e1) / factor
[docs]
def get_average_sfe_fcc_hea(
N: int,
element_list: List[str],
element_ratio: List[float],
a: float,
calc: CalculatorMP,
) -> np.ndarray:
"""Compute averaged SFE for random FCC HEA configurations.
Parameters
----------
N : int
Number of random samples.
element_list : List[str]
Element species.
element_ratio : List[float]
Element ratios.
a : float
Lattice constant.
calc : CalculatorMP
MD calculator.
Returns
-------
np.ndarray
Array of: [i, mean_sfe up to sample i]
"""
sfe = []
distance = a / 6**0.5
for seed in range(1, N + 1):
system = build_hea(
element_list,
element_ratio,
"fcc",
a,
nx=3,
ny=3,
nz=4,
miller1=[1, 1, 2],
miller2=[1, -1, 0],
miller3=[1, 1, -1],
random_seed=seed,
)
calc.results = {}
system.calc = calc
system.box.boundary[2] = 0
e1 = system.get_energy()
LZ = system.data["z"].max() - system.data["z"].min()
system.update_data(
system.data.with_columns(
pl.when(pl.col("z") > LZ / 2)
.then(pl.col("x") + distance)
.otherwise(pl.col("x"))
.alias("x")
),
reset_calcolator=True,
)
system.wrap_pos()
e2 = system.get_energy()
factor = system.box.box[0, 0] * system.box.box[1, 1] / 16021.766200000002
sfe.append((e2 - e1) / factor)
ave_sfe = []
for i in range(1, len(sfe)):
ave_sfe.append([i, np.mean(sfe[:i])])
return np.array(ave_sfe)
[docs]
def get_eos(
system: System, scale_start: float, scale_end: float, num: int
) -> np.ndarray:
"""Compute equation of state (EOS) by uniformly scaling volume.
Parameters
----------
system : System
Input structure.
scale_start : float
Initial scale factor (>0).
scale_end : float
Final scale factor (> scale_start).
num : int
Number of sampling points.
Returns
-------
np.ndarray
(num, 2) array of [volume_per_atom, energy_per_atom]
"""
assert scale_start < scale_end
assert scale_start > 0
scale_list = np.linspace(scale_start, scale_end, num)
eos = []
for i in scale_list:
cur = System(
box=system.box.box * i,
data=system.data.with_columns(
pl.col("x") * i, pl.col("y") * i, pl.col("z") * i
),
)
cur.calc = system.calc
cur.calc.results = {}
e = cur.get_energy() / cur.N
vol = cur.box.volume / cur.N
eos.append([vol, e])
return np.array(eos)
[docs]
class PCA:
"""Simple PCA implementation (similar to sklearn PCA)."""
def __init__(self, n_components: int):
"""
Parameters
----------
n_components : int
Number of principal components to keep.
"""
self.n_components = n_components
self.explained_variance: Optional[np.ndarray] = None
[docs]
def fps_sample(
n_sample: int,
descriptors: np.ndarray,
start_idx: int = 0,
) -> np.ndarray:
"""This function is used to sample the configurations using farthest point sampling method, based
on the descriptors. It is helpful to select the structures during active learning process.
Parameters
----------
n_sample : int
Number of structures one wants to select.
descriptors : np.ndarray
Two dimensional ndarray, it can be any descriptors.
start_idx : int
For deterministic results, fix the first sampled point index.
Defaults to 0.
Returns
-------
sampled_indices : ndarray, shape (n_sample,)
"""
assert descriptors.ndim == 2, "Only support 2-D ndarray."
n_points = descriptors.shape[0]
assert n_sample <= n_points, f"n_sample must <= {n_points}."
assert n_sample > 0, "n_sample must be a positive number."
assert start_idx >= 0 and start_idx < n_points, (
f"start_idx must belong [0, {n_points - 1}]."
)
sampled_indices = [start_idx]
min_distances = np.full(n_points, np.inf)
farthest_point_idx = start_idx
for _ in range(n_sample - 1):
current_point = descriptors[farthest_point_idx]
dist_to_current_point = np.linalg.norm(descriptors - current_point, axis=1)
min_distances = np.minimum(min_distances, dist_to_current_point)
farthest_point_idx = np.argmax(min_distances)
sampled_indices.append(farthest_point_idx)
return np.array(sampled_indices, np.int32)
[docs]
def cfg2xyz(
file_list: Union[List[str], str],
type_dict: Dict[str, int],
output_name: str = "train.xyz",
f_max: float = 25.0,
) -> None:
"""Convert cfg file for MTP to xyz file for GPUMD, including energy, force and virial.
Parameters
----------
file_list : List[str] or str
Single or multi cfg file.
type_dict : Dict[str, int]
Map type from number to element, such as {'Al':0, 'C':1}.
output_name : str
Output filename with append mode. Defaults to train.xyz.
f_max : float
Force absolute maximum larger than this value will be filtered. Defaults to 25.0 eV/A.
"""
for cfg in file_list:
with open(cfg) as op:
file = op.read()
res = file.split("BEGIN_CFG")[1:]
for f in range(len(res)):
frame_content = res[f].split("\n")
N = int(frame_content[2].strip())
box = []
for i in frame_content[4:7]:
box.extend(i.split())
type_pos_force = [i.split()[1:] for i in frame_content[8 : 8 + N]]
_f_max = np.abs(np.array(type_pos_force)[:, -3:].astype(float)).max()
if _f_max > f_max:
continue
energy = frame_content[8 + N + 1].strip()
vxx, vyy, vzz, vyz, vxz, vxy = frame_content[8 + N + 1 + 2].strip().split()
vyx = vxy
vzx = vxz
vzy = vyz
with open(output_name, "a") as op:
op.write(f"{N}\n")
box_str = (
"Lattice=" + '"' + "{} {} {} {} {} {} {} {} {}".format(*box) + '"'
)
op.write(
f'{box_str} energy={energy} virial="{vxx} {vxy} {vxz} {vyx} {vyy} {vyz} {vzx} {vzy} {vzz}" properties=species:S:1:pos:R:3:force:R:3\n'
)
for i in range(N):
op.write(
"{} {} {} {} {} {} {}\n".format(
type_dict[int(type_pos_force[i][0])], *type_pos_force[i][1:]
)
)
if __name__ == "__main__":
pass