# Copyright (c) 2022-2026, Yongchao Wu in Aalto University
# This file is from the mdapy project, released under the BSD 3-Clause License.
"""
Core System Class for MDAPY
===========================
This module provides the central System class that serves as the main interface
for atomic simulation data and analysis in mdapy. The System class integrates
all analysis modules and provides a unified API for structural analysis,
property calculations, and data manipulation.
Classes
-------
System : Core class representing an atomic system
"""
from mdapy.box import Box
from mdapy.load_save import BuildSystem, SaveSystem
from mdapy.knn import NearestNeighbor
from mdapy.neighbor import Neighbor
from mdapy.voronoi import Voronoi
from mdapy.structure_entropy import StructureEntropy
from mdapy.cluster_analysis import ClusterAnalysis
from mdapy.calculator import CalculatorMP
from mdapy.identify_diamond_structure import IdentifyDiamondStructure
from mdapy.polyhedral_template_matching import PolyhedralTemplateMatching
from mdapy.common_neighbor_analysis import CommonNeighborAnalysis
from mdapy.steinhardt_bond_orientation import SteinhardtBondOrientation
from mdapy.radial_distribution_function import RadialDistributionFunction
from mdapy.structure_factor import StructureFactor
from mdapy.centro_symmetry_parameter import CentroSymmetryParameter
from mdapy.warren_cowley_parameter import WarrenCowleyParameter
from mdapy.identify_fcc_planar_faults import IdentifyFccPlanarFaults
from mdapy.ackland_jones_analysis import AcklandJonesAnalysis
from mdapy.common_neighbor_parameter import CommonNeighborParameter
from mdapy.atomic_temperature import AtomicTemperature
from mdapy.bond_analysis import BondAnalysis
from mdapy.build_bond import build_bond as _build_bond_pairs
from mdapy.angular_distribution_function import AngularDistributionFunction
import mdapy.tool_function as tool
from mdapy.data import atomic_numbers, vdw_radii
from typing import Optional, Dict, TYPE_CHECKING, Union, Iterable, Any, List, Tuple
import numpy as np
import polars as pl
if TYPE_CHECKING:
from ase import Atoms
from ovito.data import DataCollection
[docs]
class System:
"""
Core class representing an atomic system with integrated analysis capabilities.
The System class is the central data structure in mdapy, providing a unified
interface for loading, manipulating, and analyzing atomic configurations. It
integrates various structural analysis methods, property calculators, and
visualization tools.
Parameters
----------
filename : str, optional
Path to an atomic configuration file. Supports various formats including
POSCAR, LAMMPS data/dump, XYZ, mp etc. Format is auto-detected or can
be specified via the `format` parameter.
data : pl.DataFrame, optional
Polars DataFrame containing atomic information. Must include columns
'x', 'y', 'z' for positions. Other common columns include 'type',
'element', 'vx', 'vy', 'vz', 'fx', 'fy', 'fz', etc.
pos : np.ndarray, optional
Array of shape (N, 3) containing atomic positions. Used in conjunction
with `box` parameter.
box : int, float, Iterable[float], np.ndarray, or Box, optional
Simulation box specification. Required when using `data` or `pos` parameters.
See :class:`~mdapy.box.Box` for supported formats.
ase_atom : ase.Atoms, optional
ASE Atoms object to initialize the system from.
ovito_atom : ovito.data.DataCollection, optional
OVITO DataCollection object to initialize the system from.
format : str, optional
File format specification when loading from `filename`.
If None, format is auto-detected.
global_info : dict, optional
Dictionary containing global system properties such as total energy,
virial, stress tensor, timestep, etc. Defaults to empty dict.
Attributes
----------
box : Box
Simulation box object containing cell vectors and boundary conditions.
verlet_list : np.ndarray, optional
Neighbor list array when neighbors are built.
distance_list : np.ndarray, optional
Distance list array when neighbors are built.
neighbor_number : np.ndarray, optional
Number of neighbors for each atom when neighbors are built.
rc : float, optional
Cutoff radius used for neighbor list construction.
Notes
-----
The System class uses lazy evaluation for many analyses - neighbor lists
and other expensive computations are only performed when needed. Results
are cached and reused when possible.
At initialization, exactly one of the following must be provided:
- `filename`
- `data` and `box`
- `pos` and `box`
- `ase_atom`
- `ovito_atom`
Examples
--------
Creating a system from different sources:
.. code-block:: python
from mdapy.system import System
import numpy as np
# Load from file (auto-detect format)
system = System("config.dump")
# Load from file with explicit format
system = System("POSCAR", format="poscar")
# Create from numpy array
pos = np.random.rand(100, 3) * 10
system = System(pos=pos, box=10)
# Create from polars DataFrame
import polars as pl
data = pl.DataFrame(
{
"x": np.random.rand(100),
"y": np.random.rand(100),
"z": np.random.rand(100),
"type": np.ones(100, dtype=int),
}
)
system = System(data=data, box=[10, 10, 10])
Performing structural analysis:
.. code-block:: python
# Identify crystal structures
system.cal_polyhedral_template_matching()
# Calculate radial distribution function
rdf = system.cal_radial_distribution_function(rc=10.0, nbin=200)
# Perform cluster analysis
system.cal_cluster_analysis(rc=3.0)
Accessing and modifying data:
.. code-block:: python
# Get positions
pos = system.data.select("x", "y", "z")
# Add a new column
new_data = system.data.with_columns(energy=np.random.rand(system.N))
system.update_data(new_data)
# Save to file
system.write("output.dump")
See Also
--------
mdapy.box.Box : Simulation box representation
mdapy.calculator.CalculatorMP : Abstract calculator interface
mdapy.load_save.BuildSystem : System construction utilities
mdapy.load_save.SaveSystem : System export utilities
"""
def __init__(
self,
filename: Optional[str] = None,
data: Optional[pl.DataFrame] = None,
pos: Optional[np.ndarray] = None,
box: Union[int, float, Iterable[float], np.ndarray, Box] = None,
ase_atom: Optional["Atoms"] = None,
ovito_atom: Optional["DataCollection"] = None,
format: Optional[str] = None,
global_info: Optional[Dict[str, Any]] = {},
):
"""Initialize a System object from various data sources."""
self.__global_info = {}
if isinstance(filename, str):
self.__data, self.box, self.__global_info = BuildSystem.from_file(
filename, format
)
elif data is not None and box is not None:
self.__data, self.box = BuildSystem.from_data(data, box)
elif pos is not None and box is not None:
self.__data, self.box = BuildSystem.from_array(pos, box)
elif ase_atom is not None:
self.__data, self.box = BuildSystem.from_ase(ase_atom)
elif ovito_atom is not None:
self.__data, self.box, self.__global_info = BuildSystem.from_ovito(
ovito_atom
)
else:
raise RuntimeError(
"One must at least provide filename or [data, box] or [pos, box] or ase_atom or ovito_atom."
)
if not len(self.__global_info):
self.__global_info = global_info
self.__calc: Optional[CalculatorMP] = None
@property
def calc(self) -> Optional[CalculatorMP]:
return self.__calc
@calc.setter
def calc(self, value):
if not isinstance(value, CalculatorMP):
raise TypeError(
f"calc must be CalculatorMP, instead of {type(value).__name__}"
)
value.results = {}
self.__calc = value
@property
def global_info(self) -> Dict[str, Any]:
"""
Obtain global system information.
Returns
-------
dict
Dictionary containing global properties such as total energy,
virial tensor, stress tensor, timestep, temperature, etc.
Keys and values depend on the data source and attached calculator.
"""
return self.__global_info
@property
def data(self) -> pl.DataFrame:
"""
Access the atomic data DataFrame.
Returns
-------
pl.DataFrame
Polars DataFrame containing per-atom properties. Always includes
columns 'x', 'y', 'z' for Cartesian coordinates. May include
additional columns such as 'type', 'element', 'vx', 'vy', 'vz'
(velocities), 'fx', 'fy', 'fz' (forces), and various computed
properties from analysis methods.
Examples
--------
>>> system = System("config.dump")
>>> print(system.data.columns)
['x', 'y', 'z', 'type', 'vx', 'vy', 'vz']
"""
return self.__data
@property
def N(self) -> int:
"""
Get the number of atoms in the system.
Returns
-------
int
Total number of atoms/particles in the system.
Examples
--------
>>> system = System(pos=np.random.rand(100, 3), box=10)
>>> print(system.N)
100
"""
return self.__data.shape[0]
def __repr__(self) -> str:
"""
Return string representation of the System.
Returns
-------
str
Multi-line string containing atom number, box information,
global properties, and a summary of the data DataFrame.
"""
if not self.__global_info:
return f"Atom Number: {self.N}\n{self.box}\nParticle Information:\n{self.__data}"
else:
info = f"Atom Number: {self.N}\n{self.box}\n"
for key in self.__global_info.keys():
info += f"{key}: {self.__global_info[key]}\n"
info += f"Particle Information:\n{self.__data}"
return info
[docs]
def set_element(self, element: Union[str, List[str], Tuple[str], np.ndarray]):
"""
Set element names for atoms in the system.
Parameters
----------
element : str, list of str, tuple of str, or np.ndarray
Element specification:
* **str**: Assigns the same element to all atoms
* **list/tuple/array**: Must have length equal to N, specifying
element for each atom individually
Examples
--------
Assign same element to all atoms:
>>> system.set_element("Cu")
Assign different elements:
>>> elements = ["Cu"] * 50 + ["Al"] * 50
>>> system.set_element(elements)
Notes
-----
This method updates the 'element' column in the data DataFrame.
If an 'element' column exists, it is replaced.
"""
if isinstance(element, str):
df = self.__data.with_columns(pl.lit(element).alias("element"))
else:
assert len(element) == self.data.shape[0], (
"Length of element should be equal to the atom number."
)
if "element" in self.data.columns:
df = self.__data.select(pl.all().exclude("element")).with_columns(
pl.lit(np.array(element)).alias("element")
)
else:
df = self.__data.with_columns(
pl.lit(np.array(element)).alias("element")
)
self.update_data(df, True)
[docs]
def set_type_by_element(
self, element_list: Union[List[str], Tuple[str], np.ndarray]
):
"""
Set atom types based on element ordering.
Parameters
----------
element_list : list of str, tuple of str, or np.ndarray
Ordered list of unique elements. Atoms are assigned type numbers
based on the index of their element in this list (starting from 1).
Raises
------
AssertionError
If data does not contain 'element' column or if any element in
data is not present in the provided element list.
Examples
--------
>>> system.set_element(["Cu", "Cu", "Al", "Al"])
>>> system.set_type_by_element(["Cu", "Al"])
>>> print(system.data.select("element", "type"))
┌─────────┬──────┐
│ element │ type │
├─────────┼──────┤
│ Cu │ 1 │
│ Cu │ 1 │
│ Al │ 2 │
│ Al │ 2 │
└─────────┴──────┘
Notes
-----
This method creates or updates the 'type' column in the data DataFrame.
Type numbers start from 1 and correspond to the order in the element list.
"""
assert "element" in self.data.columns, "Data must contain element column."
for i in self.data["element"].unique():
assert i in element_list, f"Element should have {i}."
ele2type = {j: i for i, j in enumerate(element_list, start=1)}
self.update_data(
self.data.with_columns(
pl.col("element")
.replace_strict(ele2type, return_dtype=pl.Int32)
.rechunk()
.alias("type")
),
True,
)
[docs]
def get_positions(self, reduced: bool = False) -> pl.DataFrame:
"""
Extract atomic positions from the system.
Parameters
----------
reduced : bool, optional
If True, return fractional (reduced) coordinates in the box basis.
If False (default), return Cartesian coordinates.
Returns
-------
pl.DataFrame
DataFrame containing position columns:
* **reduced=False**: Columns 'x', 'y', 'z' (Cartesian coordinates)
* **reduced=True**: Columns 'r_x', 'r_y', 'r_z' (fractional coordinates)
Examples
--------
>>> system = System("POSCAR")
>>> cart_pos = system.get_positions(reduced=False)
>>> frac_pos = system.get_positions(reduced=True)
Notes
-----
Fractional coordinates are computed using the inverse box matrix:
reduced = positions @ box.inverse_box
"""
if reduced:
inv = self.box.inverse_box
# pos @ inv
return self.data.select(
r_x=pl.col("x") * inv[0, 0]
+ pl.col("y") * inv[1, 0]
+ pl.col("z") * inv[2, 0],
r_y=pl.col("x") * inv[0, 1]
+ pl.col("y") * inv[1, 1]
+ pl.col("z") * inv[2, 1],
r_z=pl.col("x") * inv[0, 2]
+ pl.col("y") * inv[1, 2]
+ pl.col("z") * inv[2, 2],
)
else:
return self.data.select("x", "y", "z")
[docs]
def get_velocities(self) -> pl.DataFrame:
"""
Extract atomic velocities from the system.
Returns
-------
pl.DataFrame
DataFrame with columns 'vx', 'vy', 'vz' containing velocity
components for each atom.
Raises
------
AssertionError
If the data does not contain velocity columns ('vx', 'vy', 'vz').
Examples
--------
>>> system = System("relax.dump") # File with velocities
>>> velocities = system.get_velocities()
"""
for i in ["vx", "vy", "vz"]:
assert i in self.data.columns
return self.data.select("vx", "vy", "vz")
[docs]
def set_pka(
self,
energy: float,
direction: np.ndarray,
index: Optional[int] = None,
element: Optional[str] = None,
factor: float = 1.0,
):
"""
Set primary knock-on atom (PKA) for radiation damage simulation.
This method assigns initial kinetic energy and direction to a PKA atom,
commonly used to initialize cascade simulations in radiation damage studies.
We assume that the velocity in the units of A/fs, you need to use `factor` to convert your velocity to this units.
Parameters
----------
energy : float
Kinetic energy to assign to the PKA (in eV).
direction : np.ndarray
1D array of length 3 specifying the velocity direction vector.
Will be normalized automatically.
index : int, optional
Index of the atom to set as PKA. If None, selects by nearest center atom.
element : str, optional
Element name to select PKA. If provided and index is None,
selects by nearest center atom with same element.
factor : float
Convert your velocity to the units of A/fs.
Defaults to 1.0.
Notes
-----
This method updates the velocity columns ('vx', 'vy', 'vz') in the
data DataFrame.
Examples
--------
Set PKA by atom index:
>>> system.set_pka(energy=1000.0, direction=np.array([1, 3, 5]), index=50)
Set PKA by element:
>>> system.set_pka(energy=2000.0, direction=np.array([1, 3, 5]), element="Fe")
"""
self.update_data(
tool._set_pka(
self.data.with_columns(
pl.col("vx") * factor, pl.col("vy") * factor, pl.col("vz") * factor
),
self.box,
energy,
direction,
index,
element,
).with_columns(
pl.col("vx") / factor, pl.col("vy") / factor, pl.col("vz") / factor
)
)
[docs]
def get_energy(self) -> float:
"""
Calculate total potential energy of the system.
Returns
-------
float
Total potential energy in the units eV.
Raises
------
AssertionError
If no calculator is attached to the system.
Examples
--------
>>> from mdapy.eam import EAM
>>> system = System("config.dump")
>>> system.calc = EAM("potential.eam")
>>> total_energy = system.get_energy()
"""
assert isinstance(self.calc, CalculatorMP), "Must assign a calc first."
return self.calc.get_energy(self.data, self.box)
[docs]
def get_energies(self) -> np.ndarray:
"""
Calculate per-atom potential energies in the units eV.
Returns
-------
np.ndarray
1D array of shape (N,) containing potential energy for each atom.
Raises
------
AssertionError
If no calculator is attached to the system.
"""
assert isinstance(self.calc, CalculatorMP), "Must assign a calc first."
return self.calc.get_energies(self.data, self.box)
[docs]
def get_force(self) -> np.ndarray:
"""
Calculate forces acting on each atom in the units eV/A.
Returns
-------
np.ndarray
2D array of shape (N, 3) containing force components [fx, fy, fz]
for each atom.
Raises
------
AssertionError
If no calculator is attached to the system.
Examples
--------
>>> from mdapy.eam import EAM
>>> system = System("config.dump")
>>> system.calc = EAM("potential.eam")
>>> forces = system.get_force()
>>> print(forces.shape)
(1000, 3)
"""
assert isinstance(self.calc, CalculatorMP), "Must assign a calc first."
return self.calc.get_forces(self.data, self.box)
[docs]
def get_stress(self) -> np.ndarray:
"""
Calculate stress tensor of the system in the units eV/A^3.
Returns
-------
np.ndarray
1D array of shape (6,) containing stress tensor components in
Voigt notation: [σ_xx, σ_yy, σ_zz, σ_yz, σ_xz, σ_xy].
Raises
------
AssertionError
If no calculator is attached to the system.
Examples
--------
>>> from mdapy.eam import EAM
>>> system = System("config.dump")
>>> system.calc = EAM("potential.eam")
>>> stress = system.get_stress()
>>> print(stress) # [σ_xx, σ_yy, σ_zz, σ_yz, σ_xz, σ_xy]
"""
assert isinstance(self.calc, CalculatorMP), "Must assign a calc first."
return self.calc.get_stress(self.data, self.box)
[docs]
def get_virials(self) -> np.ndarray:
"""
Calculate per-atom virial tensors in the units eV.
Returns
-------
np.ndarray
2D array of shape (N, 9) containing virial tensor components for
each atom, ordered as: [v_xx, v_xy, v_xz, v_yx, v_yy, v_yz, v_zx, v_zy, v_zz].
Raises
------
AssertionError
If no calculator is attached to the system.
Examples
--------
>>> from mdapy.eam import EAM
>>> system = System("config.dump")
>>> system.calc = EAM("potential.eam")
>>> virials = system.get_virials()
>>> print(virials.shape)
(1000, 9)
"""
assert isinstance(self.calc, CalculatorMP), "Must assign a calc first."
return self.calc.get_virials(self.data, self.box)
[docs]
def update_data(
self,
data: pl.DataFrame,
reset_calcolator: bool = False,
reset_neighbor: bool = False,
) -> None:
"""
Update the atomic data DataFrame.
Parameters
----------
data : pl.DataFrame
New DataFrame containing updated atomic information.
Should maintain the same structure (columns) as the original data.
reset_calcolator : bool, optional
If True, clears cached results from the attached calculator.
Set to True when atomic positions or types/element change. Default is False.
reset_neighbor : bool, optional
If True, clear all neighbor information.
Set to True when atomic positions change. Default is False.
Examples
--------
>>> # Add a new property column
>>> new_data = system.data.with_columns(
... temperature=np.random.rand(system.N) * 300
... )
>>> system.update_data(new_data)
>>> # Update positions (reset calculator)
>>> new_data = system.data.with_columns(x=system.data["x"] + 0.1)
>>> system.update_data(new_data, reset_calcolator=True)
Notes
-----
When `reset_calcolator=True`, any cached energy, force, or stress
calculations are invalidated and will be recomputed on next access.
"""
self.__data = data
if reset_calcolator and isinstance(self.calc, CalculatorMP):
self.calc.results = {}
if reset_neighbor:
if hasattr(self, "verlet_list"):
del self.verlet_list, self.neighbor_number, self.distance_list
if hasattr(self, "rc"):
del self.rc
if hasattr(self, "bond"):
del self.bond
if hasattr(self, "voro_verlet_list"):
del (
self.voro_verlet_list,
self.voro_distance_list,
self.voro_face_area,
self.voro_neighbor_number,
)
if hasattr(self, "_enlarge_box"):
del self._enlarge_box, self._enlarge_data
[docs]
def update_box(
self,
box: Union[int, float, Iterable[float], np.ndarray, Box],
scale_pos: bool = False,
) -> None:
"""
Update the simulation box.
Parameters
----------
box : int, float, Iterable[float], np.ndarray, or Box
New box specification. See :class:`~mdapy.box.Box` for accepted formats.
scale_pos : bool, optional
If True, scales atomic positions affinely with the box change.
Useful for applying strain or changing lattice constants while
maintaining fractional coordinates. Default is False.
Raises
------
AssertionError
If `scale_pos=True` but not all boundaries are periodic.
Examples
--------
Change box without scaling positions:
>>> system.update_box([20, 20, 20])
Apply uniform strain by scaling box and positions:
>>> system.update_box([11, 11, 11], scale_pos=True)
Notes
-----
When `scale_pos=True`, the transformation is:
new_pos = old_pos @ (old_box^-1 @ new_box)
This preserves fractional coordinates in the new box.
"""
new_box = Box(box)
if scale_pos:
assert sum(new_box.boundary) == 3, (
"only support all periodic boundary condition."
)
map_matrix = np.linalg.solve(self.box.box, new_box.box)
self.__data = self.__data.with_columns(
pl.col("x") - self.box.origin[0],
pl.col("y") - self.box.origin[1],
pl.col("z") - self.box.origin[2],
).with_columns(
x=pl.col("x") * map_matrix[0, 0]
+ pl.col("y") * map_matrix[1, 0]
+ pl.col("z") * map_matrix[2, 0]
+ new_box.origin[0],
y=pl.col("x") * map_matrix[0, 1]
+ pl.col("y") * map_matrix[1, 1]
+ pl.col("z") * map_matrix[2, 1]
+ new_box.origin[1],
z=pl.col("x") * map_matrix[0, 2]
+ pl.col("y") * map_matrix[1, 2]
+ pl.col("z") * map_matrix[2, 2]
+ new_box.origin[2],
)
self.box = new_box
if isinstance(self.calc, CalculatorMP):
self.calc.results = {}
if hasattr(self, "verlet_list"):
del self.verlet_list, self.neighbor_number, self.distance_list
if hasattr(self, "rc"):
del self.rc
if hasattr(self, "bond"):
del self.bond
if hasattr(self, "voro_verlet_list"):
del (
self.voro_verlet_list,
self.voro_distance_list,
self.voro_face_area,
self.voro_neighbor_number,
)
if hasattr(self, "_enlarge_box"):
del self._enlarge_box, self._enlarge_data
[docs]
def wrap_pos(self) -> None:
"""Wrap positions into box for PBC boundary."""
self.update_data(tool.wrap_pos(self.data, self.box), True, True)
[docs]
def replicate(self, nx: int, ny: int, nz: int) -> None:
"""
Replicate the system by creating a supercell.
Parameters
----------
nx : int
Number of replications along x-direction.
ny : int
Number of replications along y-direction.
nz : int
Number of replications along z-direction.
Examples
--------
Create a 2x2x2 supercell:
>>> system = System("unit_cell.poscar")
>>> system.replicate(2, 2, 2)
>>> print(system.N) # 8 times the original number
Notes
-----
This method modifies the system in place. The box is scaled by
[nx, ny, nz] and atoms are replicated accordingly.
"""
assert nx > 0
assert ny > 0
assert nz > 0
data, box = tool.replicate(self.__data, self.box, int(nx), int(ny), int(nz))
self.update_data(data, True, True)
self.update_box(box)
[docs]
def to_ovito(self) -> "DataCollection":
"""
Convert system to OVITO DataCollection object.
Returns
-------
ovito.data.DataCollection
OVITO data structure containing the atomic configuration.
Raises
------
ImportError
If OVITO is not installed.
Examples
--------
>>> data = system.to_ovito()
>>> # Use OVITO's analysis or visualization tools
>>> from ovito.modifiers import CommonNeighborAnalysisModifier
>>> data.apply(CommonNeighborAnalysisModifier())
"""
return SaveSystem.to_ovito(self.__data, self.box, self.__global_info)
[docs]
def to_ase(self) -> "Atoms":
"""
Convert system to ASE Atoms object.
Returns
-------
ase.Atoms
ASE Atoms object containing the atomic configuration.
Raises
------
ImportError
If ASE is not installed.
AssertionError
If data does not contain an 'element' column.
Examples
--------
>>> atoms = system.to_ase()
>>> # Use ASE's functionality
>>> from ase.io import write
>>> write("output.xyz", atoms)
"""
return SaveSystem.to_ase(self.__data, self.box)
[docs]
def write_mp(self, output_name: str) -> None:
"""
Save system to mp format file.
Parameters
----------
nx : int
Number of replications along x-direction.
ny : int
Number of replications along y-direction.
nz : int
Number of replications along z-direction.
Raises
------
ImportError
If ASE is not installed.
AssertionError
If data does not contain an 'element' column.
Examples
--------
>>> atoms = system.to_ase()
>>> # Use ASE's functionality
>>> from ase.io import write
>>> write("output.xyz", atoms)
"""
SaveSystem.write_mp(output_name, self.data, self.box, self.global_info)
[docs]
def write_xyz(self, output_name: str, classical: bool = False, compress=False):
SaveSystem.write_xyz(
output_name, self.box, self.data, classical, compress, **self.global_info
)
[docs]
def write_poscar(
self,
output_name: str,
reduced_pos: bool = False,
selective_dynamics: bool = False,
compress=False,
):
SaveSystem.write_poscar(
output_name,
self.box,
self.data,
reduced_pos=reduced_pos,
selective_dynamics=selective_dynamics,
compress=compress,
)
[docs]
def write_data(
self,
output_name: str,
element_list: Optional[List[str]] = None,
num_type: Optional[int] = None,
data_format: str = "atomic",
compress=False,
):
if element_list is not None:
self.set_type_by_element(element_list)
SaveSystem.write_data(
output_name,
self.box,
self.data,
element_list=element_list,
num_type=num_type,
data_format=data_format,
compress=compress,
)
[docs]
def write_dump(self, output_name: str, timestep: float = 0, compress=False):
SaveSystem.write_dump(
output_name, self.box, self.data, timestep=timestep, compress=compress
)
[docs]
def build_neighbor(
self,
rc: float,
max_neigh: Optional[int] = None,
) -> None:
"""
Build neighbor list for the system.
This method constructs a Verlet neighbor list within a cutoff radius,
which is required for many analysis methods. For small systems, the
box may be automatically replicated to ensure sufficient neighbors.
Parameters
----------
rc : float
Cutoff radius for neighbor search (in simulation units).
max_neigh : int, optional
Maximum number of neighbors per atom. If None, automatically
determined based on the actual neighbor number Increase this if you get warnings
about insufficient neighbor slots.
Attributes Set
--------------
verlet_list : np.ndarray
2D array of shape (N, max_neigh) containing neighbor indices.
Unfilled slots contain -1.
distance_list : np.ndarray
2D array of shape (N, max_neigh) containing distances to neighbors.
neighbor_number : np.ndarray
1D array of shape (N,) containing number of neighbors for each atom.
rc : float
The cutoff radius used.
_enlarge_box : Box, optional
replicated box for small system.
_enlarge_data : pl.DataFrame, optional
replicated atom dataframe for small system
Examples
--------
>>> system.build_neighbor(rc=5.0)
>>> print(system.verlet_list.shape)
(1000, 50) # 1000 atoms, up to 50 neighbors each
>>> print(system.neighbor_number)
[12 12 12 ... 12 12 12] # FCC coordination
See :class:`~mdapy.neighbor.Neighbor` for implementation details.
"""
neigh = Neighbor(rc, self.box, self.data, max_neigh)
neigh.compute()
self.rc = rc
if hasattr(neigh, "_enlarge_box"):
self._enlarge_box: Box = neigh._enlarge_box
if hasattr(neigh, "_enlarge_data"):
self._enlarge_data: pl.DataFrame = neigh._enlarge_data
self.verlet_list, self.distance_list, self.neighbor_number = (
neigh.verlet_list,
neigh.distance_list,
neigh.neighbor_number,
)
[docs]
def build_voronoi_neighbor(
self, a_face_area_threshold: float = -1.0, r_face_area_threshold: float = -1.0
) -> None:
"""
Calculate Voronoi neighbors and face properties for all atoms.
This method performs Voronoi tessellation to identify nearest neighbors
based on shared Voronoi cell faces. It can filter neighbors based on
face area thresholds to exclude insignificant contacts.
Parameters
----------
a_face_area_threshold : float, optional
Absolute face area threshold.
Faces with area below this value are ignored as neighbors.
Default is -1.0 (no filtering).
r_face_area_threshold : float, optional
Relative face area threshold (fraction of average face area).
Faces with relative area below this value are ignored.
Default is -1.0 (no filtering).
Attributes Set
--------------
voro_verlet_list : np.ndarray
2D array of shape (N, max_neigh) containing neighbor indices.
Unfilled slots contain -1.
voro_distance_list : np.ndarray
2D array of shape (N, max_neigh) containing distances to neighbors.
voro_neighbor_number : np.ndarray
1D array of shape (N,) containing number of neighbors for each atom.
voro_face_area : np.ndarray
2D array of shape (N, max_neighbors) containing the area of
the Voronoi face shared with each neighbor.
_enlarge_box : Box, optional
replicated box for small system.
_enlarge_data : pl.DataFrame, optional
replicated atom dataframe for small system
Examples
--------
>>> system.build_voronoi_neighbor()
>>> print(system.voro_verlet_list.shape)
(1000, 50) # 1000 atoms, up to 50 neighbors each
See :class:`~mdapy.voronoi.Voronoi` for implementation details.
"""
vor = Voronoi(self.box, self.data)
(
self.voro_verlet_list,
self.voro_distance_list,
self.voro_face_area,
self.voro_neighbor_number,
) = vor.get_neighbor(a_face_area_threshold, r_face_area_threshold)
if hasattr(vor, "_enlarge_box"):
self._enlarge_box: Box = vor._enlarge_box
if hasattr(vor, "_enlarge_data"):
self._enlarge_data: pl.DataFrame = vor._enlarge_data
[docs]
def build_nearest_neighbor(self, k: int) -> None:
"""
Calculate `k` nearest neighbor information.
Parameters
----------
k : int
Maximum number of neighbors to find per atom.
Attributes Set
--------------
verlet_list : np.ndarray
2D array of shape (N, k) containing neighbor indices.
distance_list : np.ndarray
2D array of shape (N, k) containing distances to neighbors.
neighbor_number : np.ndarray
1D array of shape (N,) containing number of neighbors for each atom.
_enlarge_box : Box, optional
replicated box for small system.
_enlarge_data : pl.DataFrame, optional
replicated atom dataframe for small system
Examples
--------
>>> system.build_nearest_neighbor(12)
>>> print(system.verlet_list.shape)
(1000, 12) # 1000 atoms, up to 12 sorted neighbors each
See :class:`~mdapy.knn.NearestNeighbor` for implementation details.
"""
kdt = NearestNeighbor(self.data, self.box, k)
kdt.compute()
if hasattr(kdt, "_enlarge_box"):
self._enlarge_box: Box = kdt._enlarge_box
if hasattr(kdt, "_enlarge_data"):
self._enlarge_data: pl.DataFrame = kdt._enlarge_data
self.verlet_list, self.distance_list = kdt.indices_py, kdt.distances_py
self.neighbor_number = np.full(self.verlet_list.shape[0], k, np.int32)
@staticmethod
def _normalize_bond_cutoff(
rc: Union[float, Dict[Tuple[Union[int, str], Union[int, str]], float], np.ndarray],
data: pl.DataFrame,
) -> Tuple[float, np.ndarray, np.ndarray]:
if np.isscalar(rc):
rc = float(rc)
assert rc > 0, "rc should be larger than 0."
return rc, np.zeros(data.shape[0], np.int32), np.array([[rc]], float)
if isinstance(rc, dict):
assert len(rc) > 0, "pairwise rc should not be empty."
first_pair = next(iter(rc))
assert len(first_pair) == 2, "pairwise rc key should have two entries."
first_value = first_pair[0]
if isinstance(first_value, str):
assert "element" in data.columns, (
"Data must contain element column for element-pair bond cutoff."
)
labels = data["element"].to_numpy()
unique_labels = np.unique(labels)
else:
assert "type" in data.columns, (
"Data must contain type column for type-pair bond cutoff."
)
labels = data["type"].to_numpy(allow_copy=False)
unique_labels = np.unique(labels)
compact_type = np.searchsorted(unique_labels, labels).astype(np.int32)
ntype = unique_labels.shape[0]
cutoff_matrix = np.full((ntype, ntype), -1.0, float)
label_to_index = {value: idx for idx, value in enumerate(unique_labels.tolist())}
for pair, cutoff in rc.items():
assert len(pair) == 2, "pairwise rc key should have two type indices."
i, j = pair[0], pair[1]
assert i in label_to_index and j in label_to_index, (
f"type/element pair {(i, j)} is not in current system."
)
cutoff = float(cutoff)
assert cutoff > 0, "pairwise rc should be larger than 0."
ii, jj = label_to_index[i], label_to_index[j]
cutoff_matrix[ii, jj] = cutoff
cutoff_matrix[jj, ii] = cutoff
if np.any(cutoff_matrix < 0):
missing_pairs = []
for i in range(ntype):
for j in range(i, ntype):
if cutoff_matrix[i, j] < 0:
missing_pairs.append((unique_labels[i], unique_labels[j]))
raise ValueError(f"Missing rc for type pairs: {missing_pairs}")
else:
assert "type" in data.columns, (
"Data must contain type column for matrix bond cutoff."
)
type_list = data["type"].to_numpy(allow_copy=False)
unique_labels = np.unique(type_list)
compact_type = np.searchsorted(unique_labels, type_list).astype(np.int32)
ntype = unique_labels.shape[0]
cutoff_matrix = np.asarray(rc, float)
assert cutoff_matrix.ndim == 2, "pairwise rc should be a 2D array."
assert cutoff_matrix.shape == (ntype, ntype), (
f"pairwise rc shape should be {(ntype, ntype)}."
)
assert np.all(cutoff_matrix > 0), "pairwise rc should be larger than 0."
return float(cutoff_matrix.max()), compact_type, cutoff_matrix
[docs]
def cal_build_bond(
self,
rc: Union[
float,
Dict[Tuple[Union[int, str], Union[int, str]], float],
np.ndarray,
],
max_neigh: Optional[int] = None,
) -> np.ndarray:
"""
Build bond pairs from neighbor information.
Parameters
----------
rc : float, dict, or np.ndarray
Bond cutoff definition. If a float is given, all bonds use the same
cutoff. If a dict is given, it should map ``(type_i, type_j)`` or
``(element_i, element_j)`` to the corresponding cutoff. If a 2D
array is given, its rows and columns follow the sorted unique atom
types in the system.
max_neigh : int, optional
Maximum number of neighbors per atom when building the neighbor list.
Returns
-------
np.ndarray
A 2D integer array of shape ``(Nbond, 2)`` containing 0-based atom
indices for each bond pair.
Notes
-----
This method stores the result in ``self.bond`` and also returns it.
"""
if np.isscalar(rc):
max_rc = float(rc)
else:
if isinstance(rc, dict):
max_rc = float(np.max(np.asarray(list(rc.values()), float)))
else:
assert "type" in self.data.columns, (
"Data must contain type column for matrix bond cutoff."
)
max_rc = float(np.max(np.asarray(rc, float)))
assert max_rc > 0, "rc should be larger than 0."
has_neigh = False
if hasattr(self, "rc") and self.rc >= max_rc:
has_neigh = True
if not has_neigh:
self.build_neighbor(max_rc, max_neigh)
if hasattr(self, "_enlarge_data"):
data = self._enlarge_data
else:
data = self.__data
_, compact_type, cutoff_matrix = self._normalize_bond_cutoff(rc, data)
bond = np.asarray(
_build_bond_pairs(
self.verlet_list,
self.distance_list,
self.neighbor_number,
compact_type,
cutoff_matrix,
),
dtype=np.int32,
)
if bond.size == 0:
self.bond = np.empty((0, 2), np.int32)
return self.bond
if hasattr(self, "_enlarge_data"):
bond %= self.N
bond.sort(axis=1)
bond = bond[bond[:, 0] != bond[:, 1]]
if bond.size == 0:
self.bond = np.empty((0, 2), np.int32)
return self.bond
self.bond = np.unique(bond, axis=0)
return self.bond
[docs]
def cal_identify_diamond_structure(self) -> None:
"""
Identify diamond structure atoms. This will generate 'ids' column in self.data:
- 0 Other
- 1 Cubic diamond
- 2 Cubic diamond (1st neighbor)
- 3 Cubic diamond (2nd neighbor)
- 4 Hexagonal diamond
- 5 Hexagonal diamond (1st neighbor)
- 6 Hexagonal diamond (2nd neighbor)
Notes
-----
See :class:`~mdapy.identify_diamond_structure.IdentifyDiamondStructure`
for implementation details.
"""
verlet_list = None
safe_L = 15
repeat = np.ceil(safe_L / self.box.get_thickness()).astype(int)
for i in range(3):
if self.box.boundary[i] == 0:
repeat[i] = 1
if sum(repeat) == 3:
if hasattr(self, "neighbor_number"):
if self.neighbor_number.min() >= 4:
tool.sort_neighbor(
self.verlet_list, self.distance_list, self.neighbor_number, 4
)
verlet_list = self.verlet_list
if hasattr(self, "_enlarge_data"):
box = self._enlarge_box
data = self._enlarge_data
else:
box = self.box
data = self.__data
ids = IdentifyDiamondStructure(data, box, verlet_list)
ids.compute()
self.update_data(self.__data.with_columns(ids=ids.pattern[: self.N]))
[docs]
def cal_common_neighbor_parameter(
self, rc: float, max_neigh: Optional[int] = None
) -> None:
"""
Calculate common neighbor parameter for structure analysis. This will save results to self.data['cnp'].
Parameters
----------
rc : float
Cutoff radius for neighbor search.
max_neigh : int, optional
Maximum number of neighbors per atom.
Notes
-----
See :class:`~mdapy.common_neighbor_parameter.CommonNeighborParameter`
for implementation details.
"""
has_neigh = False
if hasattr(self, "rc"):
if self.rc >= rc:
has_neigh = True
if not has_neigh:
self.build_neighbor(rc, max_neigh)
if hasattr(self, "_enlarge_data"):
data = self._enlarge_data
box = self._enlarge_box
else:
data = self.__data
box = self.box
cnp = CommonNeighborParameter(
data, box, rc, self.verlet_list, self.distance_list, self.neighbor_number
)
cnp.compute()
self.update_data(self.__data.with_columns(cnp=cnp.cnp[: self.N]))
[docs]
def cal_ackland_jones_analysis(self) -> None:
"""
Perform Ackland-Jones structure analysis. This will save results to self.data['aja']:
- 0 = Other (unknown coordination)
- 1 = FCC (face-centered cubic)
- 2 = HCP (hexagonal close-packed)
- 3 = BCC (body-centered cubic)
- 4 = ICO (icosahedral coordination)
Notes
-----
See :class:`~mdapy.ackland_jones_analysis.AcklandJonesAnalysis`
for implementation details.
"""
N_neigh = 14
if self.data.shape[0] < N_neigh and sum(self.box.boundary) == 0:
self.update_data(self.__data.with_columns(pl.lit(0, pl.Int32).alias("aja")))
return
if hasattr(self, "neighbor_number") and self.neighbor_number.min() >= N_neigh:
tool.sort_neighbor(
self.verlet_list, self.distance_list, self.neighbor_number, N_neigh
)
else:
self.build_nearest_neighbor(N_neigh)
if hasattr(self, "_enlarge_data"):
box = self._enlarge_box
data = self._enlarge_data
else:
box = self.box
data = self.__data
aja = AcklandJonesAnalysis(data, box, self.verlet_list, self.distance_list)
aja.compute()
self.update_data(self.__data.with_columns(aja=aja.aja[: self.N]))
[docs]
def cal_warren_cowley_parameter(
self, rc: float, max_neigh: Optional[int] = None
) -> WarrenCowleyParameter:
"""
Calculate Warren-Cowley short-range order parameter.
Parameters
----------
rc : float
Cutoff radius for neighbor search.
max_neigh : int, optional
Maximum number of neighbors per atom.
Returns
-------
WarrenCowleyParameter
Object containing Warren-Cowley parameters.
Notes
-----
See :class:`~mdapy.warren_cowley_parameter.WarrenCowleyParameter`
for implementation details and returned attributes.
Examples
--------
>>> wcp = system.cal_warren_cowley_parameter(rc=3.0)
>>> print(wcp.wcp) # Access Warren-Cowley parameters
"""
has_neigh = False
if hasattr(self, "rc"):
if self.rc >= rc:
has_neigh = True
if not has_neigh:
self.build_neighbor(rc, max_neigh)
if hasattr(self, "_enlarge_data"):
data = self._enlarge_data
else:
data = self.__data
wcp = WarrenCowleyParameter(self.verlet_list, self.neighbor_number, data)
wcp.compute()
return wcp
[docs]
def cal_atomic_temperature(
self, rc: float, factor: float = 1.0, max_neigh: Optional[int] = None
) -> None:
"""
Calculate the local atomic temperature for each atom by analyzing the velocity fluctuations of the atom and its neighbors.
We assume velocity in units of Å/fs, you can use `factor` to convert your velocity.
This will save results to self.data['atomic_temp'] in the unit of K.
Parameters
----------
rc : float
Cutoff radius for local averaging.
factor : float
Scaling factor for velocities (e.g., for unit conversion).
Default is 1.0.
max_neigh : int, optional
Maximum number of neighbors per atom.
Notes
-----
See :class:`~mdapy.atomic_temperature.AtomicTemperature` for
implementation details.
"""
has_neigh = False
if hasattr(self, "rc"):
if self.rc >= rc:
has_neigh = True
if not has_neigh:
self.build_neighbor(rc, max_neigh)
if hasattr(self, "_enlarge_data"):
data = self._enlarge_data
else:
data = self.__data
atomTemp = AtomicTemperature(
data, self.verlet_list, self.distance_list, rc, factor
)
atomTemp.compute()
self.update_data(self.data.with_columns(atomic_temp=atomTemp.T[: self.N]))
[docs]
def cal_steinhardt_bond_orientation(
self,
llist: Union[np.ndarray, List[int]],
use_voronoi: bool = False,
nnn: int = 0,
rc: float = -1.0,
average: bool = False,
use_weight: bool = False,
weight: Optional[np.ndarray] = None,
wl: bool = False,
wlhat: bool = False,
a_face_area_threshold: float = -1,
r_face_area_threshold: float = -1,
identify_liquid: bool = False,
threshold: float = 0.7,
n_bond: int = 7,
max_neigh: Optional[int] = None,
) -> None:
r"""Calculate Steinhardt bond orientation order parameters. One can also use it to identify the solid or liquid atom.
Parameters
----------
llist : np.ndarry or List[int]
Degrees list, such as [4, 6, 8], should be positive, even integer.
use_voronoi : bool
If True, compute using Voronoi neighbor. Default is False.
nnn : int
If `use_voronoi` is False and this value is postive, compute using `nnn` nearest neighbors. Default is 0.
rc : float
If `use_voronoi` is False and `nnn` is zero, make sure this parameter is postive, and use it to build neighbor. Default is -1.0.
average : bool
If True, qlm will be averaged. Default is False.
use_weight : bool
If True, the neighbor will be weighed by `weight` array, otherwise the weight of each neighbor is 1.0. Default is False.
weight : np.ndarray, optional
If it is not None, it should be has the same shape with the `self.verlet_list`.
If it is None and the `use_voronoi` is True, then the voronoi_face_area will be used as weight. Default is None.
wl : bool
If True, compute third-order invariant :math:`w_l` parameters. Default is False.
wlhat : bool
If True, compute normalized third-order invariant :math:`\hat{w}_l` parameters. Default is False.
a_face_area_threshold : float
Absolute face area threshold.
Faces with area below this value are ignored as neighbors.
Default is -1.0 (no filtering).
r_face_area_threshold : float
Relative face area threshold (fraction of average face area).
Faces with relative area below this value are ignored.
Default is -1.0 (no filtering).
identify_liquid : bool
Enable solid-liquid classification (requires :math:`l=6` in llist).
threshold : float
Threshold for solid-liquid identification (default: 0.7 for normalized :math:`q_6`).
n_bond : int
Minimum number of "solid-like" bonds for solid classification. Default to 7.
max_neigh : int, optional
Maximum number of neighbors per atom.
Notes
-----
See :class:`~mdapy.steinhardt_bond_orientation.SteinhardtBondOrientation`
for implementation details. The results will add to self.data['ql{l}'] for l in `llist`.
If set `wl` is True, also will add self.data['wl{l}']. If set `wlhat` is True, also will add
self.data['wlh{l}']. If set `identify_liquid`, it will add self.data['solidliquid'] and self.data['nbond'] results.
"""
if use_voronoi:
self.build_voronoi_neighbor(a_face_area_threshold, r_face_area_threshold)
verlet_list, distance_list, neighbor_number = (
self.voro_verlet_list,
self.voro_distance_list,
self.voro_neighbor_number,
)
if use_weight and weight is None:
weight = self.voro_face_area
else:
if nnn > 0:
has_sort_neigh = False
if hasattr(self, "neighbor_number"):
if self.neighbor_number.min() >= nnn:
tool.sort_neighbor(
self.verlet_list,
self.distance_list,
self.neighbor_number,
nnn,
)
has_sort_neigh = True
if not has_sort_neigh:
self.build_nearest_neighbor(nnn)
else:
assert rc > 0, (
"At least use voronoi, or set positive nnn, or positive rc."
)
if hasattr(self, "rc"):
if self.rc < rc:
self.build_neighbor(rc, max_neigh)
else:
self.build_neighbor(rc, max_neigh)
verlet_list, distance_list, neighbor_number = (
self.verlet_list,
self.distance_list,
self.neighbor_number,
)
if hasattr(self, "_enlarge_data"):
box = self._enlarge_box
data = self._enlarge_data
else:
box = self.box
data = self.__data
SBO = SteinhardtBondOrientation(
box,
data,
np.asarray(llist, int),
nnn,
rc,
average,
use_voronoi,
use_weight,
weight,
verlet_list,
distance_list,
neighbor_number,
wl,
wlhat,
identify_liquid,
threshold,
n_bond,
)
SBO.compute()
if SBO.qnarray.shape[1] > 1:
columns = []
for i in llist:
columns.append(f"ql{i}")
if wl:
for i in llist:
columns.append(f"wl{i}")
if wlhat:
for i in llist:
columns.append(f"wlh{i}")
for i, name in enumerate(columns):
self.__data = self.__data.with_columns(
pl.lit(SBO.qnarray[: self.N, i]).alias(name)
)
else:
self.__data = self.__data.with_columns(
pl.lit(SBO.qnarray.flatten()[: self.N]).alias(f"ql{llist[0]}")
)
if identify_liquid:
self.__data = self.__data.with_columns(
pl.lit(SBO.solidliquid[: self.N]).alias("solidliquid")
)
self.__data = self.__data.with_columns(
pl.lit(SBO.nbond[: self.N]).alias("nbond")
)
[docs]
def cal_polyhedral_template_matching(
self,
structure="fcc-hcp-bcc",
rmsd_threshold=0.1,
return_ordering=False,
return_rmsd=False,
return_atomic_distance=False,
return_orientation=False,
identify_fcc_planar_faults=False,
identify_esf=True,
):
"""
Identify crystal structures using polyhedral template matching, one can also further distiguish the planar defects in FCC structures.
For PTM, structure results will add as self.data['ptm'] and self.data['ordering']
- Structure type (integer, 0=Other, 1=FCC, 2=HCP, 3=BCC, 4=ICO, 5=SC, 6=DCUB, 7=DHEX, 8=Graphene)
- Ordering type (interger, 0=Other, 1=L10, 2=L12 (A-site), 3=L12 (B-site), 4=B2, 5=zincblende / wurtzite)
For planar defects, structure results will add as self.data['pft']
- 0: Non-hcp atoms (e.g. perfect fcc or disordered)
- 1: Indeterminate hcp-like (isolated hcp-like atoms, not forming a planar defect)
- 2: Intrinsic stacking fault (ISF, two adjacent hcp-like layers)
- 3: Coherent twin boundary (TB, one hcp-like layer)
- 4: Multi-layer stacking fault (three or more adjacent hcp-like layers)
- 5: Extrinsic Stacking Fault (ESF, if cal_esf=True)
Parameters
----------
structure : str
String specifying the structure types to consider, separated by hyphens (Defaults is "fcc-hcp-bcc").
Supported values: "fcc", "hcp", "bcc", "ico", "sc", "dcub", "dhex", "graphene".
Special values: "all" (all types), "default" (fcc, hcp, bcc).
rmsd_threshold : float
Maximum RMSD for a valid structure match. Particles exceeding this are classified as "Other".
Default is 0.1.
return_ordering : bool
If True, return the ordering type.
Default is False.
return_rmsd : bool
If True, return the RMSD value.
Default is False.
return_atomic_distance : bool
If True, return the interatomic distance.
Default is False.
return_orientation : bool
If True, return the orientation value.
Default is False.
identify_fcc_planar_faults : bool
If True, further detect the plannar defects.
Default is False.
identify_esf : bool
If True, further defect the ESF.
Default is True.
Notes
-----
See :class:`~mdapy.polyhedral_template_matching.PolyhedralTemplateMatching` and class:`~mdapy.identify_fcc_planar_faults.IdentifyFccPlanarFaults`
for implementation details.
"""
verlet_list = None
safe_L = 15
repeat = np.ceil(safe_L / self.box.get_thickness()).astype(int)
for i in range(3):
if self.box.boundary[i] == 0:
repeat[i] = 1
if sum(repeat) == 3:
if hasattr(self, "neighbor_number"):
if self.neighbor_number.min() >= 18:
tool.sort_neighbor(
self.verlet_list, self.distance_list, self.neighbor_number, 18
)
verlet_list = self.verlet_list
if hasattr(self, "_enlarge_data"):
box = self._enlarge_box
data = self._enlarge_data
else:
box = self.box
data = self.__data
ptm = PolyhedralTemplateMatching(
structure, data, box, rmsd_threshold, verlet_list
)
ptm.compute()
output = ptm.output[: self.N]
data = self.__data.with_columns(pl.lit(output[:, 0], pl.Int32).alias("ptm"))
if return_ordering:
data = data.with_columns(pl.lit(output[:, 1]).alias("ordering"))
if return_rmsd:
data = data.with_columns(pl.lit(output[:, 2]).alias("rmsd"))
if return_atomic_distance:
data = data.with_columns(pl.lit(output[:, 3]).alias("interatomic_distance"))
if return_orientation:
data = data.with_columns(
qx=output[:, 5],
qy=output[:, 6],
qz=output[:, 7],
qw=output[:, 4],
)
if identify_fcc_planar_faults:
structure_types = np.array(ptm.output[:, 0], np.int32)
ptm_indices = np.ascontiguousarray(ptm.ptm_indices[:, 1:13])
ifpt = IdentifyFccPlanarFaults(structure_types, ptm_indices, identify_esf)
ifpt.compute()
data = data.with_columns(pl.lit(ifpt.fault_types[: self.N]).alias("pft"))
self.update_data(data)
[docs]
def cal_centro_symmetry_parameter(self, N: int):
"""
Calculate centro-symmetry parameter for defect identification. The results will be added as self.data['csp'].
Parameters
----------
N : int
Number of nearest neighbors to consider, should be a positive, even integer. 12 for FCC and 8 for BCC.
Notes
-----
See :class:`~mdapy.centro_symmetry_parameter.CentroSymmetryParameter`
for implementation details.
"""
assert N % 2 == 0 and N > 0, f"N must be a positive even number: {N}."
if self.N <= N and sum(self.box.boundary) == 0:
res = np.full(self.N, 10000, float)
else:
has_verlet = False
if hasattr(self, "neighbor_number"):
if self.neighbor_number.min() >= N and hasattr(self, "rc"):
tool.sort_neighbor(
self.verlet_list, self.distance_list, self.neighbor_number, 18
)
has_verlet = True
if not has_verlet:
self.build_nearest_neighbor(N)
if hasattr(self, "_enlarge_data"):
box = self._enlarge_box
data = self._enlarge_data
else:
box = self.box
data = self.__data
csp = CentroSymmetryParameter(data, box, N, self.verlet_list)
csp.compute()
res = csp.csp[: self.N]
self.update_data(self.data.with_columns(csp=res))
[docs]
def cal_common_neighbor_analysis(
self, rc: Optional[float] = None, max_neigh: Optional[int] = None
):
"""
Perform common neighbor analysis for structure identification. This will generate 'cna' column in self.data:
- 0 = Other (unknown coordination)
- 1 = FCC (face-centered cubic)
- 2 = HCP (hexagonal close-packed)
- 3 = BCC (body-centered cubic)
- 4 = ICO (icosahedral coordination)
Parameters
----------
rc : float, optional
Cutoff radius. If not given, it will use adaptive mode to do CNA.
max_neigh : int, optional
Maximum number of neighbors per atom.
Notes
-----
See :class:`~mdapy.common_neighbor_analysis.CommonNeighborAnalysis`
for implementation details.
"""
verlet_list = None
neighbor_number = None
safe_L = 15
repeat = np.ceil(safe_L / self.box.get_thickness()).astype(int)
for i in range(3):
if self.box.boundary[i] == 0:
repeat[i] = 1
if sum(repeat) == 3:
if hasattr(self, "rc"):
if rc is None:
if self.neighbor_number.min() >= 14:
tool.sort_neighbor(
self.verlet_list,
self.distance_list,
self.neighbor_number,
14,
)
verlet_list = self.verlet_list
else:
if self.rc < rc:
self.build_neighbor(rc, max_neigh)
verlet_list = self.verlet_list
neighbor_number = self.neighbor_number
else:
if rc is not None:
self.build_neighbor(rc, max_neigh)
verlet_list = self.verlet_list
neighbor_number = self.neighbor_number
if hasattr(self, "_enlarge_data"):
box = self._enlarge_box
data = self._enlarge_data
else:
box = self.box
data = self.__data
cna = CommonNeighborAnalysis(data, box, verlet_list, neighbor_number, rc)
cna.compute()
self.update_data(self.__data.with_columns(cna=cna.pattern[: self.N]))
[docs]
def cal_structure_factor(
self,
k_min: float,
k_max: float,
nbins: int,
cal_partial: bool = False,
atomic_form_factors: bool = False,
mode: str = "direct",
) -> StructureFactor:
"""
Calculate static structure factor S(k).
Parameters
----------
k_min : float
Minimum wave vector magnitude.
k_max : float
Maximum wave vector magnitude.
nbins : int
Number of bins for k-space averaging.
cal_partial : bool, optional
If True, calculate partial structure factors. Default is False.
atomic_form_factors : bool, default=False
If True, use atomic form factors f to weigh the atoms' individual contributions to S(k). Atomic form factors are taken from `TU Graz <https://lampz.tugraz.at/~hadley/ss1/crystaldiffraction/atomicformfactors/formfactors.php>`_.
mode : str, optional
Calculation mode: 'direct' or 'debye'. Default is 'direct'.
Returns
-------
StructureFactor
Object containing structure factor data.
Notes
-----
See :class:`~mdapy.structure_factor.StructureFactor` for
implementation details and returned attributes.
"""
sfc = StructureFactor(
self.data,
self.box,
k_min,
k_max,
nbins,
cal_partial,
atomic_form_factors,
mode,
)
sfc.compute()
return sfc
[docs]
def cal_bond_analysis(
self, rc: float, nbin: int, max_neigh: Optional[int] = None
) -> BondAnalysis:
"""
Analyze bond length and angle distributions.
Parameters
----------
rc : float
Cutoff radius for bond search.
nbin : int
Number of bins for histograms.
max_neigh : int, optional
Maximum number of neighbors per atom.
Returns
-------
BondAnalysis
Object containing bond distribution data.
Notes
-----
See :class:`~mdapy.bond_analysis.BondAnalysis` for implementation
details and returned attributes.
Examples
--------
>>> ba = system.cal_bond_analysis(rc=5.0, nbin=100)
>>> print(ba.bond_length_distribution)
"""
has_neigh = False
if hasattr(self, "rc"):
if self.rc >= rc:
has_neigh = True
if not has_neigh:
self.build_neighbor(rc, max_neigh)
if hasattr(self, "_enlarge_box"):
box = self._enlarge_box
data = self._enlarge_data
else:
box = self.box
data = self.data
ba = BondAnalysis(
data,
box,
rc,
nbin,
self.verlet_list,
self.distance_list,
self.neighbor_number,
)
ba.compute()
return ba
[docs]
def cal_angular_distribution_function(
self,
rc_dict: Dict[str, List[float]],
nbin: int,
max_neigh: Optional[int] = None,
) -> AngularDistributionFunction:
"""
Calculate angular distribution function (ADF).
Parameters
----------
rc_dict : dict of str to list of float
Dictionary mapping triplet patterns to cutoff radii.
Format: {'A-B-C': [rc_AB_min, rc_AB_max, rc_AC_min, rc_AC_max]}
where A, B, C are element symbols and A is the central atom.
Example: {'O-H-H': [0, 2.0, 0, 2.0]} for water molecules.
nbin : int
Number of angular bins.
max_neigh : int, optional
Maximum number of neighbors per atom.
Returns
-------
AngularDistributionFunction
Object containing ADF data.
Notes
-----
See :class:`~mdapy.angular_distribution_function.AngularDistributionFunction`
for implementation details and returned attributes.
"""
assert "element" in self.data.columns
rc = np.array(list(rc_dict.values())).max()
has_neigh = False
if hasattr(self, "rc"):
if self.rc >= rc:
has_neigh = True
if not has_neigh:
self.build_neighbor(rc, max_neigh)
if hasattr(self, "_enlarge_box"):
box = self._enlarge_box
data = self._enlarge_data
else:
box = self.box
data = self.data
adf = AngularDistributionFunction(
data,
box,
rc_dict,
nbin,
self.verlet_list,
self.distance_list,
self.neighbor_number,
)
adf.compute()
return adf
[docs]
def cal_radial_distribution_function(
self, rc: float, nbin: int = 100, max_neigh: Optional[int] = None
) -> RadialDistributionFunction:
"""
Calculate radial distribution function g(r).
Parameters
----------
rc : float
Maximum distance for RDF calculation.
nbin : int, optional
Number of distance bins. Default is 100.
max_neigh : int, optional
Maximum number of neighbors per atom.
Returns
-------
RadialDistributionFunction
Object containing RDF data and partial RDFs if applicable.
Notes
-----
See :class:`~mdapy.radial_distribution_function.RadialDistributionFunction`
for implementation details and returned attributes.
"""
has_neigh = False
if hasattr(self, "rc"):
if self.rc >= rc:
has_neigh = True
if not has_neigh:
self.build_neighbor(rc, max_neigh)
if hasattr(self, "_enlarge_box"):
box = self._enlarge_box
data = self._enlarge_data
else:
box = self.box
data = self.data
if "element" in data.columns:
ele2type = {j: i + 1 for i, j in enumerate(data["element"].unique().sort())}
type_list = data.with_columns(
pl.col("element").replace_strict(ele2type).rechunk().alias("type")
)["type"].to_numpy()
self.__data = self.data.with_columns(type=type_list[: self.N])
elif "type" in data.columns:
type_list = data["type"].to_numpy()
else:
type_list = np.zeros(data.shape[0], np.int32)
rdf = RadialDistributionFunction(
rc,
nbin,
box,
self.verlet_list,
self.distance_list,
self.neighbor_number,
type_list,
)
rdf.compute()
return rdf
[docs]
def average_by_neighbor(
self,
average_rc: float,
property_name: str,
include_self: bool = True,
output_name: Optional[str] = None,
max_neigh: Optional[int] = None,
) -> None:
"""
Average a property over local neighborhoods.
Parameters
----------
average_rc : float
Cutoff radius for local averaging.
property_name : str
Name of the column in data to average.
include_self : bool
Whether to include the central atom in averaging. Default is True.
output_name : str, optional
Name for the output column. If None, uses '{property_name}_ave'.
max_neigh : int, optional
Maximum number of neighbors per atom.
Raises
------
AssertionError
If property_name is not a column in data.
"""
assert property_name in self.__data.columns, f"{property_name} not in data."
if hasattr(self, "rc"):
if self.rc < average_rc:
self.build_neighbor(average_rc, max_neigh)
else:
self.build_neighbor(average_rc, max_neigh)
assert not hasattr(self, "_enlarge_data"), "Only supprot for big box."
data = self.data
if hasattr(self, "_enlarge_data"):
data = self._enlarge_data
data = tool.average_by_neighbor(
average_rc,
data,
property_name,
self.verlet_list,
self.distance_list,
self.neighbor_number,
include_self,
output_name,
)[: self.N].rechunk()
self.update_data(data)
[docs]
def cal_cluster_analysis(
self,
rc: Union[float, int, Dict[str, float]] = 5.0,
max_neigh: Optional[int] = None,
) -> None:
"""
Perform cluster analysis to identify connected atomic groups. The results will be added as self.data['cluster_id'].
Parameters
----------
rc : float, int, or dict, optional
Cutoff radius for cluster connectivity. Can be:
* **float/int**: Single cutoff for all atom pairs
* **dict**: Type-specific cutoffs, e.g., {'1-1': 1.5, '1-2': 1.3}
Default is 5.0.
max_neigh : int, optional
Maximum number of neighbors per atom.
Notes
-----
See :class:`~mdapy.cluster_analysis.ClusterAnalysis` for
implementation details.
Examples
--------
Single cutoff:
>>> system.cal_cluster_analysis(rc=3.0)
Type-specific cutoffs:
>>> rc_dict = {"1-1": 2.8, "1-2": 3.0, "2-2": 3.2}
>>> system.cal_cluster_analysis(rc=rc_dict)
>>> # cluster_id column added to system.data
"""
if isinstance(rc, float) or isinstance(rc, int):
max_rc = rc
elif isinstance(rc, dict):
max_rc = max([i for i in rc.values()])
else:
raise "rc should be a positive number, or a dict like {'1-1':1.5, '1-2':1.3}"
if hasattr(self, "rc"):
if self.rc < max_rc:
self.build_neighbor(max_rc, max_neigh)
else:
self.build_neighbor(max_rc, max_neigh)
type_list = None
if isinstance(rc, dict):
assert "type" in self.data.columns, (
"Must have type for multi rc cluster calculation."
)
type_list = self.data["type"].to_numpy(allow_copy=False)
ca = ClusterAnalysis(
rc, self.verlet_list, self.distance_list, self.neighbor_number, type_list
)
ca.compute()
self.__data = self.data.with_columns(cluster_id=ca.particleClusters[: self.N])
[docs]
def cal_structure_entropy(
self,
rc: float,
sigma: float,
use_local_density: bool = False,
average_rc: float = 0.0,
max_neigh: Optional[int] = None,
) -> None:
"""
Calculate structural entropy for disorder quantification.
Parameters
----------
rc : float
Cutoff radius for neighbor search.
sigma : float
Width parameter for Gaussian kernel.
use_local_density : bool, optional
If True, use local density normalization. Default is False.
average_rc : float, optional
Radius for local averaging of entropy. If 0, no averaging.
Default is 0.0.
max_neigh : int, optional
Maximum number of neighbors per atom.
Notes
-----
See :class:`~mdapy.structure_entropy.StructureEntropy` for
implementation details.
Examples
--------
>>> system.cal_structure_entropy(rc=5.0, sigma=0.5)
>>> # entropy column added to system.data
With local averaging:
>>> system.cal_structure_entropy(rc=5.0, sigma=0.5, average_rc=3.0)
>>> # entropy and entropy_ave columns added to system.data
"""
if hasattr(self, "rc"):
if self.rc < rc:
self.build_neighbor(rc, max_neigh)
else:
self.build_neighbor(rc, max_neigh)
if hasattr(self, "_enlarge_data"):
box = self._enlarge_box
else:
box = self.box
SE = StructureEntropy(
box,
self.verlet_list,
self.distance_list,
self.neighbor_number,
rc,
sigma,
use_local_density,
average_rc,
)
SE.compute()
data = self.data.with_columns(entropy=SE.entropy[: self.N])
if average_rc > 0:
data = self.data.with_columns(entropy_ave=SE.entropy_ave[: self.N])
self.update_data(data)
[docs]
def cal_voronoi_volume(self) -> None:
"""
Calculate Voronoi cell volumes and related properties.
This method computes the Voronoi tessellation and adds three columns
to the data DataFrame:
* **volume**: Atomic volume from Voronoi cell
* **neighbor_number**: Coordination number from Voronoi neighbors
* **cavity_radius**: This is the distance from the atom to the farthest vertex of its Voronoi cell, representing the radius of the largest empty sphere (containing no other atoms) that touches the atom.
Notes
-----
See :class:`~mdapy.voronoi.Voronoi` for implementation details.
Examples
--------
>>> system.cal_voronoi_volume()
>>> # volume, neighbor_number, cavity_radius columns added to data
>>> avg_volume = system.data["volume"].mean()
"""
vor = Voronoi(self.box, self.data)
volume, neighbor_number, cavity_radius = vor.get_volume()
self.update_data(
self.data.with_columns(
volume=volume,
neighbor_number=neighbor_number,
cavity_radius=cavity_radius,
)
)
[docs]
def cal_chemical_species(
self,
search_species: Optional[List[str]] = None,
element_list: Optional[List[str]] = None,
check_most: int = 10,
add_mol_id: bool = False,
scale: float = 0.6,
) -> Dict[str, int]:
"""
Identify chemical species based on atomic connectivity.
Two atoms *i* and *j* are considered bonded if their interatomic distance
satisfies:
rij <= (vdW_radius_i + vdW_radius_j) * scale
where vdW_radius is the van der Waals radius of the corresponding element,
and ``scale`` is an empirical scaling factor. This approach is similar to
the connectivity detection method used in OpenBabel.
Parameters
----------
search_species : list of str, optional
Molecular formulas to search for, e.g. ``['H2O', 'CO2', 'Cl2', 'N2']``.
If provided, only these species will be counted. Otherwise, the most
frequent species will be returned.
element_list : list of str, optional
List of element symbols corresponding to atom types, e.g.
``['C', 'H', 'O']``. This is required if the ``'element'`` column is
not present in ``self.data``.
check_most : int, default=10
Number of most frequent species to return when ``search_species`` is None.
add_mol_id : bool, default=False
If True and ``search_species`` is provided, add a new column ``'mol_id'``
to the internal data. The molecule IDs are assigned according to the order
of ``search_species`` (zero-based index). Atoms not belonging to any
searched species are assigned ``-1``.
scale : float, default=0.6
Scaling factor applied to the sum of van der Waals radii when determining
bonding cutoff distances.
Returns
-------
dict of str to int
Dictionary mapping species formulas to their corresponding counts.
Examples
--------
>>> species = system.cal_chemical_species(["H2O", "CO2"])
>>> print(species)
{'H2O': 125, 'CO2': 42}
"""
import re
if "element" in self.data.columns:
element_list = self.data["element"].unique().sort()
ele2type = {j: i + 1 for i, j in enumerate(element_list)}
self.__data = self.__data.with_columns(
pl.col("element").replace_strict(ele2type).rechunk().alias("type")
)
else:
assert element_list is not None, (
"'element_list' must be provided because no 'element' column "
"is present in the data."
)
assert "type" in self.data.columns, (
"'type' column is required when 'element' column is not available."
)
Ntype = self.data["type"].max()
assert len(element_list) >= Ntype, (
f"'element_list' must contain at least {Ntype} elements to match "
f"the maximum atom type index ({Ntype})."
)
partial_cutoff = {}
for type1 in range(len(element_list)):
for type2 in range(type1, len(element_list)):
partial_cutoff[f"{type1 + 1}-{type2 + 1}"] = (
vdw_radii[atomic_numbers[element_list[type1]]]
+ vdw_radii[atomic_numbers[element_list[type2]]]
) * scale
self.cal_cluster_analysis(partial_cutoff, max_neigh=None)
if search_species is not None:
pattern = r"([A-Z][a-z]?)(\d*)"
trans_search_species = []
for old_name in search_species:
matches = re.findall(pattern, old_name)
matches.sort()
name = ""
for i, j in matches:
if len(j):
name += i * int(j)
else:
name += i
trans_search_species.append(name)
first_step = (
self.__data.with_columns(
pl.lit(
np.array(element_list)[(self.__data["type"] - 1).to_numpy()]
).alias("type_name")
)
.group_by("cluster_id")
.agg(pl.col("type_name"))
.with_columns(pl.col("type_name").list.sort())
.with_columns(pl.col("type_name").list.join(""))
)
res = (first_step["type_name"].value_counts()).filter(
pl.col("type_name").is_in(trans_search_species)
)
res = dict(zip(res[:, 0], res[:, 1]))
species = {}
for i, j in zip(search_species, trans_search_species):
if j not in res.keys():
species[i] = 0
else:
species[i] = res[j]
if add_mol_id:
clus_mol = first_step.with_columns(
pl.col("type_name")
.replace_strict(
{j: i for i, j in enumerate(trans_search_species)}, default=-1
)
.rechunk()
.alias("mol_id")
)
self.__data = self.__data.with_columns(
pl.col("cluster_id")
.replace_strict(
dict(zip(clus_mol["cluster_id"], clus_mol["mol_id"]))
)
.rechunk()
.alias("mol_id")
)
else:
res = (
(
self.__data.with_columns(
pl.lit(
np.array(element_list)[(self.__data["type"] - 1).to_numpy()]
).alias("type_name")
)
.group_by("cluster_id")
.agg(pl.col("type_name"))
.with_columns(pl.col("type_name").list.sort())
.with_columns(pl.col("type_name").list.join(""))["type_name"]
.value_counts()
)
.sort("count", descending=True)
.limit(check_most)
)
species = dict(zip(res[:, 0], res[:, 1]))
return species
if __name__ == "__main__":
system = System("tests/input_files/water.xyz")
# res = system.cal_chemical_species(search_species=['H2O'], add_mol_id=True)
res = system.cal_chemical_species(scale=0.4)
print(res)
# system.write_xyz('mol.xyz')