# Copyright (c) 2022-2026, Yongchao Wu in Aalto University
# This file is from the mdapy project, released under the BSD 3-Clause License.
"""
Finite-temperature elastic constants from MD stress-strain
==========================================================
Implements the Aidan Thompson finite-T recipe (LAMMPS examples/ELASTIC/T):
1. (Optional) NPT pre-relax to equilibrium volume V(T) at target pressure.
2. Equilibrate the cell at T (NVT, langevin); average reference stress sigma_0.
3. For each Voigt direction d in {1..6} and each sign s in {-1,+1}:
- Reset the cell from a saved restart.
- change_box delta = s*delta*L0 in direction d (with proper xy/xz/yz coupling).
- NVT (isothermal) or NVE (adiabatic) for n_equil + n_run steps.
- Time-average the pressure tensor -> sigma_pm.
4. C_id = -(sigma_pos[i] - sigma_neg[i]) / (2*delta) for i,d in 1..6.
5. Symmetrize: C_ij = (C_ij + C_ji) / 2.
Each LAMMPS instance is a single subprocess; the 12 deformations within one
temperature run on a `ProcessPoolExecutor` so a serial-Kokkos LAMMPS build
parallelises to N cores. ``MDElastic.scan_parallel`` further runs all
temperatures concurrently (1 NPT + 12 deformations per T, 13*N tasks total).
Driver uses the LAMMPS Python module (``from lammps import lammps``).
``pair_style`` / ``pair_coeff`` are passed by the caller — for serial CPU
runs use ``-k on -sf kk`` only (no ``g 1`` and no ``-pk kokkos`` GPU args).
"""
from __future__ import annotations
import os
import sys
if sys.platform == "darwin":
os.environ.setdefault("OMP_NUM_THREADS", "1")
os.environ.setdefault("KMP_DUPLICATE_LIB_OK", "TRUE")
import tempfile
import multiprocessing as _mp
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple, Union, TYPE_CHECKING
import numpy as np
import polars as pl
from mdapy.system import System
if TYPE_CHECKING:
from matplotlib.figure import Figure
from matplotlib.axes import Axes
try:
from lammps import lammps as _lammps # noqa: F401 (used in subprocess fns)
except ImportError:
raise ImportError(
"lammps Python module is required for MDElastic. "
"Install via conda: `conda install -c conda-forge lammps`, or build "
"LAMMPS with -DBUILD_SHARED_LIBS=ON -DPKG_PYTHON=ON."
)
_EAA_PER_BAR = 1.0e-4 # 1 bar = 1e-4 GPa
# ============================================================
# Result container
# ============================================================
[docs]
class MDElasticResult:
"""Output of one MDElastic.run() call.
Attributes
----------
cij_voigt : (6,6) ndarray
Full 6x6 elastic stiffness tensor in GPa (already symmetrized).
V_eq : float
Equilibrium cell volume at T (Angstrom^3).
T_actual : float
Time-averaged temperature during reference run (K).
stress_ref : (6,) ndarray
Time-averaged reference stress in GPa (should be ~0 if relax_volume).
ensemble : str
"isothermal" (NVT) or "adiabatic" (NVE after equilibration).
temperature : float
Target temperature (K).
"""
def __init__(
self,
cij_voigt: np.ndarray,
V_eq: float,
T_actual: float,
stress_ref: np.ndarray,
ensemble: str,
temperature: float,
) -> None:
self.cij_voigt = np.asarray(cij_voigt, dtype=float)
self.V_eq = float(V_eq)
self.T_actual = float(T_actual)
self.stress_ref = np.asarray(stress_ref, dtype=float)
self.ensemble = str(ensemble)
self.temperature = float(temperature)
[docs]
def cubic_average(self) -> Tuple[float, float, float]:
"""Cubic-symmetry-averaged (C11, C12, C44) in GPa.
SQS / defected cells are not strictly cubic; this is the standard
averaging convention (used by the LAMMPS reference script).
"""
C = self.cij_voigt
C11 = (C[0, 0] + C[1, 1] + C[2, 2]) / 3.0
C12 = (C[0, 1] + C[0, 2] + C[1, 2]) / 3.0
C44 = (C[3, 3] + C[4, 4] + C[5, 5]) / 3.0
return C11, C12, C44
[docs]
def vrh(self) -> dict:
"""Voigt-Reuss-Hill polycrystalline averages computed on the full 6x6."""
from mdapy.elastic import ElasticTensor
return ElasticTensor(self.cij_voigt).vrh()
[docs]
def born_stable_cubic(self) -> bool:
"""Born stability for cubic (uses cubic_average)."""
c11, c12, c44 = self.cubic_average()
return (c11 > abs(c12)) and (c44 > 0.0) and (c11 + 2.0 * c12 > 0.0)
[docs]
def print(self) -> None:
c11, c12, c44 = self.cubic_average()
K, G = (c11 + 2 * c12) / 3.0, (c11 - c12 + 3 * c44) / 5.0
E = 9 * K * G / (3 * K + G) if (3 * K + G) > 0 else 0.0
lines = [
f"MDElastic @ T={self.temperature:.0f} K ({self.ensemble}):",
f" V_eq = {self.V_eq:.3f} A^3",
f" T_actual = {self.T_actual:.1f} K",
f" reference sigma= {np.array2string(self.stress_ref, precision=3)} GPa",
f" C11 (cubic avg)= {c11:.2f} GPa",
f" C12 (cubic avg)= {c12:.2f} GPa",
f" C44 (cubic avg)= {c44:.2f} GPa",
f" K (V) = {K:.2f} GPa",
f" G (V) = {G:.2f} GPa",
f" E = {E:.2f} GPa",
]
print("\n".join(lines), flush=True)
# ============================================================
# Module-level segment functions (subprocess-callable)
# ============================================================
#
# These run inside ProcessPoolExecutor workers. Each takes a plain dict
# of args (picklable) and returns a plain dict of results (picklable).
# A fresh LAMMPS instance is created and torn down per call.
def _spawn_lammps(args: dict):
"""Build a LAMMPS instance with the chosen accelerator backend.
``args['accelerator']`` selects the prefix injected into ``cmdargs``:
- ``"kokkos"`` (default): ``-k on [t N] [g M] -sf kk``. ``N`` from
``args['n_threads']``, ``M`` from ``args['n_gpus']``.
- ``"omp"``: ``-pk omp N -sf omp`` (LAMMPS USER-OMP package). ``N``
from ``args['n_threads']``.
- ``"gpu"``: ``-pk gpu M -sf gpu`` (LAMMPS GPU package). ``M`` from
``args['n_gpus']``.
- ``"none"``: no accelerator prefix; pure serial / MPI LAMMPS.
The pair_style ``-sf X`` makes LAMMPS auto-append ``/X`` to the
pair_style, so ``eam/alloy`` becomes ``eam/alloy/kk`` etc. — no need
to mangle the user's pair_style string.
Stage-level log goes to ``args['stage_log']`` if set, else suppressed.
"""
from lammps import lammps # local import: subprocess-safe
acc = args.get("accelerator", "kokkos")
n_threads = int(args.get("n_threads", 1))
n_gpus = int(args.get("n_gpus", 0))
if n_threads > 1:
os.environ["OMP_NUM_THREADS"] = str(n_threads)
if acc == "kokkos":
kk = []
if n_gpus > 0:
kk += ["g", str(n_gpus)]
if n_threads > 1:
kk += ["t", str(n_threads)]
prefix = ["-k", "on", *kk, "-sf", "kk"]
elif acc == "omp":
prefix = ["-pk", "omp", str(max(n_threads, 1)), "-sf", "omp"]
elif acc == "gpu":
prefix = ["-pk", "gpu", str(max(n_gpus, 1)), "-sf", "gpu"]
elif acc in ("none", None, "serial"):
prefix = []
else:
raise ValueError(
f"unknown accelerator {acc!r}; expected 'kokkos'|'omp'|'gpu'|'none'"
)
user_args = list(args.get("lammps_cmdargs") or [])
cmdargs = prefix + user_args
if "-screen" not in cmdargs:
cmdargs += ["-screen", "none"]
if "-log" not in cmdargs:
cmdargs += ["-log", args.get("stage_log") or "none"]
return lammps(cmdargs=cmdargs)
def _apply_thermostat(lmp, T, thermostat, tdamp, seed):
"""Add fix integ (+ optional langevin) for the equilibration phase."""
if thermostat == "nose-hoover":
lmp.command(f"fix integ all nvt temp {T} {T} {tdamp}")
else: # langevin
lmp.command("fix integ all nve")
lmp.command(f"fix thermo all langevin {T} {T} {tdamp} {seed}")
def _switch_to_nve(lmp, thermostat):
"""Adiabatic averaging: drop the thermostat, keep an integrator."""
if thermostat == "nose-hoover":
lmp.command("unfix integ")
lmp.command("fix integ all nve")
else: # langevin
lmp.command("unfix thermo")
def _stop_thermostat(lmp, thermostat, ensemble):
"""Remove integ + thermo fixes (call before clear/restart)."""
lmp.command("unfix integ")
if thermostat == "langevin" and ensemble != "adiabatic":
lmp.command("unfix thermo")
def _read_avp(lmp) -> np.ndarray:
"""Read fix avp -> 6-vector pressure in bar in elastic Voigt order
[pxx, pyy, pzz, pyz, pxz, pxy]."""
comps = np.array(
[lmp.extract_fix("avp", 0, 1, i, 0) for i in range(6)], dtype=float
)
return np.array(
[comps[0], comps[1], comps[2], comps[5], comps[4], comps[3]], dtype=float
)
def _run_reference_segment(args: dict) -> dict:
"""Single-T reference: NPT pre-relax (optional) + reference NVT/NVE.
Saves a restart file at args['restart_path']. If args['npt_log'] is
provided, the NPT phase writes step / temp / press / lx ly lz / vol /
pe ke etotal / pxx-pyz to that log file (every 100 steps).
"""
lmp = _spawn_lammps(args)
try:
lmp.commands_string(f"""
units metal
dimension 3
boundary p p p
atom_modify map array
read_data {args['data_path']}
change_box all triclinic
""")
lmp.command(f"pair_style {args['pair_style']}")
lmp.command(f"pair_coeff {args['pair_coeff']}")
lmp.command(f"timestep {args['timestep']}")
lmp.command(f"velocity all create {args['T']} {args['seed']} loop geom")
# Configure thermo style (used for the NPT log + console output).
lmp.commands_string("""
thermo_style custom step temp press lx ly lz vol pe ke etotal pxx pyy pzz pxy pxz pyz
thermo_modify norm no flush yes
thermo 100
""")
T = args["T"]
# Stage 1a: NH NPT relax. The first half equilibrates the
# barostat; the second half time-averages box dimensions
# (lx, ly, lz). The averaged values feed V_eq and len0, giving a
# noise-free V(T) without rescaling the cell. The NVT averaging
# stage runs at the NPT-end size (slightly noisy), but the
# deformation Cij is differential so the cell-size mismatch
# cancels in (sigma_pos - sigma_neg)/(2δ).
len_avg = None
if args["relax_volume"]:
lmp.command(
f"fix npt_relax all npt temp {T} {T} {args['tdamp']} "
f"{args['pressure_coupling']} "
f"{args['P']} {args['P']} {args['pdamp']}"
)
n_eq = max(args["n_relax"] // 2, args["nfreq"])
n_avg = max(args["n_relax"] - n_eq, args["nfreq"])
lmp.command(f"run {n_eq}")
lmp.commands_string("""
variable rlx equal lx
variable rly equal ly
variable rlz equal lz
""")
lmp.command(
f"fix r_avg all ave/time {args['nevery']} {args['nrepeat']} "
f"{args['nfreq']} v_rlx v_rly v_rlz mode scalar ave running"
)
lmp.command(f"run {n_avg}")
len_avg = tuple(
float(lmp.extract_fix("r_avg", 0, 1, i, 0)) for i in range(3)
)
lmp.command("unfix r_avg")
lmp.command("unfix npt_relax")
# Stage 1b: reference NVT equilibration + stress averaging.
_apply_thermostat(lmp, T, args["thermostat"], args["tdamp"], args["seed"])
lmp.command(f"run {args['n_equil']}")
if args["ensemble"] == "adiabatic":
_switch_to_nve(lmp, args["thermostat"])
# `ave running` accumulates samples across the whole n_run window,
# so the final extracted value is the time-average over the entire
# averaging phase (not just the last nfreq steps).
lmp.command(
f"fix avp all ave/time {args['nevery']} {args['nrepeat']} "
f"{args['nfreq']} c_thermo_press mode vector ave running"
)
lmp.command(
f"fix avT all ave/time {args['nevery']} {args['nrepeat']} "
f"{args['nfreq']} c_thermo_temp ave running"
)
lmp.command(f"run {args['n_run']}")
sigma_ref = _read_avp(lmp)
T_actual = float(lmp.extract_fix("avT", 0, 0, 0, 0))
if len_avg is not None:
# Use box dims time-averaged over the NPT second half. The
# actual NVT cell is at the noisier NPT-end size, but
# deformation Cij is differential so this drops out. V_eq
# and len0 reported here come from the clean averages.
lx0, ly0, lz0 = len_avg
V_eq = lx0 * ly0 * lz0
else:
V_eq = float(lmp.get_thermo("vol"))
lx0 = float(lmp.get_thermo("lx"))
ly0 = float(lmp.get_thermo("ly"))
lz0 = float(lmp.get_thermo("lz"))
lmp.command("unfix avp")
lmp.command("unfix avT")
_stop_thermostat(lmp, args["thermostat"], args["ensemble"])
lmp.command(f"write_restart {args['restart_path']}")
finally:
lmp.close()
return dict(
sigma_ref_bar=sigma_ref.tolist(),
T_actual=T_actual,
V_eq=V_eq,
len0=(lx0, ly0, lz0),
restart_path=args["restart_path"],
T=args["T"],
)
def _run_deform_segment(args: dict) -> dict:
"""One deformation segment (single direction, single sign)."""
lmp = _spawn_lammps(args)
try:
lmp.commands_string("""
units metal
dimension 3
boundary p p p
atom_modify map array
""")
lmp.command(f"read_restart {args['restart_path']}")
lmp.command(f"pair_style {args['pair_style']}")
lmp.command(f"pair_coeff {args['pair_coeff']}")
lmp.command(f"timestep {args['timestep']}")
lmp.commands_string("""
thermo_style custom step temp press lx ly lz vol pe ke etotal pxx pyy pzz pxy pxz pyz
thermo_modify norm no flush yes
thermo 100
""")
d = args["direction"]
signed_delta = args["signed_delta"]
len0 = args["len0"]
# Reference length per direction (matches LAMMPS displace.mod):
# dir 1: lx0; dir 2: ly0; dirs 3, 4, 5: lz0; dir 6: ly0.
ref_len = {
1: len0[0],
2: len0[1],
3: len0[2],
4: len0[2],
5: len0[2],
6: len0[1],
}[d]
d_abs = signed_delta * ref_len
xy = float(lmp.get_thermo("xy"))
xz = float(lmp.get_thermo("xz"))
yz = float(lmp.get_thermo("yz"))
dxy = signed_delta * xy
dxz = signed_delta * xz
dyz = signed_delta * yz
if d == 1:
lmp.command(
f"change_box all x delta 0 {d_abs} "
f"xy delta {dxy} xz delta {dxz} remap units box"
)
elif d == 2:
lmp.command(
f"change_box all y delta 0 {d_abs} yz delta {dyz} remap units box"
)
elif d == 3:
lmp.command(f"change_box all z delta 0 {d_abs} remap units box")
elif d == 4:
lmp.command(f"change_box all yz delta {d_abs} remap units box")
elif d == 5:
lmp.command(f"change_box all xz delta {d_abs} remap units box")
elif d == 6:
lmp.command(f"change_box all xy delta {d_abs} remap units box")
else:
raise ValueError(f"bad direction {d}")
T = args["T"]
_apply_thermostat(lmp, T, args["thermostat"], args["tdamp"], args["seed"])
lmp.command(f"run {args['n_equil']}")
if args["ensemble"] == "adiabatic":
_switch_to_nve(lmp, args["thermostat"])
lmp.command(
f"fix avp all ave/time {args['nevery']} {args['nrepeat']} "
f"{args['nfreq']} c_thermo_press mode vector"
)
lmp.command(f"run {args['n_run']}")
sigma = _read_avp(lmp)
finally:
lmp.close()
return dict(
direction=int(d),
sign=int(np.sign(signed_delta)),
sigma_bar=sigma.tolist(),
)
# ============================================================
# Main class
# ============================================================
[docs]
class MDElastic:
"""
Finite-T elastic constants via stress-strain MD (LAMMPS).
Parameters
----------
system : System
mdapy System with element labels; will be written to a LAMMPS data
file internally.
pair_style : str
LAMMPS ``pair_style`` argument, e.g. ``"eam/alloy"``, ``"nep"``,
``"nep/kk"``, ``"meam"``.
pair_coeff : str
Full ``pair_coeff`` line *after* ``pair_coeff``, e.g.
``"* * NiCoCr.lammps.eam Ni Co Cr"``. Element order here must match
``elements`` below.
elements : list of str
Element symbol order used to map ``system.data["element"]`` to LAMMPS
atom types (1..N). Must match the species list in ``pair_coeff``.
temperature : float
Target temperature (K).
pressure : float, optional
Target pressure for the optional NPT relax stage (bar). Default 0.
pressure_coupling : {"iso", "aniso"}, optional
``fix npt`` coupling. ``iso`` keeps the cell shape isotropic (rescales
all three lattice vectors together — appropriate for nominally cubic
SQS / chemically-disordered cells). ``aniso`` lets each axis relax
independently (use for non-cubic structures). Default ``iso``.
delta : float, optional
Voigt strain magnitude. LAMMPS reference uses 2e-2; 5e-3..2e-2 is
the useful range. Default 0.01.
timestep : float, optional
MD timestep (ps). Default 0.001.
n_equil : int, optional
Equilibration steps per stage. Default 10000.
n_run : int, optional
Averaging steps per stage. Default 5000.
nevery, nrepeat : int, optional
Parameters of LAMMPS ``fix ave/time``. The averaging window is
``nevery * nrepeat``. Defaults 10 and 10.
ensemble : {"isothermal", "adiabatic"}, optional
Integration during the *averaging* run. Default "isothermal".
thermostat : {"nose-hoover", "langevin"}, optional
Default "nose-hoover" (Nose-Hoover NVT, lower stress noise).
thermostat_damp : float, optional
Thermostat tau in ps. Default ``100 * timestep``.
seed : int, optional
RNG seed. Default 87287.
relax_volume : bool, optional
Run NPT pre-relax. Default True.
n_relax : int, optional
NPT pre-relax steps. The first half equilibrates the barostat;
the second half time-averages box dimensions (lx, ly, lz). The
averaged values are reported as ``V_eq`` and used as ``len0`` for
the deformation strains, giving a noise-free V(T) without
rescaling the actual cell. Default 50000.
pdamp : float, optional
Barostat tau in ps. Default ``1000 * timestep``.
accelerator : {"kokkos", "omp", "gpu", "none"}, optional
LAMMPS accelerator backend. ``kokkos`` (default) injects
``-k on [t N] [g M] -sf kk`` and uses ``pair_style/kk`` variants.
``omp`` uses the USER-OMP package: ``-pk omp N -sf omp``.
``gpu`` uses the GPU package: ``-pk gpu M -sf gpu``. ``none``
runs serial / MPI LAMMPS without any per-process accelerator.
Pair-style suffix is added automatically by ``-sf X`` so the
user's ``pair_style`` string need not be modified.
n_workers : int, optional
Number of *deformation* worker processes. The 12 deformations are
scheduled across this pool. Default ``min(12, os.cpu_count())``.
Set to 1 to run sequentially in the main process.
n_threads_ref, n_threads_def : int, optional
OpenMP threads per LAMMPS process for the reference / deformation
phases. Used by ``accelerator="kokkos"`` (as ``-k on t N``) and
``accelerator="omp"`` (as ``-pk omp N``). Default 1.
n_gpus_ref, n_gpus_def : int, optional
GPUs per LAMMPS process. Used by ``accelerator="kokkos"`` (as
``-k on g M``) and ``accelerator="gpu"`` (as ``-pk gpu M``).
Default 0.
log_dir : str or Path, optional
Per-stage log files written here. The reference (NPT + ref NVT)
log is ``log_dir/ref_T{T}.log`` (one per temperature); each
deformation gets ``log_dir/def_T{T}_d{dir}_{sign}.log``. Each log
carries thermo every 100 steps with columns
``step temp press lx ly lz vol pe ke etotal pxx pyy pzz pxy pxz pyz``.
lammps_cmdargs : list of str, optional
Extra args appended to ``lammps(cmdargs=...)``. The accelerator
prefix is injected automatically based on ``accelerator`` /
``n_threads_*`` / ``n_gpus_*``.
work_dir : str, optional
Directory for data + restart files. Default a tmpdir cleaned up
on completion.
quiet : bool
Suppress mdapy progress prints.
"""
_STRAIN_DIRS = (1, 2, 3, 4, 5, 6)
def __init__(
self,
system: System,
pair_style: str,
pair_coeff: str,
elements: Sequence[str],
temperature: float,
pressure: float = 0.0,
pressure_coupling: str = "iso",
delta: float = 0.01,
timestep: float = 1e-3,
n_equil: int = 10000,
n_run: int = 5000,
nevery: int = 10,
nrepeat: int = 10,
ensemble: str = "isothermal",
thermostat: str = "nose-hoover",
thermostat_damp: Optional[float] = None,
seed: int = 87287,
relax_volume: bool = True,
n_relax: int = 50000,
pdamp: Optional[float] = None,
accelerator: str = "kokkos",
n_workers: Optional[int] = None,
n_threads_ref: int = 1,
n_threads_def: int = 1,
n_gpus_ref: int = 0,
n_gpus_def: int = 0,
log_dir: Optional[Union[str, os.PathLike]] = None,
lammps_cmdargs: Optional[List[str]] = None,
work_dir: Optional[str] = None,
quiet: bool = False,
) -> None:
assert (
"element" in system.data.columns
), "system must contain element information."
if ensemble not in ("isothermal", "adiabatic"):
raise ValueError(
f"ensemble must be 'isothermal' or 'adiabatic', got {ensemble!r}"
)
if thermostat not in ("nose-hoover", "langevin"):
raise ValueError(
f"thermostat must be 'nose-hoover' or 'langevin', got {thermostat!r}"
)
if pressure_coupling not in ("iso", "aniso"):
raise ValueError(
f"pressure_coupling must be 'iso' or 'aniso', got {pressure_coupling!r}"
)
if accelerator not in ("kokkos", "omp", "gpu", "none"):
raise ValueError(
f"accelerator must be 'kokkos'|'omp'|'gpu'|'none', got {accelerator!r}"
)
self.system = system
self.pair_style = pair_style
self.pair_coeff = pair_coeff
self.elements = list(elements)
self.temperature = float(temperature)
self.pressure = float(pressure)
self.pressure_coupling = pressure_coupling
self.delta = float(delta)
self.timestep = float(timestep)
self.n_equil = int(n_equil)
self.n_run = int(n_run)
self.nevery = int(nevery)
self.nrepeat = int(nrepeat)
self.nfreq = self.nevery * self.nrepeat
if self.n_run < self.nfreq:
raise ValueError(
f"n_run ({self.n_run}) must be >= nevery*nrepeat ({self.nfreq})."
)
self.ensemble = ensemble
self.thermostat = thermostat
self.thermostat_damp = (
float(thermostat_damp)
if thermostat_damp is not None
else 100.0 * self.timestep
)
self.seed = int(seed)
self.relax_volume = bool(relax_volume)
self.n_relax = int(n_relax)
self.pdamp = float(pdamp) if pdamp is not None else 1000.0 * self.timestep
self.accelerator = accelerator
self.n_workers = (
int(n_workers) if n_workers is not None else min(12, os.cpu_count() or 1)
)
self.n_threads_ref = int(n_threads_ref)
self.n_threads_def = int(n_threads_def)
self.n_gpus_ref = int(n_gpus_ref)
self.n_gpus_def = int(n_gpus_def)
self.log_dir = Path(log_dir) if log_dir is not None else None
if self.log_dir is not None:
self.log_dir.mkdir(parents=True, exist_ok=True)
self.lammps_cmdargs = list(lammps_cmdargs) if lammps_cmdargs else []
self.quiet = bool(quiet)
self._user_work_dir = work_dir
self._owns_work_dir = work_dir is None
# ----------------------------------------------------------
# Public API
# ----------------------------------------------------------
[docs]
def run(self) -> MDElasticResult:
"""Execute the protocol for a single temperature.
Reference NPT+NVT runs in the main process; the 12 deformations
run on a ``ProcessPoolExecutor`` of size ``min(n_workers, 12)``.
"""
if self._owns_work_dir:
self._work_dir = Path(tempfile.mkdtemp(prefix="md_elastic_"))
else:
self._work_dir = Path(self._user_work_dir)
self._work_dir.mkdir(parents=True, exist_ok=True)
try:
self._write_initial_data()
return self._execute_single()
finally:
if self._owns_work_dir and self._work_dir.exists():
import shutil
shutil.rmtree(self._work_dir, ignore_errors=True)
[docs]
@staticmethod
def scan_parallel(
system: System,
temperatures: Sequence[float],
work_dir: Union[str, os.PathLike],
n_workers_ref: Optional[int] = None,
n_workers_def: Optional[int] = None,
n_workers: Optional[int] = None, # alias: same for both phases
log_dir: Optional[Union[str, os.PathLike]] = None,
**kwargs,
) -> pl.DataFrame:
"""Run MDElastic at multiple temperatures with two-phase parallelism.
Phase 1: each temperature's reference (NPT + NVT) runs in its own
process; up to ``n_workers_ref`` simultaneously. Each process can
use ``n_threads_ref`` OpenMP threads (passed via kwargs).
Phase 2: each (T, direction, sign) deformation runs in its own
process; up to ``n_workers_def`` simultaneously, each typically
single-threaded (``n_threads_def`` via kwargs).
Typical 16-core CPU + 4 temperatures::
scan_parallel(
..., n_workers_ref=4, n_threads_ref=4, # 4*4 = 16 cores
n_workers_def=12, n_threads_def=1, # 12 procs single-thread
)
Parameters
----------
system, temperatures : as named.
work_dir : str or Path
Persistent directory for LAMMPS data + per-T restart files.
n_workers_ref : int, optional
Number of reference processes. Default ``len(temperatures)``.
n_workers_def : int, optional
Number of deformation processes. Default
``min(12 * len(temperatures), os.cpu_count())``.
n_workers : int, optional
Convenience alias: sets both ``n_workers_ref`` and
``n_workers_def``. Ignored if either is set explicitly.
log_dir : str or Path, optional
Directory for per-stage LAMMPS log files. Each T's stage 1
(NPT + ref NVT + averaging) goes to ``log_dir/ref_T{T}.log``;
each deformation goes to ``log_dir/def_T{T}_d{dir}_{sign}.log``.
Useful for checking convergence + debugging.
**kwargs : forwarded to MDElastic.__init__ for each T (including
``n_threads_ref`` / ``n_threads_def`` / ``thermostat`` / etc.).
Returns
-------
pl.DataFrame with one row per T, columns:
T, V_eq, T_actual, C11, C12, C44 (cubic averages),
K_VRH, G_VRH, E_VRH, nu_VRH, stable,
stress_ref_max, c{ij} (full 6x6 entries).
"""
work_dir = Path(work_dir)
work_dir.mkdir(parents=True, exist_ok=True)
if log_dir is not None:
log_dir = Path(log_dir)
log_dir.mkdir(parents=True, exist_ok=True)
if n_workers_ref is None:
n_workers_ref = n_workers if n_workers is not None else len(temperatures)
if n_workers_def is None:
n_workers_def = (
n_workers
if n_workers is not None
else min(12 * len(temperatures), os.cpu_count() or 1)
)
# Build a per-T MDElastic stub (gives us config dict construction);
# we don't call .run() on it because we use the segment functions
# directly across the shared pool.
stubs = []
data_paths = []
for i, T in enumerate(temperatures):
sub_dir = work_dir / f"T_{int(T)}"
sub_dir.mkdir(exist_ok=True)
data_path = sub_dir / "init.data"
system.write_data(str(data_path), element_list=kwargs["elements"])
data_paths.append(data_path)
# Drop kwargs that we set explicitly so the user can still pass
# `quiet=...` via kwargs without TypeError.
kw = {
k: v
for k, v in kwargs.items()
if k not in ("temperature", "log_dir", "work_dir")
}
stub = MDElastic(
system,
temperature=float(T),
log_dir=log_dir,
work_dir=str(sub_dir),
**kw,
)
# Manually fix up data path so segment functions see it.
stub._data_path = data_path
stub._work_dir = sub_dir
stubs.append(stub)
ref_tasks = [s._build_reference_args() for s in stubs]
quiet = kwargs.get("quiet", False)
n_threads_ref = kwargs.get("n_threads_ref", 1)
n_threads_def = kwargs.get("n_threads_def", 1)
if not quiet:
print(
f"[MDElastic.scan_parallel] phase 1: {len(ref_tasks)} ref runs "
f"on {n_workers_ref} workers x {n_threads_ref} threads ...",
flush=True,
)
ctx = _mp.get_context("spawn")
with ProcessPoolExecutor(max_workers=n_workers_ref, mp_context=ctx) as ex:
ref_results = list(ex.map(_run_reference_segment, ref_tasks))
# Sort ref_results by temperature in case of out-of-order completion.
ref_results.sort(key=lambda r: r["T"])
deform_tasks = []
for i, ref in enumerate(ref_results):
for d in (1, 2, 3, 4, 5, 6):
for sign in (+1, -1):
deform_tasks.append(stubs[i]._build_deform_args(d, sign, ref))
if not quiet:
print(
f"[MDElastic.scan_parallel] phase 2: {len(deform_tasks)} deformation "
f"runs on {n_workers_def} workers x {n_threads_def} threads ...",
flush=True,
)
with ProcessPoolExecutor(max_workers=n_workers_def, mp_context=ctx) as ex:
def_results = list(ex.map(_run_deform_segment, deform_tasks))
# Aggregate per-T C_ij.
rows = []
per_T_def = {} # T -> list of deform-result dicts
# Group deform_tasks by T (preserved by order: 12 per T in the order built).
for i, T in enumerate(temperatures):
chunk = def_results[i * 12 : (i + 1) * 12]
per_T_def[float(T)] = chunk
for i, T in enumerate(temperatures):
ref = ref_results[i]
sigma_pos = np.zeros((6, 6))
sigma_neg = np.zeros((6, 6))
for r in per_T_def[float(T)]:
d = r["direction"] - 1
sb = np.array(r["sigma_bar"])
if r["sign"] > 0:
sigma_pos[d] = sb
else:
sigma_neg[d] = sb
cij = np.zeros((6, 6))
delta = stubs[i].delta
for d in range(6):
for k in range(6):
cij[k, d] = (
-(sigma_pos[d, k] - sigma_neg[d, k])
/ (2.0 * delta)
* _EAA_PER_BAR
)
cij = 0.5 * (cij + cij.T)
res = MDElasticResult(
cij_voigt=cij,
V_eq=ref["V_eq"],
T_actual=ref["T_actual"],
stress_ref=np.array(ref["sigma_ref_bar"]) * _EAA_PER_BAR,
ensemble=stubs[i].ensemble,
temperature=float(T),
)
c11, c12, c44 = res.cubic_average()
v = res.vrh()
row = {
"T": float(T),
"V_eq": res.V_eq,
"T_actual": res.T_actual,
"C11": c11,
"C12": c12,
"C44": c44,
"K_VRH": v["K_H"],
"G_VRH": v["G_H"],
"E_VRH": v["E"],
"nu_VRH": v["nu"],
"stable": res.born_stable_cubic(),
"stress_ref_max": float(np.abs(res.stress_ref).max()),
}
for ii in range(6):
for jj in range(6):
row[f"c{ii + 1}{jj + 1}"] = res.cij_voigt[ii, jj]
rows.append(row)
return pl.DataFrame(rows)
# ----------------------------------------------------------
# Lattice constant + thermal expansion helpers
# ----------------------------------------------------------
[docs]
@staticmethod
def thermal_expansion(
df: pl.DataFrame,
n_unit_cells: int,
) -> pl.DataFrame:
"""Compute lattice constant and thermal expansion from a scan_parallel
result DataFrame.
Parameters
----------
df : pl.DataFrame
Output of ``MDElastic.scan_parallel``. Must contain ``T`` and
``V_eq`` columns.
n_unit_cells : int
Number of *conventional* unit cells in the simulation cell. For a
3x3x3-of-conventional-FCC SQS (108 atoms with 4 atoms/unit cell),
pass ``n_unit_cells=27``.
Returns
-------
pl.DataFrame
Original columns plus ``a`` (lattice constant in Å, equal to
``(V / n_unit_cells)**(1/3)``), ``alpha_V`` (volumetric thermal
expansion in 1/K, centred finite-difference of ``V(T)``) and
``alpha_L`` (linear thermal expansion, ``alpha_V / 3`` under the
cubic-symmetry assumption).
"""
assert "T" in df.columns and "V_eq" in df.columns, (
"thermal_expansion() needs T and V_eq columns "
"(use MDElastic.scan_parallel output)."
)
out = df.sort("T")
T = out["T"].to_numpy()
V = out["V_eq"].to_numpy()
n = len(T)
alpha_V = np.zeros(n)
for i in range(n):
if n == 1:
break
if i == 0:
alpha_V[i] = (V[1] - V[0]) / (T[1] - T[0]) / V[0]
elif i == n - 1:
alpha_V[i] = (V[i] - V[i - 1]) / (T[i] - T[i - 1]) / V[i]
else:
alpha_V[i] = (V[i + 1] - V[i - 1]) / (T[i + 1] - T[i - 1]) / V[i]
a = (V / float(n_unit_cells)) ** (1.0 / 3.0)
return out.with_columns(
pl.Series("a", a),
pl.Series("alpha_V", alpha_V),
pl.Series("alpha_L", alpha_V / 3.0),
)
[docs]
@staticmethod
def parse_log(
log_path: Union[str, os.PathLike],
skip_fraction: float = 0.5,
) -> dict:
"""Parse a per-stage LAMMPS log written by MDElastic and return
time-averages of the thermo columns over the *equilibrated* portion.
Reads the ``thermo_style custom step temp press lx ly lz vol pe ke
etotal pxx pyy pzz pxy pxz pyz`` block emitted by stage 1 / stage 2
runs. The first ``skip_fraction`` of rows is discarded as
equilibration; the rest is averaged.
Parameters
----------
log_path : path to ``ref_T{T}.log`` or similar.
skip_fraction : float in [0, 1), default 0.5.
Returns
-------
dict with keys ``T_avg``, ``press_avg``, ``a_avg`` (Å, from lx ly lz),
``V_avg`` (Å^3), ``n_samples``, plus ``stress_voigt`` 6-vector
``[pxx, pyy, pzz, pyz, pxz, pxy]`` in bar.
"""
cols = (
"step",
"temp",
"press",
"lx",
"ly",
"lz",
"vol",
"pe",
"ke",
"etotal",
"pxx",
"pyy",
"pzz",
"pxy",
"pxz",
"pyz",
)
n_cols = len(cols)
rows: List[List[float]] = []
in_block = False
with open(log_path) as f:
for line in f:
stripped = line.strip()
if stripped.startswith("Step"):
in_block = True
continue
if in_block:
toks = stripped.split()
if len(toks) == n_cols:
try:
rows.append([float(t) for t in toks])
except ValueError:
in_block = False
else:
in_block = False
if not rows:
raise ValueError(f"no thermo data parsed from {log_path}")
arr = np.asarray(rows)
n_skip = int(skip_fraction * len(arr))
eq = arr[n_skip:]
i = {name: idx for idx, name in enumerate(cols)}
return dict(
n_samples=len(eq),
T_avg=float(eq[:, i["temp"]].mean()),
press_avg=float(eq[:, i["press"]].mean()),
V_avg=float(eq[:, i["vol"]].mean()),
a_avg=float(
(
(
eq[:, i["lx"]].mean()
* eq[:, i["ly"]].mean()
* eq[:, i["lz"]].mean()
)
** (1.0 / 3.0)
)
),
stress_voigt=np.array(
[
eq[:, i["pxx"]].mean(),
eq[:, i["pyy"]].mean(),
eq[:, i["pzz"]].mean(),
eq[:, i["pyz"]].mean(),
eq[:, i["pxz"]].mean(),
eq[:, i["pxy"]].mean(),
]
),
)
# ----------------------------------------------------------
# Internals
# ----------------------------------------------------------
def _write_initial_data(self) -> None:
self._data_path = self._work_dir / "init.data"
self.system.write_data(str(self._data_path), element_list=self.elements)
def _build_reference_args(self) -> dict:
ref_log = None
if self.log_dir is not None:
ref_log = str(self.log_dir / f"ref_T{int(self.temperature)}.log")
return dict(
lammps_cmdargs=self.lammps_cmdargs,
stage_log=ref_log,
accelerator=self.accelerator,
n_threads=self.n_threads_ref,
n_gpus=self.n_gpus_ref,
data_path=str(self._data_path),
pair_style=self.pair_style,
pair_coeff=self.pair_coeff,
T=self.temperature,
P=self.pressure,
pressure_coupling=self.pressure_coupling,
timestep=self.timestep,
n_relax=self.n_relax,
n_equil=self.n_equil,
n_run=self.n_run,
nevery=self.nevery,
nrepeat=self.nrepeat,
nfreq=self.nfreq,
ensemble=self.ensemble,
thermostat=self.thermostat,
tdamp=self.thermostat_damp,
pdamp=self.pdamp,
seed=self.seed,
relax_volume=self.relax_volume,
restart_path=str(self._work_dir / "ref.restart"),
)
def _build_deform_args(self, direction: int, sign: int, ref: dict) -> dict:
def_log = None
if self.log_dir is not None:
sgn = "+" if sign > 0 else "-"
def_log = str(
self.log_dir
/ f"def_T{int(self.temperature)}_d{int(direction)}_{sgn}.log"
)
return dict(
lammps_cmdargs=self.lammps_cmdargs,
stage_log=def_log,
accelerator=self.accelerator,
n_threads=self.n_threads_def,
n_gpus=self.n_gpus_def,
restart_path=ref["restart_path"],
pair_style=self.pair_style,
pair_coeff=self.pair_coeff,
T=self.temperature,
timestep=self.timestep,
direction=int(direction),
signed_delta=sign * self.delta,
len0=ref["len0"],
n_equil=self.n_equil,
n_run=self.n_run,
nevery=self.nevery,
nrepeat=self.nrepeat,
nfreq=self.nfreq,
ensemble=self.ensemble,
thermostat=self.thermostat,
tdamp=self.thermostat_damp,
seed=self.seed,
)
def _execute_single(self) -> MDElasticResult:
# Phase 1: reference NPT+NVT (this process).
if not self.quiet:
print(
f"[MDElastic] T={self.temperature} K reference "
f"({self.thermostat}, ensemble={self.ensemble}) ...",
flush=True,
)
ref = _run_reference_segment(self._build_reference_args())
# Phase 2: 12 deformations across n_workers processes.
deform_args = [
self._build_deform_args(d, sign, ref)
for d in self._STRAIN_DIRS
for sign in (+1, -1)
]
n_w = min(self.n_workers, len(deform_args))
if not self.quiet:
print(
f"[MDElastic] T={self.temperature} K: "
f"{len(deform_args)} deformations on {n_w} workers ...",
flush=True,
)
if n_w > 1:
ctx = _mp.get_context("spawn")
with ProcessPoolExecutor(max_workers=n_w, mp_context=ctx) as ex:
results = list(ex.map(_run_deform_segment, deform_args))
else:
results = [_run_deform_segment(a) for a in deform_args]
# Aggregate.
sigma_pos = np.zeros((6, 6))
sigma_neg = np.zeros((6, 6))
for r in results:
d = r["direction"] - 1
sb = np.array(r["sigma_bar"])
if r["sign"] > 0:
sigma_pos[d] = sb
else:
sigma_neg[d] = sb
cij = np.zeros((6, 6))
for d in range(6):
for i in range(6):
cij[i, d] = (
-(sigma_pos[d, i] - sigma_neg[d, i])
/ (2.0 * self.delta)
* _EAA_PER_BAR
)
cij = 0.5 * (cij + cij.T)
return MDElasticResult(
cij_voigt=cij,
V_eq=ref["V_eq"],
T_actual=ref["T_actual"],
stress_ref=np.array(ref["sigma_ref_bar"]) * _EAA_PER_BAR,
ensemble=self.ensemble,
temperature=self.temperature,
)
if __name__ == "__main__":
from mdapy import SQS, build_hea
hea = build_hea(
("Cr", "Co", "Ni"),
(1 / 3, 1 / 3, 1 / 3),
"fcc",
3.53,
nx=5,
ny=5,
nz=5,
random_seed=1,
)
sqs = SQS(
hea,
cutoffs={2: 4.0, 3: 3.0, 4: 3.0},
max_steps=20000,
n_replicas=4,
seed=1,
T=1.0,
).compute()
sqs.is_sqs()
temperatures = (200.0, 400.0, 600.0, 800.0, 1000.0)
# accelerator + threading combos:
# Kokkos OMP (default) : accelerator="kokkos", n_threads_ref/def=N
# Kokkos GPU : accelerator="kokkos", n_gpus_ref/def=1
# USER-OMP package : accelerator="omp", n_threads_ref/def=N
# GPU package : accelerator="gpu", n_gpus_ref/def=1
# serial / MPI : accelerator="none"
df = MDElastic.scan_parallel(
sqs.system,
temperatures=temperatures,
pair_style="eam/alloy",
pair_coeff="* * tests/input_files/NiCoCr.lammps.eam Cr Co Ni",
elements=["Cr", "Co", "Ni"],
delta=0.01,
timestep=1e-3,
n_relax=30000,
n_equil=30000,
n_run=20000,
relax_volume=True,
pressure_coupling="iso",
thermostat="nose-hoover",
ensemble="isothermal",
accelerator="kokkos",
n_workers_ref=5,
n_threads_ref=1,
n_workers_def=15,
n_threads_def=1,
work_dir="md_elastic_run",
log_dir="md_elastic_run/logs",
)
# nx*ny*nz conventional FCC unit cells in the simulation box.
df = MDElastic.thermal_expansion(df, n_unit_cells=5 * 5 * 5)
print(df)
df.write_csv("md_elastic_run/md_elastic_CrCoNi.csv")